Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c6d58a2b

History | View | Annotate | Download (176.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_MASTER); note that all
58
      commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84

    
85
    if not cfg.IsCluster():
86
      raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                 " use 'gnt-cluster init' first.")
88
    if self.REQ_MASTER:
89
      master = sstore.GetMasterNode()
90
      if master != utils.HostInfo().name:
91
        raise errors.OpPrereqError("Commands must be run on the master"
92
                                   " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    No nodes should be returned as an empty list (and not None).
148

149
    Note that if the HPATH for a LU class is None, this function will
150
    not be called.
151

152
    """
153
    raise NotImplementedError
154

    
155
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
156
    """Notify the LU about the results of its hooks.
157

158
    This method is called every time a hooks phase is executed, and notifies
159
    the Logical Unit about the hooks' result. The LU can then use it to alter
160
    its result based on the hooks.  By default the method does nothing and the
161
    previous result is passed back unchanged but any LU can define it if it
162
    wants to use the local cluster hook-scripts somehow.
163

164
    Args:
165
      phase: the hooks phase that has just been run
166
      hooks_results: the results of the multi-node hooks rpc call
167
      feedback_fn: function to send feedback back to the caller
168
      lu_result: the previous result this LU had, or None in the PRE phase.
169

170
    """
171
    return lu_result
172

    
173

    
174
class NoHooksLU(LogicalUnit):
175
  """Simple LU which runs no hooks.
176

177
  This LU is intended as a parent for other LogicalUnits which will
178
  run no hooks, in order to reduce duplicate code.
179

180
  """
181
  HPATH = None
182
  HTYPE = None
183

    
184

    
185
def _AddHostToEtcHosts(hostname):
186
  """Wrapper around utils.SetEtcHostsEntry.
187

188
  """
189
  hi = utils.HostInfo(name=hostname)
190
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
191

    
192

    
193
def _RemoveHostFromEtcHosts(hostname):
194
  """Wrapper around utils.RemoveEtcHostsEntry.
195

196
  """
197
  hi = utils.HostInfo(name=hostname)
198
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
200

    
201

    
202
def _GetWantedNodes(lu, nodes):
203
  """Returns list of checked and expanded node names.
204

205
  Args:
206
    nodes: List of nodes (strings) or None for all
207

208
  """
209
  if not isinstance(nodes, list):
210
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
211

    
212
  if nodes:
213
    wanted = []
214

    
215
    for name in nodes:
216
      node = lu.cfg.ExpandNodeName(name)
217
      if node is None:
218
        raise errors.OpPrereqError("No such node name '%s'" % name)
219
      wanted.append(node)
220

    
221
  else:
222
    wanted = lu.cfg.GetNodeList()
223
  return utils.NiceSort(wanted)
224

    
225

    
226
def _GetWantedInstances(lu, instances):
227
  """Returns list of checked and expanded instance names.
228

229
  Args:
230
    instances: List of instances (strings) or None for all
231

232
  """
233
  if not isinstance(instances, list):
234
    raise errors.OpPrereqError("Invalid argument type 'instances'")
235

    
236
  if instances:
237
    wanted = []
238

    
239
    for name in instances:
240
      instance = lu.cfg.ExpandInstanceName(name)
241
      if instance is None:
242
        raise errors.OpPrereqError("No such instance name '%s'" % name)
243
      wanted.append(instance)
244

    
245
  else:
246
    wanted = lu.cfg.GetInstanceList()
247
  return utils.NiceSort(wanted)
248

    
249

    
250
def _CheckOutputFields(static, dynamic, selected):
251
  """Checks whether all selected fields are valid.
252

253
  Args:
254
    static: Static fields
255
    dynamic: Dynamic fields
256

257
  """
258
  static_fields = frozenset(static)
259
  dynamic_fields = frozenset(dynamic)
260

    
261
  all_fields = static_fields | dynamic_fields
262

    
263
  if not all_fields.issuperset(selected):
264
    raise errors.OpPrereqError("Unknown output fields selected: %s"
265
                               % ",".join(frozenset(selected).
266
                                          difference(all_fields)))
267

    
268

    
269
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
270
                          memory, vcpus, nics):
271
  """Builds instance related env variables for hooks from single variables.
272

273
  Args:
274
    secondary_nodes: List of secondary nodes as strings
275
  """
276
  env = {
277
    "OP_TARGET": name,
278
    "INSTANCE_NAME": name,
279
    "INSTANCE_PRIMARY": primary_node,
280
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
281
    "INSTANCE_OS_TYPE": os_type,
282
    "INSTANCE_STATUS": status,
283
    "INSTANCE_MEMORY": memory,
284
    "INSTANCE_VCPUS": vcpus,
285
  }
286

    
287
  if nics:
288
    nic_count = len(nics)
289
    for idx, (ip, bridge, mac) in enumerate(nics):
290
      if ip is None:
291
        ip = ""
292
      env["INSTANCE_NIC%d_IP" % idx] = ip
293
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
294
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
295
  else:
296
    nic_count = 0
297

    
298
  env["INSTANCE_NIC_COUNT"] = nic_count
299

    
300
  return env
301

    
302

    
303
def _BuildInstanceHookEnvByObject(instance, override=None):
304
  """Builds instance related env variables for hooks from an object.
305

306
  Args:
307
    instance: objects.Instance object of instance
308
    override: dict of values to override
309
  """
310
  args = {
311
    'name': instance.name,
312
    'primary_node': instance.primary_node,
313
    'secondary_nodes': instance.secondary_nodes,
314
    'os_type': instance.os,
315
    'status': instance.os,
316
    'memory': instance.memory,
317
    'vcpus': instance.vcpus,
318
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
319
  }
320
  if override:
321
    args.update(override)
322
  return _BuildInstanceHookEnv(**args)
323

    
324

    
325
def _HasValidVG(vglist, vgname):
326
  """Checks if the volume group list is valid.
327

328
  A non-None return value means there's an error, and the return value
329
  is the error message.
330

331
  """
332
  vgsize = vglist.get(vgname, None)
333
  if vgsize is None:
334
    return "volume group '%s' missing" % vgname
335
  elif vgsize < 20480:
336
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
337
            (vgname, vgsize))
338
  return None
339

    
340

    
341
def _InitSSHSetup(node):
342
  """Setup the SSH configuration for the cluster.
343

344

345
  This generates a dsa keypair for root, adds the pub key to the
346
  permitted hosts and adds the hostkey to its own known hosts.
347

348
  Args:
349
    node: the name of this host as a fqdn
350

351
  """
352
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
353

    
354
  for name in priv_key, pub_key:
355
    if os.path.exists(name):
356
      utils.CreateBackup(name)
357
    utils.RemoveFile(name)
358

    
359
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
360
                         "-f", priv_key,
361
                         "-q", "-N", ""])
362
  if result.failed:
363
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
364
                             result.output)
365

    
366
  f = open(pub_key, 'r')
367
  try:
368
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
369
  finally:
370
    f.close()
371

    
372

    
373
def _InitGanetiServerSetup(ss):
374
  """Setup the necessary configuration for the initial node daemon.
375

376
  This creates the nodepass file containing the shared password for
377
  the cluster and also generates the SSL certificate.
378

379
  """
380
  # Create pseudo random password
381
  randpass = sha.new(os.urandom(64)).hexdigest()
382
  # and write it into sstore
383
  ss.SetKey(ss.SS_NODED_PASS, randpass)
384

    
385
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
386
                         "-days", str(365*5), "-nodes", "-x509",
387
                         "-keyout", constants.SSL_CERT_FILE,
388
                         "-out", constants.SSL_CERT_FILE, "-batch"])
389
  if result.failed:
390
    raise errors.OpExecError("could not generate server ssl cert, command"
391
                             " %s had exitcode %s and error message %s" %
392
                             (result.cmd, result.exit_code, result.output))
393

    
394
  os.chmod(constants.SSL_CERT_FILE, 0400)
395

    
396
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
397

    
398
  if result.failed:
399
    raise errors.OpExecError("Could not start the node daemon, command %s"
400
                             " had exitcode %s and error %s" %
401
                             (result.cmd, result.exit_code, result.output))
402

    
403

    
404
def _CheckInstanceBridgesExist(instance):
405
  """Check that the brigdes needed by an instance exist.
406

407
  """
408
  # check bridges existance
409
  brlist = [nic.bridge for nic in instance.nics]
410
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
411
    raise errors.OpPrereqError("one or more target bridges %s does not"
412
                               " exist on destination node '%s'" %
413
                               (brlist, instance.primary_node))
414

    
415

    
416
class LUInitCluster(LogicalUnit):
417
  """Initialise the cluster.
418

419
  """
420
  HPATH = "cluster-init"
421
  HTYPE = constants.HTYPE_CLUSTER
422
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
423
              "def_bridge", "master_netdev", "file_storage_dir"]
424
  REQ_CLUSTER = False
425

    
426
  def BuildHooksEnv(self):
427
    """Build hooks env.
428

429
    Notes: Since we don't require a cluster, we must manually add
430
    ourselves in the post-run node list.
431

432
    """
433
    env = {"OP_TARGET": self.op.cluster_name}
434
    return env, [], [self.hostname.name]
435

    
436
  def CheckPrereq(self):
437
    """Verify that the passed name is a valid one.
438

439
    """
440
    if config.ConfigWriter.IsCluster():
441
      raise errors.OpPrereqError("Cluster is already initialised")
442

    
443
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
444
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
445
        raise errors.OpPrereqError("Please prepare the cluster VNC"
446
                                   "password file %s" %
447
                                   constants.VNC_PASSWORD_FILE)
448

    
449
    self.hostname = hostname = utils.HostInfo()
450

    
451
    if hostname.ip.startswith("127."):
452
      raise errors.OpPrereqError("This host's IP resolves to the private"
453
                                 " range (%s). Please fix DNS or %s." %
454
                                 (hostname.ip, constants.ETC_HOSTS))
455

    
456
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
457
                         source=constants.LOCALHOST_IP_ADDRESS):
458
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
459
                                 " to %s,\nbut this ip address does not"
460
                                 " belong to this host."
461
                                 " Aborting." % hostname.ip)
462

    
463
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
464

    
465
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
466
                     timeout=5):
467
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
468

    
469
    secondary_ip = getattr(self.op, "secondary_ip", None)
470
    if secondary_ip and not utils.IsValidIP(secondary_ip):
471
      raise errors.OpPrereqError("Invalid secondary ip given")
472
    if (secondary_ip and
473
        secondary_ip != hostname.ip and
474
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
475
                           source=constants.LOCALHOST_IP_ADDRESS))):
476
      raise errors.OpPrereqError("You gave %s as secondary IP,"
477
                                 " but it does not belong to this host." %
478
                                 secondary_ip)
479
    self.secondary_ip = secondary_ip
480

    
481
    if not hasattr(self.op, "vg_name"):
482
      self.op.vg_name = None
483
    # if vg_name not None, checks if volume group is valid
484
    if self.op.vg_name:
485
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
486
      if vgstatus:
487
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
488
                                   " you are not using lvm" % vgstatus)
489

    
490
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
491

    
492
    if not os.path.isabs(self.op.file_storage_dir):
493
      raise errors.OpPrereqError("The file storage directory you have is"
494
                                 " not an absolute path.")
495

    
496
    if not os.path.exists(self.op.file_storage_dir):
497
      try:
498
        os.makedirs(self.op.file_storage_dir, 0750)
499
      except OSError, err:
500
        raise errors.OpPrereqError("Cannot create file storage directory"
501
                                   " '%s': %s" %
502
                                   (self.op.file_storage_dir, err))
503

    
504
    if not os.path.isdir(self.op.file_storage_dir):
505
      raise errors.OpPrereqError("The file storage directory '%s' is not"
506
                                 " a directory." % self.op.file_storage_dir)
507

    
508
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
509
                    self.op.mac_prefix):
510
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
511
                                 self.op.mac_prefix)
512

    
513
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
514
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
515
                                 self.op.hypervisor_type)
516

    
517
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
518
    if result.failed:
519
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
520
                                 (self.op.master_netdev,
521
                                  result.output.strip()))
522

    
523
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
524
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
525
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
526
                                 " executable." % constants.NODE_INITD_SCRIPT)
527

    
528
  def Exec(self, feedback_fn):
529
    """Initialize the cluster.
530

531
    """
532
    clustername = self.clustername
533
    hostname = self.hostname
534

    
535
    # set up the simple store
536
    self.sstore = ss = ssconf.SimpleStore()
537
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
538
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
539
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
540
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
541
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
542
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
543
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
544

    
545
    # set up the inter-node password and certificate
546
    _InitGanetiServerSetup(ss)
547

    
548
    # start the master ip
549
    rpc.call_node_start_master(hostname.name)
550

    
551
    # set up ssh config and /etc/hosts
552
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
553
    try:
554
      sshline = f.read()
555
    finally:
556
      f.close()
557
    sshkey = sshline.split(" ")[1]
558

    
559
    _AddHostToEtcHosts(hostname.name)
560
    _InitSSHSetup(hostname.name)
561

    
562
    # init of cluster config file
563
    self.cfg = cfgw = config.ConfigWriter()
564
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
565
                    sshkey, self.op.mac_prefix,
566
                    self.op.vg_name, self.op.def_bridge)
567

    
568
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
569

    
570

    
571
class LUDestroyCluster(NoHooksLU):
572
  """Logical unit for destroying the cluster.
573

574
  """
575
  _OP_REQP = []
576

    
577
  def CheckPrereq(self):
578
    """Check prerequisites.
579

580
    This checks whether the cluster is empty.
581

582
    Any errors are signalled by raising errors.OpPrereqError.
583

584
    """
585
    master = self.sstore.GetMasterNode()
586

    
587
    nodelist = self.cfg.GetNodeList()
588
    if len(nodelist) != 1 or nodelist[0] != master:
589
      raise errors.OpPrereqError("There are still %d node(s) in"
590
                                 " this cluster." % (len(nodelist) - 1))
591
    instancelist = self.cfg.GetInstanceList()
592
    if instancelist:
593
      raise errors.OpPrereqError("There are still %d instance(s) in"
594
                                 " this cluster." % len(instancelist))
595

    
596
  def Exec(self, feedback_fn):
597
    """Destroys the cluster.
598

599
    """
600
    master = self.sstore.GetMasterNode()
601
    if not rpc.call_node_stop_master(master):
602
      raise errors.OpExecError("Could not disable the master role")
603
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
604
    utils.CreateBackup(priv_key)
605
    utils.CreateBackup(pub_key)
606
    rpc.call_node_leave_cluster(master)
607

    
608

    
609
class LUVerifyCluster(LogicalUnit):
610
  """Verifies the cluster status.
611

612
  """
613
  HPATH = "cluster-verify"
614
  HTYPE = constants.HTYPE_CLUSTER
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    if 'node-net-test' not in node_result:
683
      bad = True
684
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
685
    else:
686
      if node_result['node-net-test']:
687
        bad = True
688
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
689
        for node in nlist:
690
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
691
                          (node, node_result['node-net-test'][node]))
692

    
693
    hyp_result = node_result.get('hypervisor', None)
694
    if hyp_result is not None:
695
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
696
    return bad
697

    
698
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
699
                      node_instance, feedback_fn):
700
    """Verify an instance.
701

702
    This function checks to see if the required block devices are
703
    available on the instance's node.
704

705
    """
706
    bad = False
707

    
708
    node_current = instanceconfig.primary_node
709

    
710
    node_vol_should = {}
711
    instanceconfig.MapLVsByNode(node_vol_should)
712

    
713
    for node in node_vol_should:
714
      for volume in node_vol_should[node]:
715
        if node not in node_vol_is or volume not in node_vol_is[node]:
716
          feedback_fn("  - ERROR: volume %s missing on node %s" %
717
                          (volume, node))
718
          bad = True
719

    
720
    if not instanceconfig.status == 'down':
721
      if (node_current not in node_instance or
722
          not instance in node_instance[node_current]):
723
        feedback_fn("  - ERROR: instance %s not running on node %s" %
724
                        (instance, node_current))
725
        bad = True
726

    
727
    for node in node_instance:
728
      if (not node == node_current):
729
        if instance in node_instance[node]:
730
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
731
                          (instance, node))
732
          bad = True
733

    
734
    return bad
735

    
736
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
737
    """Verify if there are any unknown volumes in the cluster.
738

739
    The .os, .swap and backup volumes are ignored. All other volumes are
740
    reported as unknown.
741

742
    """
743
    bad = False
744

    
745
    for node in node_vol_is:
746
      for volume in node_vol_is[node]:
747
        if node not in node_vol_should or volume not in node_vol_should[node]:
748
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
749
                      (volume, node))
750
          bad = True
751
    return bad
752

    
753
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
754
    """Verify the list of running instances.
755

756
    This checks what instances are running but unknown to the cluster.
757

758
    """
759
    bad = False
760
    for node in node_instance:
761
      for runninginstance in node_instance[node]:
762
        if runninginstance not in instancelist:
763
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
764
                          (runninginstance, node))
765
          bad = True
766
    return bad
767

    
768
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
769
    """Verify N+1 Memory Resilience.
770

771
    Check that if one single node dies we can still start all the instances it
772
    was primary for.
773

774
    """
775
    bad = False
776

    
777
    for node, nodeinfo in node_info.iteritems():
778
      # This code checks that every node which is now listed as secondary has
779
      # enough memory to host all instances it is supposed to should a single
780
      # other node in the cluster fail.
781
      # FIXME: not ready for failover to an arbitrary node
782
      # FIXME: does not support file-backed instances
783
      # WARNING: we currently take into account down instances as well as up
784
      # ones, considering that even if they're down someone might want to start
785
      # them even in the event of a node failure.
786
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
787
        needed_mem = 0
788
        for instance in instances:
789
          needed_mem += instance_cfg[instance].memory
790
        if nodeinfo['mfree'] < needed_mem:
791
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
792
                      " failovers should node %s fail" % (node, prinode))
793
          bad = True
794
    return bad
795

    
796
  def CheckPrereq(self):
797
    """Check prerequisites.
798

799
    Transform the list of checks we're going to skip into a set and check that
800
    all its members are valid.
801

802
    """
803
    self.skip_set = frozenset(self.op.skip_checks)
804
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
805
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
806

    
807
  def BuildHooksEnv(self):
808
    """Build hooks env.
809

810
    Cluster-Verify hooks just rone in the post phase and their failure makes
811
    the output be logged in the verify output and the verification to fail.
812

813
    """
814
    all_nodes = self.cfg.GetNodeList()
815
    # TODO: populate the environment with useful information for verify hooks
816
    env = {}
817
    return env, [], all_nodes
818

    
819
  def Exec(self, feedback_fn):
820
    """Verify integrity of cluster, performing various test on nodes.
821

822
    """
823
    bad = False
824
    feedback_fn("* Verifying global settings")
825
    for msg in self.cfg.VerifyConfig():
826
      feedback_fn("  - ERROR: %s" % msg)
827

    
828
    vg_name = self.cfg.GetVGName()
829
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
830
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
831
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
832
    i_non_redundant = [] # Non redundant instances
833
    node_volume = {}
834
    node_instance = {}
835
    node_info = {}
836
    instance_cfg = {}
837

    
838
    # FIXME: verify OS list
839
    # do local checksums
840
    file_names = list(self.sstore.GetFileList())
841
    file_names.append(constants.SSL_CERT_FILE)
842
    file_names.append(constants.CLUSTER_CONF_FILE)
843
    local_checksums = utils.FingerprintFiles(file_names)
844

    
845
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
846
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
847
    all_instanceinfo = rpc.call_instance_list(nodelist)
848
    all_vglist = rpc.call_vg_list(nodelist)
849
    node_verify_param = {
850
      'filelist': file_names,
851
      'nodelist': nodelist,
852
      'hypervisor': None,
853
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
854
                        for node in nodeinfo]
855
      }
856
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
857
    all_rversion = rpc.call_version(nodelist)
858
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
859

    
860
    for node in nodelist:
861
      feedback_fn("* Verifying node %s" % node)
862
      result = self._VerifyNode(node, file_names, local_checksums,
863
                                all_vglist[node], all_nvinfo[node],
864
                                all_rversion[node], feedback_fn)
865
      bad = bad or result
866

    
867
      # node_volume
868
      volumeinfo = all_volumeinfo[node]
869

    
870
      if isinstance(volumeinfo, basestring):
871
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
872
                    (node, volumeinfo[-400:].encode('string_escape')))
873
        bad = True
874
        node_volume[node] = {}
875
      elif not isinstance(volumeinfo, dict):
876
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
877
        bad = True
878
        continue
879
      else:
880
        node_volume[node] = volumeinfo
881

    
882
      # node_instance
883
      nodeinstance = all_instanceinfo[node]
884
      if type(nodeinstance) != list:
885
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
886
        bad = True
887
        continue
888

    
889
      node_instance[node] = nodeinstance
890

    
891
      # node_info
892
      nodeinfo = all_ninfo[node]
893
      if not isinstance(nodeinfo, dict):
894
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
895
        bad = True
896
        continue
897

    
898
      try:
899
        node_info[node] = {
900
          "mfree": int(nodeinfo['memory_free']),
901
          "dfree": int(nodeinfo['vg_free']),
902
          "pinst": [],
903
          "sinst": [],
904
          # dictionary holding all instances this node is secondary for,
905
          # grouped by their primary node. Each key is a cluster node, and each
906
          # value is a list of instances which have the key as primary and the
907
          # current node as secondary.  this is handy to calculate N+1 memory
908
          # availability if you can only failover from a primary to its
909
          # secondary.
910
          "sinst-by-pnode": {},
911
        }
912
      except ValueError:
913
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
914
        bad = True
915
        continue
916

    
917
    node_vol_should = {}
918

    
919
    for instance in instancelist:
920
      feedback_fn("* Verifying instance %s" % instance)
921
      inst_config = self.cfg.GetInstanceInfo(instance)
922
      result =  self._VerifyInstance(instance, inst_config, node_volume,
923
                                     node_instance, feedback_fn)
924
      bad = bad or result
925

    
926
      inst_config.MapLVsByNode(node_vol_should)
927

    
928
      instance_cfg[instance] = inst_config
929

    
930
      pnode = inst_config.primary_node
931
      if pnode in node_info:
932
        node_info[pnode]['pinst'].append(instance)
933
      else:
934
        feedback_fn("  - ERROR: instance %s, connection to primary node"
935
                    " %s failed" % (instance, pnode))
936
        bad = True
937

    
938
      # If the instance is non-redundant we cannot survive losing its primary
939
      # node, so we are not N+1 compliant. On the other hand we have no disk
940
      # templates with more than one secondary so that situation is not well
941
      # supported either.
942
      # FIXME: does not support file-backed instances
943
      if len(inst_config.secondary_nodes) == 0:
944
        i_non_redundant.append(instance)
945
      elif len(inst_config.secondary_nodes) > 1:
946
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
947
                    % instance)
948

    
949
      for snode in inst_config.secondary_nodes:
950
        if snode in node_info:
951
          node_info[snode]['sinst'].append(instance)
952
          if pnode not in node_info[snode]['sinst-by-pnode']:
953
            node_info[snode]['sinst-by-pnode'][pnode] = []
954
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
955
        else:
956
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
957
                      " %s failed" % (instance, snode))
958

    
959
    feedback_fn("* Verifying orphan volumes")
960
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
961
                                       feedback_fn)
962
    bad = bad or result
963

    
964
    feedback_fn("* Verifying remaining instances")
965
    result = self._VerifyOrphanInstances(instancelist, node_instance,
966
                                         feedback_fn)
967
    bad = bad or result
968

    
969
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
970
      feedback_fn("* Verifying N+1 Memory redundancy")
971
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
972
      bad = bad or result
973

    
974
    feedback_fn("* Other Notes")
975
    if i_non_redundant:
976
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
977
                  % len(i_non_redundant))
978

    
979
    return int(bad)
980

    
981
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
982
    """Analize the post-hooks' result, handle it, and send some
983
    nicely-formatted feedback back to the user.
984

985
    Args:
986
      phase: the hooks phase that has just been run
987
      hooks_results: the results of the multi-node hooks rpc call
988
      feedback_fn: function to send feedback back to the caller
989
      lu_result: previous Exec result
990

991
    """
992
    # We only really run POST phase hooks, and are only interested in their results
993
    if phase == constants.HOOKS_PHASE_POST:
994
      # Used to change hooks' output to proper indentation
995
      indent_re = re.compile('^', re.M)
996
      feedback_fn("* Hooks Results")
997
      if not hooks_results:
998
        feedback_fn("  - ERROR: general communication failure")
999
        lu_result = 1
1000
      else:
1001
        for node_name in hooks_results:
1002
          show_node_header = True
1003
          res = hooks_results[node_name]
1004
          if res is False or not isinstance(res, list):
1005
            feedback_fn("    Communication failure")
1006
            lu_result = 1
1007
            continue
1008
          for script, hkr, output in res:
1009
            if hkr == constants.HKR_FAIL:
1010
              # The node header is only shown once, if there are
1011
              # failing hooks on that node
1012
              if show_node_header:
1013
                feedback_fn("  Node %s:" % node_name)
1014
                show_node_header = False
1015
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1016
              output = indent_re.sub('      ', output)
1017
              feedback_fn("%s" % output)
1018
              lu_result = 1
1019

    
1020
      return lu_result
1021

    
1022

    
1023
class LUVerifyDisks(NoHooksLU):
1024
  """Verifies the cluster disks status.
1025

1026
  """
1027
  _OP_REQP = []
1028

    
1029
  def CheckPrereq(self):
1030
    """Check prerequisites.
1031

1032
    This has no prerequisites.
1033

1034
    """
1035
    pass
1036

    
1037
  def Exec(self, feedback_fn):
1038
    """Verify integrity of cluster disks.
1039

1040
    """
1041
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1042

    
1043
    vg_name = self.cfg.GetVGName()
1044
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1045
    instances = [self.cfg.GetInstanceInfo(name)
1046
                 for name in self.cfg.GetInstanceList()]
1047

    
1048
    nv_dict = {}
1049
    for inst in instances:
1050
      inst_lvs = {}
1051
      if (inst.status != "up" or
1052
          inst.disk_template not in constants.DTS_NET_MIRROR):
1053
        continue
1054
      inst.MapLVsByNode(inst_lvs)
1055
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1056
      for node, vol_list in inst_lvs.iteritems():
1057
        for vol in vol_list:
1058
          nv_dict[(node, vol)] = inst
1059

    
1060
    if not nv_dict:
1061
      return result
1062

    
1063
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1064

    
1065
    to_act = set()
1066
    for node in nodes:
1067
      # node_volume
1068
      lvs = node_lvs[node]
1069

    
1070
      if isinstance(lvs, basestring):
1071
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1072
        res_nlvm[node] = lvs
1073
      elif not isinstance(lvs, dict):
1074
        logger.Info("connection to node %s failed or invalid data returned" %
1075
                    (node,))
1076
        res_nodes.append(node)
1077
        continue
1078

    
1079
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1080
        inst = nv_dict.pop((node, lv_name), None)
1081
        if (not lv_online and inst is not None
1082
            and inst.name not in res_instances):
1083
          res_instances.append(inst.name)
1084

    
1085
    # any leftover items in nv_dict are missing LVs, let's arrange the
1086
    # data better
1087
    for key, inst in nv_dict.iteritems():
1088
      if inst.name not in res_missing:
1089
        res_missing[inst.name] = []
1090
      res_missing[inst.name].append(key)
1091

    
1092
    return result
1093

    
1094

    
1095
class LURenameCluster(LogicalUnit):
1096
  """Rename the cluster.
1097

1098
  """
1099
  HPATH = "cluster-rename"
1100
  HTYPE = constants.HTYPE_CLUSTER
1101
  _OP_REQP = ["name"]
1102

    
1103
  def BuildHooksEnv(self):
1104
    """Build hooks env.
1105

1106
    """
1107
    env = {
1108
      "OP_TARGET": self.sstore.GetClusterName(),
1109
      "NEW_NAME": self.op.name,
1110
      }
1111
    mn = self.sstore.GetMasterNode()
1112
    return env, [mn], [mn]
1113

    
1114
  def CheckPrereq(self):
1115
    """Verify that the passed name is a valid one.
1116

1117
    """
1118
    hostname = utils.HostInfo(self.op.name)
1119

    
1120
    new_name = hostname.name
1121
    self.ip = new_ip = hostname.ip
1122
    old_name = self.sstore.GetClusterName()
1123
    old_ip = self.sstore.GetMasterIP()
1124
    if new_name == old_name and new_ip == old_ip:
1125
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1126
                                 " cluster has changed")
1127
    if new_ip != old_ip:
1128
      result = utils.RunCmd(["fping", "-q", new_ip])
1129
      if not result.failed:
1130
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1131
                                   " reachable on the network. Aborting." %
1132
                                   new_ip)
1133

    
1134
    self.op.name = new_name
1135

    
1136
  def Exec(self, feedback_fn):
1137
    """Rename the cluster.
1138

1139
    """
1140
    clustername = self.op.name
1141
    ip = self.ip
1142
    ss = self.sstore
1143

    
1144
    # shutdown the master IP
1145
    master = ss.GetMasterNode()
1146
    if not rpc.call_node_stop_master(master):
1147
      raise errors.OpExecError("Could not disable the master role")
1148

    
1149
    try:
1150
      # modify the sstore
1151
      ss.SetKey(ss.SS_MASTER_IP, ip)
1152
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1153

    
1154
      # Distribute updated ss config to all nodes
1155
      myself = self.cfg.GetNodeInfo(master)
1156
      dist_nodes = self.cfg.GetNodeList()
1157
      if myself.name in dist_nodes:
1158
        dist_nodes.remove(myself.name)
1159

    
1160
      logger.Debug("Copying updated ssconf data to all nodes")
1161
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1162
        fname = ss.KeyToFilename(keyname)
1163
        result = rpc.call_upload_file(dist_nodes, fname)
1164
        for to_node in dist_nodes:
1165
          if not result[to_node]:
1166
            logger.Error("copy of file %s to node %s failed" %
1167
                         (fname, to_node))
1168
    finally:
1169
      if not rpc.call_node_start_master(master):
1170
        logger.Error("Could not re-enable the master role on the master,"
1171
                     " please restart manually.")
1172

    
1173

    
1174
def _RecursiveCheckIfLVMBased(disk):
1175
  """Check if the given disk or its children are lvm-based.
1176

1177
  Args:
1178
    disk: ganeti.objects.Disk object
1179

1180
  Returns:
1181
    boolean indicating whether a LD_LV dev_type was found or not
1182

1183
  """
1184
  if disk.children:
1185
    for chdisk in disk.children:
1186
      if _RecursiveCheckIfLVMBased(chdisk):
1187
        return True
1188
  return disk.dev_type == constants.LD_LV
1189

    
1190

    
1191
class LUSetClusterParams(LogicalUnit):
1192
  """Change the parameters of the cluster.
1193

1194
  """
1195
  HPATH = "cluster-modify"
1196
  HTYPE = constants.HTYPE_CLUSTER
1197
  _OP_REQP = []
1198

    
1199
  def BuildHooksEnv(self):
1200
    """Build hooks env.
1201

1202
    """
1203
    env = {
1204
      "OP_TARGET": self.sstore.GetClusterName(),
1205
      "NEW_VG_NAME": self.op.vg_name,
1206
      }
1207
    mn = self.sstore.GetMasterNode()
1208
    return env, [mn], [mn]
1209

    
1210
  def CheckPrereq(self):
1211
    """Check prerequisites.
1212

1213
    This checks whether the given params don't conflict and
1214
    if the given volume group is valid.
1215

1216
    """
1217
    if not self.op.vg_name:
1218
      instances = [self.cfg.GetInstanceInfo(name)
1219
                   for name in self.cfg.GetInstanceList()]
1220
      for inst in instances:
1221
        for disk in inst.disks:
1222
          if _RecursiveCheckIfLVMBased(disk):
1223
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1224
                                       " lvm-based instances exist")
1225

    
1226
    # if vg_name not None, checks given volume group on all nodes
1227
    if self.op.vg_name:
1228
      node_list = self.cfg.GetNodeList()
1229
      vglist = rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1232
        if vgstatus:
1233
          raise errors.OpPrereqError("Error on node '%s': %s" %
1234
                                     (node, vgstatus))
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Change the parameters of the cluster.
1238

1239
    """
1240
    if self.op.vg_name != self.cfg.GetVGName():
1241
      self.cfg.SetVGName(self.op.vg_name)
1242
    else:
1243
      feedback_fn("Cluster LVM configuration already in desired"
1244
                  " state, not changing")
1245

    
1246

    
1247
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1248
  """Sleep and poll for an instance's disk to sync.
1249

1250
  """
1251
  if not instance.disks:
1252
    return True
1253

    
1254
  if not oneshot:
1255
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1256

    
1257
  node = instance.primary_node
1258

    
1259
  for dev in instance.disks:
1260
    cfgw.SetDiskID(dev, node)
1261

    
1262
  retries = 0
1263
  while True:
1264
    max_time = 0
1265
    done = True
1266
    cumul_degraded = False
1267
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1268
    if not rstats:
1269
      proc.LogWarning("Can't get any data from node %s" % node)
1270
      retries += 1
1271
      if retries >= 10:
1272
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1273
                                 " aborting." % node)
