Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 8a12ce45

History | View | Annotate | Download (176.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
545

    
546
    # set up the inter-node password and certificate
547
    _InitGanetiServerSetup(ss)
548

    
549
    # start the master ip
550
    rpc.call_node_start_master(hostname.name)
551

    
552
    # set up ssh config and /etc/hosts
553
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
554
    try:
555
      sshline = f.read()
556
    finally:
557
      f.close()
558
    sshkey = sshline.split(" ")[1]
559

    
560
    _AddHostToEtcHosts(hostname.name)
561
    _InitSSHSetup(hostname.name)
562

    
563
    # init of cluster config file
564
    self.cfg = cfgw = config.ConfigWriter()
565
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
566
                    sshkey, self.op.mac_prefix,
567
                    self.op.vg_name, self.op.def_bridge)
568

    
569
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
570

    
571

    
572
class LUDestroyCluster(NoHooksLU):
573
  """Logical unit for destroying the cluster.
574

575
  """
576
  _OP_REQP = []
577

    
578
  def CheckPrereq(self):
579
    """Check prerequisites.
580

581
    This checks whether the cluster is empty.
582

583
    Any errors are signalled by raising errors.OpPrereqError.
584

585
    """
586
    master = self.sstore.GetMasterNode()
587

    
588
    nodelist = self.cfg.GetNodeList()
589
    if len(nodelist) != 1 or nodelist[0] != master:
590
      raise errors.OpPrereqError("There are still %d node(s) in"
591
                                 " this cluster." % (len(nodelist) - 1))
592
    instancelist = self.cfg.GetInstanceList()
593
    if instancelist:
594
      raise errors.OpPrereqError("There are still %d instance(s) in"
595
                                 " this cluster." % len(instancelist))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    master = self.sstore.GetMasterNode()
602
    if not rpc.call_node_stop_master(master):
603
      raise errors.OpExecError("Could not disable the master role")
604
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
605
    utils.CreateBackup(priv_key)
606
    utils.CreateBackup(pub_key)
607
    rpc.call_node_leave_cluster(master)
608

    
609

    
610
class LUVerifyCluster(LogicalUnit):
611
  """Verifies the cluster status.
612

613
  """
614
  HPATH = "cluster-verify"
615
  HTYPE = constants.HTYPE_CLUSTER
616
  _OP_REQP = ["skip_checks"]
617

    
618
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
619
                  remote_version, feedback_fn):
620
    """Run multiple tests against a node.
621

622
    Test list:
623
      - compares ganeti version
624
      - checks vg existance and size > 20G
625
      - checks config file checksum
626
      - checks ssh to other nodes
627

628
    Args:
629
      node: name of the node to check
630
      file_list: required list of files
631
      local_cksum: dictionary of local files and their checksums
632

633
    """
634
    # compares ganeti version
635
    local_version = constants.PROTOCOL_VERSION
636
    if not remote_version:
637
      feedback_fn("  - ERROR: connection to %s failed" % (node))
638
      return True
639

    
640
    if local_version != remote_version:
641
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
642
                      (local_version, node, remote_version))
643
      return True
644

    
645
    # checks vg existance and size > 20G
646

    
647
    bad = False
648
    if not vglist:
649
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
650
                      (node,))
651
      bad = True
652
    else:
653
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
654
      if vgstatus:
655
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
656
        bad = True
657

    
658
    # checks config file checksum
659
    # checks ssh to any
660

    
661
    if 'filelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
664
    else:
665
      remote_cksum = node_result['filelist']
666
      for file_name in file_list:
667
        if file_name not in remote_cksum:
668
          bad = True
669
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
670
        elif remote_cksum[file_name] != local_cksum[file_name]:
671
          bad = True
672
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
673

    
674
    if 'nodelist' not in node_result:
675
      bad = True
676
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
677
    else:
678
      if node_result['nodelist']:
679
        bad = True
680
        for node in node_result['nodelist']:
681
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
682
                          (node, node_result['nodelist'][node]))
683
    if 'node-net-test' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
686
    else:
687
      if node_result['node-net-test']:
688
        bad = True
689
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
690
        for node in nlist:
691
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
692
                          (node, node_result['node-net-test'][node]))
693

    
694
    hyp_result = node_result.get('hypervisor', None)
695
    if hyp_result is not None:
696
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
697
    return bad
698

    
699
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
700
                      node_instance, feedback_fn):
701
    """Verify an instance.
702

703
    This function checks to see if the required block devices are
704
    available on the instance's node.
705

706
    """
707
    bad = False
708

    
709
    node_current = instanceconfig.primary_node
710

    
711
    node_vol_should = {}
712
    instanceconfig.MapLVsByNode(node_vol_should)
713

    
714
    for node in node_vol_should:
715
      for volume in node_vol_should[node]:
716
        if node not in node_vol_is or volume not in node_vol_is[node]:
717
          feedback_fn("  - ERROR: volume %s missing on node %s" %
718
                          (volume, node))
719
          bad = True
720

    
721
    if not instanceconfig.status == 'down':
722
      if (node_current not in node_instance or
723
          not instance in node_instance[node_current]):
724
        feedback_fn("  - ERROR: instance %s not running on node %s" %
725
                        (instance, node_current))
726
        bad = True
727

    
728
    for node in node_instance:
729
      if (not node == node_current):
730
        if instance in node_instance[node]:
731
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
732
                          (instance, node))
733
          bad = True
734

    
735
    return bad
736

    
737
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
738
    """Verify if there are any unknown volumes in the cluster.
739

740
    The .os, .swap and backup volumes are ignored. All other volumes are
741
    reported as unknown.
742

743
    """
744
    bad = False
745

    
746
    for node in node_vol_is:
747
      for volume in node_vol_is[node]:
748
        if node not in node_vol_should or volume not in node_vol_should[node]:
749
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
750
                      (volume, node))
751
          bad = True
752
    return bad
753

    
754
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
755
    """Verify the list of running instances.
756

757
    This checks what instances are running but unknown to the cluster.
758

759
    """
760
    bad = False
761
    for node in node_instance:
762
      for runninginstance in node_instance[node]:
763
        if runninginstance not in instancelist:
764
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
765
                          (runninginstance, node))
766
          bad = True
767
    return bad
768

    
769
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
770
    """Verify N+1 Memory Resilience.
771

772
    Check that if one single node dies we can still start all the instances it
773
    was primary for.
774

775
    """
776
    bad = False
777

    
778
    for node, nodeinfo in node_info.iteritems():
779
      # This code checks that every node which is now listed as secondary has
780
      # enough memory to host all instances it is supposed to should a single
781
      # other node in the cluster fail.
782
      # FIXME: not ready for failover to an arbitrary node
783
      # FIXME: does not support file-backed instances
784
      # WARNING: we currently take into account down instances as well as up
785
      # ones, considering that even if they're down someone might want to start
786
      # them even in the event of a node failure.
787
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
788
        needed_mem = 0
789
        for instance in instances:
790
          needed_mem += instance_cfg[instance].memory
791
        if nodeinfo['mfree'] < needed_mem:
792
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
793
                      " failovers should node %s fail" % (node, prinode))
794
          bad = True
795
    return bad
796

    
797
  def CheckPrereq(self):
798
    """Check prerequisites.
799

800
    Transform the list of checks we're going to skip into a set and check that
801
    all its members are valid.
802

803
    """
804
    self.skip_set = frozenset(self.op.skip_checks)
805
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
806
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    Cluster-Verify hooks just rone in the post phase and their failure makes
812
    the output be logged in the verify output and the verification to fail.
813

814
    """
815
    all_nodes = self.cfg.GetNodeList()
816
    # TODO: populate the environment with useful information for verify hooks
817
    env = {}
818
    return env, [], all_nodes
819

    
820
  def Exec(self, feedback_fn):
821
    """Verify integrity of cluster, performing various test on nodes.
822

823
    """
824
    bad = False
825
    feedback_fn("* Verifying global settings")
826
    for msg in self.cfg.VerifyConfig():
827
      feedback_fn("  - ERROR: %s" % msg)
828

    
829
    vg_name = self.cfg.GetVGName()
830
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
831
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
832
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
833
    i_non_redundant = [] # Non redundant instances
834
    node_volume = {}
835
    node_instance = {}
836
    node_info = {}
837
    instance_cfg = {}
838

    
839
    # FIXME: verify OS list
840
    # do local checksums
841
    file_names = list(self.sstore.GetFileList())
842
    file_names.append(constants.SSL_CERT_FILE)
843
    file_names.append(constants.CLUSTER_CONF_FILE)
844
    local_checksums = utils.FingerprintFiles(file_names)
845

    
846
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
847
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
848
    all_instanceinfo = rpc.call_instance_list(nodelist)
849
    all_vglist = rpc.call_vg_list(nodelist)
850
    node_verify_param = {
851
      'filelist': file_names,
852
      'nodelist': nodelist,
853
      'hypervisor': None,
854
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
855
                        for node in nodeinfo]
856
      }
857
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858
    all_rversion = rpc.call_version(nodelist)
859
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
860

    
861
    for node in nodelist:
862
      feedback_fn("* Verifying node %s" % node)
863
      result = self._VerifyNode(node, file_names, local_checksums,
864
                                all_vglist[node], all_nvinfo[node],
865
                                all_rversion[node], feedback_fn)
866
      bad = bad or result
867

    
868
      # node_volume
869
      volumeinfo = all_volumeinfo[node]
870

    
871
      if isinstance(volumeinfo, basestring):
872
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
873
                    (node, volumeinfo[-400:].encode('string_escape')))
874
        bad = True
875
        node_volume[node] = {}
876
      elif not isinstance(volumeinfo, dict):
877
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
878
        bad = True
879
        continue
880
      else:
881
        node_volume[node] = volumeinfo
882

    
883
      # node_instance
884
      nodeinstance = all_instanceinfo[node]
885
      if type(nodeinstance) != list:
886
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
887
        bad = True
888
        continue
889

    
890
      node_instance[node] = nodeinstance
891

    
892
      # node_info
893
      nodeinfo = all_ninfo[node]
894
      if not isinstance(nodeinfo, dict):
895
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
896
        bad = True
897
        continue
898

    
899
      try:
900
        node_info[node] = {
901
          "mfree": int(nodeinfo['memory_free']),
902
          "dfree": int(nodeinfo['vg_free']),
903
          "pinst": [],
904
          "sinst": [],
905
          # dictionary holding all instances this node is secondary for,
906
          # grouped by their primary node. Each key is a cluster node, and each
907
          # value is a list of instances which have the key as primary and the
908
          # current node as secondary.  this is handy to calculate N+1 memory
909
          # availability if you can only failover from a primary to its
910
          # secondary.
911
          "sinst-by-pnode": {},
912
        }
913
      except ValueError:
914
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
915
        bad = True
916
        continue
917

    
918
    node_vol_should = {}
919

    
920
    for instance in instancelist:
921
      feedback_fn("* Verifying instance %s" % instance)
922
      inst_config = self.cfg.GetInstanceInfo(instance)
923
      result =  self._VerifyInstance(instance, inst_config, node_volume,
924
                                     node_instance, feedback_fn)
925
      bad = bad or result
926

    
927
      inst_config.MapLVsByNode(node_vol_should)
928

    
929
      instance_cfg[instance] = inst_config
930

    
931
      pnode = inst_config.primary_node
932
      if pnode in node_info:
933
        node_info[pnode]['pinst'].append(instance)
934
      else:
935
        feedback_fn("  - ERROR: instance %s, connection to primary node"
936
                    " %s failed" % (instance, pnode))
937
        bad = True
938

    
939
      # If the instance is non-redundant we cannot survive losing its primary
940
      # node, so we are not N+1 compliant. On the other hand we have no disk
941
      # templates with more than one secondary so that situation is not well
942
      # supported either.
943
      # FIXME: does not support file-backed instances
944
      if len(inst_config.secondary_nodes) == 0:
945
        i_non_redundant.append(instance)
946
      elif len(inst_config.secondary_nodes) > 1:
947
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
948
                    % instance)
949

    
950
      for snode in inst_config.secondary_nodes:
951
        if snode in node_info:
952
          node_info[snode]['sinst'].append(instance)
953
          if pnode not in node_info[snode]['sinst-by-pnode']:
954
            node_info[snode]['sinst-by-pnode'][pnode] = []
955
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
956
        else:
957
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
958
                      " %s failed" % (instance, snode))
959

    
960
    feedback_fn("* Verifying orphan volumes")
961
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
962
                                       feedback_fn)
963
    bad = bad or result
964

    
965
    feedback_fn("* Verifying remaining instances")
966
    result = self._VerifyOrphanInstances(instancelist, node_instance,
967
                                         feedback_fn)
968
    bad = bad or result
969

    
970
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
971
      feedback_fn("* Verifying N+1 Memory redundancy")
972
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
973
      bad = bad or result
974

    
975
    feedback_fn("* Other Notes")
976
    if i_non_redundant:
977
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
978
                  % len(i_non_redundant))
979

    
980
    return int(bad)
981

    
982
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
983
    """Analize the post-hooks' result, handle it, and send some
984
    nicely-formatted feedback back to the user.
985

986
    Args:
987
      phase: the hooks phase that has just been run
988
      hooks_results: the results of the multi-node hooks rpc call
989
      feedback_fn: function to send feedback back to the caller
990
      lu_result: previous Exec result
991

992
    """
993
    # We only really run POST phase hooks, and are only interested in their results
994
    if phase == constants.HOOKS_PHASE_POST:
995
      # Used to change hooks' output to proper indentation
996
      indent_re = re.compile('^', re.M)
997
      feedback_fn("* Hooks Results")
998
      if not hooks_results:
999
        feedback_fn("  - ERROR: general communication failure")
1000
        lu_result = 1
1001
      else:
1002
        for node_name in hooks_results:
1003
          show_node_header = True
1004
          res = hooks_results[node_name]
1005
          if res is False or not isinstance(res, list):
1006
            feedback_fn("    Communication failure")
1007
            lu_result = 1
1008
            continue
1009
          for script, hkr, output in res:
1010
            if hkr == constants.HKR_FAIL:
1011
              # The node header is only shown once, if there are
1012
              # failing hooks on that node
1013
              if show_node_header:
1014
                feedback_fn("  Node %s:" % node_name)
1015
                show_node_header = False
1016
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1017
              output = indent_re.sub('      ', output)
1018
              feedback_fn("%s" % output)
1019
              lu_result = 1
1020

    
1021
      return lu_result
1022

    
1023

    
1024
class LUVerifyDisks(NoHooksLU):
1025
  """Verifies the cluster disks status.
1026

1027
  """
1028
  _OP_REQP = []
1029

    
1030
  def CheckPrereq(self):
1031
    """Check prerequisites.
1032

1033
    This has no prerequisites.
1034

1035
    """
1036
    pass
1037

    
1038
  def Exec(self, feedback_fn):
1039
    """Verify integrity of cluster disks.
1040

1041
    """
1042
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1043

    
1044
    vg_name = self.cfg.GetVGName()
1045
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1046
    instances = [self.cfg.GetInstanceInfo(name)
1047
                 for name in self.cfg.GetInstanceList()]
1048

    
1049
    nv_dict = {}
1050
    for inst in instances:
1051
      inst_lvs = {}
1052
      if (inst.status != "up" or
1053
          inst.disk_template not in constants.DTS_NET_MIRROR):
1054
        continue
1055
      inst.MapLVsByNode(inst_lvs)
1056
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1057
      for node, vol_list in inst_lvs.iteritems():
1058
        for vol in vol_list:
1059
          nv_dict[(node, vol)] = inst
1060

    
1061
    if not nv_dict:
1062
      return result
1063

    
1064
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1065

    
1066
    to_act = set()
1067
    for node in nodes:
1068
      # node_volume
1069
      lvs = node_lvs[node]
1070

    
1071
      if isinstance(lvs, basestring):
1072
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1073
        res_nlvm[node] = lvs
1074
      elif not isinstance(lvs, dict):
1075
        logger.Info("connection to node %s failed or invalid data returned" %
1076
                    (node,))
1077
        res_nodes.append(node)
1078
        continue
1079

    
1080
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1081
        inst = nv_dict.pop((node, lv_name), None)
1082
        if (not lv_online and inst is not None
1083
            and inst.name not in res_instances):
1084
          res_instances.append(inst.name)
1085

    
1086
    # any leftover items in nv_dict are missing LVs, let's arrange the
1087
    # data better
1088
    for key, inst in nv_dict.iteritems():
1089
      if inst.name not in res_missing:
1090
        res_missing[inst.name] = []
1091
      res_missing[inst.name].append(key)
1092

    
1093
    return result
1094

    
1095

    
1096
class LURenameCluster(LogicalUnit):
1097
  """Rename the cluster.
1098

1099
  """
1100
  HPATH = "cluster-rename"
1101
  HTYPE = constants.HTYPE_CLUSTER
1102
  _OP_REQP = ["name"]
1103

    
1104
  def BuildHooksEnv(self):
1105
    """Build hooks env.
1106

1107
    """
1108
    env = {
1109
      "OP_TARGET": self.sstore.GetClusterName(),
1110
      "NEW_NAME": self.op.name,
1111
      }
1112
    mn = self.sstore.GetMasterNode()
1113
    return env, [mn], [mn]
1114

    
1115
  def CheckPrereq(self):
1116
    """Verify that the passed name is a valid one.
1117

1118
    """
1119
    hostname = utils.HostInfo(self.op.name)
1120

    
1121
    new_name = hostname.name
1122
    self.ip = new_ip = hostname.ip
1123
    old_name = self.sstore.GetClusterName()
1124
    old_ip = self.sstore.GetMasterIP()
1125
    if new_name == old_name and new_ip == old_ip:
1126
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1127
                                 " cluster has changed")
1128
    if new_ip != old_ip:
1129
      result = utils.RunCmd(["fping", "-q", new_ip])
1130
      if not result.failed:
1131
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1132
                                   " reachable on the network. Aborting." %
1133
                                   new_ip)
1134

    
1135
    self.op.name = new_name
1136

    
1137
  def Exec(self, feedback_fn):
1138
    """Rename the cluster.
1139

1140
    """
1141
    clustername = self.op.name
1142
    ip = self.ip
1143
    ss = self.sstore
1144

    
1145
    # shutdown the master IP
1146
    master = ss.GetMasterNode()
1147
    if not rpc.call_node_stop_master(master):
1148
      raise errors.OpExecError("Could not disable the master role")
1149

    
1150
    try:
1151
      # modify the sstore
1152
      ss.SetKey(ss.SS_MASTER_IP, ip)
1153
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1154

    
1155
      # Distribute updated ss config to all nodes
1156
      myself = self.cfg.GetNodeInfo(master)
1157
      dist_nodes = self.cfg.GetNodeList()
1158
      if myself.name in dist_nodes:
1159
        dist_nodes.remove(myself.name)
1160

    
1161
      logger.Debug("Copying updated ssconf data to all nodes")
1162
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1163
        fname = ss.KeyToFilename(keyname)
1164
        result = rpc.call_upload_file(dist_nodes, fname)
1165
        for to_node in dist_nodes:
1166
          if not result[to_node]:
1167
            logger.Error("copy of file %s to node %s failed" %
1168
                         (fname, to_node))
1169
    finally:
1170
      if not rpc.call_node_start_master(master):
1171
        logger.Error("Could not re-enable the master role on the master,"
1172
                     " please restart manually.")
1173

    
1174

    
1175
def _RecursiveCheckIfLVMBased(disk):
1176
  """Check if the given disk or its children are lvm-based.