1274
      time.sleep(6)
1275
      continue
1276
    retries = 0
1277
    for i in range(len(rstats)):
1278
      mstat = rstats[i]
1279
      if mstat is None:
1280
        proc.LogWarning("Can't compute data for node %s/%s" %
1281
                        (node, instance.disks[i].iv_name))
1282
        continue
1283
      # we ignore the ldisk parameter
1284
      perc_done, est_time, is_degraded, _ = mstat
1285
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1286
      if perc_done is not None:
1287
        done = False
1288
        if est_time is not None:
1289
          rem_time = "%d estimated seconds remaining" % est_time
1290
          max_time = est_time
1291
        else:
1292
          rem_time = "no time estimate"
1293
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1294
                     (instance.disks[i].iv_name, perc_done, rem_time))
1295
    if done or oneshot:
1296
      break
1297

    
1298
    if unlock:
1299
      #utils.Unlock('cmd')
1300
      pass
1301
    try:
1302
      time.sleep(min(60, max_time))
1303
    finally:
1304
      if unlock:
1305
        #utils.Lock('cmd')
1306
        pass
1307

    
1308
  if done:
1309
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1310
  return not cumul_degraded
1311

    
1312

    
1313
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1314
  """Check that mirrors are not degraded.
1315

1316
  The ldisk parameter, if True, will change the test from the
1317
  is_degraded attribute (which represents overall non-ok status for
1318
  the device(s)) to the ldisk (representing the local storage status).
1319

1320
  """
1321
  cfgw.SetDiskID(dev, node)
1322
  if ldisk:
1323
    idx = 6
1324
  else:
1325
    idx = 5
1326

    
1327
  result = True
1328
  if on_primary or dev.AssembleOnSecondary():
1329
    rstats = rpc.call_blockdev_find(node, dev)
1330
    if not rstats:
1331
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1332
      result = False
1333
    else:
1334
      result = result and (not rstats[idx])
1335
  if dev.children:
1336
    for child in dev.children:
1337
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1338

    
1339
  return result
1340

    
1341

    
1342
class LUDiagnoseOS(NoHooksLU):
1343
  """Logical unit for OS diagnose/query.
1344

1345
  """
1346
  _OP_REQP = ["output_fields", "names"]
1347

    
1348
  def CheckPrereq(self):
1349
    """Check prerequisites.
1350

1351
    This always succeeds, since this is a pure query LU.
1352

1353
    """
1354
    if self.op.names:
1355
      raise errors.OpPrereqError("Selective OS query not supported")
1356

    
1357
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1358
    _CheckOutputFields(static=[],
1359
                       dynamic=self.dynamic_fields,
1360
                       selected=self.op.output_fields)
1361

    
1362
  @staticmethod
1363
  def _DiagnoseByOS(node_list, rlist):
1364
    """Remaps a per-node return list into an a per-os per-node dictionary
1365

1366
      Args:
1367
        node_list: a list with the names of all nodes
1368
        rlist: a map with node names as keys and OS objects as values
1369

1370
      Returns:
1371
        map: a map with osnames as keys and as value another map, with
1372
             nodes as
1373
             keys and list of OS objects as values
1374
             e.g. {"debian-etch": {"node1": [<object>,...],
1375
                                   "node2": [<object>,]}
1376
                  }
1377

1378
    """
1379
    all_os = {}
1380
    for node_name, nr in rlist.iteritems():
1381
      if not nr:
1382
        continue
1383
      for os_obj in nr:
1384
        if os_obj.name not in all_os:
1385
          # build a list of nodes for this os containing empty lists
1386
          # for each node in node_list
1387
          all_os[os_obj.name] = {}
1388
          for nname in node_list:
1389
            all_os[os_obj.name][nname] = []
1390
        all_os[os_obj.name][node_name].append(os_obj)
1391
    return all_os
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Compute the list of OSes.
1395

1396
    """
1397
    node_list = self.cfg.GetNodeList()
1398
    node_data = rpc.call_os_diagnose(node_list)
1399
    if node_data == False:
1400
      raise errors.OpExecError("Can't gather the list of OSes")
1401
    pol = self._DiagnoseByOS(node_list, node_data)
1402
    output = []
1403
    for os_name, os_data in pol.iteritems():
1404
      row = []
1405
      for field in self.op.output_fields:
1406
        if field == "name":
1407
          val = os_name
1408
        elif field == "valid":
1409
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1410
        elif field == "node_status":
1411
          val = {}
1412
          for node_name, nos_list in os_data.iteritems():
1413
            val[node_name] = [(v.status, v.path) for v in nos_list]
1414
        else:
1415
          raise errors.ParameterError(field)
1416
        row.append(val)
1417
      output.append(row)
1418

    
1419
    return output
1420

    
1421

    
1422
class LURemoveNode(LogicalUnit):
1423
  """Logical unit for removing a node.
1424

1425
  """
1426
  HPATH = "node-remove"
1427
  HTYPE = constants.HTYPE_NODE
1428
  _OP_REQP = ["node_name"]
1429

    
1430
  def BuildHooksEnv(self):
1431
    """Build hooks env.
1432

1433
    This doesn't run on the target node in the pre phase as a failed
1434
    node would not allows itself to run.
1435

1436
    """
1437
    env = {
1438
      "OP_TARGET": self.op.node_name,
1439
      "NODE_NAME": self.op.node_name,
1440
      }
1441
    all_nodes = self.cfg.GetNodeList()
1442
    all_nodes.remove(self.op.node_name)
1443
    return env, all_nodes, all_nodes
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    This checks:
1449
     - the node exists in the configuration
1450
     - it does not have primary or secondary instances
1451
     - it's not the master
1452

1453
    Any errors are signalled by raising errors.OpPrereqError.
1454

1455
    """
1456
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1457
    if node is None:
1458
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1459

    
1460
    instance_list = self.cfg.GetInstanceList()
1461

    
1462
    masternode = self.sstore.GetMasterNode()
1463
    if node.name == masternode:
1464
      raise errors.OpPrereqError("Node is the master node,"
1465
                                 " you need to failover first.")
1466

    
1467
    for instance_name in instance_list:
1468
      instance = self.cfg.GetInstanceInfo(instance_name)
1469
      if node.name == instance.primary_node:
1470
        raise errors.OpPrereqError("Instance %s still running on the node,"
1471
                                   " please remove first." % instance_name)
1472
      if node.name in instance.secondary_nodes:
1473
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1474
                                   " please remove first." % instance_name)
1475
    self.op.node_name = node.name
1476
    self.node = node
1477

    
1478
  def Exec(self, feedback_fn):
1479
    """Removes the node from the cluster.
1480

1481
    """
1482
    node = self.node
1483
    logger.Info("stopping the node daemon and removing configs from node %s" %
1484
                node.name)
1485

    
1486
    rpc.call_node_leave_cluster(node.name)
1487

    
1488
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1489

    
1490
    logger.Info("Removing node %s from config" % node.name)
1491

    
1492
    self.cfg.RemoveNode(node.name)
1493

    
1494
    _RemoveHostFromEtcHosts(node.name)
1495

    
1496

    
1497
class LUQueryNodes(NoHooksLU):
1498
  """Logical unit for querying nodes.
1499

1500
  """
1501
  _OP_REQP = ["output_fields", "names"]
1502

    
1503
  def CheckPrereq(self):
1504
    """Check prerequisites.
1505

1506
    This checks that the fields required are valid output fields.
1507

1508
    """
1509
    self.dynamic_fields = frozenset([
1510
      "dtotal", "dfree",
1511
      "mtotal", "mnode", "mfree",
1512
      "bootid",
1513
      "ctotal",
1514
      ])
1515

    
1516
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1517
                               "pinst_list", "sinst_list",
1518
                               "pip", "sip"],
1519
                       dynamic=self.dynamic_fields,
1520
                       selected=self.op.output_fields)
1521

    
1522
    self.wanted = _GetWantedNodes(self, self.op.names)
1523

    
1524
  def Exec(self, feedback_fn):
1525
    """Computes the list of nodes and their attributes.
1526

1527
    """