1177

1178
  Args:
1179
    disk: ganeti.objects.Disk object
1180

1181
  Returns:
1182
    boolean indicating whether a LD_LV dev_type was found or not
1183

1184
  """
1185
  if disk.children:
1186
    for chdisk in disk.children:
1187
      if _RecursiveCheckIfLVMBased(chdisk):
1188
        return True
1189
  return disk.dev_type == constants.LD_LV
1190

    
1191

    
1192
class LUSetClusterParams(LogicalUnit):
1193
  """Change the parameters of the cluster.
1194

1195
  """
1196
  HPATH = "cluster-modify"
1197
  HTYPE = constants.HTYPE_CLUSTER
1198
  _OP_REQP = []
1199

    
1200
  def BuildHooksEnv(self):
1201
    """Build hooks env.
1202

1203
    """
1204
    env = {
1205
      "OP_TARGET": self.sstore.GetClusterName(),
1206
      "NEW_VG_NAME": self.op.vg_name,
1207
      }
1208
    mn = self.sstore.GetMasterNode()
1209
    return env, [mn], [mn]
1210

    
1211
  def CheckPrereq(self):
1212
    """Check prerequisites.
1213

1214
    This checks whether the given params don't conflict and
1215
    if the given volume group is valid.
1216

1217
    """
1218
    if not self.op.vg_name:
1219
      instances = [self.cfg.GetInstanceInfo(name)
1220
                   for name in self.cfg.GetInstanceList()]
1221
      for inst in instances:
1222
        for disk in inst.disks:
1223
          if _RecursiveCheckIfLVMBased(disk):
1224
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1225
                                       " lvm-based instances exist")
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      node_list = self.cfg.GetNodeList()
1230
      vglist = rpc.call_vg_list(node_list)
1231
      for node in node_list:
1232
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
  def Exec(self, feedback_fn):
1238
    """Change the parameters of the cluster.
1239

1240
    """
1241
    if self.op.vg_name != self.cfg.GetVGName():
1242
      self.cfg.SetVGName(self.op.vg_name)
1243
    else:
1244
      feedback_fn("Cluster LVM configuration already in desired"
1245
                  " state, not changing")
1246

    
1247

    
1248
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1249
  """Sleep and poll for an instance's disk to sync.
1250

1251
  """
1252
  if not instance.disks:
1253
    return True
1254

    
1255
  if not oneshot:
1256
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1257

    
1258
  node = instance.primary_node
1259

    
1260
  for dev in instance.disks:
1261
    cfgw.SetDiskID(dev, node)
1262

    
1263
  retries = 0
1264
  while True:
1265
    max_time = 0
1266
    done = True
1267
    cumul_degraded = False
1268
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1269
    if not rstats:
1270
      proc.LogWarning("Can't get any data from node %s" % node)
1271
      retries += 1
1272
      if retries >= 10:
1273
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1274
                                 " aborting." % node)
1275
      time.sleep(6)
1276
      continue
1277
    retries = 0
1278
    for i in range(len(rstats)):
1279
      mstat = rstats[i]
1280
      if mstat is None:
1281
        proc.LogWarning("Can't compute data for node %s/%s" %
1282
                        (node, instance.disks[i].iv_name))
1283
        continue
1284
      # we ignore the ldisk parameter
1285
      perc_done, est_time, is_degraded, _ = mstat
1286
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1287
      if perc_done is not None:
1288
        done = False
1289
        if est_time is not None:
1290
          rem_time = "%d estimated seconds remaining" % est_time
1291
          max_time = est_time
1292
        else:
1293
          rem_time = "no time estimate"
1294
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1295
                     (instance.disks[i].iv_name, perc_done, rem_time))
1296
    if done or oneshot:
1297
      break
1298

    
1299
    if unlock:
1300
      #utils.Unlock('cmd')
1301
      pass
1302
    try:
1303
      time.sleep(min(60, max_time))
1304
    finally:
1305
      if unlock:
1306
        #utils.Lock('cmd')
1307
        pass
1308

    
1309
  if done:
1310
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1311
  return not cumul_degraded
1312

    
1313

    
1314
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1315
  """Check that mirrors are not degraded.
1316

1317
  The ldisk parameter, if True, will change the test from the
1318
  is_degraded attribute (which represents overall non-ok status for
1319
  the device(s)) to the ldisk (representing the local storage status).
1320

1321
  """
1322
  cfgw.SetDiskID(dev, node)
1323
  if ldisk:
1324
    idx = 6
1325
  else:
1326
    idx = 5
1327

    
1328
  result = True
1329
  if on_primary or dev.AssembleOnSecondary():
1330
    rstats = rpc.call_blockdev_find(node, dev)
1331
    if not rstats:
1332
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1333
      result = False
1334
    else:
1335
      result = result and (not rstats[idx])
1336
  if dev.children:
1337
    for child in dev.children:
1338
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1339

    
1340
  return result
1341

    
1342

    
1343
class LUDiagnoseOS(NoHooksLU):
1344
  """Logical unit for OS diagnose/query.
1345

1346
  """
1347
  _OP_REQP = ["output_fields", "names"]
1348

    
1349
  def CheckPrereq(self):
1350
    """Check prerequisites.
1351

1352
    This always succeeds, since this is a pure query LU.
1353

1354
    """
1355
    if self.op.names:
1356
      raise errors.OpPrereqError("Selective OS query not supported")
1357

    
1358
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1359
    _CheckOutputFields(static=[],
1360
                       dynamic=self.dynamic_fields,
1361
                       selected=self.op.output_fields)
1362

    
1363
  @staticmethod
1364
  def _DiagnoseByOS(node_list, rlist):
1365
    """Remaps a per-node return list into an a per-os per-node dictionary
1366

1367
      Args:
1368
        node_list: a list with the names of all nodes
1369
        rlist: a map with node names as keys and OS objects as values
1370

1371
      Returns:
1372
        map: a map with osnames as keys and as value another map, with
1373
             nodes as
1374
             keys and list of OS objects as values
1375
             e.g. {"debian-etch": {"node1": [<object>,...],
1376
                                   "node2": [<object>,]}
1377
                  }
1378

1379
    """
1380
    all_os = {}
1381
    for node_name, nr in rlist.iteritems():
1382
      if not nr:
1383
        continue
1384
      for os_obj in nr:
1385
        if os_obj.name not in all_os:
1386
          # build a list of nodes for this os containing empty lists
1387
          # for each node in node_list
1388
          all_os[os_obj.name] = {}
1389
          for nname in node_list:
1390
            all_os[os_obj.name][nname] = []
1391
        all_os[os_obj.name][node_name].append(os_obj)
1392
    return all_os
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Compute the list of OSes.
1396

1397
    """
1398
    node_list = self.cfg.GetNodeList()
1399
    node_data = rpc.call_os_diagnose(node_list)
1400
    if node_data == False:
1401
      raise errors.OpExecError("Can't gather the list of OSes")
1402
    pol = self._DiagnoseByOS(node_list, node_data)
1403
    output = []
1404
    for os_name, os_data in pol.iteritems():
1405
      row = []
1406
      for field in self.op.output_fields:
1407
        if field == "name":
1408
          val = os_name
1409
        elif field == "valid":
1410
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1411
        elif field == "node_status":
1412
          val = {}
1413
          for node_name, nos_list in os_data.iteritems():
1414
            val[node_name] = [(v.status, v.path) for v in nos_list]
1415
        else:
1416
          raise errors.ParameterError(field)
1417
        row.append(val)
1418
      output.append(row)
1419

    
1420
    return output
1421

    
1422

    
1423
class LURemoveNode(LogicalUnit):
1424
  """Logical unit for removing a node.
1425

1426
  """
1427
  HPATH = "node-remove"
1428
  HTYPE = constants.HTYPE_NODE
1429
  _OP_REQP = ["node_name"]
1430

    
1431
  def BuildHooksEnv(self):
1432
    """Build hooks env.
1433

1434
    This doesn't run on the target node in the pre phase as a failed
1435
    node would not allows itself to run.
1436

1437
    """
1438
    env = {
1439
      "OP_TARGET": self.op.node_name,
1440
      "NODE_NAME": self.op.node_name,
1441
      }
1442
    all_nodes = self.cfg.GetNodeList()
1443
    all_nodes.remove(self.op.node_name)
1444
    return env, all_nodes, all_nodes
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks:
1450
     - the node exists in the configuration
1451
     - it does not have primary or secondary instances
1452
     - it's not the master
1453

1454
    Any errors are signalled by raising errors.OpPrereqError.
1455

1456
    """
1457
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1458
    if node is None:
1459
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1460

    
1461
    instance_list = self.cfg.GetInstanceList()
1462

    
1463
    masternode = self.sstore.GetMasterNode()
1464
    if node.name == masternode:
1465
      raise errors.OpPrereqError("Node is the master node,"
1466
                                 " you need to failover first.")
1467

    
1468
    for instance_name in instance_list:
1469
      instance = self.cfg.GetInstanceInfo(instance_name)
1470
      if node.name == instance.primary_node:
1471
        raise errors.OpPrereqError("Instance %s still running on the node,"
1472
                                   " please remove first." % instance_name)
1473
      if node.name in instance.secondary_nodes:
1474
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1475
                                   " please remove first." % instance_name)
1476
    self.op.node_name = node.name
1477
    self.node = node
1478

    
1479
  def Exec(self, feedback_fn):
1480
    """Removes the node from the cluster.
1481

1482
    """
1483
    node = self.node
1484
    logger.Info("stopping the node daemon and removing configs from node %s" %
1485
                node.name)
1486

    
1487
    rpc.call_node_leave_cluster(node.name)
1488

    
1489
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1490

    
1491
    logger.Info("Removing node %s from config" % node.name)
1492

    
1493
    self.cfg.RemoveNode(node.name)
1494

    
1495
    _RemoveHostFromEtcHosts(node.name)
1496

    
1497

    
1498
class LUQueryNodes(NoHooksLU):
1499
  """Logical unit for querying nodes.
1500

1501
  """
1502
  _OP_REQP = ["output_fields", "names"]
1503

    
1504
  def CheckPrereq(self):
1505
    """Check prerequisites.
1506

1507
    This checks that the fields required are valid output fields.
1508

1509
    """
1510
    self.dynamic_fields = frozenset([
1511
      "dtotal", "dfree",
1512
      "mtotal", "mnode", "mfree",
1513
      "bootid",
1514
      "ctotal",
1515
      ])
1516

    
1517
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1518
                               "pinst_list", "sinst_list",
1519
                               "pip", "sip"],
1520
                       dynamic=self.dynamic_fields,
1521
                       selected=self.op.output_fields)
1522

    
1523
    self.wanted = _GetWantedNodes(self, self.op.names)
1524

    
1525
  def Exec(self, feedback_fn):
1526
    """Computes the list of nodes and their attributes.
1527

1528
    """
1529
    nodenames = self.wanted
1530
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1531

    
1532
    # begin data gathering
1533

    
1534
    if self.dynamic_fields.intersection(self.op.output_fields):
1535
      live_data = {}
1536
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1537
      for name in nodenames:
1538
        nodeinfo = node_data.get(name, None)
1539
        if nodeinfo:
1540
          live_data[name] = {
1541
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1542
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1543
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1544
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1545
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1546
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1547
            "bootid": nodeinfo['bootid'],
1548
            }
1549
        else:
1550
          live_data[name] = {}
1551
    else:
1552
      live_data = dict.fromkeys(nodenames, {})
1553

    
1554
    node_to_primary = dict([(name, set()) for name in nodenames])
1555
    node_to_secondary = dict([(name, set()) for name in nodenames])
1556

    
1557
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1558
                             "sinst_cnt", "sinst_list"))
1559
    if inst_fields & frozenset(self.op.output_fields):
1560
      instancelist = self.cfg.GetInstanceList()
1561

    
1562
      for instance_name in instancelist:
1563
        inst = self.cfg.GetInstanceInfo(instance_name)
1564
        if inst.primary_node in node_to_primary:
1565
          node_to_primary[inst.primary_node].add(inst.name)
1566
        for secnode in inst.secondary_nodes:
1567
          if secnode in node_to_secondary:
1568
            node_to_secondary[secnode].add(inst.name)
1569

    
1570
    # end data gathering
1571

    
1572
    output = []
1573
    for node in nodelist:
1574
      node_output = []
1575
      for field in self.op.output_fields:
1576
        if field == "name":
1577
          val = node.name
1578
        elif field == "pinst_list":
1579
          val = list(node_to_primary[node.name])
1580
        elif field == "sinst_list":
1581
          val = list(node_to_secondary[node.name])
1582
        elif field == "pinst_cnt":
1583
          val = len(node_to_primary[node.name])
1584
        elif field == "sinst_cnt":
1585
          val = len(node_to_secondary[node.name])
1586
        elif field == "pip":
1587
          val = node.primary_ip
1588
        elif field == "sip":
1589
          val = node.secondary_ip
1590
        elif field in self.dynamic_fields:
1591
          val = live_data[node.name].get(field, None)
1592
        else:
1593
          raise errors.ParameterError(field)
1594
        node_output.append(val)
1595
      output.append(node_output)
1596

    
1597
    return output
1598

    
1599

    
1600
class LUQueryNodeVolumes(NoHooksLU):
1601
  """Logical unit for getting volumes on node(s).
1602

1603
  """
1604
  _OP_REQP = ["nodes", "output_fields"]
1605

    
1606
  def CheckPrereq(self):
1607
    """Check prerequisites.
1608

1609
    This checks that the fields required are valid output fields.
1610

1611
    """
1612
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1613

    
1614
    _CheckOutputFields(static=["node"],
1615
                       dynamic=["phys", "vg", "name", "size", "instance"],
1616
                       selected=self.op.output_fields)
1617

    
1618

    
1619
  def Exec(self, feedback_fn):
1620
    """Computes the list of nodes and their attributes.
1621

1622
    """
1623
    nodenames = self.nodes
1624
    volumes = rpc.call_node_volumes(nodenames)
1625

    
1626
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1627
             in self.cfg.GetInstanceList()]
1628

    
1629
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1630

    
1631
    output = []
1632
    for node in nodenames:
1633
      if node not in volumes or not volumes[node]:
1634
        continue
1635

    
1636
      node_vols = volumes[node][:]
1637
      node_vols.sort(key=lambda vol: vol['dev'])
1638

    
1639
      for vol in node_vols:
1640
        node_output = []
1641
        for field in self.op.output_fields:
1642
          if field == "node":
1643
            val = node
1644
          elif field == "phys":
1645
            val = vol['dev']
1646
          elif field == "vg":
1647
            val = vol['vg']
1648
          elif field == "name":
1649
            val = vol['name']
1650
          elif field == "size":
1651
            val = int(float(vol['size']))
1652
          elif field == "instance":
1653
            for inst in ilist:
1654
              if node not in lv_by_node[inst]:
1655
                continue
1656
              if vol['name'] in lv_by_node[inst][node]:
1657
                val = inst.name
1658
                break
1659
            else:
1660
              val = '-'
1661
          else:
1662
            raise errors.ParameterError(field)
1663
          node_output.append(str(val))
1664

    
1665
        output.append(node_output)
1666

    
1667
    return output
1668

    
1669

    
1670
class LUAddNode(LogicalUnit):
1671
  """Logical unit for adding node to the cluster.
1672

1673
  """
1674
  HPATH = "node-add"
1675
  HTYPE = constants.HTYPE_NODE
1676
  _OP_REQP = ["node_name"]
1677

    
1678
  def BuildHooksEnv(self):
1679
    """Build hooks env.
1680

1681
    This will run on all nodes before, and on all nodes + the new node after.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.op.node_name,
1686
      "NODE_NAME": self.op.node_name,
1687
      "NODE_PIP": self.op.primary_ip,
1688
      "NODE_SIP": self.op.secondary_ip,
1689
      }
1690
    nodes_0 = self.cfg.GetNodeList()
1691
    nodes_1 = nodes_0 + [self.op.node_name, ]
1692
    return env, nodes_0, nodes_1
1693

    
1694
  def CheckPrereq(self):
1695
    """Check prerequisites.
1696

1697
    This checks:
1698
     - the new node is not already in the config
1699
     - it is resolvable
1700
     - its parameters (single/dual homed) matches the cluster
1701

1702
    Any errors are signalled by raising errors.OpPrereqError.
1703

1704
    """
1705
    node_name = self.op.node_name
1706
    cfg = self.cfg
1707

    
1708
    dns_data = utils.HostInfo(node_name)
1709

    
1710
    node = dns_data.name
1711
    primary_ip = self.op.primary_ip = dns_data.ip
1712
    secondary_ip = getattr(self.op, "secondary_ip", None)
1713
    if secondary_ip is None:
1714
      secondary_ip = primary_ip
1715
    if not utils.IsValidIP(secondary_ip):
1716
      raise errors.OpPrereqError("Invalid secondary IP given")
1717
    self.op.secondary_ip = secondary_ip
1718

    
1719
    node_list = cfg.GetNodeList()
1720
    if not self.op.readd and node in node_list:
1721
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1722
                                 node)
1723
    elif self.op.readd and node not in node_list:
1724
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1725

    
1726
    for existing_node_name in node_list:
1727
      existing_node = cfg.GetNodeInfo(existing_node_name)
1728

    
1729
      if self.op.readd and node == existing_node_name:
1730
        if (existing_node.primary_ip != primary_ip or
1731
            existing_node.secondary_ip != secondary_ip):
1732
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1733
                                     " address configuration as before")
1734
        continue
1735

    
1736
      if (existing_node.primary_ip == primary_ip or
1737
          existing_node.secondary_ip == primary_ip or
1738
          existing_node.primary_ip == secondary_ip or
1739
          existing_node.secondary_ip == secondary_ip):
1740
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1741
                                   " existing node %s" % existing_node.name)
1742

    
1743
    # check that the type of the node (single versus dual homed) is the
1744
    # same as for the master
1745
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1746
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1747
    newbie_singlehomed = secondary_ip == primary_ip
1748
    if master_singlehomed != newbie_singlehomed:
1749
      if master_singlehomed:
1750
        raise errors.OpPrereqError("The master has no private ip but the"
1751
                                   " new node has one")
1752
      else:
1753
        raise errors.OpPrereqError("The master has a private ip but the"
1754
                                   " new node doesn't have one")
1755

    
1756
    # checks reachablity
1757
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1758
      raise errors.OpPrereqError("Node not reachable by ping")
1759

    
1760
    if not newbie_singlehomed:
1761
      # check reachability from my secondary ip to newbie's secondary ip
1762
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1763
                           source=myself.secondary_ip):
1764
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1765
                                   " based ping to noded port")
1766

    
1767
    self.new_node = objects.Node(name=node,
1768
                                 primary_ip=primary_ip,
1769
                                 secondary_ip=secondary_ip)
1770

    
1771
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1772
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1773
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1774
                                   constants.VNC_PASSWORD_FILE)
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Adds the new node to the cluster.
1778