1528
    nodenames = self.wanted
1529
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1530

    
1531
    # begin data gathering
1532

    
1533
    if self.dynamic_fields.intersection(self.op.output_fields):
1534
      live_data = {}
1535
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1536
      for name in nodenames:
1537
        nodeinfo = node_data.get(name, None)
1538
        if nodeinfo:
1539
          live_data[name] = {
1540
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1541
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1542
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1543
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1544
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1545
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1546
            "bootid": nodeinfo['bootid'],
1547
            }
1548
        else:
1549
          live_data[name] = {}
1550
    else:
1551
      live_data = dict.fromkeys(nodenames, {})
1552

    
1553
    node_to_primary = dict([(name, set()) for name in nodenames])
1554
    node_to_secondary = dict([(name, set()) for name in nodenames])
1555

    
1556
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1557
                             "sinst_cnt", "sinst_list"))
1558
    if inst_fields & frozenset(self.op.output_fields):
1559
      instancelist = self.cfg.GetInstanceList()
1560

    
1561
      for instance_name in instancelist:
1562
        inst = self.cfg.GetInstanceInfo(instance_name)
1563
        if inst.primary_node in node_to_primary:
1564
          node_to_primary[inst.primary_node].add(inst.name)
1565
        for secnode in inst.secondary_nodes:
1566
          if secnode in node_to_secondary:
1567
            node_to_secondary[secnode].add(inst.name)
1568

    
1569
    # end data gathering
1570

    
1571
    output = []
1572
    for node in nodelist:
1573
      node_output = []
1574
      for field in self.op.output_fields:
1575
        if field == "name":
1576
          val = node.name
1577
        elif field == "pinst_list":
1578
          val = list(node_to_primary[node.name])
1579
        elif field == "sinst_list":
1580
          val = list(node_to_secondary[node.name])
1581
        elif field == "pinst_cnt":
1582
          val = len(node_to_primary[node.name])
1583
        elif field == "sinst_cnt":
1584
          val = len(node_to_secondary[node.name])
1585
        elif field == "pip":
1586
          val = node.primary_ip
1587
        elif field == "sip":
1588
          val = node.secondary_ip
1589
        elif field in self.dynamic_fields:
1590
          val = live_data[node.name].get(field, None)
1591
        else:
1592
          raise errors.ParameterError(field)
1593
        node_output.append(val)
1594
      output.append(node_output)
1595

    
1596
    return output
1597

    
1598

    
1599
class LUQueryNodeVolumes(NoHooksLU):
1600
  """Logical unit for getting volumes on node(s).
1601

1602
  """
1603
  _OP_REQP = ["nodes", "output_fields"]
1604

    
1605
  def CheckPrereq(self):
1606
    """Check prerequisites.
1607

1608
    This checks that the fields required are valid output fields.
1609

1610
    """
1611
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1612

    
1613
    _CheckOutputFields(static=["node"],
1614
                       dynamic=["phys", "vg", "name", "size", "instance"],
1615
                       selected=self.op.output_fields)
1616

    
1617

    
1618
  def Exec(self, feedback_fn):
1619
    """Computes the list of nodes and their attributes.
1620

1621
    """
1622
    nodenames = self.nodes
1623
    volumes = rpc.call_node_volumes(nodenames)
1624

    
1625
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1626
             in self.cfg.GetInstanceList()]
1627

    
1628
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1629

    
1630
    output = []
1631
    for node in nodenames:
1632
      if node not in volumes or not volumes[node]:
1633
        continue
1634

    
1635
      node_vols = volumes[node][:]
1636
      node_vols.sort(key=lambda vol: vol['dev'])
1637

    
1638
      for vol in node_vols:
1639
        node_output = []
1640
        for field in self.op.output_fields:
1641
          if field == "node":
1642
            val = node
1643
          elif field == "phys":
1644
            val = vol['dev']
1645
          elif field == "vg":
1646
            val = vol['vg']
1647
          elif field == "name":
1648
            val = vol['name']
1649
          elif field == "size":
1650
            val = int(float(vol['size']))
1651
          elif field == "instance":
1652
            for inst in ilist:
1653
              if node not in lv_by_node[inst]:
1654
                continue
1655
              if vol['name'] in lv_by_node[inst][node]:
1656
                val = inst.name
1657
                break
1658
            else:
1659
              val = '-'
1660
          else:
1661
            raise errors.ParameterError(field)
1662
          node_output.append(str(val))
1663

    
1664
        output.append(node_output)
1665

    
1666
    return output
1667

    
1668

    
1669
class LUAddNode(LogicalUnit):
1670
  """Logical unit for adding node to the cluster.
1671

1672
  """
1673
  HPATH = "node-add"
1674
  HTYPE = constants.HTYPE_NODE
1675
  _OP_REQP = ["node_name"]
1676

    
1677
  def BuildHooksEnv(self):
1678
    """Build hooks env.
1679

1680
    This will run on all nodes before, and on all nodes + the new node after.
1681

1682
    """
1683
    env = {
1684
      "OP_TARGET": self.op.node_name,
1685
      "NODE_NAME": self.op.node_name,
1686
      "NODE_PIP": self.op.primary_ip,
1687
      "NODE_SIP": self.op.secondary_ip,
1688
      }
1689
    nodes_0 = self.cfg.GetNodeList()
1690
    nodes_1 = nodes_0 + [self.op.node_name, ]
1691
    return env, nodes_0, nodes_1
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    This checks:
1697
     - the new node is not already in the config
1698
     - it is resolvable
1699
     - its parameters (single/dual homed) matches the cluster
1700

1701
    Any errors are signalled by raising errors.OpPrereqError.
1702

1703
    """
1704
    node_name = self.op.node_name
1705
    cfg = self.cfg
1706

    
1707
    dns_data = utils.HostInfo(node_name)
1708

    
1709
    node = dns_data.name
1710
    primary_ip = self.op.primary_ip = dns_data.ip
1711
    secondary_ip = getattr(self.op, "secondary_ip", None)
1712
    if secondary_ip is None:
1713
      secondary_ip = primary_ip
1714
    if not utils.IsValidIP(secondary_ip):
1715
      raise errors.OpPrereqError("Invalid secondary IP given")
1716
    self.op.secondary_ip = secondary_ip
1717

    
1718
    node_list = cfg.GetNodeList()
1719
    if not self.op.readd and node in node_list:
1720
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1721
                                 node)
1722
    elif self.op.readd and node not in node_list:
1723
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1724

    
1725
    for existing_node_name in node_list:
1726
      existing_node = cfg.GetNodeInfo(existing_node_name)
1727

    
1728
      if self.op.readd and node == existing_node_name:
1729
        if (existing_node.primary_ip != primary_ip or
1730
            existing_node.secondary_ip != secondary_ip):
1731
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1732
                                     " address configuration as before")
1733
        continue
1734

    
1735
      if (existing_node.primary_ip == primary_ip or
1736
          existing_node.secondary_ip == primary_ip or
1737
          existing_node.primary_ip == secondary_ip or
1738
          existing_node.secondary_ip == secondary_ip):
1739
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1740
                                   " existing node %s" % existing_node.name)
1741

    
1742
    # check that the type of the node (single versus dual homed) is the
1743
    # same as for the master
1744
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1745
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1746
    newbie_singlehomed = secondary_ip == primary_ip
1747
    if master_singlehomed != newbie_singlehomed:
1748
      if master_singlehomed:
1749
        raise errors.OpPrereqError("The master has no private ip but the"
1750
                                   " new node has one")
1751
      else:
1752
        raise errors.OpPrereqError("The master has a private ip but the"
1753
                                   " new node doesn't have one")
1754

    
1755
    # checks reachablity
1756
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1757
      raise errors.OpPrereqError("Node not reachable by ping")
1758

    
1759
    if not newbie_singlehomed:
1760
      # check reachability from my secondary ip to newbie's secondary ip
1761
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1762
                           source=myself.secondary_ip):
1763
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1764
                                   " based ping to noded port")
1765

    
1766
    self.new_node = objects.Node(name=node,
1767
                                 primary_ip=primary_ip,
1768
                                 secondary_ip=secondary_ip)
1769

    
1770
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1771
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1772
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1773
                                   constants.VNC_PASSWORD_FILE)
1774

    
1775
  def Exec(self, feedback_fn):
1776
    """Adds the new node to the cluster.
1777

1778
    """
1779
    new_node = self.new_node
1780
    node = new_node.name
1781

    
1782
    # set up inter-node password and certificate and restarts the node daemon
1783
    gntpass = self.sstore.GetNodeDaemonPassword()
1784
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1785
      raise errors.OpExecError("ganeti password corruption detected")
1786
    f = open(constants.SSL_CERT_FILE)
1787
    try:
1788
      gntpem = f.read(8192)
1789
    finally:
1790
      f.close()
1791
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1792
    # so we use this to detect an invalid certificate; as long as the
1793
    # cert doesn't contain this, the here-document will be correctly
1794
    # parsed by the shell sequence below
1795
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1796
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1797
    if not gntpem.endswith("\n"):
1798
      raise errors.OpExecError("PEM must end with newline")
1799
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1800

    
1801
    # and then connect with ssh to set password and start ganeti-noded
1802
    # note that all the below variables are sanitized at this point,
1803
    # either by being constants or by the checks above
1804
    ss = self.sstore
1805
    mycommand = ("umask 077 && "
1806
                 "echo '%s' > '%s' && "
1807
                 "cat > '%s' << '!EOF.' && \n"
1808
                 "%s!EOF.\n%s restart" %
1809
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1810
                  constants.SSL_CERT_FILE, gntpem,
1811
                  constants.NODE_INITD_SCRIPT))
1812

    
1813
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1814
    if result.failed:
1815
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1816
                               " output: %s" %
1817
                               (node, result.fail_reason, result.output))
1818

    
1819
    # check connectivity
1820
    time.sleep(4)
1821

    
1822
    result = rpc.call_version([node])[node]
1823
    if result:
1824
      if constants.PROTOCOL_VERSION == result:
1825
        logger.Info("communication to node %s fine, sw version %s match" %
1826
                    (node, result))
1827
      else:
1828
        raise errors.OpExecError("Version mismatch master version %s,"
1829
                                 " node version %s" %
1830
                                 (constants.PROTOCOL_VERSION, result))
1831
    else:
1832
      raise errors.OpExecError("Cannot get version from the new node")
1833

    
1834
    # setup ssh on node
1835
    logger.Info("copy ssh key to node %s" % node)
1836
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1837
    keyarray = []
1838
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1839
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1840
                priv_key, pub_key]
1841

    
1842
    for i in keyfiles:
1843
      f = open(i, 'r')
1844
      try:
1845
        keyarray.append(f.read())
1846
      finally:
1847
        f.close()
1848

    
1849
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1850
                               keyarray[3], keyarray[4], keyarray[5])
1851

    
1852
    if not result:
1853
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1854

    
1855
    # Add node to our /etc/hosts, and add key to known_hosts
1856
    _AddHostToEtcHosts(new_node.name)
1857

    
1858
    if new_node.secondary_ip != new_node.primary_ip:
1859
      if not rpc.call_node_tcp_ping(new_node.name,
1860
                                    constants.LOCALHOST_IP_ADDRESS,
1861
                                    new_node.secondary_ip,
1862
                                    constants.DEFAULT_NODED_PORT,
1863
                                    10, False):
1864
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1865
                                 " you gave (%s). Please fix and re-run this"
1866
                                 " command." % new_node.secondary_ip)
1867

    
1868
    success, msg = self.ssh.VerifyNodeHostname(node)
1869
    if not success:
1870
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1871
                               " than the one the resolver gives: %s."
1872
                               " Please fix and re-run this command." %
1873
                               (node, msg))
1874

    
1875
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1876
    # including the node just added
1877
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1878
    dist_nodes = self.cfg.GetNodeList()
1879
    if not self.op.readd:
1880
      dist_nodes.append(node)
1881
    if myself.name in dist_nodes:
1882
      dist_nodes.remove(myself.name)
1883

    
1884
    logger.Debug("Copying hosts and known_hosts to all nodes")
1885
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1886
      result = rpc.call_upload_file(dist_nodes, fname)
1887
      for to_node in dist_nodes:
1888
        if not result[to_node]:
1889
          logger.Error("copy of file %s to node %s failed" %
1890
                       (fname, to_node))
1891

    
1892
    to_copy = ss.GetFileList()
1893
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1894
      to_copy.append(constants.VNC_PASSWORD_FILE)
1895
    for fname in to_copy:
1896
      if not self.ssh.CopyFileToNode(node, fname):
1897
        logger.Error("could not copy file %s to node %s" % (fname, node))
1898

    
1899
    if not self.op.readd:
1900
      logger.Info("adding node %s to cluster.conf" % node)
1901
      self.cfg.AddNode(new_node)
1902

    
1903

    
1904
class LUMasterFailover(LogicalUnit):
1905
  """Failover the master node to the current node.
1906

1907
  This is a special LU in that it must run on a non-master node.
1908

1909
  """
1910
  HPATH = "master-failover"
1911
  HTYPE = constants.HTYPE_CLUSTER
1912
  REQ_MASTER = False
1913
  _OP_REQP = []
1914

    
1915
  def BuildHooksEnv(self):
1916
    """Build hooks env.
1917

1918
    This will run on the new master only in the pre phase, and on all
1919
    the nodes in the post phase.
1920

1921
    """
1922
    env = {
1923
      "OP_TARGET": self.new_master,
1924
      "NEW_MASTER": self.new_master,
1925
      "OLD_MASTER": self.old_master,
1926
      }
1927
    return env, [self.new_master], self.cfg.GetNodeList()
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    This checks that we are not already the master.
1933

1934
    """
1935
    self.new_master = utils.HostInfo().name
1936
    self.old_master = self.sstore.GetMasterNode()
1937

    
1938
    if self.old_master == self.new_master:
1939
      raise errors.OpPrereqError("This commands must be run on the node"
1940
                                 " where you want the new master to be."
1941
                                 " %s is already the master" %
1942
                                 self.old_master)
1943

    
1944
  def Exec(self, feedback_fn):
1945
    """Failover the master node.
1946

1947
    This command, when run on a non-master node, will cause the current
1948
    master to cease being master, and the non-master to become new
1949
    master.
1950

1951
    """
1952
    #TODO: do not rely on gethostname returning the FQDN
1953
    logger.Info("setting master to %s, old master: %s" %
1954
                (self.new_master, self.old_master))
1955

    
1956
    if not rpc.call_node_stop_master(self.old_master):
1957
      logger.Error("could disable the master role on the old master"
1958
                   " %s, please disable manually" % self.old_master)
1959

    
1960
    ss = self.sstore
1961
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1962
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1963
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1964
      logger.Error("could not distribute the new simple store master file"
1965
                   " to the other nodes, please check.")
1966

    
1967
    if not rpc.call_node_start_master(self.new_master):
1968
      logger.Error("could not start the master role on the new master"
1969
                   " %s, please check" % self.new_master)
1970
      feedback_fn("Error in activating the master IP on the new master,"
1971
                  " please fix manually.")
1972

    
1973

    
1974

    
1975
class LUQueryClusterInfo(NoHooksLU):
1976
  """Query cluster configuration.
1977

1978
  """
1979
  _OP_REQP = []
1980
  REQ_MASTER = False
1981

    
1982
  def CheckPrereq(self):
1983
    """No prerequsites needed for this LU.
1984

1985
    """
1986
    pass
1987

    
1988
  def Exec(self, feedback_fn):
1989
    """Return cluster config.
1990

1991
    """
1992
    result = {
1993
      "name": self.sstore.GetClusterName(),
1994
      "software_version": constants.RELEASE_VERSION,
1995
      "protocol_version": constants.PROTOCOL_VERSION,
1996
      "config_version": constants.CONFIG_VERSION,
1997
      "os_api_version": constants.OS_API_VERSION,
1998
      "export_version": constants.EXPORT_VERSION,
1999
      "master": self.sstore.GetMasterNode(),
2000
      "architecture": (platform.architecture()[0], platform.machine()),
2001
      "hypervisor_type": self.sstore.GetHypervisorType(),
2002
      }
2003

    
2004
    return result
2005

    
2006

    
2007
class LUClusterCopyFile(NoHooksLU):
2008
  """Copy file to cluster.
2009

2010
  """
2011
  _OP_REQP = ["nodes", "filename"]
2012

    
2013
  def CheckPrereq(self):
2014
    """Check prerequisites.
2015

2016
    It should check that the named file exists and that the given list
2017
    of nodes is valid.
2018

2019
    """
2020
    if not os.path.exists(self.op.filename):
2021
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2022

    
2023
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2024

    
2025
  def Exec(self, feedback_fn):
2026
    """Copy a file from master to some nodes.
2027

2028
    Args:
2029
      opts - class with options as members
2030
      args - list containing a single element, the file name
2031
    Opts used:
2032
      nodes - list containing the name of target nodes; if empty, all nodes
2033

2034
    """
2035
    filename = self.op.filename
2036

    
2037
    myname = utils.HostInfo().name
2038

    
2039
    for node in self.nodes:
2040
      if node == myname:
2041
        continue
2042
      if not self.ssh.CopyFileToNode(node, filename):
2043
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2044

    
2045

    
2046
class LUDumpClusterConfig(NoHooksLU):
2047
  """Return a text-representation of the cluster-config.
2048

2049
  """
2050
  _OP_REQP = []
2051

    
2052
  def CheckPrereq(self):
2053
    """No prerequisites.
2054

2055
    """
2056
    pass
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Dump a representation of the cluster config to the standard output.
2060

2061
    """
2062
    return self.cfg.DumpConfig()
2063

    
2064

    
2065
class LURunClusterCommand(NoHooksLU):
2066
  """Run a command on some nodes.
2067

2068
  """
2069
  _OP_REQP = ["command", "nodes"]
2070

    
2071
  def CheckPrereq(self):
2072
    """Check prerequisites.
2073

2074
    It checks that the given list of nodes is valid.
2075

2076
    """
2077
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2078

    
2079
  def Exec(self, feedback_fn):
2080
    """Run a command on some nodes.
2081

2082
    """
2083
    # put the master at the end of the nodes list
2084
    master_node = self.sstore.GetMasterNode()
2085
    if master_node in self.nodes:
2086
      self.nodes.remove(master_node)
2087
      self.nodes.append(master_node)
2088

    
2089
    data = []
2090
    for node in self.nodes:
2091
      result = self.ssh.Run(node, "root", self.op.command)
2092
      data.append((node, result.output, result.exit_code))
2093

    
2094
    return data
2095

    
2096

    
2097
class LUActivateInstanceDisks(NoHooksLU):
2098
  """Bring up an instance's disks.
2099

2100
  """
2101
  _OP_REQP = ["instance_name"]
2102

    
2103
  def CheckPrereq(self):
2104
    """Check prerequisites.
2105

2106
    This checks that the instance is in the cluster.
2107

2108
    """
2109
    instance = self.cfg.GetInstanceInfo(
2110
      self.cfg.ExpandInstanceName(self.op.instance_name))
2111
    if instance is None:
2112
      raise errors.OpPrereqError("Instance '%s' not known" %
2113
                                 self.op.instance_name)
2114
    self.instance = instance
2115

    
2116

    
2117
  def Exec(self, feedback_fn):
2118
    """Activate the disks.
2119

2120
    """
2121
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2122
    if not disks_ok:
2123
      raise errors.OpExecError("Cannot activate block devices")
2124

    
2125
    return disks_info
2126

    
2127

    
2128
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2129
  """Prepare the block devices for an instance.
2130

2131
  This sets up the block devices on all nodes.
2132

2133
  Args:
2134
    instance: a ganeti.objects.Instance object
2135
    ignore_secondaries: if true, errors on secondary nodes won't result
2136
                        in an error return from the function
2137

2138
  Returns:
2139
    false if the operation failed
2140
    list of (host, instance_visible_name, node_visible_name) if the operation
2141
         suceeded with the mapping from node devices to instance devices
2142
  """
2143
  device_info = []
2144
  disks_ok = True
2145
  iname = instance.name
2146
  # With the two passes mechanism we try to reduce the window of
2147
  # opportunity for the race condition of switching DRBD to primary
2148
  # before handshaking occured, but we do not eliminate it
2149

    
2150
  # The proper fix would be to wait (with some limits) until the
2151
  # connection has been made and drbd transitions from WFConnection
2152
  # into any other network-connected state (Connected, SyncTarget,
2153
  # SyncSource, etc.)
2154

    
2155
  # 1st pass, assemble on all nodes in secondary mode
2156
  for inst_disk in instance.disks:
2157
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2158
      cfg.SetDiskID(node_disk, node)
2159
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2160
      if not result:
2161
        logger.Error("could not prepare block device %s on node %s"
2162
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2163
        if not ignore_secondaries:
2164
          disks_ok = False
2165

    
2166
  # FIXME: race condition on drbd migration to primary
2167

    
2168
  # 2nd pass, do only the primary node
2169
  for inst_disk in instance.disks:
2170
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2171
      if node != instance.primary_node:
2172
        continue
2173
      cfg.SetDiskID(node_disk, node)
2174
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2175
      if not result:
2176
        logger.Error("could not prepare block device %s on node %s"
2177
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2178
        disks_ok = False
2179
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2180

    
2181
  # leave the disks configured for the primary node
2182
  # this is a workaround that would be fixed better by
2183
  # improving the logical/physical id handling
2184
  for disk in instance.disks:
2185
    cfg.SetDiskID(disk, instance.primary_node)
2186

    
2187
  return disks_ok, device_info
2188

    
2189

    
2190
def _StartInstanceDisks(cfg, instance, force):
2191
  """Start the disks of an instance.
2192

2193
  """
2194
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2195
                                           ignore_secondaries=force)
2196
  if not disks_ok:
2197
    _ShutdownInstanceDisks(instance, cfg)
2198
    if force is not None and not force:
2199
      logger.Error("If the message above refers to a secondary node,"
2200
                   " you can retry the operation using '--force'.")
2201
    raise errors.OpExecError("Disk consistency error")
2202

    
2203

    
2204
class LUDeactivateInstanceDisks(NoHooksLU):
2205
  """Shutdown an instance's disks.
2206

2207
  """
2208
  _OP_REQP = ["instance_name"]
2209

    
2210
  def CheckPrereq(self):
2211
    """Check prerequisites.
2212

2213
    This checks that the instance is in the cluster.
2214

2215
    """
2216
    instance = self.cfg.GetInstanceInfo(
2217
      self.cfg.ExpandInstanceName(self.op.instance_name))
2218
    if instance is None:
2219
      raise errors.OpPrereqError("Instance '%s' not known" %
2220
                                 self.op.instance_name)
2221
    self.instance = instance
2222

    
2223
  def Exec(self, feedback_fn):
2224
    """Deactivate the disks
2225

2226
    """
2227
    instance = self.instance
2228
    ins_l = rpc.call_instance_list([instance.primary_node])
2229
    ins_l = ins_l[instance.primary_node]
2230
    if not type(ins_l) is list:
2231
      raise errors.OpExecError("Can't contact node '%s'" %
2232
                               instance.primary_node)
2233

    
2234
    if self.instance.name in ins_l:
2235
      raise errors.OpExecError("Instance is running, can't shutdown"
2236
                               " block devices.")
2237

    
2238
    _ShutdownInstanceDisks(instance, self.cfg)
2239

    
2240

    
2241
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2242
  """Shutdown block devices of an instance.
2243

2244
  This does the shutdown on all nodes of the instance.
2245

2246
  If the ignore_primary is false, errors on the primary node are
2247
  ignored.
2248

2249
  """
2250
  result = True
2251
  for disk in instance.disks:
2252
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2253
      cfg.SetDiskID(top_disk, node)
2254
      if not rpc.call_blockdev_shutdown(node, top_disk):
2255
        logger.Error("could not shutdown block device %s on node %s" %
2256
                     (disk.iv_name, node))
2257
        if not ignore_primary or node != instance.primary_node:
2258
          result = False
2259
  return result
2260

    
2261

    
2262
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2263
  """Checks if a node has enough free memory.
2264

2265
  This function check if a given node has the needed amount of free
2266
  memory. In case the node has less memory or we cannot get the
2267
  information from the node, this function raise an OpPrereqError
2268
  exception.
2269

2270
  Args:
2271
    - cfg: a ConfigWriter instance
2272
    - node: the node name
2273
    - reason: string to use in the error message
2274
    - requested: the amount of memory in MiB
2275

2276
  """
2277
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2278
  if not nodeinfo or not isinstance(nodeinfo, dict):
2279
    raise errors.OpPrereqError("Could not contact node %s for resource"
2280
                             " information" % (node,))
2281

    
2282
  free_mem = nodeinfo[node].get('memory_free')
2283
  if not isinstance(free_mem, int):
2284
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2285
                             " was '%s'" % (node, free_mem))
2286
  if requested > free_mem:
2287
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2288
                             " needed %s MiB, available %s MiB" %
2289
                             (node, reason, requested, free_mem))
2290

    
2291

    
2292
class LUStartupInstance(LogicalUnit):
2293
  """Starts an instance.
2294

2295
  """
2296
  HPATH = "instance-start"
2297
  HTYPE = constants.HTYPE_INSTANCE
2298
  _OP_REQP = ["instance_name", "force"]
2299

    
2300
  def BuildHooksEnv(self):
2301
    """Build hooks env.
2302

2303
    This runs on master, primary and secondary nodes of the instance.
2304

2305
    """
2306
    env = {
2307
      "FORCE": self.op.force,
2308
      }
2309
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2310
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2311
          list(self.instance.secondary_nodes))
2312
    return env, nl, nl
2313

    
2314
  def CheckPrereq(self):
2315
    """Check prerequisites.
2316

2317
    This checks that the instance is in the cluster.
2318

2319
    """
2320
    instance = self.cfg.GetInstanceInfo(
2321
      self.cfg.ExpandInstanceName(self.op.instance_name))
2322
    if instance is None:
2323
      raise errors.OpPrereqError("Instance '%s' not known" %
2324
                                 self.op.instance_name)
2325

    
2326
    # check bridges existance
2327
    _CheckInstanceBridgesExist(instance)
2328

    
2329
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2330
                         "starting instance %s" % instance.name,
2331
                         instance.memory)
2332

    
2333
    self.instance = instance
2334
    self.op.instance_name = instance.name
2335

    
2336
  def Exec(self, feedback_fn):
2337
    """Start the instance.
2338

2339
    """
2340
    instance = self.instance
2341
    force = self.op.force
2342
    extra_args = getattr(self.op, "extra_args", "")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346
    node_current = instance.primary_node
2347

    
2348
    _StartInstanceDisks(self.cfg, instance, force)
2349

    
2350
    if not rpc.call_instance_start(node_current, instance, extra_args):
2351
      _ShutdownInstanceDisks(instance, self.cfg)
2352
      raise errors.OpExecError("Could not start instance")
2353

    
2354

    
2355
class LURebootInstance(LogicalUnit):
2356
  """Reboot an instance.
2357

2358
  """
2359
  HPATH = "instance-reboot"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = {
2370
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2371
      }
2372
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2373
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2374
          list(self.instance.secondary_nodes))
2375
    return env, nl, nl
2376

    
2377
  def CheckPrereq(self):
2378
    """Check prerequisites.
2379

2380
    This checks that the instance is in the cluster.
2381

2382
    """
2383
    instance = self.cfg.GetInstanceInfo(
2384
      self.cfg.ExpandInstanceName(self.op.instance_name))
2385
    if instance is None:
2386
      raise errors.OpPrereqError("Instance '%s' not known" %
2387
                                 self.op.instance_name)
2388

    
2389
    # check bridges existance
2390
    _CheckInstanceBridgesExist(instance)
2391

    
2392
    self.instance = instance
2393
    self.op.instance_name = instance.name
2394

    
2395
  def Exec(self, feedback_fn):
2396
    """Reboot the instance.
2397

2398
    """
2399
    instance = self.instance
2400
    ignore_secondaries = self.op.ignore_secondaries
2401
    reboot_type = self.op.reboot_type
2402
    extra_args = getattr(self.op, "extra_args", "")
2403

    
2404
    node_current = instance.primary_node
2405

    
2406
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2407
                           constants.INSTANCE_REBOOT_HARD,
2408
                           constants.INSTANCE_REBOOT_FULL]:
2409
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2410
                                  (constants.INSTANCE_REBOOT_SOFT,
2411
                                   constants.INSTANCE_REBOOT_HARD,
2412
                                   constants.INSTANCE_REBOOT_FULL))
2413

    
2414
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2415
                       constants.INSTANCE_REBOOT_HARD]:
2416
      if not rpc.call_instance_reboot(node_current, instance,
2417
                                      reboot_type, extra_args):
2418
        raise errors.OpExecError("Could not reboot instance")
2419
    else:
2420
      if not rpc.call_instance_shutdown(node_current, instance):
2421
        raise errors.OpExecError("could not shutdown instance for full reboot")
2422
      _ShutdownInstanceDisks(instance, self.cfg)
2423
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2424
      if not rpc.call_instance_start(node_current, instance, extra_args):
2425
        _ShutdownInstanceDisks(instance, self.cfg)
2426
        raise errors.OpExecError("Could not start instance for full reboot")
2427

    
2428
    self.cfg.MarkInstanceUp(instance.name)
2429

    
2430

    
2431
class LUShutdownInstance(LogicalUnit):
2432
  """Shutdown an instance.
2433

2434
  """
2435
  HPATH = "instance-stop"
2436
  HTYPE = constants.HTYPE_INSTANCE
2437
  _OP_REQP = ["instance_name"]
2438

    
2439
  def BuildHooksEnv(self):
2440
    """Build hooks env.
2441

2442
    This runs on master, primary and secondary nodes of the instance.
2443

2444
    """
2445
    env = _BuildInstanceHookEnvByObject(self.instance)
2446
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2447
          list(self.instance.secondary_nodes))
2448
    return env, nl, nl
2449

    
2450
  def CheckPrereq(self):
2451
    """Check prerequisites.
2452

2453
    This checks that the instance is in the cluster.
2454

2455
    """
2456
    instance = self.cfg.GetInstanceInfo(
2457
      self.cfg.ExpandInstanceName(self.op.instance_name))
2458
    if instance is None:
2459
      raise errors.OpPrereqError("Instance '%s' not known" %
2460
                                 self.op.instance_name)
2461
    self.instance = instance
2462

    
2463
  def Exec(self, feedback_fn):
2464
    """Shutdown the instance.
2465

2466
    """
2467
    instance = self.instance
2468
    node_current = instance.primary_node
2469
    self.cfg.MarkInstanceDown(instance.name)
2470
    if not rpc.call_instance_shutdown(node_current, instance):
2471
      logger.Error("could not shutdown instance")
2472

    
2473
    _ShutdownInstanceDisks(instance, self.cfg)
2474

    
2475

    
2476
class LUReinstallInstance(LogicalUnit):
2477
  """Reinstall an instance.
2478

2479
  """
2480
  HPATH = "instance-reinstall"
2481
  HTYPE = constants.HTYPE_INSTANCE
2482
  _OP_REQP = ["instance_name"]
2483

    
2484
  def BuildHooksEnv(self):
2485
    """Build hooks env.
2486

2487
    This runs on master, primary and secondary nodes of the instance.
2488

2489
    """
2490
    env = _BuildInstanceHookEnvByObject(self.instance)
2491
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2492
          list(self.instance.secondary_nodes))
2493
    return env, nl, nl
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster and is not running.
2499

2500
    """
2501
    instance = self.cfg.GetInstanceInfo(
2502
      self.cfg.ExpandInstanceName(self.op.instance_name))
2503
    if instance is None:
2504
      raise errors.OpPrereqError("Instance '%s' not known" %
2505
                                 self.op.instance_name)
2506
    if instance.disk_template == constants.DT_DISKLESS:
2507
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2508
                                 self.op.instance_name)
2509
    if instance.status != "down":
2510
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2511
                                 self.op.instance_name)
2512
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2513
    if remote_info:
2514
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2515
                                 (self.op.instance_name,
2516
                                  instance.primary_node))
2517

    
2518
    self.op.os_type = getattr(self.op, "os_type", None)
2519
    if self.op.os_type is not None:
2520
      # OS verification
2521
      pnode = self.cfg.GetNodeInfo(
2522
        self.cfg.ExpandNodeName(instance.primary_node))
2523
      if pnode is None:
2524
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2525
                                   self.op.pnode)
2526
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2527
      if not os_obj:
2528
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2529
                                   " primary node"  % self.op.os_type)
2530

    
2531
    self.instance = instance
2532

    
2533
  def Exec(self, feedback_fn):
2534
    """Reinstall the instance.
2535

2536
    """
2537
    inst = self.instance
2538

    
2539
    if self.op.os_type is not None:
2540
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2541
      inst.os = self.op.os_type
2542
      self.cfg.AddInstance(inst)
2543

    
2544
    _StartInstanceDisks(self.cfg, inst, None)
2545
    try:
2546
      feedback_fn("Running the instance OS create scripts...")
2547
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2548
        raise errors.OpExecError("Could not install OS for instance %s"
2549
                                 " on node %s" %
2550
                                 (inst.name, inst.primary_node))
2551
    finally:
2552
      _ShutdownInstanceDisks(inst, self.cfg)
2553

    
2554

    
2555
class LURenameInstance(LogicalUnit):
2556
  """Rename an instance.
2557

2558
  """
2559
  HPATH = "instance-rename"
2560
  HTYPE = constants.HTYPE_INSTANCE
2561
  _OP_REQP = ["instance_name", "new_name"]
2562

    
2563
  def BuildHooksEnv(self):
2564
    """Build hooks env.
2565

2566
    This runs on master, primary and secondary nodes of the instance.
2567

2568
    """
2569
    env = _BuildInstanceHookEnvByObject(self.instance)
2570
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2571
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2572
          list(self.instance.secondary_nodes))
2573
    return env, nl, nl
2574

    
2575
  def CheckPrereq(self):
2576
    """Check prerequisites.
2577

2578
    This checks that the instance is in the cluster and is not running.
2579

2580
    """
2581
    instance = self.cfg.GetInstanceInfo(
2582
      self.cfg.ExpandInstanceName(self.op.instance_name))
2583
    if instance is None:
2584
      raise errors.OpPrereqError("Instance '%s' not known" %
2585
                                 self.op.instance_name)
2586
    if instance.status != "down":
2587
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2588
                                 self.op.instance_name)
2589
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2590
    if remote_info:
2591
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2592
                                 (self.op.instance_name,
2593
                                  instance.primary_node))
2594
    self.instance = instance
2595

    
2596
    # new name verification
2597
    name_info = utils.HostInfo(self.op.new_name)
2598

    
2599
    self.op.new_name = new_name = name_info.name
2600
    instance_list = self.cfg.GetInstanceList()
2601
    if new_name in instance_list:
2602
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2603
                                 new_name)
2604

    
2605
    if not getattr(self.op, "ignore_ip", False):
2606
      command = ["fping", "-q", name_info.ip]
2607
      result = utils.RunCmd(command)
2608
      if not result.failed:
2609
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2610
                                   (name_info.ip, new_name))
2611

    
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Reinstall the instance.
2615

2616
    """
2617
    inst = self.instance
2618
    old_name = inst.name
2619

    
2620
    if inst.disk_template == constants.DT_FILE:
2621
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2622

    
2623
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2624

    
2625
    # re-read the instance from the configuration after rename
2626
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2627

    
2628
    if inst.disk_template == constants.DT_FILE:
2629
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2630
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2631
                                                old_file_storage_dir,
2632
                                                new_file_storage_dir)
2633

    
2634
      if not result:
2635
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2636
                                 " directory '%s' to '%s' (but the instance"
2637
                                 " has been renamed in Ganeti)" % (
2638
                                 inst.primary_node, old_file_storage_dir,
2639
                                 new_file_storage_dir))
2640

    
2641
      if not result[0]:
2642
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2643
                                 " (but the instance has been renamed in"
2644
                                 " Ganeti)" % (old_file_storage_dir,
2645
                                               new_file_storage_dir))
2646

    
2647
    _StartInstanceDisks(self.cfg, inst, None)
2648
    try:
2649
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2650
                                          "sda", "sdb"):
2651
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2652
               " instance has been renamed in Ganeti)" %
2653
               (inst.name, inst.primary_node))
2654
        logger.Error(msg)
2655
    finally:
2656
      _ShutdownInstanceDisks(inst, self.cfg)
2657

    
2658

    
2659
class LURemoveInstance(LogicalUnit):
2660
  """Remove an instance.
2661

2662
  """
2663
  HPATH = "instance-remove"
2664
  HTYPE = constants.HTYPE_INSTANCE
2665
  _OP_REQP = ["instance_name", "ignore_failures"]
2666

    
2667
  def BuildHooksEnv(self):
2668
    """Build hooks env.
2669

2670
    This runs on master, primary and secondary nodes of the instance.
2671

2672
    """
2673
    env = _BuildInstanceHookEnvByObject(self.instance)
2674
    nl = [self.sstore.GetMasterNode()]
2675
    return env, nl, nl
2676

    
2677
  def CheckPrereq(self):
2678
    """Check prerequisites.
2679

2680
    This checks that the instance is in the cluster.
2681

2682
    """
2683
    instance = self.cfg.GetInstanceInfo(
2684
      self.cfg.ExpandInstanceName(self.op.instance_name))
2685
    if instance is None:
2686
      raise errors.OpPrereqError("Instance '%s' not known" %
2687
                                 self.op.instance_name)
2688
    self.instance = instance
2689

    
2690
  def Exec(self, feedback_fn):
2691
    """Remove the instance.
2692

2693
    """
2694
    instance = self.instance
2695
    logger.Info("shutting down instance %s on node %s" %
2696
                (instance.name, instance.primary_node))
2697

    
2698
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2699
      if self.op.ignore_failures:
2700
        feedback_fn("Warning: can't shutdown instance")
2701
      else:
2702
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2703
                                 (instance.name, instance.primary_node))
2704

    
2705
    logger.Info("removing block devices for instance %s" % instance.name)
2706

    
2707
    if not _RemoveDisks(instance, self.cfg):
2708
      if self.op.ignore_failures:
2709
        feedback_fn("Warning: can't remove instance's disks")
2710
      else:
2711
        raise errors.OpExecError("Can't remove instance's disks")
2712

    
2713
    logger.Info("removing instance %s out of cluster config" % instance.name)
2714

    
2715
    self.cfg.RemoveInstance(instance.name)
2716

    
2717

    
2718
class LUQueryInstances(NoHooksLU):
2719
  """Logical unit for querying instances.
2720

2721
  """
2722
  _OP_REQP = ["output_fields", "names"]
2723

    
2724
  def CheckPrereq(self):
2725
    """Check prerequisites.
2726

2727
    This checks that the fields required are valid output fields.
2728

2729
    """
2730
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2731
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2732
                               "admin_state", "admin_ram",
2733
                               "disk_template", "ip", "mac", "bridge",
2734
                               "sda_size", "sdb_size", "vcpus"],
2735
                       dynamic=self.dynamic_fields,
2736
                       selected=self.op.output_fields)
2737

    
2738
    self.wanted = _GetWantedInstances(self, self.op.names)
2739

    
2740
  def Exec(self, feedback_fn):
2741
    """Computes the list of nodes and their attributes.
2742

2743
    """
2744
    instance_names = self.wanted
2745
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2746
                     in instance_names]
2747

    
2748
    # begin data gathering
2749

    
2750
    nodes = frozenset([inst.primary_node for inst in instance_list])
2751

    
2752
    bad_nodes = []
2753
    if self.dynamic_fields.intersection(self.op.output_fields):
2754
      live_data = {}
2755
      node_data = rpc.call_all_instances_info(nodes)
2756
      for name in nodes:
2757
        result = node_data[name]
2758
        if result:
2759
          live_data.update(result)
2760
        elif result == False:
2761
          bad_nodes.append(name)
2762
        # else no instance is alive
2763
    else:
2764
      live_data = dict([(name, {}) for name in instance_names])
2765

    
2766
    # end data gathering
2767

    
2768
    output = []
2769
    for instance in instance_list:
2770
      iout = []
2771
      for field in self.op.output_fields:
2772
        if field == "name":
2773
          val = instance.name
2774
        elif field == "os":
2775
          val = instance.os
2776
        elif field == "pnode":
2777
          val = instance.primary_node
2778
        elif field == "snodes":
2779
          val = list(instance.secondary_nodes)
2780
        elif field == "admin_state":
2781
          val = (instance.status != "down")
2782
        elif field == "oper_state":
2783
          if instance.primary_node in bad_nodes:
2784
            val = None
2785
          else:
2786
            val = bool(live_data.get(instance.name))
2787
        elif field == "status":
2788
          if instance.primary_node in bad_nodes:
2789
            val = "ERROR_nodedown"
2790
          else:
2791
            running = bool(live_data.get(instance.name))
2792
            if running:
2793
              if instance.status != "down":
2794
                val = "running"
2795
              else:
2796
                val = "ERROR_up"
2797
            else:
2798
              if instance.status != "down":
2799
                val = "ERROR_down"
2800
              else:
2801
                val = "ADMIN_down"
2802
        elif field == "admin_ram":
2803
          val = instance.memory
2804
        elif field == "oper_ram":
2805
          if instance.primary_node in bad_nodes:
2806
            val = None
2807
          elif instance.name in live_data:
2808
            val = live_data[instance.name].get("memory", "?")
2809
          else:
2810
            val = "-"
2811
        elif field == "disk_template":
2812
          val = instance.disk_template
2813
        elif field == "ip":
2814
          val = instance.nics[0].ip
2815
        elif field == "bridge":
2816
          val = instance.nics[0].bridge
2817
        elif field == "mac":
2818
          val = instance.nics[0].mac
2819
        elif field == "sda_size" or field == "sdb_size":
2820
          disk = instance.FindDisk(field[:3])
2821
          if disk is None:
2822
            val = None
2823
          else:
2824
            val = disk.size
2825
        elif field == "vcpus":
2826
          val = instance.vcpus
2827
        else:
2828
          raise errors.ParameterError(field)
2829
        iout.append(val)
2830
      output.append(iout)
2831

    
2832
    return output
2833

    
2834

    
2835
class LUFailoverInstance(LogicalUnit):
2836
  """Failover an instance.