1779
    """
1780
    new_node = self.new_node
1781
    node = new_node.name
1782

    
1783
    # set up inter-node password and certificate and restarts the node daemon
1784
    gntpass = self.sstore.GetNodeDaemonPassword()
1785
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1786
      raise errors.OpExecError("ganeti password corruption detected")
1787
    f = open(constants.SSL_CERT_FILE)
1788
    try:
1789
      gntpem = f.read(8192)
1790
    finally:
1791
      f.close()
1792
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1793
    # so we use this to detect an invalid certificate; as long as the
1794
    # cert doesn't contain this, the here-document will be correctly
1795
    # parsed by the shell sequence below
1796
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1797
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1798
    if not gntpem.endswith("\n"):
1799
      raise errors.OpExecError("PEM must end with newline")
1800
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1801

    
1802
    # and then connect with ssh to set password and start ganeti-noded
1803
    # note that all the below variables are sanitized at this point,
1804
    # either by being constants or by the checks above
1805
    ss = self.sstore
1806
    mycommand = ("umask 077 && "
1807
                 "echo '%s' > '%s' && "
1808
                 "cat > '%s' << '!EOF.' && \n"
1809
                 "%s!EOF.\n%s restart" %
1810
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1811
                  constants.SSL_CERT_FILE, gntpem,
1812
                  constants.NODE_INITD_SCRIPT))
1813

    
1814
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1815
    if result.failed:
1816
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1817
                               " output: %s" %
1818
                               (node, result.fail_reason, result.output))
1819

    
1820
    # check connectivity
1821
    time.sleep(4)
1822

    
1823
    result = rpc.call_version([node])[node]
1824
    if result:
1825
      if constants.PROTOCOL_VERSION == result:
1826
        logger.Info("communication to node %s fine, sw version %s match" %
1827
                    (node, result))
1828
      else:
1829
        raise errors.OpExecError("Version mismatch master version %s,"
1830
                                 " node version %s" %
1831
                                 (constants.PROTOCOL_VERSION, result))
1832
    else:
1833
      raise errors.OpExecError("Cannot get version from the new node")
1834

    
1835
    # setup ssh on node
1836
    logger.Info("copy ssh key to node %s" % node)
1837
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1838
    keyarray = []
1839
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1840
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1841
                priv_key, pub_key]
1842

    
1843
    for i in keyfiles:
1844
      f = open(i, 'r')
1845
      try:
1846
        keyarray.append(f.read())
1847
      finally:
1848
        f.close()
1849

    
1850
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1851
                               keyarray[3], keyarray[4], keyarray[5])
1852

    
1853
    if not result:
1854
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1855

    
1856
    # Add node to our /etc/hosts, and add key to known_hosts
1857
    _AddHostToEtcHosts(new_node.name)
1858

    
1859
    if new_node.secondary_ip != new_node.primary_ip:
1860
      if not rpc.call_node_tcp_ping(new_node.name,
1861
                                    constants.LOCALHOST_IP_ADDRESS,
1862
                                    new_node.secondary_ip,
1863
                                    constants.DEFAULT_NODED_PORT,
1864
                                    10, False):
1865
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1866
                                 " you gave (%s). Please fix and re-run this"
1867
                                 " command." % new_node.secondary_ip)
1868

    
1869
    success, msg = self.ssh.VerifyNodeHostname(node)
1870
    if not success:
1871
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1872
                               " than the one the resolver gives: %s."
1873
                               " Please fix and re-run this command." %
1874
                               (node, msg))
1875

    
1876
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1877
    # including the node just added
1878
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1879
    dist_nodes = self.cfg.GetNodeList()
1880
    if not self.op.readd:
1881
      dist_nodes.append(node)
1882
    if myself.name in dist_nodes:
1883
      dist_nodes.remove(myself.name)
1884

    
1885
    logger.Debug("Copying hosts and known_hosts to all nodes")
1886
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1887
      result = rpc.call_upload_file(dist_nodes, fname)
1888
      for to_node in dist_nodes:
1889
        if not result[to_node]:
1890
          logger.Error("copy of file %s to node %s failed" %
1891
                       (fname, to_node))
1892

    
1893
    to_copy = ss.GetFileList()
1894
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1895
      to_copy.append(constants.VNC_PASSWORD_FILE)
1896
    for fname in to_copy:
1897
      if not self.ssh.CopyFileToNode(node, fname):
1898
        logger.Error("could not copy file %s to node %s" % (fname, node))
1899

    
1900
    if not self.op.readd:
1901
      logger.Info("adding node %s to cluster.conf" % node)
1902
      self.cfg.AddNode(new_node)
1903

    
1904

    
1905
class LUMasterFailover(LogicalUnit):
1906
  """Failover the master node to the current node.
1907

1908
  This is a special LU in that it must run on a non-master node.
1909

1910
  """
1911
  HPATH = "master-failover"
1912
  HTYPE = constants.HTYPE_CLUSTER
1913
  REQ_MASTER = False
1914
  _OP_REQP = []
1915

    
1916
  def BuildHooksEnv(self):
1917
    """Build hooks env.
1918

1919
    This will run on the new master only in the pre phase, and on all
1920
    the nodes in the post phase.
1921

1922
    """
1923
    env = {
1924
      "OP_TARGET": self.new_master,
1925
      "NEW_MASTER": self.new_master,
1926
      "OLD_MASTER": self.old_master,
1927
      }
1928
    return env, [self.new_master], self.cfg.GetNodeList()
1929

    
1930
  def CheckPrereq(self):
1931
    """Check prerequisites.
1932

1933
    This checks that we are not already the master.
1934

1935
    """
1936
    self.new_master = utils.HostInfo().name
1937
    self.old_master = self.sstore.GetMasterNode()
1938

    
1939
    if self.old_master == self.new_master:
1940
      raise errors.OpPrereqError("This commands must be run on the node"
1941
                                 " where you want the new master to be."
1942
                                 " %s is already the master" %
1943
                                 self.old_master)
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Failover the master node.
1947

1948
    This command, when run on a non-master node, will cause the current
1949
    master to cease being master, and the non-master to become new
1950
    master.
1951

1952
    """
1953
    #TODO: do not rely on gethostname returning the FQDN
1954
    logger.Info("setting master to %s, old master: %s" %
1955
                (self.new_master, self.old_master))
1956

    
1957
    if not rpc.call_node_stop_master(self.old_master):
1958
      logger.Error("could disable the master role on the old master"
1959
                   " %s, please disable manually" % self.old_master)
1960

    
1961
    ss = self.sstore
1962
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1963
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1964
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1965
      logger.Error("could not distribute the new simple store master file"
1966
                   " to the other nodes, please check.")
1967

    
1968
    if not rpc.call_node_start_master(self.new_master):
1969
      logger.Error("could not start the master role on the new master"
1970
                   " %s, please check" % self.new_master)
1971
      feedback_fn("Error in activating the master IP on the new master,"
1972
                  " please fix manually.")
1973

    
1974

    
1975

    
1976
class LUQueryClusterInfo(NoHooksLU):
1977
  """Query cluster configuration.
1978

1979
  """
1980
  _OP_REQP = []
1981
  REQ_MASTER = False
1982

    
1983
  def CheckPrereq(self):
1984
    """No prerequsites needed for this LU.
1985

1986
    """
1987
    pass
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Return cluster config.
1991

1992
    """
1993
    result = {
1994
      "name": self.sstore.GetClusterName(),
1995
      "software_version": constants.RELEASE_VERSION,
1996
      "protocol_version": constants.PROTOCOL_VERSION,
1997
      "config_version": constants.CONFIG_VERSION,
1998
      "os_api_version": constants.OS_API_VERSION,
1999
      "export_version": constants.EXPORT_VERSION,
2000
      "master": self.sstore.GetMasterNode(),
2001
      "architecture": (platform.architecture()[0], platform.machine()),
2002
      "hypervisor_type": self.sstore.GetHypervisorType(),
2003
      }
2004

    
2005
    return result
2006

    
2007

    
2008
class LUClusterCopyFile(NoHooksLU):
2009
  """Copy file to cluster.
2010

2011
  """
2012
  _OP_REQP = ["nodes", "filename"]
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    It should check that the named file exists and that the given list
2018
    of nodes is valid.
2019

2020
    """
2021
    if not os.path.exists(self.op.filename):
2022
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2023

    
2024
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2025

    
2026
  def Exec(self, feedback_fn):
2027
    """Copy a file from master to some nodes.
2028

2029
    Args:
2030
      opts - class with options as members
2031
      args - list containing a single element, the file name
2032
    Opts used:
2033
      nodes - list containing the name of target nodes; if empty, all nodes
2034

2035
    """
2036
    filename = self.op.filename
2037

    
2038
    myname = utils.HostInfo().name
2039

    
2040
    for node in self.nodes:
2041
      if node == myname:
2042
        continue
2043
      if not self.ssh.CopyFileToNode(node, filename):
2044
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2045

    
2046

    
2047
class LUDumpClusterConfig(NoHooksLU):
2048
  """Return a text-representation of the cluster-config.
2049

2050
  """
2051
  _OP_REQP = []
2052

    
2053
  def CheckPrereq(self):
2054
    """No prerequisites.
2055

2056
    """
2057
    pass
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Dump a representation of the cluster config to the standard output.
2061

2062
    """
2063
    return self.cfg.DumpConfig()
2064

    
2065

    
2066
class LURunClusterCommand(NoHooksLU):
2067
  """Run a command on some nodes.
2068

2069
  """
2070
  _OP_REQP = ["command", "nodes"]
2071

    
2072
  def CheckPrereq(self):
2073
    """Check prerequisites.
2074

2075
    It checks that the given list of nodes is valid.
2076

2077
    """
2078
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2079

    
2080
  def Exec(self, feedback_fn):
2081
    """Run a command on some nodes.
2082

2083
    """
2084
    # put the master at the end of the nodes list
2085
    master_node = self.sstore.GetMasterNode()
2086
    if master_node in self.nodes:
2087
      self.nodes.remove(master_node)
2088
      self.nodes.append(master_node)
2089

    
2090
    data = []
2091
    for node in self.nodes:
2092
      result = self.ssh.Run(node, "root", self.op.command)
2093
      data.append((node, result.output, result.exit_code))
2094

    
2095
    return data
2096

    
2097

    
2098
class LUActivateInstanceDisks(NoHooksLU):
2099
  """Bring up an instance's disks.
2100

2101
  """
2102
  _OP_REQP = ["instance_name"]
2103

    
2104
  def CheckPrereq(self):
2105
    """Check prerequisites.
2106

2107
    This checks that the instance is in the cluster.
2108

2109
    """
2110
    instance = self.cfg.GetInstanceInfo(
2111
      self.cfg.ExpandInstanceName(self.op.instance_name))
2112
    if instance is None:
2113
      raise errors.OpPrereqError("Instance '%s' not known" %
2114
                                 self.op.instance_name)
2115
    self.instance = instance
2116

    
2117

    
2118
  def Exec(self, feedback_fn):
2119
    """Activate the disks.
2120

2121
    """
2122
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2123
    if not disks_ok:
2124
      raise errors.OpExecError("Cannot activate block devices")
2125

    
2126
    return disks_info
2127

    
2128

    
2129
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2130
  """Prepare the block devices for an instance.
2131

2132
  This sets up the block devices on all nodes.
2133

2134
  Args:
2135
    instance: a ganeti.objects.Instance object
2136
    ignore_secondaries: if true, errors on secondary nodes won't result
2137
                        in an error return from the function
2138

2139
  Returns:
2140
    false if the operation failed
2141
    list of (host, instance_visible_name, node_visible_name) if the operation
2142
         suceeded with the mapping from node devices to instance devices
2143
  """
2144
  device_info = []
2145
  disks_ok = True
2146
  iname = instance.name
2147
  # With the two passes mechanism we try to reduce the window of
2148
  # opportunity for the race condition of switching DRBD to primary
2149
  # before handshaking occured, but we do not eliminate it
2150

    
2151
  # The proper fix would be to wait (with some limits) until the
2152
  # connection has been made and drbd transitions from WFConnection
2153
  # into any other network-connected state (Connected, SyncTarget,
2154
  # SyncSource, etc.)
2155

    
2156
  # 1st pass, assemble on all nodes in secondary mode
2157
  for inst_disk in instance.disks:
2158
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2159
      cfg.SetDiskID(node_disk, node)
2160
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2161
      if not result:
2162
        logger.Error("could not prepare block device %s on node %s"
2163
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2164
        if not ignore_secondaries:
2165
          disks_ok = False
2166

    
2167
  # FIXME: race condition on drbd migration to primary
2168

    
2169
  # 2nd pass, do only the primary node
2170
  for inst_disk in instance.disks:
2171
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2172
      if node != instance.primary_node:
2173
        continue
2174
      cfg.SetDiskID(node_disk, node)
2175
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2176
      if not result:
2177
        logger.Error("could not prepare block device %s on node %s"
2178
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2179
        disks_ok = False
2180
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2181

    
2182
  # leave the disks configured for the primary node
2183
  # this is a workaround that would be fixed better by
2184
  # improving the logical/physical id handling
2185
  for disk in instance.disks:
2186
    cfg.SetDiskID(disk, instance.primary_node)
2187

    
2188
  return disks_ok, device_info
2189

    
2190

    
2191
def _StartInstanceDisks(cfg, instance, force):
2192
  """Start the disks of an instance.
2193

2194
  """
2195
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2196
                                           ignore_secondaries=force)
2197
  if not disks_ok:
2198
    _ShutdownInstanceDisks(instance, cfg)
2199
    if force is not None and not force:
2200
      logger.Error("If the message above refers to a secondary node,"
2201
                   " you can retry the operation using '--force'.")
2202
    raise errors.OpExecError("Disk consistency error")
2203

    
2204

    
2205
class LUDeactivateInstanceDisks(NoHooksLU):
2206
  """Shutdown an instance's disks.
2207

2208
  """
2209
  _OP_REQP = ["instance_name"]
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    instance = self.cfg.GetInstanceInfo(
2218
      self.cfg.ExpandInstanceName(self.op.instance_name))
2219
    if instance is None:
2220
      raise errors.OpPrereqError("Instance '%s' not known" %
2221
                                 self.op.instance_name)
2222
    self.instance = instance
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Deactivate the disks
2226

2227
    """
2228
    instance = self.instance
2229
    ins_l = rpc.call_instance_list([instance.primary_node])
2230
    ins_l = ins_l[instance.primary_node]
2231
    if not type(ins_l) is list:
2232
      raise errors.OpExecError("Can't contact node '%s'" %
2233
                               instance.primary_node)
2234

    
2235
    if self.instance.name in ins_l:
2236
      raise errors.OpExecError("Instance is running, can't shutdown"
2237
                               " block devices.")
2238

    
2239
    _ShutdownInstanceDisks(instance, self.cfg)
2240

    
2241

    
2242
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2243
  """Shutdown block devices of an instance.
2244

2245
  This does the shutdown on all nodes of the instance.
2246

2247
  If the ignore_primary is false, errors on the primary node are
2248
  ignored.
2249

2250
  """
2251
  result = True
2252
  for disk in instance.disks:
2253
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2254
      cfg.SetDiskID(top_disk, node)
2255
      if not rpc.call_blockdev_shutdown(node, top_disk):
2256
        logger.Error("could not shutdown block device %s on node %s" %
2257
                     (disk.iv_name, node))
2258
        if not ignore_primary or node != instance.primary_node:
2259
          result = False
2260
  return result
2261

    
2262

    
2263
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2264
  """Checks if a node has enough free memory.
2265

2266
  This function check if a given node has the needed amount of free
2267
  memory. In case the node has less memory or we cannot get the
2268
  information from the node, this function raise an OpPrereqError
2269
  exception.
2270

2271
  Args:
2272
    - cfg: a ConfigWriter instance
2273
    - node: the node name
2274
    - reason: string to use in the error message
2275
    - requested: the amount of memory in MiB
2276

2277
  """
2278
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2279
  if not nodeinfo or not isinstance(nodeinfo, dict):
2280
    raise errors.OpPrereqError("Could not contact node %s for resource"
2281
                             " information" % (node,))
2282

    
2283
  free_mem = nodeinfo[node].get('memory_free')
2284
  if not isinstance(free_mem, int):
2285
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2286
                             " was '%s'" % (node, free_mem))
2287
  if requested > free_mem:
2288
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2289
                             " needed %s MiB, available %s MiB" %
2290
                             (node, reason, requested, free_mem))
2291

    
2292

    
2293
class LUStartupInstance(LogicalUnit):
2294
  """Starts an instance.
2295

2296
  """
2297
  HPATH = "instance-start"
2298
  HTYPE = constants.HTYPE_INSTANCE
2299
  _OP_REQP = ["instance_name", "force"]
2300

    
2301
  def BuildHooksEnv(self):
2302
    """Build hooks env.
2303

2304
    This runs on master, primary and secondary nodes of the instance.
2305

2306
    """
2307
    env = {
2308
      "FORCE": self.op.force,
2309
      }
2310
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2311
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2312
          list(self.instance.secondary_nodes))
2313
    return env, nl, nl
2314

    
2315
  def CheckPrereq(self):
2316
    """Check prerequisites.
2317

2318
    This checks that the instance is in the cluster.
2319

2320
    """
2321
    instance = self.cfg.GetInstanceInfo(
2322
      self.cfg.ExpandInstanceName(self.op.instance_name))
2323
    if instance is None:
2324
      raise errors.OpPrereqError("Instance '%s' not known" %
2325
                                 self.op.instance_name)
2326

    
2327
    # check bridges existance
2328
    _CheckInstanceBridgesExist(instance)
2329

    
2330
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2331
                         "starting instance %s" % instance.name,
2332
                         instance.memory)
2333

    
2334
    self.instance = instance
2335
    self.op.instance_name = instance.name
2336

    
2337
  def Exec(self, feedback_fn):
2338
    """Start the instance.
2339

2340
    """
2341
    instance = self.instance
2342
    force = self.op.force
2343
    extra_args = getattr(self.op, "extra_args", "")
2344

    
2345
    self.cfg.MarkInstanceUp(instance.name)
2346

    
2347
    node_current = instance.primary_node
2348

    
2349
    _StartInstanceDisks(self.cfg, instance, force)
2350

    
2351
    if not rpc.call_instance_start(node_current, instance, extra_args):
2352
      _ShutdownInstanceDisks(instance, self.cfg)
2353
      raise errors.OpExecError("Could not start instance")
2354

    
2355

    
2356
class LURebootInstance(LogicalUnit):
2357
  """Reboot an instance.
2358

2359
  """
2360
  HPATH = "instance-reboot"
2361
  HTYPE = constants.HTYPE_INSTANCE
2362
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2363

    
2364
  def BuildHooksEnv(self):
2365
    """Build hooks env.
2366

2367
    This runs on master, primary and secondary nodes of the instance.
2368

2369
    """
2370
    env = {
2371
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2372
      }
2373
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2374
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2375
          list(self.instance.secondary_nodes))
2376
    return env, nl, nl
2377

    
2378
  def CheckPrereq(self):
2379
    """Check prerequisites.
2380

2381
    This checks that the instance is in the cluster.
2382

2383
    """
2384
    instance = self.cfg.GetInstanceInfo(
2385
      self.cfg.ExpandInstanceName(self.op.instance_name))
2386
    if instance is None:
2387
      raise errors.OpPrereqError("Instance '%s' not known" %
2388
                                 self.op.instance_name)
2389

    
2390
    # check bridges existance
2391
    _CheckInstanceBridgesExist(instance)
2392

    
2393
    self.instance = instance
2394
    self.op.instance_name = instance.name
2395

    
2396
  def Exec(self, feedback_fn):
2397
    """Reboot the instance.
2398

2399
    """
2400
    instance = self.instance
2401
    ignore_secondaries = self.op.ignore_secondaries
2402
    reboot_type = self.op.reboot_type
2403
    extra_args = getattr(self.op, "extra_args", "")
2404

    
2405
    node_current = instance.primary_node
2406

    
2407
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2408
                           constants.INSTANCE_REBOOT_HARD,
2409
                           constants.INSTANCE_REBOOT_FULL]:
2410
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2411
                                  (constants.INSTANCE_REBOOT_SOFT,
2412
                                   constants.INSTANCE_REBOOT_HARD,
2413
                                   constants.INSTANCE_REBOOT_FULL))
2414

    
2415
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2416
                       constants.INSTANCE_REBOOT_HARD]:
2417
      if not rpc.call_instance_reboot(node_current, instance,
2418
                                      reboot_type, extra_args):
2419
        raise errors.OpExecError("Could not reboot instance")
2420
    else:
2421
      if not rpc.call_instance_shutdown(node_current, instance):
2422
        raise errors.OpExecError("could not shutdown instance for full reboot")
2423
      _ShutdownInstanceDisks(instance, self.cfg)
2424
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2425
      if not rpc.call_instance_start(node_current, instance, extra_args):
2426
        _ShutdownInstanceDisks(instance, self.cfg)
2427
        raise errors.OpExecError("Could not start instance for full reboot")
2428

    
2429
    self.cfg.MarkInstanceUp(instance.name)
2430

    
2431

    
2432
class LUShutdownInstance(LogicalUnit):
2433
  """Shutdown an instance.
2434

2435
  """
2436
  HPATH = "instance-stop"
2437
  HTYPE = constants.HTYPE_INSTANCE
2438
  _OP_REQP = ["instance_name"]
2439

    
2440
  def BuildHooksEnv(self):
2441
    """Build hooks env.
2442

2443
    This runs on master, primary and secondary nodes of the instance.
2444

2445
    """
2446
    env = _BuildInstanceHookEnvByObject(self.instance)
2447
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2448
          list(self.instance.secondary_nodes))
2449
    return env, nl, nl
2450

    
2451
  def CheckPrereq(self):
2452
    """Check prerequisites.
2453

2454
    This checks that the instance is in the cluster.
2455

2456
    """
2457
    instance = self.cfg.GetInstanceInfo(
2458
      self.cfg.ExpandInstanceName(self.op.instance_name))
2459
    if instance is None:
2460
      raise errors.OpPrereqError("Instance '%s' not known" %
2461
                                 self.op.instance_name)
2462
    self.instance = instance
2463

    
2464
  def Exec(self, feedback_fn):
2465
    """Shutdown the instance.
2466

2467
    """
2468
    instance = self.instance
2469
    node_current = instance.primary_node
2470
    self.cfg.MarkInstanceDown(instance.name)
2471
    if not rpc.call_instance_shutdown(node_current, instance):
2472
      logger.Error("could not shutdown instance")
2473

    
2474
    _ShutdownInstanceDisks(instance, self.cfg)
2475

    
2476

    
2477
class LUReinstallInstance(LogicalUnit):
2478
  """Reinstall an instance.
2479

2480
  """
2481
  HPATH = "instance-reinstall"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name"]
2484

    
2485
  def BuildHooksEnv(self):
2486
    """Build hooks env.
2487

2488
    This runs on master, primary and secondary nodes of the instance.
2489

2490
    """
2491
    env = _BuildInstanceHookEnvByObject(self.instance)
2492
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2493
          list(self.instance.secondary_nodes))
2494
    return env, nl, nl
2495

    
2496
  def CheckPrereq(self):
2497
    """Check prerequisites.
2498

2499
    This checks that the instance is in the cluster and is not running.
2500

2501
    """
2502
    instance = self.cfg.GetInstanceInfo(
2503
      self.cfg.ExpandInstanceName(self.op.instance_name))
2504
    if instance is None:
2505
      raise errors.OpPrereqError("Instance '%s' not known" %
2506
                                 self.op.instance_name)
2507
    if instance.disk_template == constants.DT_DISKLESS:
2508
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2509
                                 self.op.instance_name)
2510
    if instance.status != "down":
2511
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2512
                                 self.op.instance_name)
2513
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2514
    if remote_info:
2515
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2516
                                 (self.op.instance_name,
2517
                                  instance.primary_node))
2518

    
2519
    self.op.os_type = getattr(self.op, "os_type", None)
2520
    if self.op.os_type is not None:
2521
      # OS verification
2522
      pnode = self.cfg.GetNodeInfo(
2523
        self.cfg.ExpandNodeName(instance.primary_node))
2524
      if pnode is None:
2525
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2526
                                   self.op.pnode)
2527
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2528
      if not os_obj:
2529
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2530
                                   " primary node"  % self.op.os_type)
2531

    
2532
    self.instance = instance
2533

    
2534
  def Exec(self, feedback_fn):
2535
    """Reinstall the instance.
2536

2537
    """
2538
    inst = self.instance
2539

    
2540
    if self.op.os_type is not None:
2541
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2542
      inst.os = self.op.os_type
2543
      self.cfg.AddInstance(inst)
2544

    
2545
    _StartInstanceDisks(self.cfg, inst, None)
2546
    try:
2547
      feedback_fn("Running the instance OS create scripts...")
2548
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2549
        raise errors.OpExecError("Could not install OS for instance %s"
2550
                                 " on node %s" %
2551
                                 (inst.name, inst.primary_node))
2552
    finally:
2553
      _ShutdownInstanceDisks(inst, self.cfg)
2554

    
2555

    
2556
class LURenameInstance(LogicalUnit):
2557
  """Rename an instance.
2558

2559
  """
2560
  HPATH = "instance-rename"
2561
  HTYPE = constants.HTYPE_INSTANCE
2562
  _OP_REQP = ["instance_name", "new_name"]
2563

    
2564
  def BuildHooksEnv(self):
2565
    """Build hooks env.
2566

2567
    This runs on master, primary and secondary nodes of the instance.
2568

2569
    """
2570
    env = _BuildInstanceHookEnvByObject(self.instance)
2571
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2572
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2573
          list(self.instance.secondary_nodes))
2574
    return env, nl, nl
2575

    
2576
  def CheckPrereq(self):
2577
    """Check prerequisites.
2578

2579
    This checks that the instance is in the cluster and is not running.
2580

2581
    """
2582
    instance = self.cfg.GetInstanceInfo(
2583
      self.cfg.ExpandInstanceName(self.op.instance_name))
2584
    if instance is None:
2585
      raise errors.OpPrereqError("Instance '%s' not known" %
2586
                                 self.op.instance_name)
2587
    if instance.status != "down":
2588
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2589
                                 self.op.instance_name)
2590
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2591
    if remote_info:
2592
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2593
                                 (self.op.instance_name,
2594
                                  instance.primary_node))
2595
    self.instance = instance
2596

    
2597
    # new name verification
2598
    name_info = utils.HostInfo(self.op.new_name)
2599

    
2600
    self.op.new_name = new_name = name_info.name
2601
    instance_list = self.cfg.GetInstanceList()
2602
    if new_name in instance_list:
2603
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2604
                                 new_name)
2605

    
2606
    if not getattr(self.op, "ignore_ip", False):
2607
      command = ["fping", "-q", name_info.ip]
2608
      result = utils.RunCmd(command)
2609
      if not result.failed:
2610
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2611
                                   (name_info.ip, new_name))
2612

    
2613

    
2614
  def Exec(self, feedback_fn):
2615
    """Reinstall the instance.
2616

2617
    """
2618
    inst = self.instance
2619
    old_name = inst.name
2620

    
2621
    if inst.disk_template == constants.DT_FILE:
2622
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2623

    
2624
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2625

    
2626
    # re-read the instance from the configuration after rename
2627
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2628

    
2629
    if inst.disk_template == constants.DT_FILE:
2630
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2631
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2632
                                                old_file_storage_dir,
2633
                                                new_file_storage_dir)
2634

    
2635
      if not result:
2636
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2637
                                 " directory '%s' to '%s' (but the instance"
2638
                                 " has been renamed in Ganeti)" % (
2639
                                 inst.primary_node, old_file_storage_dir,
2640
                                 new_file_storage_dir))
2641

    
2642
      if not result[0]:
2643
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2644
                                 " (but the instance has been renamed in"
2645
                                 " Ganeti)" % (old_file_storage_dir,
2646
                                               new_file_storage_dir))
2647

    
2648
    _StartInstanceDisks(self.cfg, inst, None)
2649
    try:
2650
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2651
                                          "sda", "sdb"):
2652
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2653
               " instance has been renamed in Ganeti)" %
2654
               (inst.name, inst.primary_node))
2655
        logger.Error(msg)
2656
    finally:
2657
      _ShutdownInstanceDisks(inst, self.cfg)
2658

    
2659

    
2660
class LURemoveInstance(LogicalUnit):
2661
  """Remove an instance.
2662

2663
  """
2664
  HPATH = "instance-remove"
2665
  HTYPE = constants.HTYPE_INSTANCE
2666
  _OP_REQP = ["instance_name", "ignore_failures"]
2667

    
2668
  def BuildHooksEnv(self):
2669
    """Build hooks env.
2670

2671
    This runs on master, primary and secondary nodes of the instance.
2672

2673
    """
2674
    env = _BuildInstanceHookEnvByObject(self.instance)
2675
    nl = [self.sstore.GetMasterNode()]
2676
    return env, nl, nl
2677

    
2678
  def CheckPrereq(self):
2679
    """Check prerequisites.
2680

2681
    This checks that the instance is in the cluster.
2682

2683
    """
2684
    instance = self.cfg.GetInstanceInfo(
2685
      self.cfg.ExpandInstanceName(self.op.instance_name))
2686
    if instance is None:
2687
      raise errors.OpPrereqError("Instance '%s' not known" %
2688
                                 self.op.instance_name)
2689
    self.instance = instance
2690

    
2691
  def Exec(self, feedback_fn):
2692
    """Remove the instance.
2693

2694
    """
2695
    instance = self.instance
2696
    logger.Info("shutting down instance %s on node %s" %
2697
                (instance.name, instance.primary_node))
2698

    
2699
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2700
      if self.op.ignore_failures:
2701
        feedback_fn("Warning: can't shutdown instance")
2702
      else:
2703
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2704
                                 (instance.name, instance.primary_node))
2705

    
2706
    logger.Info("removing block devices for instance %s" % instance.name)
2707

    
2708
    if not _RemoveDisks(instance, self.cfg):
2709
      if self.op.ignore_failures:
2710
        feedback_fn("Warning: can't remove instance's disks")
2711
      else:
2712
        raise errors.OpExecError("Can't remove instance's disks")
2713

    
2714
    logger.Info("removing instance %s out of cluster config" % instance.name)
2715

    
2716
    self.cfg.RemoveInstance(instance.name)
2717

    
2718

    
2719
class LUQueryInstances(NoHooksLU):
2720
  """Logical unit for querying instances.
2721

2722
  """
2723
  _OP_REQP = ["output_fields", "names"]
2724

    
2725
  def CheckPrereq(self):
2726
    """Check prerequisites.
2727

2728
    This checks that the fields required are valid output fields.
2729

2730
    """
2731
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2732
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2733
                               "admin_state", "admin_ram",
2734
                               "disk_template", "ip", "mac", "bridge",
2735
                               "sda_size", "sdb_size", "vcpus"],
2736
                       dynamic=self.dynamic_fields,
2737
                       selected=self.op.output_fields)
2738

    
2739
    self.wanted = _GetWantedInstances(self, self.op.names)
2740

    
2741
  def Exec(self, feedback_fn):
2742
    """Computes the list of nodes and their attributes.
2743

2744
    """
2745
    instance_names = self.wanted
2746
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2747
                     in instance_names]
2748

    
2749
    # begin data gathering
2750

    
2751
    nodes = frozenset([inst.primary_node for inst in instance_list])
2752

    
2753
    bad_nodes = []
2754
    if self.dynamic_fields.intersection(self.op.output_fields):
2755
      live_data = {}
2756
      node_data = rpc.call_all_instances_info(nodes)
2757
      for name in nodes:
2758
        result = node_data[name]
2759
        if result:
2760
          live_data.update(result)
2761
        elif result == False:
2762
          bad_nodes.append(name)
2763
        # else no instance is alive
2764
    else:
2765
      live_data = dict([(name, {}) for name in instance_names])
2766

    
2767
    # end data gathering
2768

    
2769
    output = []
2770
    for instance in instance_list:
2771
      iout = []
2772
      for field in self.op.output_fields:
2773
        if field == "name":
2774
          val = instance.name
2775
        elif field == "os":
2776
          val = instance.os
2777
        elif field == "pnode":
2778
          val = instance.primary_node
2779
        elif field == "snodes":
2780
          val = list(instance.secondary_nodes)
2781
        elif field == "admin_state":
2782
          val = (instance.status != "down")
2783
        elif field == "oper_state":
2784
          if instance.primary_node in bad_nodes:
2785
            val = None
2786
          else:
2787
            val = bool(live_data.get(instance.name))
2788
        elif field == "status":
2789
          if instance.primary_node in bad_nodes:
2790
            val = "ERROR_nodedown"
2791
          else:
2792
            running = bool(live_data.get(instance.name))
2793
            if running:
2794
              if instance.status != "down":
2795
                val = "running"
2796
              else:
2797
                val = "ERROR_up"
2798
            else:
2799
              if instance.status != "down":
2800
                val = "ERROR_down"
2801
              else:
2802
                val = "ADMIN_down"
2803
        elif field == "admin_ram":
2804
          val = instance.memory
2805
        elif field == "oper_ram":
2806
          if instance.primary_node in bad_nodes:
2807
            val = None
2808
          elif instance.name in live_data:
2809
            val = live_data[instance.name].get("memory", "?")
2810
          else:
2811
            val = "-"
2812
        elif field == "disk_template":
2813
          val = instance.disk_template
2814
        elif field == "ip":
2815
          val = instance.nics[0].ip
2816
        elif field == "bridge":
2817
          val = instance.nics[0].bridge
2818
        elif field == "mac":
2819
          val = instance.nics[0].mac
2820
        elif field == "sda_size" or field == "sdb_size":
2821
          disk = instance.FindDisk(field[:3])
2822
          if disk is None:
2823
            val = None
2824
          else:
2825
            val = disk.size
2826
        elif field == "vcpus":
2827
          val = instance.vcpus
2828
        else:
2829
          raise errors.ParameterError(field)
2830
        iout.append(val)
2831
      output.append(iout)
2832

    
2833
    return output
2834

    
2835

    
2836
class LUFailoverInstance(LogicalUnit):
2837
  """Failover an instance.
2838

2839
  """
2840
  HPATH = "instance-failover"
2841
  HTYPE = constants.HTYPE_INSTANCE
2842
  _OP_REQP = ["instance_name", "ignore_consistency"]
2843

    
2844
  def BuildHooksEnv(self):
2845
    """Build hooks env.
2846

2847
    This runs on master, primary and secondary nodes of the instance.
2848

2849
    """
2850
    env = {
2851
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2852
      }
2853
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2854
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2855
    return env, nl, nl
2856

    
2857
  def CheckPrereq(self):
2858
    """Check prerequisites.
2859

2860
    This checks that the instance is in the cluster.
2861

2862
    """
2863
    instance = self.cfg.GetInstanceInfo(
2864
      self.cfg.ExpandInstanceName(self.op.instance_name))
2865
    if instance is None:
2866
      raise errors.OpPrereqError("Instance '%s' not known" %
2867
                                 self.op.instance_name)
2868

    
2869
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2870
      raise errors.OpPrereqError("Instance's disk layout is not"
2871
                                 " network mirrored, cannot failover.")
2872

    
2873
    secondary_nodes = instance.secondary_nodes
2874
    if not secondary_nodes:
2875
      raise errors.ProgrammerError("no secondary node but using "
2876
                                   "a mirrored disk template")
2877

    
2878
    target_node = secondary_nodes[0]
2879
    # check memory requirements on the secondary node
2880
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2881
                         instance.name, instance.memory)
2882

    
2883
    # check bridge existance
2884
    brlist = [nic.bridge for nic in instance.nics]
2885
    if not rpc.call_bridges_exist(target_node, brlist):
2886
      raise errors.OpPrereqError("One or more target bridges %s does not"
2887
                                 " exist on destination node '%s'" %
2888
                                 (brlist, target_node))
2889

    
2890
    self.instance = instance
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Failover an instance.
2894

2895
    The failover is done by shutting it down on its present node and
2896
    starting it on the secondary.
2897

2898
    """
2899
    instance = self.instance
2900

    
2901
    source_node = instance.primary_node
2902
    target_node = instance.secondary_nodes[0]
2903

    
2904
    feedback_fn("* checking disk consistency between source and target")
2905
    for dev in instance.disks:
2906
      # for drbd, these are drbd over lvm
2907
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2908
        if instance.status == "up" and not self.op.ignore_consistency:
2909
          raise errors.OpExecError("Disk %s is degraded on target node,"
2910
                                   " aborting failover." % dev.iv_name)
2911

    
2912
    feedback_fn("* shutting down instance on source node")
2913
    logger.Info("Shutting down instance %s on node %s" %
2914
                (instance.name, source_node))
2915

    
2916
    if not rpc.call_instance_shutdown(source_node, instance):
2917
      if self.op.ignore_consistency:
2918
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2919
                     " anyway. Please make sure node %s is down"  %
2920
                     (instance.name, source_node, source_node))
2921
      else:
2922
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2923
                                 (instance.name, source_node))
2924

    
2925
    feedback_fn("* deactivating the instance's disks on source node")
2926
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2927
      raise errors.OpExecError("Can't shut down the instance's disks.")
2928

    
2929
    instance.primary_node = target_node
2930
    # distribute new instance config to the other nodes
2931
    self.cfg.AddInstance(instance)
2932

    
2933
    # Only start the instance if it's marked as up
2934
    if instance.status == "up":
2935
      feedback_fn("* activating the instance's disks on target node")
2936
      logger.Info("Starting instance %s on node %s" %
2937
                  (instance.name, target_node))
2938

    
2939
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2940
                                               ignore_secondaries=True)
2941
      if not disks_ok:
2942
        _ShutdownInstanceDisks(instance, self.cfg)
2943
        raise errors.OpExecError("Can't activate the instance's disks")
2944

    
2945
      feedback_fn("* starting the instance on the target node")
2946
      if not rpc.call_instance_start(target_node, instance, None):
2947
        _ShutdownInstanceDisks(instance, self.cfg)
2948
        raise errors.OpExecError("Could not start instance %s on node %s." %
2949
                                 (instance.name, target_node))
2950

    
2951

    
2952
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2953
  """Create a tree of block devices on the primary node.
2954

2955
  This always creates all devices.
2956