2837

2838
  """
2839
  HPATH = "instance-failover"
2840
  HTYPE = constants.HTYPE_INSTANCE
2841
  _OP_REQP = ["instance_name", "ignore_consistency"]
2842

    
2843
  def BuildHooksEnv(self):
2844
    """Build hooks env.
2845

2846
    This runs on master, primary and secondary nodes of the instance.
2847

2848
    """
2849
    env = {
2850
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2851
      }
2852
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2853
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2854
    return env, nl, nl
2855

    
2856
  def CheckPrereq(self):
2857
    """Check prerequisites.
2858

2859
    This checks that the instance is in the cluster.
2860

2861
    """
2862
    instance = self.cfg.GetInstanceInfo(
2863
      self.cfg.ExpandInstanceName(self.op.instance_name))
2864
    if instance is None:
2865
      raise errors.OpPrereqError("Instance '%s' not known" %
2866
                                 self.op.instance_name)
2867

    
2868
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2869
      raise errors.OpPrereqError("Instance's disk layout is not"
2870
                                 " network mirrored, cannot failover.")
2871

    
2872
    secondary_nodes = instance.secondary_nodes
2873
    if not secondary_nodes:
2874
      raise errors.ProgrammerError("no secondary node but using "
2875
                                   "a mirrored disk template")
2876

    
2877
    target_node = secondary_nodes[0]
2878
    # check memory requirements on the secondary node
2879
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2880
                         instance.name, instance.memory)
2881

    
2882
    # check bridge existance
2883
    brlist = [nic.bridge for nic in instance.nics]
2884
    if not rpc.call_bridges_exist(target_node, brlist):
2885
      raise errors.OpPrereqError("One or more target bridges %s does not"
2886
                                 " exist on destination node '%s'" %
2887
                                 (brlist, target_node))
2888

    
2889
    self.instance = instance
2890

    
2891
  def Exec(self, feedback_fn):
2892
    """Failover an instance.
2893

2894
    The failover is done by shutting it down on its present node and
2895
    starting it on the secondary.
2896

2897
    """
2898
    instance = self.instance
2899

    
2900
    source_node = instance.primary_node
2901
    target_node = instance.secondary_nodes[0]
2902

    
2903
    feedback_fn("* checking disk consistency between source and target")
2904
    for dev in instance.disks:
2905
      # for drbd, these are drbd over lvm
2906
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2907
        if instance.status == "up" and not self.op.ignore_consistency:
2908
          raise errors.OpExecError("Disk %s is degraded on target node,"
2909
                                   " aborting failover." % dev.iv_name)
2910

    
2911
    feedback_fn("* shutting down instance on source node")
2912
    logger.Info("Shutting down instance %s on node %s" %
2913
                (instance.name, source_node))
2914

    
2915
    if not rpc.call_instance_shutdown(source_node, instance):
2916
      if self.op.ignore_consistency:
2917
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2918
                     " anyway. Please make sure node %s is down"  %
2919
                     (instance.name, source_node, source_node))
2920
      else:
2921
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2922
                                 (instance.name, source_node))
2923

    
2924
    feedback_fn("* deactivating the instance's disks on source node")
2925
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2926
      raise errors.OpExecError("Can't shut down the instance's disks.")
2927

    
2928
    instance.primary_node = target_node
2929
    # distribute new instance config to the other nodes
2930
    self.cfg.AddInstance(instance)
2931

    
2932
    # Only start the instance if it's marked as up
2933
    if instance.status == "up":
2934
      feedback_fn("* activating the instance's disks on target node")
2935
      logger.Info("Starting instance %s on node %s" %
2936
                  (instance.name, target_node))
2937

    
2938
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2939
                                               ignore_secondaries=True)
2940
      if not disks_ok:
2941
        _ShutdownInstanceDisks(instance, self.cfg)
2942
        raise errors.OpExecError("Can't activate the instance's disks")
2943

    
2944
      feedback_fn("* starting the instance on the target node")
2945
      if not rpc.call_instance_start(target_node, instance, None):
2946
        _ShutdownInstanceDisks(instance, self.cfg)
2947
        raise errors.OpExecError("Could not start instance %s on node %s." %
2948
                                 (instance.name, target_node))
2949

    
2950

    
2951
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2952
  """Create a tree of block devices on the primary node.
2953

2954
  This always creates all devices.
2955

2956
  """
2957
  if device.children:
2958
    for child in device.children:
2959
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2960
        return False
2961

    
2962
  cfg.SetDiskID(device, node)
2963
  new_id = rpc.call_blockdev_create(node, device, device.size,
2964
                                    instance.name, True, info)
2965
  if not new_id:
2966
    return False
2967
  if device.physical_id is None:
2968
    device.physical_id = new_id
2969
  return True
2970

    
2971

    
2972
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2973
  """Create a tree of block devices on a secondary node.
2974

2975
  If this device type has to be created on secondaries, create it and
2976
  all its children.
2977

2978
  If not, just recurse to children keeping the same 'force' value.
2979

2980
  """
2981
  if device.CreateOnSecondary():
2982
    force = True
2983
  if device.children:
2984
    for child in device.children:
2985
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2986
                                        child, force, info):
2987
        return False
2988

    
2989
  if not force:
2990
    return True
2991
  cfg.SetDiskID(device, node)
2992
  new_id = rpc.call_blockdev_create(node, device, device.size,
2993
                                    instance.name, False, info)
2994
  if not new_id:
2995
    return False
2996
  if device.physical_id is None:
2997
    device.physical_id = new_id
2998
  return True
2999

    
3000

    
3001
def _GenerateUniqueNames(cfg, exts):
3002
  """Generate a suitable LV name.
3003

3004
  This will generate a logical volume name for the given instance.
3005

3006
  """
3007
  results = []
3008
  for val in exts:
3009
    new_id = cfg.GenerateUniqueID()
3010
    results.append("%s%s" % (new_id, val))
3011
  return results
3012

    
3013

    
3014
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3015
  """Generate a drbd device complete with its children.
3016

3017
  """
3018
  port = cfg.AllocatePort()
3019
  vgname = cfg.GetVGName()
3020
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3021
                          logical_id=(vgname, names[0]))
3022
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3023
                          logical_id=(vgname, names[1]))
3024
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3025
                          logical_id = (primary, secondary, port),
3026
                          children = [dev_data, dev_meta])
3027
  return drbd_dev
3028

    
3029

    
3030
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3031
  """Generate a drbd8 device complete with its children.
3032

3033
  """
3034
  port = cfg.AllocatePort()
3035
  vgname = cfg.GetVGName()
3036
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3037
                          logical_id=(vgname, names[0]))
3038
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3039
                          logical_id=(vgname, names[1]))
3040
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3041
                          logical_id = (primary, secondary, port),
3042
                          children = [dev_data, dev_meta],
3043
                          iv_name=iv_name)
3044
  return drbd_dev
3045

    
3046

    
3047
def _GenerateDiskTemplate(cfg, template_name,
3048
                          instance_name, primary_node,
3049
                          secondary_nodes, disk_sz, swap_sz,
3050
                          file_storage_dir, file_driver):
3051
  """Generate the entire disk layout for a given template type.
3052

3053
  """
3054
  #TODO: compute space requirements
3055

    
3056
  vgname = cfg.GetVGName()
3057
  if template_name == constants.DT_DISKLESS:
3058
    disks = []
3059
  elif template_name == constants.DT_PLAIN:
3060
    if len(secondary_nodes) != 0:
3061
      raise errors.ProgrammerError("Wrong template configuration")
3062

    
3063
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3064
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3065
                           logical_id=(vgname, names[0]),
3066
                           iv_name = "sda")
3067
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3068
                           logical_id=(vgname, names[1]),
3069
                           iv_name = "sdb")
3070
    disks = [sda_dev, sdb_dev]
3071
  elif template_name == constants.DT_DRBD8:
3072
    if len(secondary_nodes) != 1:
3073
      raise errors.ProgrammerError("Wrong template configuration")
3074
    remote_node = secondary_nodes[0]
3075
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3076
                                       ".sdb_data", ".sdb_meta"])
3077
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3078
                                         disk_sz, names[0:2], "sda")
3079
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3080
                                         swap_sz, names[2:4], "sdb")
3081
    disks = [drbd_sda_dev, drbd_sdb_dev]
3082
  elif template_name == constants.DT_FILE:
3083
    if len(secondary_nodes) != 0:
3084
      raise errors.ProgrammerError("Wrong template configuration")
3085

    
3086
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3087
                                iv_name="sda", logical_id=(file_driver,
3088
                                "%s/sda" % file_storage_dir))
3089
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3090
                                iv_name="sdb", logical_id=(file_driver,
3091
                                "%s/sdb" % file_storage_dir))
3092
    disks = [file_sda_dev, file_sdb_dev]
3093
  else:
3094
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3095
  return disks
3096

    
3097

    
3098
def _GetInstanceInfoText(instance):
3099
  """Compute that text that should be added to the disk's metadata.
3100

3101
  """
3102
  return "originstname+%s" % instance.name
3103

    
3104

    
3105
def _CreateDisks(cfg, instance):
3106
  """Create all disks for an instance.
3107

3108
  This abstracts away some work from AddInstance.
3109

3110
  Args:
3111
    instance: the instance object
3112

3113
  Returns:
3114
    True or False showing the success of the creation process
3115

3116
  """
3117
  info = _GetInstanceInfoText(instance)
3118

    
3119
  if instance.disk_template == constants.DT_FILE:
3120
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3121
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3122
                                              file_storage_dir)
3123

    
3124
    if not result:
3125
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3126
      return False
3127

    
3128
    if not result[0]:
3129
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3130
      return False
3131

    
3132
  for device in instance.disks:
3133
    logger.Info("creating volume %s for instance %s" %
3134
                (device.iv_name, instance.name))
3135
    #HARDCODE
3136
    for secondary_node in instance.secondary_nodes:
3137
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3138
                                        device, False, info):
3139
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3140
                     (device.iv_name, device, secondary_node))
3141
        return False
3142
    #HARDCODE
3143
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3144
                                    instance, device, info):
3145
      logger.Error("failed to create volume %s on primary!" %
3146
                   device.iv_name)
3147
      return False
3148

    
3149
  return True
3150

    
3151

    
3152
def _RemoveDisks(instance, cfg):
3153
  """Remove all disks for an instance.
3154

3155
  This abstracts away some work from `AddInstance()` and
3156
  `RemoveInstance()`. Note that in case some of the devices couldn't
3157
  be removed, the removal will continue with the other ones (compare
3158
  with `_CreateDisks()`).
3159

3160
  Args:
3161
    instance: the instance object
3162

3163
  Returns:
3164
    True or False showing the success of the removal proces
3165

3166
  """
3167
  logger.Info("removing block devices for instance %s" % instance.name)
3168

    
3169
  result = True
3170
  for device in instance.disks:
3171
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3172
      cfg.SetDiskID(disk, node)
3173
      if not rpc.call_blockdev_remove(node, disk):
3174
        logger.Error("could not remove block device %s on node %s,"
3175
                     " continuing anyway" %
3176
                     (device.iv_name, node))
3177
        result = False
3178

    
3179
  if instance.disk_template == constants.DT_FILE:
3180
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3181
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3182
                                            file_storage_dir):
3183
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3184
      result = False
3185

    
3186
  return result
3187

    
3188

    
3189
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3190
  """Compute disk size requirements in the volume group
3191

3192
  This is currently hard-coded for the two-drive layout.
3193

3194
  """
3195
  # Required free disk space as a function of disk and swap space
3196
  req_size_dict = {
3197
    constants.DT_DISKLESS: None,
3198
    constants.DT_PLAIN: disk_size + swap_size,
3199
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3200
    constants.DT_DRBD8: disk_size + swap_size + 256,
3201
    constants.DT_FILE: None,
3202
  }
3203

    
3204
  if disk_template not in req_size_dict:
3205
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3206
                                 " is unknown" %  disk_template)
3207

    
3208
  return req_size_dict[disk_template]
3209

    
3210

    
3211
class LUCreateInstance(LogicalUnit):
3212
  """Create an instance.
3213

3214
  """
3215
  HPATH = "instance-add"
3216
  HTYPE = constants.HTYPE_INSTANCE
3217
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3218
              "disk_template", "swap_size", "mode", "start", "vcpus",
3219
              "wait_for_sync", "ip_check", "mac"]
3220

    
3221
  def _RunAllocator(self):
3222
    """Run the allocator based on input opcode.
3223

3224
    """
3225
    disks = [{"size": self.op.disk_size, "mode": "w"},
3226
             {"size": self.op.swap_size, "mode": "w"}]
3227
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3228
             "bridge": self.op.bridge}]
3229
    ial = IAllocator(self.cfg, self.sstore,
3230
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3231
                     name=self.op.instance_name,
3232
                     disk_template=self.op.disk_template,
3233
                     tags=[],
3234
                     os=self.op.os_type,
3235
                     vcpus=self.op.vcpus,
3236
                     mem_size=self.op.mem_size,
3237
                     disks=disks,
3238
                     nics=nics,
3239
                     )
3240

    
3241
    ial.Run(self.op.iallocator)
3242

    
3243
    if not ial.success:
3244
      raise errors.OpPrereqError("Can't compute nodes using"
3245
                                 " iallocator '%s': %s" % (self.op.iallocator,
3246
                                                           ial.info))
3247
    if len(ial.nodes) != ial.required_nodes:
3248
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3249
                                 " of nodes (%s), required %s" %
3250
                                 (len(ial.nodes), ial.required_nodes))
3251
    self.op.pnode = ial.nodes[0]
3252
    logger.ToStdout("Selected nodes for the instance: %s" %
3253
                    (", ".join(ial.nodes),))
3254
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3255
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3256
    if ial.required_nodes == 2:
3257
      self.op.snode = ial.nodes[1]
3258

    
3259
  def BuildHooksEnv(self):
3260
    """Build hooks env.
3261

3262
    This runs on master, primary and secondary nodes of the instance.
3263

3264
    """
3265
    env = {
3266
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3267
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3268
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3269
      "INSTANCE_ADD_MODE": self.op.mode,
3270
      }
3271
    if self.op.mode == constants.INSTANCE_IMPORT:
3272
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3273
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3274
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3275

    
3276
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3277
      primary_node=self.op.pnode,
3278
      secondary_nodes=self.secondaries,
3279
      status=self.instance_status,
3280
      os_type=self.op.os_type,
3281
      memory=self.op.mem_size,
3282
      vcpus=self.op.vcpus,
3283
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3284
    ))
3285

    
3286
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3287
          self.secondaries)
3288
    return env, nl, nl
3289

    
3290

    
3291
  def CheckPrereq(self):
3292
    """Check prerequisites.
3293

3294
    """
3295
    # set optional parameters to none if they don't exist
3296
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3297
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3298
                 "vnc_bind_address"]:
3299
      if not hasattr(self.op, attr):
3300
        setattr(self.op, attr, None)
3301

    
3302
    if self.op.mode not in (constants.INSTANCE_CREATE,
3303
                            constants.INSTANCE_IMPORT):
3304
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3305
                                 self.op.mode)
3306

    
3307
    if (not self.cfg.GetVGName() and
3308
        self.op.disk_template not in constants.DTS_NOT_LVM):
3309
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3310
                                 " instances")
3311

    
3312
    if self.op.mode == constants.INSTANCE_IMPORT:
3313
      src_node = getattr(self.op, "src_node", None)
3314
      src_path = getattr(self.op, "src_path", None)
3315
      if src_node is None or src_path is None:
3316
        raise errors.OpPrereqError("Importing an instance requires source"
3317
                                   " node and path options")
3318
      src_node_full = self.cfg.ExpandNodeName(src_node)
3319
      if src_node_full is None:
3320
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3321
      self.op.src_node = src_node = src_node_full
3322

    
3323
      if not os.path.isabs(src_path):
3324
        raise errors.OpPrereqError("The source path must be absolute")
3325

    
3326
      export_info = rpc.call_export_info(src_node, src_path)
3327

    
3328
      if not export_info:
3329
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3330

    
3331
      if not export_info.has_section(constants.INISECT_EXP):
3332
        raise errors.ProgrammerError("Corrupted export config")
3333

    
3334
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3335
      if (int(ei_version) != constants.EXPORT_VERSION):
3336
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3337
                                   (ei_version, constants.EXPORT_VERSION))
3338

    
3339
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3340
        raise errors.OpPrereqError("Can't import instance with more than"
3341
                                   " one data disk")
3342

    
3343
      # FIXME: are the old os-es, disk sizes, etc. useful?
3344
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3345
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3346
                                                         'disk0_dump'))
3347
      self.src_image = diskimage
3348
    else: # INSTANCE_CREATE
3349
      if getattr(self.op, "os_type", None) is None:
3350
        raise errors.OpPrereqError("No guest OS specified")
3351

    
3352
    #### instance parameters check
3353

    
3354
    # disk template and mirror node verification
3355
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3356
      raise errors.OpPrereqError("Invalid disk template name")
3357

    
3358
    # instance name verification
3359
    hostname1 = utils.HostInfo(self.op.instance_name)
3360

    
3361
    self.op.instance_name = instance_name = hostname1.name
3362
    instance_list = self.cfg.GetInstanceList()
3363
    if instance_name in instance_list:
3364
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3365
                                 instance_name)
3366

    
3367
    # ip validity checks
3368
    ip = getattr(self.op, "ip", None)
3369
    if ip is None or ip.lower() == "none":
3370
      inst_ip = None
3371
    elif ip.lower() == "auto":
3372
      inst_ip = hostname1.ip
3373
    else:
3374
      if not utils.IsValidIP(ip):
3375
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3376
                                   " like a valid IP" % ip)
3377
      inst_ip = ip
3378
    self.inst_ip = self.op.ip = inst_ip
3379

    
3380
    if self.op.start and not self.op.ip_check:
3381
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3382
                                 " adding an instance in start mode")
3383

    
3384
    if self.op.ip_check:
3385
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3386
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3387
                                   (hostname1.ip, instance_name))
3388

    
3389
    # MAC address verification
3390
    if self.op.mac != "auto":
3391
      if not utils.IsValidMac(self.op.mac.lower()):
3392
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3393
                                   self.op.mac)
3394

    
3395
    # bridge verification
3396
    bridge = getattr(self.op, "bridge", None)
3397
    if bridge is None:
3398
      self.op.bridge = self.cfg.GetDefBridge()
3399
    else:
3400
      self.op.bridge = bridge
3401

    
3402
    # boot order verification
3403
    if self.op.hvm_boot_order is not None:
3404
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3405
        raise errors.OpPrereqError("invalid boot order specified,"
3406
                                   " must be one or more of [acdn]")
3407
    # file storage checks
3408
    if (self.op.file_driver and
3409
        not self.op.file_driver in constants.FILE_DRIVER):
3410
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3411
                                 self.op.file_driver)
3412

    
3413
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3414
      raise errors.OpPrereqError("File storage directory not a relative"
3415
                                 " path")
3416
    #### allocator run
3417

    
3418
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3419
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3420
                                 " node must be given")
3421

    
3422
    if self.op.iallocator is not None:
3423
      self._RunAllocator()
3424

    
3425
    #### node related checks
3426

    
3427
    # check primary node
3428
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3429
    if pnode is None:
3430
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3431
                                 self.op.pnode)
3432
    self.op.pnode = pnode.name
3433
    self.pnode = pnode
3434
    self.secondaries = []
3435

    
3436
    # mirror node verification
3437
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3438
      if getattr(self.op, "snode", None) is None:
3439
        raise errors.OpPrereqError("The networked disk templates need"
3440
                                   " a mirror node")
3441

    
3442
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3443
      if snode_name is None:
3444
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3445
                                   self.op.snode)
3446
      elif snode_name == pnode.name:
3447
        raise errors.OpPrereqError("The secondary node cannot be"
3448
                                   " the primary node.")
3449
      self.secondaries.append(snode_name)
3450

    
3451
    req_size = _ComputeDiskSize(self.op.disk_template,
3452
                                self.op.disk_size, self.op.swap_size)
3453

    
3454
    # Check lv size requirements
3455
    if req_size is not None:
3456
      nodenames = [pnode.name] + self.secondaries
3457
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3458
      for node in nodenames:
3459
        info = nodeinfo.get(node, None)
3460
        if not info:
3461
          raise errors.OpPrereqError("Cannot get current information"
3462
                                     " from node '%s'" % nodeinfo)
3463
        vg_free = info.get('vg_free', None)
3464
        if not isinstance(vg_free, int):
3465
          raise errors.OpPrereqError("Can't compute free disk space on"
3466
                                     " node %s" % node)
3467
        if req_size > info['vg_free']:
3468
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3469
                                     " %d MB available, %d MB required" %
3470
                                     (node, info['vg_free'], req_size))
3471

    
3472
    # os verification
3473
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3474
    if not os_obj:
3475
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3476
                                 " primary node"  % self.op.os_type)
3477

    
3478
    if self.op.kernel_path == constants.VALUE_NONE:
3479
      raise errors.OpPrereqError("Can't set instance kernel to none")
3480

    
3481

    
3482
    # bridge check on primary node
3483
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3484
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3485
                                 " destination node '%s'" %
3486
                                 (self.op.bridge, pnode.name))
3487

    
3488
    # memory check on primary node
3489
    if self.op.start:
3490
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3491
                           "creating instance %s" % self.op.instance_name,
3492
                           self.op.mem_size)
3493

    
3494
    # hvm_cdrom_image_path verification
3495
    if self.op.hvm_cdrom_image_path is not None:
3496
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3497
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3498
                                   " be an absolute path or None, not %s" %
3499
                                   self.op.hvm_cdrom_image_path)
3500
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3501
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3502
                                   " regular file or a symlink pointing to"
3503
                                   " an existing regular file, not %s" %
3504
                                   self.op.hvm_cdrom_image_path)
3505

    
3506
    # vnc_bind_address verification
3507
    if self.op.vnc_bind_address is not None:
3508
      if not utils.IsValidIP(self.op.vnc_bind_address):
3509
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3510
                                   " like a valid IP address" %
3511
                                   self.op.vnc_bind_address)
3512

    
3513
    if self.op.start:
3514
      self.instance_status = 'up'
3515
    else:
3516
      self.instance_status = 'down'
3517

    
3518
  def Exec(self, feedback_fn):
3519
    """Create and add the instance to the cluster.
3520

3521
    """
3522
    instance = self.op.instance_name
3523
    pnode_name = self.pnode.name
3524

    
3525
    if self.op.mac == "auto":
3526
      mac_address = self.cfg.GenerateMAC()
3527
    else:
3528
      mac_address = self.op.mac
3529

    
3530
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3531
    if self.inst_ip is not None:
3532
      nic.ip = self.inst_ip
3533

    
3534
    ht_kind = self.sstore.GetHypervisorType()
3535
    if ht_kind in constants.HTS_REQ_PORT:
3536
      network_port = self.cfg.AllocatePort()
3537
    else:
3538
      network_port = None
3539

    
3540
    if self.op.vnc_bind_address is None:
3541
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3542

    
3543
    # this is needed because os.path.join does not accept None arguments
3544
    if self.op.file_storage_dir is None:
3545
      string_file_storage_dir = ""
3546
    else:
3547
      string_file_storage_dir = self.op.file_storage_dir
3548

    
3549
    # build the full file storage dir path
3550
    file_storage_dir = os.path.normpath(os.path.join(
3551
                                        self.sstore.GetFileStorageDir(),
3552
                                        string_file_storage_dir, instance))
3553

    
3554

    
3555
    disks = _GenerateDiskTemplate(self.cfg,
3556
                                  self.op.disk_template,
3557
                                  instance, pnode_name,
3558
                                  self.secondaries, self.op.disk_size,
3559
                                  self.op.swap_size,
3560
                                  file_storage_dir,
3561
                                  self.op.file_driver)
3562

    
3563
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3564
                            primary_node=pnode_name,
3565
                            memory=self.op.mem_size,
3566
                            vcpus=self.op.vcpus,
3567
                            nics=[nic], disks=disks,
3568
                            disk_template=self.op.disk_template,
3569
                            status=self.instance_status,
3570
                            network_port=network_port,
3571
                            kernel_path=self.op.kernel_path,
3572
                            initrd_path=self.op.initrd_path,
3573
                            hvm_boot_order=self.op.hvm_boot_order,
3574
                            hvm_acpi=self.op.hvm_acpi,
3575
                            hvm_pae=self.op.hvm_pae,
3576
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3577
                            vnc_bind_address=self.op.vnc_bind_address,
3578
                            )
3579

    
3580
    feedback_fn("* creating instance disks...")
3581
    if not _CreateDisks(self.cfg, iobj):
3582
      _RemoveDisks(iobj, self.cfg)
3583
      raise errors.OpExecError("Device creation failed, reverting...")
3584

    
3585
    feedback_fn("adding instance %s to cluster config" % instance)
3586

    
3587
    self.cfg.AddInstance(iobj)
3588

    
3589
    if self.op.wait_for_sync:
3590
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3591
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3592
      # make sure the disks are not degraded (still sync-ing is ok)
3593
      time.sleep(15)
3594
      feedback_fn("* checking mirrors status")
3595
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3596
    else:
3597
      disk_abort = False
3598

    
3599
    if disk_abort:
3600
      _RemoveDisks(iobj, self.cfg)
3601
      self.cfg.RemoveInstance(iobj.name)
3602
      raise errors.OpExecError("There are some degraded disks for"
3603
                               " this instance")
3604

    
3605
    feedback_fn("creating os for instance %s on node %s" %
3606
                (instance, pnode_name))
3607

    
3608
    if iobj.disk_template != constants.DT_DISKLESS:
3609
      if self.op.mode == constants.INSTANCE_CREATE:
3610
        feedback_fn("* running the instance OS create scripts...")
3611
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3612
          raise errors.OpExecError("could not add os for instance %s"
3613
                                   " on node %s" %
3614
                                   (instance, pnode_name))
3615

    
3616
      elif self.op.mode == constants.INSTANCE_IMPORT:
3617
        feedback_fn("* running the instance OS import scripts...")
3618
        src_node = self.op.src_node
3619
        src_image = self.src_image
3620
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3621
                                                src_node, src_image):
3622
          raise errors.OpExecError("Could not import os for instance"
3623
                                   " %s on node %s" %
3624
                                   (instance, pnode_name))
3625
      else:
3626
        # also checked in the prereq part
3627
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3628
                                     % self.op.mode)
3629

    
3630
    if self.op.start:
3631
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3632
      feedback_fn("* starting instance...")
3633
      if not rpc.call_instance_start(pnode_name, iobj, None):
3634
        raise errors.OpExecError("Could not start instance")
3635

    
3636

    
3637
class LUConnectConsole(NoHooksLU):
3638
  """Connect to an instance's console.
3639

3640
  This is somewhat special in that it returns the command line that
3641
  you need to run on the master node in order to connect to the
3642
  console.
3643

3644
  """
3645
  _OP_REQP = ["instance_name"]
3646

    
3647
  def CheckPrereq(self):
3648
    """Check prerequisites.
3649

3650
    This checks that the instance is in the cluster.
3651

3652
    """
3653
    instance = self.cfg.GetInstanceInfo(
3654
      self.cfg.ExpandInstanceName(self.op.instance_name))
3655
    if instance is None:
3656
      raise errors.OpPrereqError("Instance '%s' not known" %
3657
                                 self.op.instance_name)
3658
    self.instance = instance
3659

    
3660
  def Exec(self, feedback_fn):
3661
    """Connect to the console of an instance
3662

3663
    """
3664
    instance = self.instance
3665
    node = instance.primary_node
3666

    
3667
    node_insts = rpc.call_instance_list([node])[node]
3668
    if node_insts is False:
3669
      raise errors.OpExecError("Can't connect to node %s." % node)
3670

    
3671
    if instance.name not in node_insts:
3672
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3673

    
3674
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3675

    
3676
    hyper = hypervisor.GetHypervisor()
3677
    console_cmd = hyper.GetShellCommandForConsole(instance)
3678

    
3679
    # build ssh cmdline
3680
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3681

    
3682

    
3683
class LUReplaceDisks(LogicalUnit):
3684
  """Replace the disks of an instance.
3685

3686
  """
3687
  HPATH = "mirrors-replace"
3688
  HTYPE = constants.HTYPE_INSTANCE
3689
  _OP_REQP = ["instance_name", "mode", "disks"]
3690

    
3691
  def _RunAllocator(self):
3692
    """Compute a new secondary node using an IAllocator.
3693

3694
    """
3695
    ial = IAllocator(self.cfg, self.sstore,
3696
                     mode=constants.IALLOCATOR_MODE_RELOC,
3697
                     name=self.op.instance_name,
3698
                     relocate_from=[self.sec_node])
3699

    
3700
    ial.Run(self.op.iallocator)
3701

    
3702
    if not ial.success:
3703
      raise errors.OpPrereqError("Can't compute nodes using"
3704
                                 " iallocator '%s': %s" % (self.op.iallocator,
3705
                                                           ial.info))
3706
    if len(ial.nodes) != ial.required_nodes:
3707
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3708
                                 " of nodes (%s), required %s" %
3709
                                 (len(ial.nodes), ial.required_nodes))
3710
    self.op.remote_node = ial.nodes[0]
3711
    logger.ToStdout("Selected new secondary for the instance: %s" %
3712
                    self.op.remote_node)
3713

    
3714
  def BuildHooksEnv(self):
3715
    """Build hooks env.
3716

3717
    This runs on the master, the primary and all the secondaries.
3718

3719
    """
3720
    env = {
3721
      "MODE": self.op.mode,
3722
      "NEW_SECONDARY": self.op.remote_node,
3723
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3724
      }
3725
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3726
    nl = [
3727
      self.sstore.GetMasterNode(),
3728
      self.instance.primary_node,
3729
      ]
3730
    if self.op.remote_node is not None:
3731
      nl.append(self.op.remote_node)
3732
    return env, nl, nl
3733

    
3734
  def CheckPrereq(self):
3735
    """Check prerequisites.
3736

3737
    This checks that the instance is in the cluster.
3738

3739
    """
3740
    if not hasattr(self.op, "remote_node"):
3741
      self.op.remote_node = None
3742

    
3743
    instance = self.cfg.GetInstanceInfo(
3744
      self.cfg.ExpandInstanceName(self.op.instance_name))
3745
    if instance is None:
3746
      raise errors.OpPrereqError("Instance '%s' not known" %
3747
                                 self.op.instance_name)
3748
    self.instance = instance
3749
    self.op.instance_name = instance.name
3750

    
3751
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3752
      raise errors.OpPrereqError("Instance's disk layout is not"
3753
                                 " network mirrored.")
3754

    
3755
    if len(instance.secondary_nodes) != 1:
3756
      raise errors.OpPrereqError("The instance has a strange layout,"
3757
                                 " expected one secondary but found %d" %
3758
                                 len(instance.secondary_nodes))
3759

    
3760
    self.sec_node = instance.secondary_nodes[0]
3761

    
3762
    ia_name = getattr(self.op, "iallocator", None)
3763
    if ia_name is not None:
3764
      if self.op.remote_node is not None:
3765
        raise errors.OpPrereqError("Give either the iallocator or the new"
3766
                                   " secondary, not both")
3767
      self.op.remote_node = self._RunAllocator()
3768

    
3769
    remote_node = self.op.remote_node
3770
    if remote_node is not None:
3771
      remote_node = self.cfg.ExpandNodeName(remote_node)
3772
      if remote_node is None:
3773
        raise errors.OpPrereqError("Node '%s' not known" %
3774
                                   self.op.remote_node)
3775
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3776
    else:
3777
      self.remote_node_info = None
3778
    if remote_node == instance.primary_node:
3779
      raise errors.OpPrereqError("The specified node is the primary node of"
3780
                                 " the instance.")
3781
    elif remote_node == self.sec_node:
3782
      if self.op.mode == constants.REPLACE_DISK_SEC:
3783
        # this is for DRBD8, where we can't execute the same mode of
3784
        # replacement as for drbd7 (no different port allocated)
3785
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3786
                                   " replacement")
3787
    if instance.disk_template == constants.DT_DRBD8:
3788
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3789
          remote_node is not None):
3790
        # switch to replace secondary mode
3791
        self.op.mode = constants.REPLACE_DISK_SEC
3792

    
3793
      if self.op.mode == constants.REPLACE_DISK_ALL:
3794
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3795
                                   " secondary disk replacement, not"
3796
                                   " both at once")
3797
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3798
        if remote_node is not None:
3799
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3800
                                     " the secondary while doing a primary"
3801
                                     " node disk replacement")
3802
        self.tgt_node = instance.primary_node
3803
        self.oth_node = instance.secondary_nodes[0]
3804
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3805
        self.new_node = remote_node # this can be None, in which case
3806
                                    # we don't change the secondary
3807
        self.tgt_node = instance.secondary_nodes[0]
3808
        self.oth_node = instance.primary_node
3809
      else:
3810
        raise errors.ProgrammerError("Unhandled disk replace mode")
3811

    
3812
    for name in self.op.disks:
3813
      if instance.FindDisk(name) is None:
3814
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3815
                                   (name, instance.name))
3816
    self.op.remote_node = remote_node
3817

    
3818
  def _ExecD8DiskOnly(self, feedback_fn):
3819
    """Replace a disk on the primary or secondary for dbrd8.
3820

3821
    The algorithm for replace is quite complicated:
3822
      - for each disk to be replaced:
3823
        - create new LVs on the target node with unique names
3824
        - detach old LVs from the drbd device
3825
        - rename old LVs to name_replaced.<time_t>
3826
        - rename new LVs to old LVs
3827
        - attach the new LVs (with the old names now) to the drbd device
3828
      - wait for sync across all devices
3829
      - for each modified disk:
3830
        - remove old LVs (which have the name name_replaces.<time_t>)
3831

3832
    Failures are not very well handled.
3833

3834
    """
3835
    steps_total = 6
3836
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3837
    instance = self.instance
3838
    iv_names = {}
3839
    vgname = self.cfg.GetVGName()
3840
    # start of work
3841
    cfg = self.cfg
3842
    tgt_node = self.tgt_node
3843
    oth_node = self.oth_node
3844

    
3845
    # Step: check device activation
3846
    self.proc.LogStep(1, steps_total, "check device existence")
3847
    info("checking volume groups")
3848
    my_vg = cfg.GetVGName()
3849
    results = rpc.call_vg_list([oth_node, tgt_node])
3850
    if not results:
3851
      raise errors.OpExecError("Can't list volume groups on the nodes")
3852
    for node in oth_node, tgt_node:
3853
      res = results.get(node, False)
3854
      if not res or my_vg not in res:
3855
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3856
                                 (my_vg, node))
3857
    for dev in instance.disks:
3858
      if not dev.iv_name in self.op.disks:
3859
        continue
3860
      for node in tgt_node, oth_node:
3861
        info("checking %s on %s" % (dev.iv_name, node))
3862
        cfg.SetDiskID(dev, node)
3863
        if not rpc.call_blockdev_find(node, dev):
3864
          raise errors.OpExecError("Can't find device %s on node %s" %
3865
                                   (dev.iv_name, node))
3866

    
3867
    # Step: check other node consistency
3868
    self.proc.LogStep(2, steps_total, "check peer consistency")
3869
    for dev in instance.disks:
3870
      if not dev.iv_name in self.op.disks:
3871
        continue
3872
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3873
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3874
                                   oth_node==instance.primary_node):
3875
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3876
                                 " to replace disks on this node (%s)" %
3877
                                 (oth_node, tgt_node))
3878

    
3879
    # Step: create new storage
3880
    self.proc.LogStep(3, steps_total, "allocate new storage")
3881
    for dev in instance.disks:
3882
      if not dev.iv_name in self.op.disks:
3883
        continue
3884
      size = dev.size
3885
      cfg.SetDiskID(dev, tgt_node)
3886
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3887
      names = _GenerateUniqueNames(cfg, lv_names)
3888
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3889
                             logical_id=(vgname, names[0]))
3890
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3891
                             logical_id=(vgname, names[1]))
3892
      new_lvs = [lv_data, lv_meta]
3893
      old_lvs = dev.children
3894
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3895
      info("creating new local storage on %s for %s" %
3896
           (tgt_node, dev.iv_name))
3897
      # since we *always* want to create this LV, we use the
3898
      # _Create...OnPrimary (which forces the creation), even if we
3899
      # are talking about the secondary node
3900
      for new_lv in new_lvs:
3901
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3902
                                        _GetInstanceInfoText(instance)):
3903
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3904
                                   " node '%s'" %
3905
                                   (new_lv.logical_id[1], tgt_node))
3906

    
3907
    # Step: for each lv, detach+rename*2+attach
3908
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3909
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3910
      info("detaching %s drbd from local storage" % dev.iv_name)
3911
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3912
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3913
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3914
      #dev.children = []
3915
      #cfg.Update(instance)
3916

    
3917
      # ok, we created the new LVs, so now we know we have the needed
3918
      # storage; as such, we proceed on the target node to rename
3919
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3920
      # using the assumption that logical_id == physical_id (which in
3921
      # turn is the unique_id on that node)
3922

    
3923
      # FIXME(iustin): use a better name for the replaced LVs
3924
      temp_suffix = int(time.time())
3925
      ren_fn = lambda d, suff: (d.physical_id[0],
3926
                                d.physical_id[1] + "_replaced-%s" % suff)
3927
      # build the rename list based on what LVs exist on the node
3928
      rlist = []
3929
      for to_ren in old_lvs:
3930
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3931
        if find_res is not None: # device exists
3932
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3933

    
3934
      info("renaming the old LVs on the target node")
3935
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3936
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3937
      # now we rename the new LVs to the old LVs
3938
      info("renaming the new LVs on the target node")
3939
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3940
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3941
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3942

    
3943
      for old, new in zip(old_lvs, new_lvs):
3944
        new.logical_id = old.logical_id
3945
        cfg.SetDiskID(new, tgt_node)
3946

    
3947
      for disk in old_lvs:
3948
        disk.logical_id = ren_fn(disk, temp_suffix)
3949
        cfg.SetDiskID(disk, tgt_node)
3950

    
3951
      # now that the new lvs have the old name, we can add them to the device
3952
      info("adding new mirror component on %s" % tgt_node)
3953
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3954
        for new_lv in new_lvs:
3955
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3956
            warning("Can't rollback device %s", hint="manually cleanup unused"
3957
                    " logical volumes")
3958
        raise errors.OpExecError("Can't add local storage to drbd")
3959

    
3960
      dev.children = new_lvs
3961
      cfg.Update(instance)
3962

    
3963
    # Step: wait for sync
3964

    
3965
    # this can fail as the old devices are degraded and _WaitForSync
3966
    # does a combined result over all disks, so we don't check its
3967
    # return value
3968
    self.proc.LogStep(5, steps_total, "sync devices")
3969
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3970

    
3971
    # so check manually all the devices
3972
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3973
      cfg.SetDiskID(dev, instance.primary_node)
3974
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3975
      if is_degr:
3976
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3977

    
3978
    # Step: remove old storage
3979
    self.proc.LogStep(6, steps_total, "removing old storage")
3980
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3981
      info("remove logical volumes for %s" % name)
3982
      for lv in old_lvs:
3983
        cfg.SetDiskID(lv, tgt_node)
3984
        if not rpc.call_blockdev_remove(tgt_node, lv):
3985
          warning("Can't remove old LV", hint="manually remove unused LVs")
3986
          continue
3987

    
3988
  def _ExecD8Secondary(self, feedback_fn):
3989
    """Replace the secondary node for drbd8.
3990

3991
    The algorithm for replace is quite complicated:
3992
      - for all disks of the instance:
3993
        - create new LVs on the new node with same names
3994
        - shutdown the drbd device on the old secondary
3995
        - disconnect the drbd network on the primary
3996
        - create the drbd device on the new secondary
3997
        - network attach the drbd on the primary, using an artifice:
3998
          the drbd code for Attach() will connect to the network if it
3999
          finds a device which is connected to the good local disks but
4000
          not network enabled