2957
  """
2958
  if device.children:
2959
    for child in device.children:
2960
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2961
        return False
2962

    
2963
  cfg.SetDiskID(device, node)
2964
  new_id = rpc.call_blockdev_create(node, device, device.size,
2965
                                    instance.name, True, info)
2966
  if not new_id:
2967
    return False
2968
  if device.physical_id is None:
2969
    device.physical_id = new_id
2970
  return True
2971

    
2972

    
2973
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2974
  """Create a tree of block devices on a secondary node.
2975

2976
  If this device type has to be created on secondaries, create it and
2977
  all its children.
2978

2979
  If not, just recurse to children keeping the same 'force' value.
2980

2981
  """
2982
  if device.CreateOnSecondary():
2983
    force = True
2984
  if device.children:
2985
    for child in device.children:
2986
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2987
                                        child, force, info):
2988
        return False
2989

    
2990
  if not force:
2991
    return True
2992
  cfg.SetDiskID(device, node)
2993
  new_id = rpc.call_blockdev_create(node, device, device.size,
2994
                                    instance.name, False, info)
2995
  if not new_id:
2996
    return False
2997
  if device.physical_id is None:
2998
    device.physical_id = new_id
2999
  return True
3000

    
3001

    
3002
def _GenerateUniqueNames(cfg, exts):
3003
  """Generate a suitable LV name.
3004

3005
  This will generate a logical volume name for the given instance.
3006

3007
  """
3008
  results = []
3009
  for val in exts:
3010
    new_id = cfg.GenerateUniqueID()
3011
    results.append("%s%s" % (new_id, val))
3012
  return results
3013

    
3014

    
3015
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3016
  """Generate a drbd device complete with its children.
3017

3018
  """
3019
  port = cfg.AllocatePort()
3020
  vgname = cfg.GetVGName()
3021
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3022
                          logical_id=(vgname, names[0]))
3023
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3024
                          logical_id=(vgname, names[1]))
3025
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3026
                          logical_id = (primary, secondary, port),
3027
                          children = [dev_data, dev_meta])
3028
  return drbd_dev
3029

    
3030

    
3031
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3032
  """Generate a drbd8 device complete with its children.
3033

3034
  """
3035
  port = cfg.AllocatePort()
3036
  vgname = cfg.GetVGName()
3037
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038
                          logical_id=(vgname, names[0]))
3039
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040
                          logical_id=(vgname, names[1]))
3041
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042
                          logical_id = (primary, secondary, port),
3043
                          children = [dev_data, dev_meta],
3044
                          iv_name=iv_name)
3045
  return drbd_dev
3046

    
3047

    
3048
def _GenerateDiskTemplate(cfg, template_name,
3049
                          instance_name, primary_node,
3050
                          secondary_nodes, disk_sz, swap_sz,
3051
                          file_storage_dir, file_driver):
3052
  """Generate the entire disk layout for a given template type.
3053

3054
  """
3055
  #TODO: compute space requirements
3056

    
3057
  vgname = cfg.GetVGName()
3058
  if template_name == constants.DT_DISKLESS:
3059
    disks = []
3060
  elif template_name == constants.DT_PLAIN:
3061
    if len(secondary_nodes) != 0:
3062
      raise errors.ProgrammerError("Wrong template configuration")
3063

    
3064
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3065
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3066
                           logical_id=(vgname, names[0]),
3067
                           iv_name = "sda")
3068
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3069
                           logical_id=(vgname, names[1]),
3070
                           iv_name = "sdb")
3071
    disks = [sda_dev, sdb_dev]
3072
  elif template_name == constants.DT_DRBD8:
3073
    if len(secondary_nodes) != 1:
3074
      raise errors.ProgrammerError("Wrong template configuration")
3075
    remote_node = secondary_nodes[0]
3076
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3077
                                       ".sdb_data", ".sdb_meta"])
3078
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3079
                                         disk_sz, names[0:2], "sda")
3080
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3081
                                         swap_sz, names[2:4], "sdb")
3082
    disks = [drbd_sda_dev, drbd_sdb_dev]
3083
  elif template_name == constants.DT_FILE:
3084
    if len(secondary_nodes) != 0:
3085
      raise errors.ProgrammerError("Wrong template configuration")
3086

    
3087
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088
                                iv_name="sda", logical_id=(file_driver,
3089
                                "%s/sda" % file_storage_dir))
3090
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091
                                iv_name="sdb", logical_id=(file_driver,
3092
                                "%s/sdb" % file_storage_dir))
3093
    disks = [file_sda_dev, file_sdb_dev]
3094
  else:
3095
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3096
  return disks
3097

    
3098

    
3099
def _GetInstanceInfoText(instance):
3100
  """Compute that text that should be added to the disk's metadata.
3101

3102
  """
3103
  return "originstname+%s" % instance.name
3104

    
3105

    
3106
def _CreateDisks(cfg, instance):
3107
  """Create all disks for an instance.
3108

3109
  This abstracts away some work from AddInstance.
3110

3111
  Args:
3112
    instance: the instance object
3113

3114
  Returns:
3115
    True or False showing the success of the creation process
3116

3117
  """
3118
  info = _GetInstanceInfoText(instance)
3119

    
3120
  if instance.disk_template == constants.DT_FILE:
3121
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3123
                                              file_storage_dir)
3124

    
3125
    if not result:
3126
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3127
      return False
3128

    
3129
    if not result[0]:
3130
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3131
      return False
3132

    
3133
  for device in instance.disks:
3134
    logger.Info("creating volume %s for instance %s" %
3135
                (device.iv_name, instance.name))
3136
    #HARDCODE
3137
    for secondary_node in instance.secondary_nodes:
3138
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3139
                                        device, False, info):
3140
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3141
                     (device.iv_name, device, secondary_node))
3142
        return False
3143
    #HARDCODE
3144
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3145
                                    instance, device, info):
3146
      logger.Error("failed to create volume %s on primary!" %
3147
                   device.iv_name)
3148
      return False
3149

    
3150
  return True
3151

    
3152

    
3153
def _RemoveDisks(instance, cfg):
3154
  """Remove all disks for an instance.
3155

3156
  This abstracts away some work from `AddInstance()` and
3157
  `RemoveInstance()`. Note that in case some of the devices couldn't
3158
  be removed, the removal will continue with the other ones (compare
3159
  with `_CreateDisks()`).
3160

3161
  Args:
3162
    instance: the instance object
3163

3164
  Returns:
3165
    True or False showing the success of the removal proces
3166

3167
  """
3168
  logger.Info("removing block devices for instance %s" % instance.name)
3169

    
3170
  result = True
3171
  for device in instance.disks:
3172
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3173
      cfg.SetDiskID(disk, node)
3174
      if not rpc.call_blockdev_remove(node, disk):
3175
        logger.Error("could not remove block device %s on node %s,"
3176
                     " continuing anyway" %
3177
                     (device.iv_name, node))
3178
        result = False
3179

    
3180
  if instance.disk_template == constants.DT_FILE:
3181
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3182
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3183
                                            file_storage_dir):
3184
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3185
      result = False
3186

    
3187
  return result
3188

    
3189

    
3190
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3191
  """Compute disk size requirements in the volume group
3192

3193
  This is currently hard-coded for the two-drive layout.
3194

3195
  """
3196
  # Required free disk space as a function of disk and swap space
3197
  req_size_dict = {
3198
    constants.DT_DISKLESS: None,
3199
    constants.DT_PLAIN: disk_size + swap_size,
3200
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3201
    constants.DT_DRBD8: disk_size + swap_size + 256,
3202
    constants.DT_FILE: None,
3203
  }
3204

    
3205
  if disk_template not in req_size_dict:
3206
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3207
                                 " is unknown" %  disk_template)
3208

    
3209
  return req_size_dict[disk_template]
3210

    
3211

    
3212
class LUCreateInstance(LogicalUnit):
3213
  """Create an instance.
3214

3215
  """
3216
  HPATH = "instance-add"
3217
  HTYPE = constants.HTYPE_INSTANCE
3218
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3219
              "disk_template", "swap_size", "mode", "start", "vcpus",
3220
              "wait_for_sync", "ip_check", "mac"]
3221

    
3222
  def _RunAllocator(self):
3223
    """Run the allocator based on input opcode.
3224

3225
    """
3226
    disks = [{"size": self.op.disk_size, "mode": "w"},
3227
             {"size": self.op.swap_size, "mode": "w"}]
3228
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3229
             "bridge": self.op.bridge}]
3230
    ial = IAllocator(self.cfg, self.sstore,
3231
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3232
                     name=self.op.instance_name,
3233
                     disk_template=self.op.disk_template,
3234
                     tags=[],
3235
                     os=self.op.os_type,
3236
                     vcpus=self.op.vcpus,
3237
                     mem_size=self.op.mem_size,
3238
                     disks=disks,
3239
                     nics=nics,
3240
                     )
3241

    
3242
    ial.Run(self.op.iallocator)
3243

    
3244
    if not ial.success:
3245
      raise errors.OpPrereqError("Can't compute nodes using"
3246
                                 " iallocator '%s': %s" % (self.op.iallocator,
3247
                                                           ial.info))
3248
    if len(ial.nodes) != ial.required_nodes:
3249
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3250
                                 " of nodes (%s), required %s" %
3251
                                 (len(ial.nodes), ial.required_nodes))
3252
    self.op.pnode = ial.nodes[0]
3253
    logger.ToStdout("Selected nodes for the instance: %s" %
3254
                    (", ".join(ial.nodes),))
3255
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3256
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3257
    if ial.required_nodes == 2:
3258
      self.op.snode = ial.nodes[1]
3259

    
3260
  def BuildHooksEnv(self):
3261
    """Build hooks env.
3262

3263
    This runs on master, primary and secondary nodes of the instance.
3264

3265
    """
3266
    env = {
3267
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3268
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3269
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3270
      "INSTANCE_ADD_MODE": self.op.mode,
3271
      }
3272
    if self.op.mode == constants.INSTANCE_IMPORT:
3273
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3274
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3275
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3276

    
3277
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3278
      primary_node=self.op.pnode,
3279
      secondary_nodes=self.secondaries,
3280
      status=self.instance_status,
3281
      os_type=self.op.os_type,
3282
      memory=self.op.mem_size,
3283
      vcpus=self.op.vcpus,
3284
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3285
    ))
3286

    
3287
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3288
          self.secondaries)
3289
    return env, nl, nl
3290

    
3291

    
3292
  def CheckPrereq(self):
3293
    """Check prerequisites.
3294

3295
    """
3296
    # set optional parameters to none if they don't exist
3297
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3298
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3299
                 "vnc_bind_address"]:
3300
      if not hasattr(self.op, attr):
3301
        setattr(self.op, attr, None)
3302

    
3303
    if self.op.mode not in (constants.INSTANCE_CREATE,
3304
                            constants.INSTANCE_IMPORT):
3305
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3306
                                 self.op.mode)
3307

    
3308
    if (not self.cfg.GetVGName() and
3309
        self.op.disk_template not in constants.DTS_NOT_LVM):
3310
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3311
                                 " instances")
3312

    
3313
    if self.op.mode == constants.INSTANCE_IMPORT:
3314
      src_node = getattr(self.op, "src_node", None)
3315
      src_path = getattr(self.op, "src_path", None)
3316
      if src_node is None or src_path is None:
3317
        raise errors.OpPrereqError("Importing an instance requires source"
3318
                                   " node and path options")
3319
      src_node_full = self.cfg.ExpandNodeName(src_node)
3320
      if src_node_full is None:
3321
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3322
      self.op.src_node = src_node = src_node_full
3323

    
3324
      if not os.path.isabs(src_path):
3325
        raise errors.OpPrereqError("The source path must be absolute")
3326

    
3327
      export_info = rpc.call_export_info(src_node, src_path)
3328

    
3329
      if not export_info:
3330
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3331

    
3332
      if not export_info.has_section(constants.INISECT_EXP):
3333
        raise errors.ProgrammerError("Corrupted export config")
3334

    
3335
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3336
      if (int(ei_version) != constants.EXPORT_VERSION):
3337
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3338
                                   (ei_version, constants.EXPORT_VERSION))
3339

    
3340
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3341
        raise errors.OpPrereqError("Can't import instance with more than"
3342
                                   " one data disk")
3343

    
3344
      # FIXME: are the old os-es, disk sizes, etc. useful?
3345
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3346
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3347
                                                         'disk0_dump'))
3348
      self.src_image = diskimage
3349
    else: # INSTANCE_CREATE
3350
      if getattr(self.op, "os_type", None) is None:
3351
        raise errors.OpPrereqError("No guest OS specified")
3352

    
3353
    #### instance parameters check
3354

    
3355
    # disk template and mirror node verification
3356
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3357
      raise errors.OpPrereqError("Invalid disk template name")
3358

    
3359
    # instance name verification
3360
    hostname1 = utils.HostInfo(self.op.instance_name)
3361

    
3362
    self.op.instance_name = instance_name = hostname1.name
3363
    instance_list = self.cfg.GetInstanceList()
3364
    if instance_name in instance_list:
3365
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3366
                                 instance_name)
3367

    
3368
    # ip validity checks
3369
    ip = getattr(self.op, "ip", None)
3370
    if ip is None or ip.lower() == "none":
3371
      inst_ip = None
3372
    elif ip.lower() == "auto":
3373
      inst_ip = hostname1.ip
3374
    else:
3375
      if not utils.IsValidIP(ip):
3376
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3377
                                   " like a valid IP" % ip)
3378
      inst_ip = ip
3379
    self.inst_ip = self.op.ip = inst_ip
3380

    
3381
    if self.op.start and not self.op.ip_check:
3382
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3383
                                 " adding an instance in start mode")
3384

    
3385
    if self.op.ip_check:
3386
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3387
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3388
                                   (hostname1.ip, instance_name))
3389

    
3390
    # MAC address verification
3391
    if self.op.mac != "auto":
3392
      if not utils.IsValidMac(self.op.mac.lower()):
3393
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3394
                                   self.op.mac)
3395

    
3396
    # bridge verification
3397
    bridge = getattr(self.op, "bridge", None)
3398
    if bridge is None:
3399
      self.op.bridge = self.cfg.GetDefBridge()
3400
    else:
3401
      self.op.bridge = bridge
3402

    
3403
    # boot order verification
3404
    if self.op.hvm_boot_order is not None:
3405
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3406
        raise errors.OpPrereqError("invalid boot order specified,"
3407
                                   " must be one or more of [acdn]")
3408
    # file storage checks
3409
    if (self.op.file_driver and
3410
        not self.op.file_driver in constants.FILE_DRIVER):
3411
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3412
                                 self.op.file_driver)
3413

    
3414
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3415
      raise errors.OpPrereqError("File storage directory not a relative"
3416
                                 " path")
3417
    #### allocator run
3418

    
3419
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3420
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3421
                                 " node must be given")
3422

    
3423
    if self.op.iallocator is not None:
3424
      self._RunAllocator()
3425

    
3426
    #### node related checks
3427

    
3428
    # check primary node
3429
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3430
    if pnode is None:
3431
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3432
                                 self.op.pnode)
3433
    self.op.pnode = pnode.name
3434
    self.pnode = pnode
3435
    self.secondaries = []
3436

    
3437
    # mirror node verification
3438
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3439
      if getattr(self.op, "snode", None) is None:
3440
        raise errors.OpPrereqError("The networked disk templates need"
3441
                                   " a mirror node")
3442

    
3443
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3444
      if snode_name is None:
3445
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3446
                                   self.op.snode)
3447
      elif snode_name == pnode.name:
3448
        raise errors.OpPrereqError("The secondary node cannot be"
3449
                                   " the primary node.")
3450
      self.secondaries.append(snode_name)
3451

    
3452
    req_size = _ComputeDiskSize(self.op.disk_template,
3453
                                self.op.disk_size, self.op.swap_size)
3454

    
3455
    # Check lv size requirements
3456
    if req_size is not None:
3457
      nodenames = [pnode.name] + self.secondaries
3458
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3459
      for node in nodenames:
3460
        info = nodeinfo.get(node, None)
3461
        if not info:
3462
          raise errors.OpPrereqError("Cannot get current information"
3463
                                     " from node '%s'" % nodeinfo)
3464
        vg_free = info.get('vg_free', None)
3465
        if not isinstance(vg_free, int):
3466
          raise errors.OpPrereqError("Can't compute free disk space on"
3467
                                     " node %s" % node)
3468
        if req_size > info['vg_free']:
3469
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3470
                                     " %d MB available, %d MB required" %
3471
                                     (node, info['vg_free'], req_size))
3472

    
3473
    # os verification
3474
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3475
    if not os_obj:
3476
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3477
                                 " primary node"  % self.op.os_type)
3478

    
3479
    if self.op.kernel_path == constants.VALUE_NONE:
3480
      raise errors.OpPrereqError("Can't set instance kernel to none")
3481

    
3482

    
3483
    # bridge check on primary node
3484
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3485
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3486
                                 " destination node '%s'" %
3487
                                 (self.op.bridge, pnode.name))
3488

    
3489
    # hvm_cdrom_image_path verification
3490
    if self.op.hvm_cdrom_image_path is not None:
3491
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3492
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3493
                                   " be an absolute path or None, not %s" %
3494
                                   self.op.hvm_cdrom_image_path)
3495
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3496
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3497
                                   " regular file or a symlink pointing to"
3498
                                   " an existing regular file, not %s" %
3499
                                   self.op.hvm_cdrom_image_path)
3500

    
3501
    # vnc_bind_address verification
3502
    if self.op.vnc_bind_address is not None:
3503
      if not utils.IsValidIP(self.op.vnc_bind_address):
3504
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3505
                                   " like a valid IP address" %
3506
                                   self.op.vnc_bind_address)
3507

    
3508
    if self.op.start:
3509
      self.instance_status = 'up'
3510
    else:
3511
      self.instance_status = 'down'
3512

    
3513
  def Exec(self, feedback_fn):
3514
    """Create and add the instance to the cluster.
3515