4001
      - wait for sync across all devices
4002
      - remove all disks from the old secondary
4003

4004
    Failures are not very well handled.
4005

4006
    """
4007
    steps_total = 6
4008
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4009
    instance = self.instance
4010
    iv_names = {}
4011
    vgname = self.cfg.GetVGName()
4012
    # start of work
4013
    cfg = self.cfg
4014
    old_node = self.tgt_node
4015
    new_node = self.new_node
4016
    pri_node = instance.primary_node
4017

    
4018
    # Step: check device activation
4019
    self.proc.LogStep(1, steps_total, "check device existence")
4020
    info("checking volume groups")
4021
    my_vg = cfg.GetVGName()
4022
    results = rpc.call_vg_list([pri_node, new_node])
4023
    if not results:
4024
      raise errors.OpExecError("Can't list volume groups on the nodes")
4025
    for node in pri_node, new_node:
4026
      res = results.get(node, False)
4027
      if not res or my_vg not in res:
4028
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4029
                                 (my_vg, node))
4030
    for dev in instance.disks:
4031
      if not dev.iv_name in self.op.disks:
4032
        continue
4033
      info("checking %s on %s" % (dev.iv_name, pri_node))
4034
      cfg.SetDiskID(dev, pri_node)
4035
      if not rpc.call_blockdev_find(pri_node, dev):
4036
        raise errors.OpExecError("Can't find device %s on node %s" %
4037
                                 (dev.iv_name, pri_node))
4038

    
4039
    # Step: check other node consistency
4040
    self.proc.LogStep(2, steps_total, "check peer consistency")
4041
    for dev in instance.disks:
4042
      if not dev.iv_name in self.op.disks:
4043
        continue
4044
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4045
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4046
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4047
                                 " unsafe to replace the secondary" %
4048
                                 pri_node)
4049

    
4050
    # Step: create new storage
4051
    self.proc.LogStep(3, steps_total, "allocate new storage")
4052
    for dev in instance.disks:
4053
      size = dev.size
4054
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4055
      # since we *always* want to create this LV, we use the
4056
      # _Create...OnPrimary (which forces the creation), even if we
4057
      # are talking about the secondary node
4058
      for new_lv in dev.children:
4059
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4060
                                        _GetInstanceInfoText(instance)):
4061
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4062
                                   " node '%s'" %
4063
                                   (new_lv.logical_id[1], new_node))
4064

    
4065
      iv_names[dev.iv_name] = (dev, dev.children)
4066

    
4067
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4068
    for dev in instance.disks:
4069
      size = dev.size
4070
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4071
      # create new devices on new_node
4072
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4073
                              logical_id=(pri_node, new_node,
4074
                                          dev.logical_id[2]),
4075
                              children=dev.children)
4076
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4077
                                        new_drbd, False,
4078
                                      _GetInstanceInfoText(instance)):
4079
        raise errors.OpExecError("Failed to create new DRBD on"
4080
                                 " node '%s'" % new_node)
4081

    
4082
    for dev in instance.disks:
4083
      # we have new devices, shutdown the drbd on the old secondary
4084
      info("shutting down drbd for %s on old node" % dev.iv_name)
4085
      cfg.SetDiskID(dev, old_node)
4086
      if not rpc.call_blockdev_shutdown(old_node, dev):
4087
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4088
                hint="Please cleanup this device manually as soon as possible")
4089

    
4090
    info("detaching primary drbds from the network (=> standalone)")
4091
    done = 0
4092
    for dev in instance.disks:
4093
      cfg.SetDiskID(dev, pri_node)
4094
      # set the physical (unique in bdev terms) id to None, meaning
4095
      # detach from network
4096
      dev.physical_id = (None,) * len(dev.physical_id)
4097
      # and 'find' the device, which will 'fix' it to match the
4098
      # standalone state
4099
      if rpc.call_blockdev_find(pri_node, dev):
4100
        done += 1
4101
      else:
4102
        warning("Failed to detach drbd %s from network, unusual case" %
4103
                dev.iv_name)
4104

    
4105
    if not done:
4106
      # no detaches succeeded (very unlikely)
4107
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4108

    
4109
    # if we managed to detach at least one, we update all the disks of
4110
    # the instance to point to the new secondary
4111
    info("updating instance configuration")
4112
    for dev in instance.disks:
4113
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4114
      cfg.SetDiskID(dev, pri_node)
4115
    cfg.Update(instance)
4116

    
4117
    # and now perform the drbd attach
4118
    info("attaching primary drbds to new secondary (standalone => connected)")
4119
    failures = []
4120
    for dev in instance.disks:
4121
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4122
      # since the attach is smart, it's enough to 'find' the device,
4123
      # it will automatically activate the network, if the physical_id
4124
      # is correct
4125
      cfg.SetDiskID(dev, pri_node)
4126
      if not rpc.call_blockdev_find(pri_node, dev):
4127
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4128
                "please do a gnt-instance info to see the status of disks")
4129

    
4130
    # this can fail as the old devices are degraded and _WaitForSync
4131
    # does a combined result over all disks, so we don't check its
4132
    # return value
4133
    self.proc.LogStep(5, steps_total, "sync devices")
4134
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4135

    
4136
    # so check manually all the devices
4137
    for name, (dev, old_lvs) in iv_names.iteritems():
4138
      cfg.SetDiskID(dev, pri_node)
4139
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4140
      if is_degr:
4141
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4142

    
4143
    self.proc.LogStep(6, steps_total, "removing old storage")
4144
    for name, (dev, old_lvs) in iv_names.iteritems():
4145
      info("remove logical volumes for %s" % name)
4146
      for lv in old_lvs:
4147
        cfg.SetDiskID(lv, old_node)
4148
        if not rpc.call_blockdev_remove(old_node, lv):
4149
          warning("Can't remove LV on old secondary",
4150
                  hint="Cleanup stale volumes by hand")
4151

    
4152
  def Exec(self, feedback_fn):
4153
    """Execute disk replacement.
4154

4155
    This dispatches the disk replacement to the appropriate handler.
4156

4157
    """
4158
    instance = self.instance
4159
    if instance.disk_template == constants.DT_DRBD8:
4160
      if self.op.remote_node is None:
4161
        fn = self._ExecD8DiskOnly
4162
      else:
4163
        fn = self._ExecD8Secondary
4164
    else:
4165
      raise errors.ProgrammerError("Unhandled disk replacement case")
4166
    return fn(feedback_fn)
4167

    
4168

    
4169
class LUQueryInstanceData(NoHooksLU):
4170
  """Query runtime instance data.
4171

4172
  """
4173
  _OP_REQP = ["instances"]
4174

    
4175
  def CheckPrereq(self):
4176
    """Check prerequisites.
4177

4178
    This only checks the optional instance list against the existing names.
4179

4180
    """
4181
    if not isinstance(self.op.instances, list):
4182
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4183
    if self.op.instances:
4184
      self.wanted_instances = []
4185
      names = self.op.instances
4186
      for name in names:
4187
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4188
        if instance is None:
4189
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4190
        self.wanted_instances.append(instance)
4191
    else:
4192
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4193
                               in self.cfg.GetInstanceList()]
4194
    return
4195

    
4196

    
4197
  def _ComputeDiskStatus(self, instance, snode, dev):
4198
    """Compute block device status.
4199

4200
    """
4201
    self.cfg.SetDiskID(dev, instance.primary_node)
4202
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4203
    if dev.dev_type in constants.LDS_DRBD:
4204
      # we change the snode then (otherwise we use the one passed in)
4205
      if dev.logical_id[0] == instance.primary_node:
4206
        snode = dev.logical_id[1]
4207
      else:
4208
        snode = dev.logical_id[0]
4209

    
4210
    if snode:
4211
      self.cfg.SetDiskID(dev, snode)
4212
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4213
    else:
4214
      dev_sstatus = None
4215

    
4216
    if dev.children:
4217
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4218
                      for child in dev.children]
4219
    else:
4220
      dev_children = []
4221

    
4222
    data = {
4223
      "iv_name": dev.iv_name,
4224
      "dev_type": dev.dev_type,
4225
      "logical_id": dev.logical_id,
4226
      "physical_id": dev.physical_id,
4227
      "pstatus": dev_pstatus,
4228
      "sstatus": dev_sstatus,
4229
      "children": dev_children,
4230
      }
4231

    
4232
    return data
4233

    
4234
  def Exec(self, feedback_fn):
4235
    """Gather and return data"""
4236
    result = {}
4237
    for instance in self.wanted_instances:
4238
      remote_info = rpc.call_instance_info(instance.primary_node,
4239
                                                instance.name)
4240
      if remote_info and "state" in remote_info:
4241
        remote_state = "up"
4242
      else:
4243
        remote_state = "down"
4244
      if instance.status == "down":
4245
        config_state = "down"
4246
      else:
4247
        config_state = "up"
4248

    
4249
      disks = [self._ComputeDiskStatus(instance, None, device)
4250
               for device in instance.disks]
4251

    
4252
      idict = {
4253
        "name": instance.name,
4254
        "config_state": config_state,
4255
        "run_state": remote_state,
4256
        "pnode": instance.primary_node,
4257
        "snodes": instance.secondary_nodes,
4258
        "os": instance.os,
4259
        "memory": instance.memory,
4260
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4261
        "disks": disks,
4262
        "vcpus": instance.vcpus,
4263
        }
4264

    
4265
      htkind = self.sstore.GetHypervisorType()
4266
      if htkind == constants.HT_XEN_PVM30:
4267
        idict["kernel_path"] = instance.kernel_path
4268
        idict["initrd_path"] = instance.initrd_path
4269

    
4270
      if htkind == constants.HT_XEN_HVM31:
4271
        idict["hvm_boot_order"] = instance.hvm_boot_order
4272
        idict["hvm_acpi"] = instance.hvm_acpi
4273
        idict["hvm_pae"] = instance.hvm_pae
4274
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4275

    
4276
      if htkind in constants.HTS_REQ_PORT:
4277
        idict["vnc_bind_address"] = instance.vnc_bind_address
4278
        idict["network_port"] = instance.network_port
4279

    
4280
      result[instance.name] = idict
4281

    
4282
    return result
4283

    
4284

    
4285
class LUSetInstanceParams(LogicalUnit):
4286
  """Modifies an instances's parameters.
4287

4288
  """
4289
  HPATH = "instance-modify"
4290
  HTYPE = constants.HTYPE_INSTANCE
4291
  _OP_REQP = ["instance_name"]
4292

    
4293
  def BuildHooksEnv(self):
4294
    """Build hooks env.
4295

4296
    This runs on the master, primary and secondaries.
4297

4298
    """
4299
    args = dict()
4300
    if self.mem:
4301
      args['memory'] = self.mem
4302
    if self.vcpus:
4303
      args['vcpus'] = self.vcpus
4304
    if self.do_ip or self.do_bridge or self.mac:
4305
      if self.do_ip:
4306
        ip = self.ip
4307
      else:
4308
        ip = self.instance.nics[0].ip
4309
      if self.bridge:
4310
        bridge = self.bridge
4311
      else:
4312
        bridge = self.instance.nics[0].bridge
4313
      if self.mac:
4314
        mac = self.mac
4315
      else:
4316
        mac = self.instance.nics[0].mac
4317
      args['nics'] = [(ip, bridge, mac)]
4318
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4319
    nl = [self.sstore.GetMasterNode(),
4320
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4321
    return env, nl, nl
4322

    
4323
  def CheckPrereq(self):
4324
    """Check prerequisites.
4325

4326
    This only checks the instance list against the existing names.
4327

4328
    """
4329
    self.mem = getattr(self.op, "mem", None)
4330
    self.vcpus = getattr(self.op, "vcpus", None)
4331
    self.ip = getattr(self.op, "ip", None)
4332
    self.mac = getattr(self.op, "mac", None)
4333
    self.bridge = getattr(self.op, "bridge", None)
4334
    self.kernel_path = getattr(self.op, "kernel_path", None)
4335
    self.initrd_path = getattr(self.op, "initrd_path", None)
4336
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4337
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4338
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4339
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4340
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4341
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4342
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4343
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4344
                 self.vnc_bind_address]
4345
    if all_parms.count(None) == len(all_parms):
4346
      raise errors.OpPrereqError("No changes submitted")
4347
    if self.mem is not None:
4348
      try:
4349
        self.mem = int(self.mem)
4350
      except ValueError, err:
4351
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4352
    if self.vcpus is not None:
4353
      try:
4354
        self.vcpus = int(self.vcpus)
4355
      except ValueError, err:
4356
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4357
    if self.ip is not None:
4358
      self.do_ip = True
4359
      if self.ip.lower() == "none":
4360
        self.ip = None
4361
      else:
4362
        if not utils.IsValidIP(self.ip):
4363
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4364
    else:
4365
      self.do_ip = False
4366
    self.do_bridge = (self.bridge is not None)
4367
    if self.mac is not None:
4368
      if self.cfg.IsMacInUse(self.mac):
4369
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4370
                                   self.mac)
4371
      if not utils.IsValidMac(self.mac):
4372
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4373

    
4374
    if self.kernel_path is not None:
4375
      self.do_kernel_path = True
4376
      if self.kernel_path == constants.VALUE_NONE:
4377
        raise errors.OpPrereqError("Can't set instance to no kernel")
4378

    
4379
      if self.kernel_path != constants.VALUE_DEFAULT:
4380
        if not os.path.isabs(self.kernel_path):
4381
          raise errors.OpPrereqError("The kernel path must be an absolute"
4382
                                    " filename")
4383
    else:
4384
      self.do_kernel_path = False
4385

    
4386
    if self.initrd_path is not None:
4387
      self.do_initrd_path = True
4388
      if self.initrd_path not in (constants.VALUE_NONE,
4389
                                  constants.VALUE_DEFAULT):
4390
        if not os.path.isabs(self.initrd_path):
4391
          raise errors.OpPrereqError("The initrd path must be an absolute"
4392
                                    " filename")
4393
    else:
4394
      self.do_initrd_path = False
4395

    
4396
    # boot order verification
4397
    if self.hvm_boot_order is not None:
4398
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4399
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4400
          raise errors.OpPrereqError("invalid boot order specified,"
4401
                                     " must be one or more of [acdn]"
4402
                                     " or 'default'")
4403

    
4404
    # hvm_cdrom_image_path verification
4405
    if self.op.hvm_cdrom_image_path is not None:
4406
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4407
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4408
                                   " be an absolute path or None, not %s" %
4409
                                   self.op.hvm_cdrom_image_path)
4410
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4411
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4412
                                   " regular file or a symlink pointing to"
4413
                                   " an existing regular file, not %s" %
4414
                                   self.op.hvm_cdrom_image_path)
4415

    
4416
    # vnc_bind_address verification
4417
    if self.op.vnc_bind_address is not None:
4418
      if not utils.IsValidIP(self.op.vnc_bind_address):
4419
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4420
                                   " like a valid IP address" %
4421
                                   self.op.vnc_bind_address)
4422

    
4423
    instance = self.cfg.GetInstanceInfo(
4424
      self.cfg.ExpandInstanceName(self.op.instance_name))
4425
    if instance is None:
4426
      raise errors.OpPrereqError("No such instance name '%s'" %
4427
                                 self.op.instance_name)
4428
    self.op.instance_name = instance.name
4429
    self.instance = instance
4430
    return
4431

    
4432
  def Exec(self, feedback_fn):
4433
    """Modifies an instance.
4434

4435
    All parameters take effect only at the next restart of the instance.
4436
    """
4437
    result = []
4438
    instance = self.instance
4439
    if self.mem:
4440
      instance.memory = self.mem
4441
      result.append(("mem", self.mem))
4442
    if self.vcpus:
4443
      instance.vcpus = self.vcpus
4444
      result.append(("vcpus",  self.vcpus))
4445
    if self.do_ip:
4446
      instance.nics[0].ip = self.ip
4447
      result.append(("ip", self.ip))
4448
    if self.bridge:
4449
      instance.nics[0].bridge = self.bridge
4450
      result.append(("bridge", self.bridge))
4451
    if self.mac:
4452
      instance.nics[0].mac = self.mac
4453
      result.append(("mac", self.mac))
4454
    if self.do_kernel_path:
4455
      instance.kernel_path = self.kernel_path
4456
      result.append(("kernel_path", self.kernel_path))
4457
    if self.do_initrd_path:
4458
      instance.initrd_path = self.initrd_path
4459
      result.append(("initrd_path", self.initrd_path))
4460
    if self.hvm_boot_order:
4461
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4462
        instance.hvm_boot_order = None
4463
      else:
4464
        instance.hvm_boot_order = self.hvm_boot_order
4465
      result.append(("hvm_boot_order", self.hvm_boot_order))
4466
    if self.hvm_acpi:
4467
      instance.hvm_acpi = self.hvm_acpi
4468
      result.append(("hvm_acpi", self.hvm_acpi))
4469
    if self.hvm_pae:
4470
      instance.hvm_pae = self.hvm_pae
4471
      result.append(("hvm_pae", self.hvm_pae))
4472
    if self.hvm_cdrom_image_path:
4473
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4474
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4475
    if self.vnc_bind_address:
4476
      instance.vnc_bind_address = self.vnc_bind_address
4477
      result.append(("vnc_bind_address", self.vnc_bind_address))
4478

    
4479
    self.cfg.AddInstance(instance)
4480

    
4481
    return result
4482

    
4483

    
4484
class LUQueryExports(NoHooksLU):
4485
  """Query the exports list
4486

4487
  """
4488
  _OP_REQP = []
4489

    
4490
  def CheckPrereq(self):
4491
    """Check that the nodelist contains only existing nodes.
4492

4493
    """
4494
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4495

    
4496
  def Exec(self, feedback_fn):
4497
    """Compute the list of all the exported system images.
4498

4499
    Returns:
4500
      a dictionary with the structure node->(export-list)
4501
      where export-list is a list of the instances exported on
4502
      that node.
4503

4504
    """
4505
    return rpc.call_export_list(self.nodes)
4506

    
4507

    
4508
class LUExportInstance(LogicalUnit):
4509
  """Export an instance to an image in the cluster.
4510

4511
  """
4512
  HPATH = "instance-export"
4513
  HTYPE = constants.HTYPE_INSTANCE
4514
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4515

    
4516
  def BuildHooksEnv(self):
4517
    """Build hooks env.
4518

4519
    This will run on the master, primary node and target node.
4520

4521
    """
4522
    env = {
4523
      "EXPORT_NODE": self.op.target_node,
4524
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4525
      }
4526
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4527
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4528
          self.op.target_node]
4529
    return env, nl, nl
4530

    
4531
  def CheckPrereq(self):
4532
    """Check prerequisites.
4533

4534
    This checks that the instance and node names are valid.
4535

4536
    """
4537
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4538
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4539
    if self.instance is None:
4540
      raise errors.OpPrereqError("Instance '%s' not found" %
4541
                                 self.op.instance_name)
4542

    
4543
    # node verification
4544
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4545
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4546

    
4547
    if self.dst_node is None:
4548
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4549
                                 self.op.target_node)
4550
    self.op.target_node = self.dst_node.name
4551

    
4552
    # instance disk type verification
4553
    for disk in self.instance.disks:
4554
      if disk.dev_type == constants.LD_FILE:
4555
        raise errors.OpPrereqError("Export not supported for instances with"
4556
                                   " file-based disks")
4557

    
4558
  def Exec(self, feedback_fn):
4559
    """Export an instance to an image in the cluster.
4560

4561
    """
4562
    instance = self.instance
4563
    dst_node = self.dst_node
4564
    src_node = instance.primary_node
4565
    if self.op.shutdown:
4566
      # shutdown the instance, but not the disks
4567
      if not rpc.call_instance_shutdown(src_node, instance):
4568
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4569
                                  (instance.name, src_node))
4570

    
4571
    vgname = self.cfg.GetVGName()
4572

    
4573
    snap_disks = []
4574

    
4575
    try:
4576
      for disk in instance.disks:
4577
        if disk.iv_name == "sda":
4578
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4579
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4580

    
4581
          if not new_dev_name:
4582
            logger.Error("could not snapshot block device %s on node %s" %
4583
                         (disk.logical_id[1], src_node))
4584
          else:
4585
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4586
                                      logical_id=(vgname, new_dev_name),
4587
                                      physical_id=(vgname, new_dev_name),
4588
                                      iv_name=disk.iv_name)
4589
            snap_disks.append(new_dev)
4590

    
4591
    finally:
4592
      if self.op.shutdown and instance.status == "up":
4593
        if not rpc.call_instance_start(src_node, instance, None):
4594
          _ShutdownInstanceDisks(instance, self.cfg)
4595
          raise errors.OpExecError("Could not start instance")
4596

    
4597
    # TODO: check for size
4598

    
4599
    for dev in snap_disks:
4600
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4601
        logger.Error("could not export block device %s from node %s to node %s"
4602
                     % (dev.logical_id[1], src_node, dst_node.name))
4603
      if not rpc.call_blockdev_remove(src_node, dev):
4604
        logger.Error("could not remove snapshot block device %s from node %s" %
4605
                     (dev.logical_id[1], src_node))
4606

    
4607
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4608
      logger.Error("could not finalize export for instance %s on node %s" %
4609
                   (instance.name, dst_node.name))
4610

    
4611
    nodelist = self.cfg.GetNodeList()
4612
    nodelist.remove(dst_node.name)
4613

    
4614
    # on one-node clusters nodelist will be empty after the removal
4615
    # if we proceed the backup would be removed because OpQueryExports
4616
    # substitutes an empty list with the full cluster node list.
4617
    if nodelist:
4618
      op = opcodes.OpQueryExports(nodes=nodelist)
4619
      exportlist = self.proc.ChainOpCode(op)
4620
      for node in exportlist:
4621
        if instance.name in exportlist[node]:
4622
          if not rpc.call_export_remove(node, instance.name):
4623
            logger.Error("could not remove older export for instance %s"
4624
                         " on node %s" % (instance.name, node))
4625

    
4626

    
4627
class LURemoveExport(NoHooksLU):
4628
  """Remove exports related to the named instance.
4629

4630
  """
4631
  _OP_REQP = ["instance_name"]
4632

    
4633
  def CheckPrereq(self):
4634
    """Check prerequisites.
4635
    """
4636
    pass
4637

    
4638
  def Exec(self, feedback_fn):
4639
    """Remove any export.
4640

4641
    """
4642
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4643
    # If the instance was not found we'll try with the name that was passed in.
4644
    # This will only work if it was an FQDN, though.
4645
    fqdn_warn = False
4646
    if not instance_name:
4647
      fqdn_warn = True
4648
      instance_name = self.op.instance_name
4649

    
4650
    op = opcodes.OpQueryExports(nodes=[])
4651
    exportlist = self.proc.ChainOpCode(op)
4652
    found = False
4653
    for node in exportlist:
4654
      if instance_name in exportlist[node]:
4655
        found = True
4656
        if not rpc.call_export_remove(node, instance_name):
4657
          logger.Error("could not remove export for instance %s"
4658
                       " on node %s" % (instance_name, node))
4659

    
4660
    if fqdn_warn and not found:
4661
      feedback_fn("Export not found. If trying to remove an export belonging"
4662
                  " to a deleted instance please use its Fully Qualified"
4663
                  " Domain Name.")
4664

    
4665

    
4666
class TagsLU(NoHooksLU):
4667
  """Generic tags LU.
4668

4669
  This is an abstract class which is the parent of all the other tags LUs.
4670

4671
  """
4672
  def CheckPrereq(self):
4673
    """Check prerequisites.
4674

4675
    """
4676
    if self.op.kind == constants.TAG_CLUSTER:
4677
      self.target = self.cfg.GetClusterInfo()
4678
    elif self.op.kind == constants.TAG_NODE:
4679
      name = self.cfg.ExpandNodeName(self.op.name)
4680
      if name is None:
4681
        raise errors.OpPrereqError("Invalid node name (%s)" %
4682
                                   (self.op.name,))
4683
      self.op.name = name
4684
      self.target = self.cfg.GetNodeInfo(name)
4685
    elif self.op.kind == constants.TAG_INSTANCE:
4686
      name = self.cfg.ExpandInstanceName(self.op.name)
4687
      if name is None:
4688
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4689
                                   (self.op.name,))
4690
      self.op.name = name
4691
      self.target = self.cfg.GetInstanceInfo(name)
4692
    else:
4693
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4694
                                 str(self.op.kind))
4695

    
4696

    
4697
class LUGetTags(TagsLU):
4698
  """Returns the tags of a given object.
4699

4700
  """
4701
  _OP_REQP = ["kind", "name"]
4702

    
4703
  def Exec(self, feedback_fn):
4704
    """Returns the tag list.
4705

4706
    """
4707
    return self.target.GetTags()
4708

    
4709

    
4710
class LUSearchTags(NoHooksLU):
4711
  """Searches the tags for a given pattern.
4712

4713
  """
4714
  _OP_REQP = ["pattern"]
4715

    
4716
  def CheckPrereq(self):
4717
    """Check prerequisites.
4718

4719
    This checks the pattern passed for validity by compiling it.
4720

4721
    """
4722
    try:
4723
      self.re = re.compile(self.op.pattern)
4724
    except re.error, err:
4725
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4726
                                 (self.op.pattern, err))
4727

    
4728
  def Exec(self, feedback_fn):
4729
    """Returns the tag list.
4730

4731
    """
4732
    cfg = self.cfg
4733
    tgts = [("/cluster", cfg.GetClusterInfo())]
4734
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4735
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4736
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4737
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4738
    results = []
4739
    for path, target in tgts:
4740
      for tag in target.GetTags():
4741
        if self.re.search(tag):
4742
          results.append((path, tag))
4743
    return results
4744

    
4745

    
4746
class LUAddTags(TagsLU):
4747
  """Sets a tag on a given object.
4748

4749
  """
4750
  _OP_REQP = ["kind", "name", "tags"]
4751

    
4752
  def CheckPrereq(self):
4753
    """Check prerequisites.
4754

4755
    This checks the type and length of the tag name and value.
4756

4757
    """
4758
    TagsLU.CheckPrereq(self)
4759
    for tag in self.op.tags:
4760
      objects.TaggableObject.ValidateTag(tag)
4761

    
4762
  def Exec(self, feedback_fn):
4763
    """Sets the tag.
4764

4765
    """
4766
    try:
4767
      for tag in self.op.tags:
4768
        self.target.AddTag(tag)
4769
    except errors.TagError, err:
4770
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4771
    try:
4772
      self.cfg.Update(self.target)
4773
    except errors.ConfigurationError:
4774
      raise errors.OpRetryError("There has been a modification to the"
4775
                                " config file and the operation has been"
4776
                                " aborted. Please retry.")
4777

    
4778

    
4779
class LUDelTags(TagsLU):
4780
  """Delete a list of tags from a given object.
4781

4782
  """
4783
  _OP_REQP = ["kind", "name", "tags"]
4784

    
4785
  def CheckPrereq(self):
4786
    """Check prerequisites.
4787

4788
    This checks that we have the given tag.
4789

4790
    """
4791
    TagsLU.CheckPrereq(self)
4792
    for tag in self.op.tags:
4793
      objects.TaggableObject.ValidateTag(tag)
4794
    del_tags = frozenset(self.op.tags)
4795
    cur_tags = self.target.GetTags()
4796
    if not del_tags <= cur_tags:
4797
      diff_tags = del_tags - cur_tags
4798
      diff_names = ["'%s'" % tag for tag in diff_tags]
4799
      diff_names.sort()
4800
      raise errors.OpPrereqError("Tag(s) %s not found" %
4801
                                 (",".join(diff_names)))
4802

    
4803
  def Exec(self, feedback_fn):
4804
    """Remove the tag from the object.
4805

4806
    """
4807
    for tag in self.op.tags:
4808
      self.target.RemoveTag(tag)
4809
    try:
4810
      self.cfg.Update(self.target)
4811
    except errors.ConfigurationError:
4812
      raise errors.OpRetryError("There has been a modification to the"
4813
                                " config file and the operation has been"
4814
                                " aborted. Please retry.")
4815

    
4816
class LUTestDelay(NoHooksLU):
4817
  """Sleep for a specified amount of time.
4818

4819
  This LU sleeps on the master and/or nodes for a specified amoutn of
4820
  time.
4821

4822
  """
4823
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4824

    
4825
  def CheckPrereq(self):
4826
    """Check prerequisites.
4827

4828
    This checks that we have a good list of nodes and/or the duration
4829
    is valid.
4830

4831
    """
4832

    
4833
    if self.op.on_nodes:
4834
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4835

    
4836
  def Exec(self, feedback_fn):
4837
    """Do the actual sleep.
4838

4839
    """
4840
    if self.op.on_master:
4841
      if not utils.TestDelay(self.op.duration):
4842
        raise errors.OpExecError("Error during master delay test")
4843
    if self.op.on_nodes:
4844
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4845
      if not result:
4846
        raise errors.OpExecError("Complete failure from rpc call")
4847
      for node, node_result in result.items():
4848
        if not node_result:
4849
          raise errors.OpExecError("Failure during rpc call to node %s,"
4850
                                   " result: %s" % (node, node_result))
4851

    
4852

    
4853
class IAllocator(object):
4854
  """IAllocator framework.
4855

4856
  An IAllocator instance has three sets of attributes:
4857
    - cfg/sstore that are needed to query the cluster
4858
    - input data (all members of the _KEYS class attribute are required)
4859
    - four buffer attributes (in|out_data|text), that represent the
4860
      input (to the external script) in text and data structure format,
4861
      and the output from it, again in two formats
4862
    - the result variables from the script (success, info, nodes) for
4863
      easy usage
4864

4865
  """
4866
  _ALLO_KEYS = [
4867
    "mem_size", "disks", "disk_template",
4868
    "os", "tags", "nics", "vcpus",
4869
    ]
4870
  _RELO_KEYS = [
4871
    "relocate_from",
4872
    ]
4873

    
4874
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4875
    self.cfg = cfg
4876
    self.sstore = sstore
4877
    # init buffer variables
4878
    self.in_text = self.out_text = self.in_data = self.out_data = None
4879
    # init all input fields so that pylint is happy
4880
    self.mode = mode
4881
    self.name = name
4882
    self.mem_size = self.disks = self.disk_template = None
4883
    self.os = self.tags = self.nics = self.vcpus = None
4884
    self.relocate_from = None
4885
    # computed fields
4886
    self.required_nodes = None
4887
    # init result fields
4888
    self.success = self.info = self.nodes = None
4889
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4890
      keyset = self._ALLO_KEYS
4891
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4892
      keyset = self._RELO_KEYS
4893
    else:
4894
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4895
                                   " IAllocator" % self.mode)
4896
    for key in kwargs:
4897
      if key not in keyset:
4898
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4899
                                     " IAllocator" % key)
4900
      setattr(self, key, kwargs[key])
4901
    for key in keyset:
4902
      if key not in kwargs:
4903
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4904
                                     " IAllocator" % key)
4905
    self._BuildInputData()
4906

    
4907
  def _ComputeClusterData(self):
4908
    """Compute the generic allocator input data.
4909

4910
    This is the data that is independent of the actual operation.
4911

4912
    """
4913
    cfg = self.cfg
4914
    # cluster data
4915
    data = {
4916
      "version": 1,
4917
      "cluster_name": self.sstore.GetClusterName(),
4918
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4919
      "hypervisor_type": self.sstore.GetHypervisorType(),
4920
      # we don't have job IDs
4921
      }
4922

    
4923
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4924

    
4925
    # node data
4926
    node_results = {}
4927
    node_list = cfg.GetNodeList()
4928
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4929
    for nname in node_list:
4930
      ninfo = cfg.GetNodeInfo(nname)
4931
      if nname not in node_data or not isinstance(node_data[nname], dict):
4932
        raise errors.OpExecError("Can't get data for node %s" % nname)
4933
      remote_info = node_data[nname]
4934
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4935
                   'vg_size', 'vg_free', 'cpu_total']:
4936
        if attr not in remote_info:
4937
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4938
                                   (nname, attr))
4939
        try:
4940
          remote_info[attr] = int(remote_info[attr])
4941
        except ValueError, err:
4942
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4943
                                   " %s" % (nname, attr, str(err)))
4944
      # compute memory used by primary instances
4945
      i_p_mem = i_p_up_mem = 0
4946
      for iinfo in i_list:
4947
        if iinfo.primary_node == nname:
4948
          i_p_mem += iinfo.memory
4949
          if iinfo.status == "up":
4950
            i_p_up_mem += iinfo.memory
4951

    
4952
      # compute memory used by instances
4953
      pnr = {
4954
        "tags": list(ninfo.GetTags()),
4955
        "total_memory": remote_info['memory_total'],
4956
        "reserved_memory": remote_info['memory_dom0'],
4957
        "free_memory": remote_info['memory_free'],
4958
        "i_pri_memory": i_p_mem,
4959
        "i_pri_up_memory": i_p_up_mem,
4960
        "total_disk": remote_info['vg_size'],
4961
        "free_disk": remote_info['vg_free'],
4962
        "primary_ip": ninfo.primary_ip,
4963
        "secondary_ip": ninfo.secondary_ip,
4964
        "total_cpus": remote_info['cpu_total'],
4965
        }
4966
      node_results[nname] = pnr
4967
    data["nodes"] = node_results
4968

    
4969
    # instance data
4970
    instance_data = {}
4971
    for iinfo in i_list:
4972
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4973
                  for n in iinfo.nics]
4974
      pir = {
4975
        "tags": list(iinfo.GetTags()),
4976
        "should_run": iinfo.status == "up",
4977
        "vcpus": iinfo.vcpus,
4978
        "memory": iinfo.memory,
4979
        "os": iinfo.os,
4980
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4981
        "nics": nic_data,
4982
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4983
        "disk_template": iinfo.disk_template,
4984
        }
4985
      instance_data[iinfo.name] = pir
4986

    
4987
    data["instances"] = instance_data
4988

    
4989
    self.in_data = data
4990

    
4991
  def _AddNewInstance(self):
4992
    """Add new instance data to allocator structure.
4993

4994
    This in combination with _AllocatorGetClusterData will create the
4995
    correct structure needed as input for the allocator.
4996

4997
    The checks for the completeness of the opcode must have already been
4998
    done.
4999

5000
    """
5001
    data = self.in_data
5002
    if len(self.disks) != 2:
5003
      raise errors.OpExecError("Only two-disk configurations supported")
5004

    
5005
    disk_space = _ComputeDiskSize(self.disk_template,
5006
                                  self.disks[0]["size"], self.disks[1]["size"])
5007

    
5008
    if self.disk_template in constants.DTS_NET_MIRROR:
5009
      self.required_nodes = 2
5010
    else:
5011
      self.required_nodes = 1
5012
    request = {
5013
      "type": "allocate",
5014
      "name": self.name,
5015
      "disk_template": self.disk_template,
5016
      "tags": self.tags,
5017
      "os": self.os,
5018
      "vcpus": self.vcpus,
5019
      "memory": self.mem_size,
5020
      "disks": self.disks,
5021
      "disk_space_total": disk_space,
5022
      "nics": self.nics,
5023
      "required_nodes": self.required_nodes,
5024
      }
5025
    data["request"] = request
5026

    
5027
  def _AddRelocateInstance(self):
5028
    """Add relocate instance data to allocator structure.
5029

5030
    This in combination with _IAllocatorGetClusterData will create the
5031
    correct structure needed as input for the allocator.
5032

5033
    The checks for the completeness of the opcode must have already been
5034
    done.
5035

5036
    """
5037
    instance = self.cfg.GetInstanceInfo(self.name)
5038
    if instance is None:
5039
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5040
                                   " IAllocator" % self.name)
5041

    
5042
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5043
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5044

    
5045
    if len(instance.secondary_nodes) != 1:
5046
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5047

    
5048
    self.required_nodes = 1
5049

    
5050
    disk_space = _ComputeDiskSize(instance.disk_template,
5051
                                  instance.disks[0].size,
5052
                                  instance.disks[1].size)
5053

    
5054
    request = {
5055
      "type": "relocate",
5056
      "name": self.name,
5057
      "disk_space_total": disk_space,
5058
      "required_nodes": self.required_nodes,
5059
      "relocate_from": self.relocate_from,
5060
      }
5061
    self.in_data["request"] = request
5062

    
5063
  def _BuildInputData(self):
5064
    """Build input data structures.
5065

5066
    """
5067
    self._ComputeClusterData()
5068

    
5069
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5070
      self._AddNewInstance()
5071
    else:
5072
      self._AddRelocateInstance()
5073

    
5074
    self.in_text = serializer.Dump(self.in_data)
5075

    
5076
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5077
    """Run an instance allocator and return the results.
5078

5079
    """
5080
    data = self.in_text
5081

    
5082
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5083

    
5084
    if not isinstance(result, tuple) or len(result) != 4:
5085
      raise errors.OpExecError("Invalid result from master iallocator runner")
5086

    
5087
    rcode, stdout, stderr, fail = result
5088

    
5089
    if rcode == constants.IARUN_NOTFOUND:
5090
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5091
    elif rcode == constants.IARUN_FAILURE:
5092
        raise errors.OpExecError("Instance allocator call failed: %s,"
5093
                                 " output: %s" %
5094
                                 (fail, stdout+stderr))
5095
    self.out_text = stdout
5096
    if validate:
5097
      self._ValidateResult()
5098

    
5099
  def _ValidateResult(self):
5100
    """Process the allocator results.
5101

5102
    This will process and if successful save the result in
5103
    self.out_data and the other parameters.
5104

5105
    """
5106
    try:
5107
      rdict = serializer.Load(self.out_text)
5108
    except Exception, err:
5109
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5110

    
5111
    if not isinstance(rdict, dict):
5112
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5113

    
5114
    for key in "success", "info", "nodes":
5115
      if key not in rdict:
5116
        raise errors.OpExecError("Can't parse iallocator results:"
5117
                                 " missing key '%s'" % key)
5118
      setattr(self, key, rdict[key])
5119

    
5120
    if not isinstance(rdict["nodes"], list):
5121
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5122
                               " is not a list")
5123
    self.out_data = rdict
5124

    
5125

    
5126
class LUTestAllocator(NoHooksLU):
5127
  """Run allocator tests.
5128

5129
  This LU runs the allocator tests
5130

5131
  """
5132
  _OP_REQP = ["direction", "mode", "name"]
5133

    
5134
  def CheckPrereq(self):
5135
    """Check prerequisites.
5136

5137
    This checks the opcode parameters depending on the director and mode test.
5138

5139
    """
5140
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5141
      for attr in ["name", "mem_size", "disks", "disk_template",
5142
                   "os", "tags", "nics", "vcpus"]:
5143
        if not hasattr(self.op, attr):
5144
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5145
                                     attr)
5146
      iname = self.cfg.ExpandInstanceName(self.op.name)
5147
      if iname is not None:
5148
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5149
                                   iname)
5150
      if not isinstance(self.op.nics, list):
5151
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5152
      for row in self.op.nics:
5153
        if (not isinstance(row, dict) or
5154
            "mac" not in row or
5155
            "ip" not in row or
5156
            "bridge" not in row):
5157
          raise errors.OpPrereqError("Invalid contents of the"
5158
                                     " 'nics' parameter")
5159
      if not isinstance(self.op.disks, list):
5160
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5161
      if len(self.op.disks) != 2:
5162
        raise errors.OpPrereqError("Only two-disk configurations supported")
5163
      for row in self.op.disks:
5164
        if (not isinstance(row, dict) or
5165
            "size" not in row or
5166
            not isinstance(row["size"], int) or
5167
            "mode" not in row or
5168
            row["mode"] not in ['r', 'w']):
5169
          raise errors.OpPrereqError("Invalid contents of the"
5170
                                     " 'disks' parameter")
5171
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5172
      if not hasattr(self.op, "name"):
5173
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5174
      fname = self.cfg.ExpandInstanceName(self.op.name)
5175
      if fname is None:
5176
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5177
                                   self.op.name)
5178
      self.op.name = fname
5179
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5180
    else:
5181
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5182
                                 self.op.mode)
5183

    
5184
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5185
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5186
        raise errors.OpPrereqError("Missing allocator name")
5187
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5188
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5189
                                 self.op.direction)
5190

    
5191
  def Exec(self, feedback_fn):
5192
    """Run the allocator test.
5193

5194
    """
5195
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5196
      ial = IAllocator(self.cfg, self.sstore,
5197
                       mode=self.op.mode,
5198
                       name=self.op.name,
5199
                       mem_size=self.op.mem_size,
5200
                       disks=self.op.disks,
5201
                       disk_template=self.op.disk_template,
5202
                       os=self.op.os,
5203
                       tags=self.op.tags,
5204
                       nics=self.op.nics,
5205
                       vcpus=self.op.vcpus,
5206
                       )
5207
    else:
5208
      ial = IAllocator(self.cfg, self.sstore,
5209
                       mode=self.op.mode,
5210
                       name=self.op.name,
5211
                       relocate_from=list(self.relocate_from),
5212
                       )
5213

    
5214
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5215
      result = ial.in_text
5216
    else:
5217
      ial.Run(self.op.allocator, validate=False)
5218
      result = ial.out_text
5219
    return result