3516
    """
3517
    instance = self.op.instance_name
3518
    pnode_name = self.pnode.name
3519

    
3520
    if self.op.mac == "auto":
3521
      mac_address = self.cfg.GenerateMAC()
3522
    else:
3523
      mac_address = self.op.mac
3524

    
3525
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3526
    if self.inst_ip is not None:
3527
      nic.ip = self.inst_ip
3528

    
3529
    ht_kind = self.sstore.GetHypervisorType()
3530
    if ht_kind in constants.HTS_REQ_PORT:
3531
      network_port = self.cfg.AllocatePort()
3532
    else:
3533
      network_port = None
3534

    
3535
    if self.op.vnc_bind_address is None:
3536
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3537

    
3538
    # this is needed because os.path.join does not accept None arguments
3539
    if self.op.file_storage_dir is None:
3540
      string_file_storage_dir = ""
3541
    else:
3542
      string_file_storage_dir = self.op.file_storage_dir
3543

    
3544
    # build the full file storage dir path
3545
    file_storage_dir = os.path.normpath(os.path.join(
3546
                                        self.sstore.GetFileStorageDir(),
3547
                                        string_file_storage_dir, instance))
3548

    
3549

    
3550
    disks = _GenerateDiskTemplate(self.cfg,
3551
                                  self.op.disk_template,
3552
                                  instance, pnode_name,
3553
                                  self.secondaries, self.op.disk_size,
3554
                                  self.op.swap_size,
3555
                                  file_storage_dir,
3556
                                  self.op.file_driver)
3557

    
3558
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3559
                            primary_node=pnode_name,
3560
                            memory=self.op.mem_size,
3561
                            vcpus=self.op.vcpus,
3562
                            nics=[nic], disks=disks,
3563
                            disk_template=self.op.disk_template,
3564
                            status=self.instance_status,
3565
                            network_port=network_port,
3566
                            kernel_path=self.op.kernel_path,
3567
                            initrd_path=self.op.initrd_path,
3568
                            hvm_boot_order=self.op.hvm_boot_order,
3569
                            hvm_acpi=self.op.hvm_acpi,
3570
                            hvm_pae=self.op.hvm_pae,
3571
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3572
                            vnc_bind_address=self.op.vnc_bind_address,
3573
                            )
3574

    
3575
    feedback_fn("* creating instance disks...")
3576
    if not _CreateDisks(self.cfg, iobj):
3577
      _RemoveDisks(iobj, self.cfg)
3578
      raise errors.OpExecError("Device creation failed, reverting...")
3579

    
3580
    feedback_fn("adding instance %s to cluster config" % instance)
3581

    
3582
    self.cfg.AddInstance(iobj)
3583

    
3584
    if self.op.wait_for_sync:
3585
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3586
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3587
      # make sure the disks are not degraded (still sync-ing is ok)
3588
      time.sleep(15)
3589
      feedback_fn("* checking mirrors status")
3590
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3591
    else:
3592
      disk_abort = False
3593

    
3594
    if disk_abort:
3595
      _RemoveDisks(iobj, self.cfg)
3596
      self.cfg.RemoveInstance(iobj.name)
3597
      raise errors.OpExecError("There are some degraded disks for"
3598
                               " this instance")
3599

    
3600
    feedback_fn("creating os for instance %s on node %s" %
3601
                (instance, pnode_name))
3602

    
3603
    if iobj.disk_template != constants.DT_DISKLESS:
3604
      if self.op.mode == constants.INSTANCE_CREATE:
3605
        feedback_fn("* running the instance OS create scripts...")
3606
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3607
          raise errors.OpExecError("could not add os for instance %s"
3608
                                   " on node %s" %
3609
                                   (instance, pnode_name))
3610

    
3611
      elif self.op.mode == constants.INSTANCE_IMPORT:
3612
        feedback_fn("* running the instance OS import scripts...")
3613
        src_node = self.op.src_node
3614
        src_image = self.src_image
3615
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3616
                                                src_node, src_image):
3617
          raise errors.OpExecError("Could not import os for instance"
3618
                                   " %s on node %s" %
3619
                                   (instance, pnode_name))
3620
      else:
3621
        # also checked in the prereq part
3622
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3623
                                     % self.op.mode)
3624

    
3625
    if self.op.start:
3626
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3627
      feedback_fn("* starting instance...")
3628
      if not rpc.call_instance_start(pnode_name, iobj, None):
3629
        raise errors.OpExecError("Could not start instance")
3630

    
3631

    
3632
class LUConnectConsole(NoHooksLU):
3633
  """Connect to an instance's console.
3634

3635
  This is somewhat special in that it returns the command line that
3636
  you need to run on the master node in order to connect to the
3637
  console.
3638

3639
  """
3640
  _OP_REQP = ["instance_name"]
3641

    
3642
  def CheckPrereq(self):
3643
    """Check prerequisites.
3644

3645
    This checks that the instance is in the cluster.
3646

3647
    """
3648
    instance = self.cfg.GetInstanceInfo(
3649
      self.cfg.ExpandInstanceName(self.op.instance_name))
3650
    if instance is None:
3651
      raise errors.OpPrereqError("Instance '%s' not known" %
3652
                                 self.op.instance_name)
3653
    self.instance = instance
3654

    
3655
  def Exec(self, feedback_fn):
3656
    """Connect to the console of an instance
3657

3658
    """
3659
    instance = self.instance
3660
    node = instance.primary_node
3661

    
3662
    node_insts = rpc.call_instance_list([node])[node]
3663
    if node_insts is False:
3664
      raise errors.OpExecError("Can't connect to node %s." % node)
3665

    
3666
    if instance.name not in node_insts:
3667
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3668

    
3669
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3670

    
3671
    hyper = hypervisor.GetHypervisor()
3672
    console_cmd = hyper.GetShellCommandForConsole(instance)
3673

    
3674
    # build ssh cmdline
3675
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3676

    
3677

    
3678
class LUReplaceDisks(LogicalUnit):
3679
  """Replace the disks of an instance.
3680

3681
  """
3682
  HPATH = "mirrors-replace"
3683
  HTYPE = constants.HTYPE_INSTANCE
3684
  _OP_REQP = ["instance_name", "mode", "disks"]
3685

    
3686
  def _RunAllocator(self):
3687
    """Compute a new secondary node using an IAllocator.
3688

3689
    """
3690
    ial = IAllocator(self.cfg, self.sstore,
3691
                     mode=constants.IALLOCATOR_MODE_RELOC,
3692
                     name=self.op.instance_name,
3693
                     relocate_from=[self.sec_node])
3694

    
3695
    ial.Run(self.op.iallocator)
3696

    
3697
    if not ial.success:
3698
      raise errors.OpPrereqError("Can't compute nodes using"
3699
                                 " iallocator '%s': %s" % (self.op.iallocator,
3700
                                                           ial.info))
3701
    if len(ial.nodes) != ial.required_nodes:
3702
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3703
                                 " of nodes (%s), required %s" %
3704
                                 (len(ial.nodes), ial.required_nodes))
3705
    self.op.remote_node = ial.nodes[0]
3706
    logger.ToStdout("Selected new secondary for the instance: %s" %
3707
                    self.op.remote_node)
3708

    
3709
  def BuildHooksEnv(self):
3710
    """Build hooks env.
3711

3712
    This runs on the master, the primary and all the secondaries.
3713

3714
    """
3715
    env = {
3716
      "MODE": self.op.mode,
3717
      "NEW_SECONDARY": self.op.remote_node,
3718
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3719
      }
3720
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3721
    nl = [
3722
      self.sstore.GetMasterNode(),
3723
      self.instance.primary_node,
3724
      ]
3725
    if self.op.remote_node is not None:
3726
      nl.append(self.op.remote_node)
3727
    return env, nl, nl
3728

    
3729
  def CheckPrereq(self):
3730
    """Check prerequisites.
3731

3732
    This checks that the instance is in the cluster.
3733

3734
    """
3735
    if not hasattr(self.op, "remote_node"):
3736
      self.op.remote_node = None
3737

    
3738
    instance = self.cfg.GetInstanceInfo(
3739
      self.cfg.ExpandInstanceName(self.op.instance_name))
3740
    if instance is None:
3741
      raise errors.OpPrereqError("Instance '%s' not known" %
3742
                                 self.op.instance_name)
3743
    self.instance = instance
3744
    self.op.instance_name = instance.name
3745

    
3746
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3747
      raise errors.OpPrereqError("Instance's disk layout is not"
3748
                                 " network mirrored.")
3749

    
3750
    if len(instance.secondary_nodes) != 1:
3751
      raise errors.OpPrereqError("The instance has a strange layout,"
3752
                                 " expected one secondary but found %d" %
3753
                                 len(instance.secondary_nodes))
3754

    
3755
    self.sec_node = instance.secondary_nodes[0]
3756

    
3757
    ia_name = getattr(self.op, "iallocator", None)
3758
    if ia_name is not None:
3759
      if self.op.remote_node is not None:
3760
        raise errors.OpPrereqError("Give either the iallocator or the new"
3761
                                   " secondary, not both")
3762
      self.op.remote_node = self._RunAllocator()
3763

    
3764
    remote_node = self.op.remote_node
3765
    if remote_node is not None:
3766
      remote_node = self.cfg.ExpandNodeName(remote_node)
3767
      if remote_node is None:
3768
        raise errors.OpPrereqError("Node '%s' not known" %
3769
                                   self.op.remote_node)
3770
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3771
    else:
3772
      self.remote_node_info = None
3773
    if remote_node == instance.primary_node:
3774
      raise errors.OpPrereqError("The specified node is the primary node of"
3775
                                 " the instance.")
3776
    elif remote_node == self.sec_node:
3777
      if self.op.mode == constants.REPLACE_DISK_SEC:
3778
        # this is for DRBD8, where we can't execute the same mode of
3779
        # replacement as for drbd7 (no different port allocated)
3780
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3781
                                   " replacement")
3782
    if instance.disk_template == constants.DT_DRBD8:
3783
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3784
          remote_node is not None):
3785
        # switch to replace secondary mode
3786
        self.op.mode = constants.REPLACE_DISK_SEC
3787

    
3788
      if self.op.mode == constants.REPLACE_DISK_ALL:
3789
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3790
                                   " secondary disk replacement, not"
3791
                                   " both at once")
3792
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3793
        if remote_node is not None:
3794
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3795
                                     " the secondary while doing a primary"
3796
                                     " node disk replacement")
3797
        self.tgt_node = instance.primary_node
3798
        self.oth_node = instance.secondary_nodes[0]
3799
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3800
        self.new_node = remote_node # this can be None, in which case
3801
                                    # we don't change the secondary
3802
        self.tgt_node = instance.secondary_nodes[0]
3803
        self.oth_node = instance.primary_node
3804
      else:
3805
        raise errors.ProgrammerError("Unhandled disk replace mode")
3806

    
3807
    for name in self.op.disks:
3808
      if instance.FindDisk(name) is None:
3809
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3810
                                   (name, instance.name))
3811
    self.op.remote_node = remote_node
3812

    
3813
  def _ExecD8DiskOnly(self, feedback_fn):
3814
    """Replace a disk on the primary or secondary for dbrd8.
3815

3816
    The algorithm for replace is quite complicated:
3817
      - for each disk to be replaced:
3818
        - create new LVs on the target node with unique names
3819
        - detach old LVs from the drbd device
3820
        - rename old LVs to name_replaced.<time_t>
3821
        - rename new LVs to old LVs
3822
        - attach the new LVs (with the old names now) to the drbd device
3823
      - wait for sync across all devices
3824
      - for each modified disk:
3825
        - remove old LVs (which have the name name_replaces.<time_t>)
3826

3827
    Failures are not very well handled.
3828

3829
    """
3830
    steps_total = 6
3831
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3832
    instance = self.instance
3833
    iv_names = {}
3834
    vgname = self.cfg.GetVGName()
3835
    # start of work
3836
    cfg = self.cfg
3837
    tgt_node = self.tgt_node
3838
    oth_node = self.oth_node
3839

    
3840
    # Step: check device activation
3841
    self.proc.LogStep(1, steps_total, "check device existence")
3842
    info("checking volume groups")
3843
    my_vg = cfg.GetVGName()
3844
    results = rpc.call_vg_list([oth_node, tgt_node])
3845
    if not results:
3846
      raise errors.OpExecError("Can't list volume groups on the nodes")
3847
    for node in oth_node, tgt_node:
3848
      res = results.get(node, False)
3849
      if not res or my_vg not in res:
3850
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3851
                                 (my_vg, node))
3852
    for dev in instance.disks:
3853
      if not dev.iv_name in self.op.disks:
3854
        continue
3855
      for node in tgt_node, oth_node:
3856
        info("checking %s on %s" % (dev.iv_name, node))
3857
        cfg.SetDiskID(dev, node)
3858
        if not rpc.call_blockdev_find(node, dev):
3859
          raise errors.OpExecError("Can't find device %s on node %s" %
3860
                                   (dev.iv_name, node))
3861

    
3862
    # Step: check other node consistency
3863
    self.proc.LogStep(2, steps_total, "check peer consistency")
3864
    for dev in instance.disks:
3865
      if not dev.iv_name in self.op.disks:
3866
        continue
3867
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3868
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3869
                                   oth_node==instance.primary_node):
3870
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3871
                                 " to replace disks on this node (%s)" %
3872
                                 (oth_node, tgt_node))
3873

    
3874
    # Step: create new storage
3875
    self.proc.LogStep(3, steps_total, "allocate new storage")
3876
    for dev in instance.disks:
3877
      if not dev.iv_name in self.op.disks:
3878
        continue
3879
      size = dev.size
3880
      cfg.SetDiskID(dev, tgt_node)
3881
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3882
      names = _GenerateUniqueNames(cfg, lv_names)
3883
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3884
                             logical_id=(vgname, names[0]))
3885
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3886
                             logical_id=(vgname, names[1]))
3887
      new_lvs = [lv_data, lv_meta]
3888
      old_lvs = dev.children
3889
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3890
      info("creating new local storage on %s for %s" %
3891
           (tgt_node, dev.iv_name))
3892
      # since we *always* want to create this LV, we use the
3893
      # _Create...OnPrimary (which forces the creation), even if we
3894
      # are talking about the secondary node
3895
      for new_lv in new_lvs:
3896
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3897
                                        _GetInstanceInfoText(instance)):
3898
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3899
                                   " node '%s'" %
3900
                                   (new_lv.logical_id[1], tgt_node))
3901

    
3902
    # Step: for each lv, detach+rename*2+attach
3903
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3904
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3905
      info("detaching %s drbd from local storage" % dev.iv_name)
3906
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3907
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3908
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3909
      #dev.children = []
3910
      #cfg.Update(instance)
3911

    
3912
      # ok, we created the new LVs, so now we know we have the needed
3913
      # storage; as such, we proceed on the target node to rename
3914
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3915
      # using the assumption that logical_id == physical_id (which in
3916
      # turn is the unique_id on that node)
3917

    
3918
      # FIXME(iustin): use a better name for the replaced LVs
3919
      temp_suffix = int(time.time())
3920
      ren_fn = lambda d, suff: (d.physical_id[0],
3921
                                d.physical_id[1] + "_replaced-%s" % suff)
3922
      # build the rename list based on what LVs exist on the node
3923
      rlist = []
3924
      for to_ren in old_lvs:
3925
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3926
        if find_res is not None: # device exists
3927
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3928

    
3929
      info("renaming the old LVs on the target node")
3930
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3931
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3932
      # now we rename the new LVs to the old LVs
3933
      info("renaming the new LVs on the target node")
3934
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3935
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3936
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3937

    
3938
      for old, new in zip(old_lvs, new_lvs):
3939
        new.logical_id = old.logical_id
3940
        cfg.SetDiskID(new, tgt_node)
3941

    
3942
      for disk in old_lvs:
3943
        disk.logical_id = ren_fn(disk, temp_suffix)
3944
        cfg.SetDiskID(disk, tgt_node)
3945

    
3946
      # now that the new lvs have the old name, we can add them to the device
3947
      info("adding new mirror component on %s" % tgt_node)
3948
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3949
        for new_lv in new_lvs:
3950
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3951
            warning("Can't rollback device %s", hint="manually cleanup unused"
3952
                    " logical volumes")
3953
        raise errors.OpExecError("Can't add local storage to drbd")
3954

    
3955
      dev.children = new_lvs
3956
      cfg.Update(instance)
3957

    
3958
    # Step: wait for sync
3959

    
3960
    # this can fail as the old devices are degraded and _WaitForSync
3961
    # does a combined result over all disks, so we don't check its
3962
    # return value
3963
    self.proc.LogStep(5, steps_total, "sync devices")
3964
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3965

    
3966
    # so check manually all the devices
3967
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3968
      cfg.SetDiskID(dev, instance.primary_node)
3969
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3970
      if is_degr:
3971
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3972

    
3973
    # Step: remove old storage
3974
    self.proc.LogStep(6, steps_total, "removing old storage")
3975
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3976
      info("remove logical volumes for %s" % name)
3977
      for lv in old_lvs:
3978
        cfg.SetDiskID(lv, tgt_node)
3979
        if not rpc.call_blockdev_remove(tgt_node, lv):
3980
          warning("Can't remove old LV", hint="manually remove unused LVs")
3981
          continue
3982

    
3983
  def _ExecD8Secondary(self, feedback_fn):
3984
    """Replace the secondary node for drbd8.
3985

3986
    The algorithm for replace is quite complicated:
3987
      - for all disks of the instance:
3988
        - create new LVs on the new node with same names
3989
        - shutdown the drbd device on the old secondary
3990
        - disconnect the drbd network on the primary
3991
        - create the drbd device on the new secondary
3992
        - network attach the drbd on the primary, using an artifice:
3993
          the drbd code for Attach() will connect to the network if it
3994
          finds a device which is connected to the good local disks but
3995
          not network enabled
3996
      - wait for sync across all devices
3997
      - remove all disks from the old secondary
3998

3999
    Failures are not very well handled.
4000

4001
    """
4002
    steps_total = 6
4003
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4004
    instance = self.instance
4005
    iv_names = {}
4006
    vgname = self.cfg.GetVGName()
4007
    # start of work
4008
    cfg = self.cfg
4009
    old_node = self.tgt_node
4010
    new_node = self.new_node
4011
    pri_node = instance.primary_node
4012

    
4013
    # Step: check device activation
4014
    self.proc.LogStep(1, steps_total, "check device existence")
4015
    info("checking volume groups")
4016
    my_vg = cfg.GetVGName()
4017
    results = rpc.call_vg_list([pri_node, new_node])
4018
    if not results:
4019
      raise errors.OpExecError("Can't list volume groups on the nodes")
4020
    for node in pri_node, new_node:
4021
      res = results.get(node, False)
4022
      if not res or my_vg not in res:
4023
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4024
                                 (my_vg, node))
4025
    for dev in instance.disks:
4026
      if not dev.iv_name in self.op.disks:
4027
        continue
4028
      info("checking %s on %s" % (dev.iv_name, pri_node))
4029
      cfg.SetDiskID(dev, pri_node)
4030
      if not rpc.call_blockdev_find(pri_node, dev):
4031
        raise errors.OpExecError("Can't find device %s on node %s" %
4032
                                 (dev.iv_name, pri_node))
4033

    
4034
    # Step: check other node consistency
4035
    self.proc.LogStep(2, steps_total, "check peer consistency")
4036
    for dev in instance.disks:
4037
      if not dev.iv_name in self.op.disks:
4038
        continue
4039
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4040
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4041
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4042
                                 " unsafe to replace the secondary" %
4043
                                 pri_node)
4044

    
4045
    # Step: create new storage
4046
    self.proc.LogStep(3, steps_total, "allocate new storage")
4047
    for dev in instance.disks:
4048
      size = dev.size
4049
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4050
      # since we *always* want to create this LV, we use the
4051
      # _Create...OnPrimary (which forces the creation), even if we
4052
      # are talking about the secondary node
4053
      for new_lv in dev.children:
4054
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4055
                                        _GetInstanceInfoText(instance)):
4056
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4057
                                   " node '%s'" %
4058
                                   (new_lv.logical_id[1], new_node))
4059

    
4060
      iv_names[dev.iv_name] = (dev, dev.children)
4061

    
4062
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4063
    for dev in instance.disks:
4064
      size = dev.size
4065
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4066
      # create new devices on new_node
4067
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4068
                              logical_id=(pri_node, new_node,
4069
                                          dev.logical_id[2]),
4070
                              children=dev.children)
4071
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4072
                                        new_drbd, False,
4073
                                      _GetInstanceInfoText(instance)):
4074
        raise errors.OpExecError("Failed to create new DRBD on"
4075
                                 " node '%s'" % new_node)
4076

    
4077
    for dev in instance.disks:
4078
      # we have new devices, shutdown the drbd on the old secondary
4079
      info("shutting down drbd for %s on old node" % dev.iv_name)
4080
      cfg.SetDiskID(dev, old_node)
4081
      if not rpc.call_blockdev_shutdown(old_node, dev):
4082
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4083
                hint="Please cleanup this device manually as soon as possible")
4084

    
4085
    info("detaching primary drbds from the network (=> standalone)")
4086
    done = 0
4087
    for dev in instance.disks:
4088
      cfg.SetDiskID(dev, pri_node)
4089
      # set the physical (unique in bdev terms) id to None, meaning
4090
      # detach from network
4091
      dev.physical_id = (None,) * len(dev.physical_id)
4092
      # and 'find' the device, which will 'fix' it to match the
4093
      # standalone state
4094
      if rpc.call_blockdev_find(pri_node, dev):
4095
        done += 1
4096
      else:
4097
        warning("Failed to detach drbd %s from network, unusual case" %
4098
                dev.iv_name)
4099

    
4100
    if not done:
4101
      # no detaches succeeded (very unlikely)
4102
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4103

    
4104
    # if we managed to detach at least one, we update all the disks of
4105
    # the instance to point to the new secondary
4106
    info("updating instance configuration")
4107
    for dev in instance.disks:
4108
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4109
      cfg.SetDiskID(dev, pri_node)
4110
    cfg.Update(instance)
4111

    
4112
    # and now perform the drbd attach
4113
    info("attaching primary drbds to new secondary (standalone => connected)")
4114
    failures = []
4115
    for dev in instance.disks:
4116
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4117
      # since the attach is smart, it's enough to 'find' the device,
4118
      # it will automatically activate the network, if the physical_id
4119
      # is correct
4120
      cfg.SetDiskID(dev, pri_node)
4121
      if not rpc.call_blockdev_find(pri_node, dev):
4122
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4123
                "please do a gnt-instance info to see the status of disks")
4124

    
4125
    # this can fail as the old devices are degraded and _WaitForSync
4126
    # does a combined result over all disks, so we don't check its
4127
    # return value
4128
    self.proc.LogStep(5, steps_total, "sync devices")
4129
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4130

    
4131
    # so check manually all the devices
4132
    for name, (dev, old_lvs) in iv_names.iteritems():
4133
      cfg.SetDiskID(dev, pri_node)
4134
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4135
      if is_degr:
4136
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4137

    
4138
    self.proc.LogStep(6, steps_total, "removing old storage")
4139
    for name, (dev, old_lvs) in iv_names.iteritems():
4140
      info("remove logical volumes for %s" % name)
4141
      for lv in old_lvs:
4142
        cfg.SetDiskID(lv, old_node)
4143
        if not rpc.call_blockdev_remove(old_node, lv):
4144
          warning("Can't remove LV on old secondary",
4145
                  hint="Cleanup stale volumes by hand")
4146

    
4147
  def Exec(self, feedback_fn):
4148
    """Execute disk replacement.
4149

4150
    This dispatches the disk replacement to the appropriate handler.
4151

4152
    """
4153
    instance = self.instance
4154
    if instance.disk_template == constants.DT_DRBD8:
4155
      if self.op.remote_node is None:
4156
        fn = self._ExecD8DiskOnly
4157
      else:
4158
        fn = self._ExecD8Secondary
4159
    else:
4160
      raise errors.ProgrammerError("Unhandled disk replacement case")
4161
    return fn(feedback_fn)
4162

    
4163

    
4164
class LUQueryInstanceData(NoHooksLU):
4165
  """Query runtime instance data.
4166

4167
  """
4168
  _OP_REQP = ["instances"]
4169

    
4170
  def CheckPrereq(self):
4171
    """Check prerequisites.
4172

4173
    This only checks the optional instance list against the existing names.
4174

4175
    """
4176
    if not isinstance(self.op.instances, list):
4177
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4178
    if self.op.instances:
4179
      self.wanted_instances = []
4180
      names = self.op.instances
4181
      for name in names:
4182
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4183
        if instance is None:
4184
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4185
        self.wanted_instances.append(instance)
4186
    else:
4187
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4188
                               in self.cfg.GetInstanceList()]
4189
    return
4190

    
4191

    
4192
  def _ComputeDiskStatus(self, instance, snode, dev):
4193
    """Compute block device status.
4194

4195
    """
4196
    self.cfg.SetDiskID(dev, instance.primary_node)
4197
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4198
    if dev.dev_type in constants.LDS_DRBD:
4199
      # we change the snode then (otherwise we use the one passed in)
4200
      if dev.logical_id[0] == instance.primary_node:
4201
        snode = dev.logical_id[1]
4202
      else:
4203
        snode = dev.logical_id[0]
4204

    
4205
    if snode:
4206
      self.cfg.SetDiskID(dev, snode)
4207
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4208
    else:
4209
      dev_sstatus = None
4210

    
4211
    if dev.children:
4212
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4213
                      for child in dev.children]
4214
    else:
4215
      dev_children = []
4216

    
4217
    data = {
4218
      "iv_name": dev.iv_name,
4219
      "dev_type": dev.dev_type,
4220
      "logical_id": dev.logical_id,
4221
      "physical_id": dev.physical_id,
4222
      "pstatus": dev_pstatus,
4223
      "sstatus": dev_sstatus,
4224
      "children": dev_children,
4225
      }
4226

    
4227
    return data
4228

    
4229
  def Exec(self, feedback_fn):
4230
    """Gather and return data"""
4231
    result = {}
4232
    for instance in self.wanted_instances:
4233
      remote_info = rpc.call_instance_info(instance.primary_node,
4234
                                                instance.name)
4235
      if remote_info and "state" in remote_info:
4236
        remote_state = "up"
4237
      else:
4238
        remote_state = "down"
4239
      if instance.status == "down":
4240
        config_state = "down"
4241
      else:
4242
        config_state = "up"
4243

    
4244
      disks = [self._ComputeDiskStatus(instance, None, device)
4245
               for device in instance.disks]
4246

    
4247
      idict = {
4248
        "name": instance.name,
4249
        "config_state": config_state,
4250
        "run_state": remote_state,
4251
        "pnode": instance.primary_node,
4252
        "snodes": instance.secondary_nodes,
4253
        "os": instance.os,
4254
        "memory": instance.memory,
4255
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4256
        "disks": disks,
4257
        "vcpus": instance.vcpus,
4258
        }
4259

    
4260
      htkind = self.sstore.GetHypervisorType()
4261
      if htkind == constants.HT_XEN_PVM30:
4262
        idict["kernel_path"] = instance.kernel_path
4263
        idict["initrd_path"] = instance.initrd_path
4264

    
4265
      if htkind == constants.HT_XEN_HVM31:
4266
        idict["hvm_boot_order"] = instance.hvm_boot_order
4267
        idict["hvm_acpi"] = instance.hvm_acpi
4268
        idict["hvm_pae"] = instance.hvm_pae
4269
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4270

    
4271
      if htkind in constants.HTS_REQ_PORT:
4272
        idict["vnc_bind_address"] = instance.vnc_bind_address
4273
        idict["network_port"] = instance.network_port
4274

    
4275
      result[instance.name] = idict
4276

    
4277
    return result
4278

    
4279

    
4280
class LUSetInstanceParams(LogicalUnit):
4281
  """Modifies an instances's parameters.
4282

4283
  """
4284
  HPATH = "instance-modify"
4285
  HTYPE = constants.HTYPE_INSTANCE
4286
  _OP_REQP = ["instance_name"]
4287

    
4288
  def BuildHooksEnv(self):
4289
    """Build hooks env.
4290

4291
    This runs on the master, primary and secondaries.
4292

4293
    """
4294
    args = dict()
4295
    if self.mem:
4296
      args['memory'] = self.mem
4297
    if self.vcpus:
4298
      args['vcpus'] = self.vcpus
4299
    if self.do_ip or self.do_bridge or self.mac:
4300
      if self.do_ip:
4301
        ip = self.ip
4302
      else:
4303
        ip = self.instance.nics[0].ip
4304
      if self.bridge:
4305
        bridge = self.bridge
4306
      else:
4307
        bridge = self.instance.nics[0].bridge
4308
      if self.mac:
4309
        mac = self.mac
4310
      else:
4311
        mac = self.instance.nics[0].mac
4312
      args['nics'] = [(ip, bridge, mac)]
4313
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4314
    nl = [self.sstore.GetMasterNode(),
4315
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4316
    return env, nl, nl
4317

    
4318
  def CheckPrereq(self):
4319
    """Check prerequisites.
4320

4321
    This only checks the instance list against the existing names.
4322

4323
    """
4324
    self.mem = getattr(self.op, "mem", None)
4325
    self.vcpus = getattr(self.op, "vcpus", None)
4326
    self.ip = getattr(self.op, "ip", None)
4327
    self.mac = getattr(self.op, "mac", None)
4328
    self.bridge = getattr(self.op, "bridge", None)
4329
    self.kernel_path = getattr(self.op, "kernel_path", None)
4330
    self.initrd_path = getattr(self.op, "initrd_path", None)
4331
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4332
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4333
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4334
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4335
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4336
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4337
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4338
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4339
                 self.vnc_bind_address]
4340
    if all_parms.count(None) == len(all_parms):
4341
      raise errors.OpPrereqError("No changes submitted")
4342
    if self.mem is not None:
4343
      try:
4344
        self.mem = int(self.mem)
4345
      except ValueError, err:
4346
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4347
    if self.vcpus is not None:
4348
      try:
4349
        self.vcpus = int(self.vcpus)
4350
      except ValueError, err:
4351
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4352
    if self.ip is not None:
4353
      self.do_ip = True
4354
      if self.ip.lower() == "none":
4355
        self.ip = None
4356
      else:
4357
        if not utils.IsValidIP(self.ip):
4358
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4359
    else:
4360
      self.do_ip = False
4361
    self.do_bridge = (self.bridge is not None)
4362
    if self.mac is not None:
4363
      if self.cfg.IsMacInUse(self.mac):
4364
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4365
                                   self.mac)
4366
      if not utils.IsValidMac(self.mac):
4367
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4368

    
4369
    if self.kernel_path is not None:
4370
      self.do_kernel_path = True
4371
      if self.kernel_path == constants.VALUE_NONE:
4372
        raise errors.OpPrereqError("Can't set instance to no kernel")
4373

    
4374
      if self.kernel_path != constants.VALUE_DEFAULT:
4375
        if not os.path.isabs(self.kernel_path):
4376
          raise errors.OpPrereqError("The kernel path must be an absolute"
4377
                                    " filename")
4378
    else:
4379
      self.do_kernel_path = False
4380

    
4381
    if self.initrd_path is not None:
4382
      self.do_initrd_path = True
4383
      if self.initrd_path not in (constants.VALUE_NONE,
4384
                                  constants.VALUE_DEFAULT):
4385
        if not os.path.isabs(self.initrd_path):
4386
          raise errors.OpPrereqError("The initrd path must be an absolute"
4387
                                    " filename")
4388
    else:
4389
      self.do_initrd_path = False
4390

    
4391
    # boot order verification
4392
    if self.hvm_boot_order is not None:
4393
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4394
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4395
          raise errors.OpPrereqError("invalid boot order specified,"
4396
                                     " must be one or more of [acdn]"
4397
                                     " or 'default'")
4398

    
4399
    # hvm_cdrom_image_path verification
4400
    if self.op.hvm_cdrom_image_path is not None:
4401
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4402
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4403
                                   " be an absolute path or None, not %s" %
4404
                                   self.op.hvm_cdrom_image_path)
4405
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4406
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4407
                                   " regular file or a symlink pointing to"
4408
                                   " an existing regular file, not %s" %
4409
                                   self.op.hvm_cdrom_image_path)
4410

    
4411
    # vnc_bind_address verification
4412
    if self.op.vnc_bind_address is not None:
4413
      if not utils.IsValidIP(self.op.vnc_bind_address):
4414
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4415
                                   " like a valid IP address" %
4416
                                   self.op.vnc_bind_address)
4417

    
4418
    instance = self.cfg.GetInstanceInfo(
4419
      self.cfg.ExpandInstanceName(self.op.instance_name))
4420
    if instance is None:
4421
      raise errors.OpPrereqError("No such instance name '%s'" %
4422
                                 self.op.instance_name)
4423
    self.op.instance_name = instance.name
4424
    self.instance = instance
4425
    return
4426

    
4427
  def Exec(self, feedback_fn):
4428
    """Modifies an instance.
4429

4430
    All parameters take effect only at the next restart of the instance.
4431
    """
4432
    result = []
4433
    instance = self.instance
4434
    if self.mem:
4435
      instance.memory = self.mem
4436
      result.append(("mem", self.mem))
4437
    if self.vcpus:
4438
      instance.vcpus = self.vcpus
4439
      result.append(("vcpus",  self.vcpus))
4440
    if self.do_ip:
4441
      instance.nics[0].ip = self.ip
4442
      result.append(("ip", self.ip))
4443
    if self.bridge:
4444
      instance.nics[0].bridge = self.bridge
4445
      result.append(("bridge", self.bridge))
4446
    if self.mac:
4447
      instance.nics[0].mac = self.mac
4448
      result.append(("mac", self.mac))
4449
    if self.do_kernel_path:
4450
      instance.kernel_path = self.kernel_path
4451
      result.append(("kernel_path", self.kernel_path))
4452
    if self.do_initrd_path:
4453
      instance.initrd_path = self.initrd_path
4454
      result.append(("initrd_path", self.initrd_path))
4455
    if self.hvm_boot_order:
4456
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4457
        instance.hvm_boot_order = None
4458
      else:
4459
        instance.hvm_boot_order = self.hvm_boot_order
4460
      result.append(("hvm_boot_order", self.hvm_boot_order))
4461
    if self.hvm_acpi:
4462
      instance.hvm_acpi = self.hvm_acpi
4463
      result.append(("hvm_acpi", self.hvm_acpi))
4464
    if self.hvm_pae:
4465
      instance.hvm_pae = self.hvm_pae
4466
      result.append(("hvm_pae", self.hvm_pae))
4467
    if self.hvm_cdrom_image_path:
4468
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4469
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4470
    if self.vnc_bind_address:
4471
      instance.vnc_bind_address = self.vnc_bind_address
4472
      result.append(("vnc_bind_address", self.vnc_bind_address))
4473

    
4474
    self.cfg.AddInstance(instance)
4475

    
4476
    return result
4477

    
4478

    
4479
class LUQueryExports(NoHooksLU):
4480
  """Query the exports list
4481

4482
  """
4483
  _OP_REQP = []
4484

    
4485
  def CheckPrereq(self):
4486
    """Check that the nodelist contains only existing nodes.
4487

4488
    """
4489
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4490

    
4491
  def Exec(self, feedback_fn):
4492
    """Compute the list of all the exported system images.
4493

4494
    Returns:
4495
      a dictionary with the structure node->(export-list)
4496
      where export-list is a list of the instances exported on
4497
      that node.
4498

4499
    """
4500
    return rpc.call_export_list(self.nodes)
4501

    
4502

    
4503
class LUExportInstance(LogicalUnit):
4504
  """Export an instance to an image in the cluster.
4505

4506
  """
4507
  HPATH = "instance-export"
4508
  HTYPE = constants.HTYPE_INSTANCE
4509
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4510

    
4511
  def BuildHooksEnv(self):
4512
    """Build hooks env.
4513

4514
    This will run on the master, primary node and target node.
4515

4516
    """
4517
    env = {
4518
      "EXPORT_NODE": self.op.target_node,
4519
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4520
      }
4521
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4522
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4523
          self.op.target_node]
4524
    return env, nl, nl
4525

    
4526
  def CheckPrereq(self):
4527
    """Check prerequisites.
4528

4529
    This checks that the instance and node names are valid.
4530

4531
    """
4532
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4533
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4534
    if self.instance is None:
4535
      raise errors.OpPrereqError("Instance '%s' not found" %
4536
                                 self.op.instance_name)
4537

    
4538
    # node verification
4539
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4540
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4541

    
4542
    if self.dst_node is None:
4543
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4544
                                 self.op.target_node)
4545
    self.op.target_node = self.dst_node.name
4546

    
4547
    # instance disk type verification
4548
    for disk in self.instance.disks:
4549
      if disk.dev_type == constants.LD_FILE:
4550
        raise errors.OpPrereqError("Export not supported for instances with"
4551
                                   " file-based disks")
4552

    
4553
  def Exec(self, feedback_fn):
4554
    """Export an instance to an image in the cluster.
4555

4556
    """
4557
    instance = self.instance
4558
    dst_node = self.dst_node
4559
    src_node = instance.primary_node
4560
    if self.op.shutdown:
4561
      # shutdown the instance, but not the disks
4562
      if not rpc.call_instance_shutdown(src_node, instance):
4563
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4564
                                  (instance.name, src_node))
4565

    
4566
    vgname = self.cfg.GetVGName()
4567

    
4568
    snap_disks = []
4569

    
4570
    try:
4571
      for disk in instance.disks:
4572
        if disk.iv_name == "sda":
4573
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4574
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4575

    
4576
          if not new_dev_name:
4577
            logger.Error("could not snapshot block device %s on node %s" %
4578
                         (disk.logical_id[1], src_node))
4579
          else:
4580
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4581
                                      logical_id=(vgname, new_dev_name),
4582
                                      physical_id=(vgname, new_dev_name),
4583
                                      iv_name=disk.iv_name)
4584
            snap_disks.append(new_dev)
4585

    
4586
    finally:
4587
      if self.op.shutdown and instance.status == "up":
4588
        if not rpc.call_instance_start(src_node, instance, None):
4589
          _ShutdownInstanceDisks(instance, self.cfg)
4590
          raise errors.OpExecError("Could not start instance")
4591

    
4592
    # TODO: check for size
4593

    
4594
    for dev in snap_disks:
4595
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4596
        logger.Error("could not export block device %s from node %s to node %s"
4597
                     % (dev.logical_id[1], src_node, dst_node.name))
4598
      if not rpc.call_blockdev_remove(src_node, dev):
4599
        logger.Error("could not remove snapshot block device %s from node %s" %
4600
                     (dev.logical_id[1], src_node))
4601

    
4602
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4603
      logger.Error("could not finalize export for instance %s on node %s" %
4604
                   (instance.name, dst_node.name))
4605

    
4606
    nodelist = self.cfg.GetNodeList()
4607
    nodelist.remove(dst_node.name)
4608

    
4609
    # on one-node clusters nodelist will be empty after the removal
4610
    # if we proceed the backup would be removed because OpQueryExports
4611
    # substitutes an empty list with the full cluster node list.
4612
    if nodelist:
4613
      op = opcodes.OpQueryExports(nodes=nodelist)
4614
      exportlist = self.proc.ChainOpCode(op)
4615
      for node in exportlist:
4616
        if instance.name in exportlist[node]:
4617
          if not rpc.call_export_remove(node, instance.name):
4618
            logger.Error("could not remove older export for instance %s"
4619
                         " on node %s" % (instance.name, node))
4620

    
4621

    
4622
class LURemoveExport(NoHooksLU):
4623
  """Remove exports related to the named instance.
4624

4625
  """
4626
  _OP_REQP = ["instance_name"]
4627

    
4628
  def CheckPrereq(self):
4629
    """Check prerequisites.
4630
    """
4631
    pass
4632

    
4633
  def Exec(self, feedback_fn):
4634
    """Remove any export.
4635

4636
    """
4637
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4638
    # If the instance was not found we'll try with the name that was passed in.
4639
    # This will only work if it was an FQDN, though.
4640
    fqdn_warn = False
4641
    if not instance_name:
4642
      fqdn_warn = True
4643
      instance_name = self.op.instance_name
4644

    
4645
    op = opcodes.OpQueryExports(nodes=[])
4646
    exportlist = self.proc.ChainOpCode(op)
4647
    found = False
4648
    for node in exportlist:
4649
      if instance_name in exportlist[node]:
4650
        found = True
4651
        if not rpc.call_export_remove(node, instance_name):
4652
          logger.Error("could not remove export for instance %s"
4653
                       " on node %s" % (instance_name, node))
4654

    
4655
    if fqdn_warn and not found:
4656
      feedback_fn("Export not found. If trying to remove an export belonging"
4657
                  " to a deleted instance please use its Fully Qualified"
4658
                  " Domain Name.")
4659

    
4660

    
4661
class TagsLU(NoHooksLU):
4662
  """Generic tags LU.
4663

4664
  This is an abstract class which is the parent of all the other tags LUs.
4665

4666
  """
4667
  def CheckPrereq(self):
4668
    """Check prerequisites.
4669

4670
    """
4671
    if self.op.kind == constants.TAG_CLUSTER:
4672
      self.target = self.cfg.GetClusterInfo()
4673
    elif self.op.kind == constants.TAG_NODE:
4674
      name = self.cfg.ExpandNodeName(self.op.name)
4675
      if name is None:
4676
        raise errors.OpPrereqError("Invalid node name (%s)" %
4677
                                   (self.op.name,))
4678
      self.op.name = name
4679
      self.target = self.cfg.GetNodeInfo(name)
4680
    elif self.op.kind == constants.TAG_INSTANCE:
4681
      name = self.cfg.ExpandInstanceName(self.op.name)
4682
      if name is None:
4683
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4684
                                   (self.op.name,))
4685
      self.op.name = name
4686
      self.target = self.cfg.GetInstanceInfo(name)
4687
    else:
4688
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4689
                                 str(self.op.kind))
4690

    
4691

    
4692
class LUGetTags(TagsLU):
4693
  """Returns the tags of a given object.
4694

4695
  """
4696
  _OP_REQP = ["kind", "name"]
4697

    
4698
  def Exec(self, feedback_fn):
4699
    """Returns the tag list.
4700

4701
    """
4702
    return self.target.GetTags()
4703

    
4704

    
4705
class LUSearchTags(NoHooksLU):
4706
  """Searches the tags for a given pattern.
4707

4708
  """
4709
  _OP_REQP = ["pattern"]
4710

    
4711
  def CheckPrereq(self):
4712
    """Check prerequisites.
4713

4714
    This checks the pattern passed for validity by compiling it.
4715

4716
    """
4717
    try:
4718
      self.re = re.compile(self.op.pattern)
4719
    except re.error, err:
4720
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4721
                                 (self.op.pattern, err))
4722

    
4723
  def Exec(self, feedback_fn):
4724
    """Returns the tag list.
4725

4726
    """
4727
    cfg = self.cfg
4728
    tgts = [("/cluster", cfg.GetClusterInfo())]
4729
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4730
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4731
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4732
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4733
    results = []
4734
    for path, target in tgts:
4735
      for tag in target.GetTags():
4736
        if self.re.search(tag):
4737
          results.append((path, tag))
4738
    return results
4739

    
4740

    
4741
class LUAddTags(TagsLU):
4742
  """Sets a tag on a given object.
4743

4744
  """
4745
  _OP_REQP = ["kind", "name", "tags"]
4746

    
4747
  def CheckPrereq(self):
4748
    """Check prerequisites.
4749

4750
    This checks the type and length of the tag name and value.
4751

4752
    """
4753
    TagsLU.CheckPrereq(self)
4754
    for tag in self.op.tags:
4755
      objects.TaggableObject.ValidateTag(tag)
4756

    
4757
  def Exec(self, feedback_fn):
4758
    """Sets the tag.
4759

4760
    """
4761
    try:
4762
      for tag in self.op.tags:
4763
        self.target.AddTag(tag)
4764
    except errors.TagError, err:
4765
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4766
    try:
4767
      self.cfg.Update(self.target)
4768
    except errors.ConfigurationError:
4769
      raise errors.OpRetryError("There has been a modification to the"
4770
                                " config file and the operation has been"
4771
                                " aborted. Please retry.")
4772

    
4773

    
4774
class LUDelTags(TagsLU):
4775
  """Delete a list of tags from a given object.
4776

4777
  """
4778
  _OP_REQP = ["kind", "name", "tags"]
4779

    
4780
  def CheckPrereq(self):
4781
    """Check prerequisites.
4782

4783
    This checks that we have the given tag.
4784

4785
    """
4786
    TagsLU.CheckPrereq(self)
4787
    for tag in self.op.tags:
4788
      objects.TaggableObject.ValidateTag(tag)
4789
    del_tags = frozenset(self.op.tags)
4790
    cur_tags = self.target.GetTags()
4791
    if not del_tags <= cur_tags:
4792
      diff_tags = del_tags - cur_tags
4793
      diff_names = ["'%s'" % tag for tag in diff_tags]
4794
      diff_names.sort()
4795
      raise errors.OpPrereqError("Tag(s) %s not found" %
4796
                                 (",".join(diff_names)))
4797

    
4798
  def Exec(self, feedback_fn):
4799
    """Remove the tag from the object.
4800

4801
    """
4802
    for tag in self.op.tags:
4803
      self.target.RemoveTag(tag)
4804
    try:
4805
      self.cfg.Update(self.target)
4806
    except errors.ConfigurationError:
4807
      raise errors.OpRetryError("There has been a modification to the"
4808
                                " config file and the operation has been"
4809
                                " aborted. Please retry.")
4810

    
4811
class LUTestDelay(NoHooksLU):
4812
  """Sleep for a specified amount of time.
4813

4814
  This LU sleeps on the master and/or nodes for a specified amoutn of
4815
  time.
4816

4817
  """
4818
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4819

    
4820
  def CheckPrereq(self):
4821
    """Check prerequisites.
4822

4823
    This checks that we have a good list of nodes and/or the duration
4824
    is valid.
4825

4826
    """
4827

    
4828
    if self.op.on_nodes:
4829
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4830

    
4831
  def Exec(self, feedback_fn):
4832
    """Do the actual sleep.
4833

4834
    """
4835
    if self.op.on_master:
4836
      if not utils.TestDelay(self.op.duration):
4837
        raise errors.OpExecError("Error during master delay test")
4838
    if self.op.on_nodes:
4839
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4840
      if not result:
4841
        raise errors.OpExecError("Complete failure from rpc call")
4842
      for node, node_result in result.items():
4843
        if not node_result:
4844
          raise errors.OpExecError("Failure during rpc call to node %s,"
4845
                                   " result: %s" % (node, node_result))
4846

    
4847

    
4848
class IAllocator(object):
4849
  """IAllocator framework.
4850

4851
  An IAllocator instance has three sets of attributes:
4852
    - cfg/sstore that are needed to query the cluster
4853
    - input data (all members of the _KEYS class attribute are required)
4854
    - four buffer attributes (in|out_data|text), that represent the
4855
      input (to the external script) in text and data structure format,
4856
      and the output from it, again in two formats
4857
    - the result variables from the script (success, info, nodes) for
4858
      easy usage
4859

4860
  """
4861
  _ALLO_KEYS = [
4862
    "mem_size", "disks", "disk_template",
4863
    "os", "tags", "nics", "vcpus",
4864
    ]
4865
  _RELO_KEYS = [
4866
    "relocate_from",
4867
    ]
4868

    
4869
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4870
    self.cfg = cfg
4871
    self.sstore = sstore
4872
    # init buffer variables
4873
    self.in_text = self.out_text = self.in_data = self.out_data = None
4874
    # init all input fields so that pylint is happy
4875
    self.mode = mode
4876
    self.name = name
4877
    self.mem_size = self.disks = self.disk_template = None
4878
    self.os = self.tags = self.nics = self.vcpus = None
4879
    self.relocate_from = None
4880
    # computed fields
4881
    self.required_nodes = None
4882
    # init result fields
4883
    self.success = self.info = self.nodes = None
4884
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4885
      keyset = self._ALLO_KEYS
4886
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4887
      keyset = self._RELO_KEYS
4888
    else:
4889
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4890
                                   " IAllocator" % self.mode)
4891
    for key in kwargs:
4892
      if key not in keyset:
4893
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4894
                                     " IAllocator" % key)
4895
      setattr(self, key, kwargs[key])
4896
    for key in keyset:
4897
      if key not in kwargs:
4898
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4899
                                     " IAllocator" % key)
4900
    self._BuildInputData()
4901

    
4902
  def _ComputeClusterData(self):
4903
    """Compute the generic allocator input data.
4904

4905
    This is the data that is independent of the actual operation.
4906

4907
    """
4908
    cfg = self.cfg
4909
    # cluster data
4910
    data = {
4911
      "version": 1,
4912
      "cluster_name": self.sstore.GetClusterName(),
4913
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4914
      "hypervisor_type": self.sstore.GetHypervisorType(),
4915
      # we don't have job IDs
4916
      }
4917

    
4918
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4919

    
4920
    # node data
4921
    node_results = {}
4922
    node_list = cfg.GetNodeList()
4923
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4924
    for nname in node_list:
4925
      ninfo = cfg.GetNodeInfo(nname)
4926
      if nname not in node_data or not isinstance(node_data[nname], dict):
4927
        raise errors.OpExecError("Can't get data for node %s" % nname)
4928
      remote_info = node_data[nname]
4929
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4930
                   'vg_size', 'vg_free', 'cpu_total']:
4931
        if attr not in remote_info:
4932
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4933
                                   (nname, attr))
4934
        try:
4935
          remote_info[attr] = int(remote_info[attr])
4936
        except ValueError, err:
4937
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4938
                                   " %s" % (nname, attr, str(err)))
4939
      # compute memory used by primary instances
4940
      i_p_mem = i_p_up_mem = 0
4941
      for iinfo in i_list:
4942
        if iinfo.primary_node == nname:
4943
          i_p_mem += iinfo.memory
4944
          if iinfo.status == "up":
4945
            i_p_up_mem += iinfo.memory
4946

    
4947
      # compute memory used by instances
4948
      pnr = {
4949
        "tags": list(ninfo.GetTags()),
4950
        "total_memory": remote_info['memory_total'],
4951
        "reserved_memory": remote_info['memory_dom0'],
4952
        "free_memory": remote_info['memory_free'],
4953
        "i_pri_memory": i_p_mem,
4954
        "i_pri_up_memory": i_p_up_mem,
4955
        "total_disk": remote_info['vg_size'],
4956
        "free_disk": remote_info['vg_free'],
4957
        "primary_ip": ninfo.primary_ip,
4958
        "secondary_ip": ninfo.secondary_ip,
4959
        "total_cpus": remote_info['cpu_total'],
4960
        }
4961
      node_results[nname] = pnr
4962
    data["nodes"] = node_results
4963

    
4964
    # instance data
4965
    instance_data = {}
4966
    for iinfo in i_list:
4967
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4968
                  for n in iinfo.nics]
4969
      pir = {
4970
        "tags": list(iinfo.GetTags()),
4971
        "should_run": iinfo.status == "up",
4972
        "vcpus": iinfo.vcpus,
4973
        "memory": iinfo.memory,
4974
        "os": iinfo.os,
4975
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4976
        "nics": nic_data,
4977
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4978
        "disk_template": iinfo.disk_template,
4979
        }
4980
      instance_data[iinfo.name] = pir
4981

    
4982
    data["instances"] = instance_data
4983

    
4984
    self.in_data = data
4985

    
4986
  def _AddNewInstance(self):
4987
    """Add new instance data to allocator structure.
4988

4989
    This in combination with _AllocatorGetClusterData will create the
4990
    correct structure needed as input for the allocator.
4991

4992
    The checks for the completeness of the opcode must have already been
4993
    done.
4994

4995
    """
4996
    data = self.in_data
4997
    if len(self.disks) != 2:
4998
      raise errors.OpExecError("Only two-disk configurations supported")
4999

    
5000
    disk_space = _ComputeDiskSize(self.disk_template,
5001
                                  self.disks[0]["size"], self.disks[1]["size"])
5002

    
5003
    if self.disk_template in constants.DTS_NET_MIRROR:
5004
      self.required_nodes = 2
5005
    else:
5006
      self.required_nodes = 1
5007
    request = {
5008
      "type": "allocate",
5009
      "name": self.name,
5010
      "disk_template": self.disk_template,
5011
      "tags": self.tags,
5012
      "os": self.os,
5013
      "vcpus": self.vcpus,
5014
      "memory": self.mem_size,
5015
      "disks": self.disks,
5016
      "disk_space_total": disk_space,
5017
      "nics": self.nics,
5018
      "required_nodes": self.required_nodes,
5019
      }
5020
    data["request"] = request
5021

    
5022
  def _AddRelocateInstance(self):
5023
    """Add relocate instance data to allocator structure.
5024

5025
    This in combination with _IAllocatorGetClusterData will create the
5026
    correct structure needed as input for the allocator.
5027

5028
    The checks for the completeness of the opcode must have already been
5029
    done.
5030

5031
    """
5032
    instance = self.cfg.GetInstanceInfo(self.name)
5033
    if instance is None:
5034
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5035
                                   " IAllocator" % self.name)
5036

    
5037
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5038
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5039

    
5040
    if len(instance.secondary_nodes) != 1:
5041
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5042

    
5043
    self.required_nodes = 1
5044

    
5045
    disk_space = _ComputeDiskSize(instance.disk_template,
5046
                                  instance.disks[0].size,
5047
                                  instance.disks[1].size)
5048

    
5049
    request = {
5050
      "type": "relocate",
5051
      "name": self.name,
5052
      "disk_space_total": disk_space,
5053
      "required_nodes": self.required_nodes,
5054
      "relocate_from": self.relocate_from,
5055
      }
5056
    self.in_data["request"] = request
5057

    
5058
  def _BuildInputData(self):
5059
    """Build input data structures.
5060

5061
    """
5062
    self._ComputeClusterData()
5063

    
5064
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5065
      self._AddNewInstance()
5066
    else:
5067
      self._AddRelocateInstance()
5068

    
5069
    self.in_text = serializer.Dump(self.in_data)
5070

    
5071
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5072
    """Run an instance allocator and return the results.
5073

5074
    """
5075
    data = self.in_text
5076

    
5077
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5078

    
5079
    if not isinstance(result, tuple) or len(result) != 4:
5080
      raise errors.OpExecError("Invalid result from master iallocator runner")
5081

    
5082
    rcode, stdout, stderr, fail = result
5083

    
5084
    if rcode == constants.IARUN_NOTFOUND:
5085
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5086
    elif rcode == constants.IARUN_FAILURE:
5087
        raise errors.OpExecError("Instance allocator call failed: %s,"
5088
                                 " output: %s" %
5089
                                 (fail, stdout+stderr))
5090
    self.out_text = stdout
5091
    if validate:
5092
      self._ValidateResult()
5093

    
5094
  def _ValidateResult(self):
5095
    """Process the allocator results.
5096

5097
    This will process and if successful save the result in
5098
    self.out_data and the other parameters.
5099

5100
    """
5101
    try:
5102
      rdict = serializer.Load(self.out_text)
5103
    except Exception, err:
5104
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5105

    
5106
    if not isinstance(rdict, dict):
5107
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5108

    
5109
    for key in "success", "info", "nodes":
5110
      if key not in rdict:
5111
        raise errors.OpExecError("Can't parse iallocator results:"
5112
                                 " missing key '%s'" % key)
5113
      setattr(self, key, rdict[key])
5114

    
5115
    if not isinstance(rdict["nodes"], list):
5116
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5117
                               " is not a list")
5118
    self.out_data = rdict
5119

    
5120

    
5121
class LUTestAllocator(NoHooksLU):
5122
  """Run allocator tests.
5123

5124
  This LU runs the allocator tests
5125

5126
  """
5127
  _OP_REQP = ["direction", "mode", "name"]
5128

    
5129
  def CheckPrereq(self):
5130
    """Check prerequisites.
5131

5132
    This checks the opcode parameters depending on the director and mode test.
5133

5134
    """
5135
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5136
      for attr in ["name", "mem_size", "disks", "disk_template",
5137
                   "os", "tags", "nics", "vcpus"]:
5138
        if not hasattr(self.op, attr):
5139
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5140
                                     attr)
5141
      iname = self.cfg.ExpandInstanceName(self.op.name)
5142
      if iname is not None:
5143
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5144
                                   iname)
5145
      if not isinstance(self.op.nics, list):
5146
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5147
      for row in self.op.nics:
5148
        if (not isinstance(row, dict) or
5149
            "mac" not in row or
5150
            "ip" not in row or
5151
            "bridge" not in row):
5152
          raise errors.OpPrereqError("Invalid contents of the"
5153
                                     " 'nics' parameter")
5154
      if not isinstance(self.op.disks, list):
5155
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5156
      if len(self.op.disks) != 2:
5157
        raise errors.OpPrereqError("Only two-disk configurations supported")
5158
      for row in self.op.disks:
5159
        if (not isinstance(row, dict) or
5160
            "size" not in row or
5161
            not isinstance(row["size"], int) or
5162
            "mode" not in row or
5163
            row["mode"] not in ['r', 'w']):
5164
          raise errors.OpPrereqError("Invalid contents of the"
5165
                                     " 'disks' parameter")
5166
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5167
      if not hasattr(self.op, "name"):
5168
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5169
      fname = self.cfg.ExpandInstanceName(self.op.name)
5170
      if fname is None:
5171
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5172
                                   self.op.name)
5173
      self.op.name = fname
5174
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5175
    else:
5176
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5177
                                 self.op.mode)
5178

    
5179
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5180
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5181
        raise errors.OpPrereqError("Missing allocator name")
5182
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5183
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5184
                                 self.op.direction)
5185

    
5186
  def Exec(self, feedback_fn):
5187
    """Run the allocator test.
5188

5189
    """
5190
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5191
      ial = IAllocator(self.cfg, self.sstore,
5192
                       mode=self.op.mode,
5193
                       name=self.op.name,
5194
                       mem_size=self.op.mem_size,
5195
                       disks=self.op.disks,
5196
                       disk_template=self.op.disk_template,
5197
                       os=self.op.os,
5198
                       tags=self.op.tags,
5199
                       nics=self.op.nics,
5200
                       vcpus=self.op.vcpus,
5201
                       )
5202
    else:
5203
      ial = IAllocator(self.cfg, self.sstore,
5204
                       mode=self.op.mode,
5205
                       name=self.op.name,
5206
                       relocate_from=list(self.relocate_from),
5207
                       )
5208

    
5209
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5210
      result = ial.in_text
5211
    else:
5212
      ial.Run(self.op.allocator, validate=False)
5213
      result = ial.out_text
5214
    return result