Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 5c54b832

History | View | Annotate | Download (177.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
545

    
546
    # set up the inter-node password and certificate
547
    _InitGanetiServerSetup(ss)
548

    
549
    # start the master ip
550
    rpc.call_node_start_master(hostname.name)
551

    
552
    # set up ssh config and /etc/hosts
553
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
554
    try:
555
      sshline = f.read()
556
    finally:
557
      f.close()
558
    sshkey = sshline.split(" ")[1]
559

    
560
    _AddHostToEtcHosts(hostname.name)
561
    _InitSSHSetup(hostname.name)
562

    
563
    # init of cluster config file
564
    self.cfg = cfgw = config.ConfigWriter()
565
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
566
                    sshkey, self.op.mac_prefix,
567
                    self.op.vg_name, self.op.def_bridge)
568

    
569
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
570

    
571

    
572
class LUDestroyCluster(NoHooksLU):
573
  """Logical unit for destroying the cluster.
574

575
  """
576
  _OP_REQP = []
577

    
578
  def CheckPrereq(self):
579
    """Check prerequisites.
580

581
    This checks whether the cluster is empty.
582

583
    Any errors are signalled by raising errors.OpPrereqError.
584

585
    """
586
    master = self.sstore.GetMasterNode()
587

    
588
    nodelist = self.cfg.GetNodeList()
589
    if len(nodelist) != 1 or nodelist[0] != master:
590
      raise errors.OpPrereqError("There are still %d node(s) in"
591
                                 " this cluster." % (len(nodelist) - 1))
592
    instancelist = self.cfg.GetInstanceList()
593
    if instancelist:
594
      raise errors.OpPrereqError("There are still %d instance(s) in"
595
                                 " this cluster." % len(instancelist))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    master = self.sstore.GetMasterNode()
602
    if not rpc.call_node_stop_master(master):
603
      raise errors.OpExecError("Could not disable the master role")
604
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
605
    utils.CreateBackup(priv_key)
606
    utils.CreateBackup(pub_key)
607
    rpc.call_node_leave_cluster(master)
608

    
609

    
610
class LUVerifyCluster(LogicalUnit):
611
  """Verifies the cluster status.
612

613
  """
614
  HPATH = "cluster-verify"
615
  HTYPE = constants.HTYPE_CLUSTER
616
  _OP_REQP = ["skip_checks"]
617

    
618
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
619
                  remote_version, feedback_fn):
620
    """Run multiple tests against a node.
621

622
    Test list:
623
      - compares ganeti version
624
      - checks vg existance and size > 20G
625
      - checks config file checksum
626
      - checks ssh to other nodes
627

628
    Args:
629
      node: name of the node to check
630
      file_list: required list of files
631
      local_cksum: dictionary of local files and their checksums
632

633
    """
634
    # compares ganeti version
635
    local_version = constants.PROTOCOL_VERSION
636
    if not remote_version:
637
      feedback_fn("  - ERROR: connection to %s failed" % (node))
638
      return True
639

    
640
    if local_version != remote_version:
641
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
642
                      (local_version, node, remote_version))
643
      return True
644

    
645
    # checks vg existance and size > 20G
646

    
647
    bad = False
648
    if not vglist:
649
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
650
                      (node,))
651
      bad = True
652
    else:
653
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
654
      if vgstatus:
655
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
656
        bad = True
657

    
658
    # checks config file checksum
659
    # checks ssh to any
660

    
661
    if 'filelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
664
    else:
665
      remote_cksum = node_result['filelist']
666
      for file_name in file_list:
667
        if file_name not in remote_cksum:
668
          bad = True
669
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
670
        elif remote_cksum[file_name] != local_cksum[file_name]:
671
          bad = True
672
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
673

    
674
    if 'nodelist' not in node_result:
675
      bad = True
676
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
677
    else:
678
      if node_result['nodelist']:
679
        bad = True
680
        for node in node_result['nodelist']:
681
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
682
                          (node, node_result['nodelist'][node]))
683
    if 'node-net-test' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
686
    else:
687
      if node_result['node-net-test']:
688
        bad = True
689
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
690
        for node in nlist:
691
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
692
                          (node, node_result['node-net-test'][node]))
693

    
694
    hyp_result = node_result.get('hypervisor', None)
695
    if hyp_result is not None:
696
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
697
    return bad
698

    
699
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
700
                      node_instance, feedback_fn):
701
    """Verify an instance.
702

703
    This function checks to see if the required block devices are
704
    available on the instance's node.
705

706
    """
707
    bad = False
708

    
709
    node_current = instanceconfig.primary_node
710

    
711
    node_vol_should = {}
712
    instanceconfig.MapLVsByNode(node_vol_should)
713

    
714
    for node in node_vol_should:
715
      for volume in node_vol_should[node]:
716
        if node not in node_vol_is or volume not in node_vol_is[node]:
717
          feedback_fn("  - ERROR: volume %s missing on node %s" %
718
                          (volume, node))
719
          bad = True
720

    
721
    if not instanceconfig.status == 'down':
722
      if (node_current not in node_instance or
723
          not instance in node_instance[node_current]):
724
        feedback_fn("  - ERROR: instance %s not running on node %s" %
725
                        (instance, node_current))
726
        bad = True
727

    
728
    for node in node_instance:
729
      if (not node == node_current):
730
        if instance in node_instance[node]:
731
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
732
                          (instance, node))
733
          bad = True
734

    
735
    return bad
736

    
737
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
738
    """Verify if there are any unknown volumes in the cluster.
739

740
    The .os, .swap and backup volumes are ignored. All other volumes are
741
    reported as unknown.
742

743
    """
744
    bad = False
745

    
746
    for node in node_vol_is:
747
      for volume in node_vol_is[node]:
748
        if node not in node_vol_should or volume not in node_vol_should[node]:
749
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
750
                      (volume, node))
751
          bad = True
752
    return bad
753

    
754
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
755
    """Verify the list of running instances.
756

757
    This checks what instances are running but unknown to the cluster.
758

759
    """
760
    bad = False
761
    for node in node_instance:
762
      for runninginstance in node_instance[node]:
763
        if runninginstance not in instancelist:
764
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
765
                          (runninginstance, node))
766
          bad = True
767
    return bad
768

    
769
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
770
    """Verify N+1 Memory Resilience.
771

772
    Check that if one single node dies we can still start all the instances it
773
    was primary for.
774

775
    """
776
    bad = False
777

    
778
    for node, nodeinfo in node_info.iteritems():
779
      # This code checks that every node which is now listed as secondary has
780
      # enough memory to host all instances it is supposed to should a single
781
      # other node in the cluster fail.
782
      # FIXME: not ready for failover to an arbitrary node
783
      # FIXME: does not support file-backed instances
784
      # WARNING: we currently take into account down instances as well as up
785
      # ones, considering that even if they're down someone might want to start
786
      # them even in the event of a node failure.
787
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
788
        needed_mem = 0
789
        for instance in instances:
790
          needed_mem += instance_cfg[instance].memory
791
        if nodeinfo['mfree'] < needed_mem:
792
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
793
                      " failovers should node %s fail" % (node, prinode))
794
          bad = True
795
    return bad
796

    
797
  def CheckPrereq(self):
798
    """Check prerequisites.
799

800
    Transform the list of checks we're going to skip into a set and check that
801
    all its members are valid.
802

803
    """
804
    self.skip_set = frozenset(self.op.skip_checks)
805
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
806
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
807

    
808
  def BuildHooksEnv(self):
809
    """Build hooks env.
810

811
    Cluster-Verify hooks just rone in the post phase and their failure makes
812
    the output be logged in the verify output and the verification to fail.
813

814
    """
815
    all_nodes = self.cfg.GetNodeList()
816
    # TODO: populate the environment with useful information for verify hooks
817
    env = {}
818
    return env, [], all_nodes
819

    
820
  def Exec(self, feedback_fn):
821
    """Verify integrity of cluster, performing various test on nodes.
822

823
    """
824
    bad = False
825
    feedback_fn("* Verifying global settings")
826
    for msg in self.cfg.VerifyConfig():
827
      feedback_fn("  - ERROR: %s" % msg)
828

    
829
    vg_name = self.cfg.GetVGName()
830
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
831
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
832
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
833
    i_non_redundant = [] # Non redundant instances
834
    node_volume = {}
835
    node_instance = {}
836
    node_info = {}
837
    instance_cfg = {}
838

    
839
    # FIXME: verify OS list
840
    # do local checksums
841
    file_names = list(self.sstore.GetFileList())
842
    file_names.append(constants.SSL_CERT_FILE)
843
    file_names.append(constants.CLUSTER_CONF_FILE)
844
    local_checksums = utils.FingerprintFiles(file_names)
845

    
846
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
847
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
848
    all_instanceinfo = rpc.call_instance_list(nodelist)
849
    all_vglist = rpc.call_vg_list(nodelist)
850
    node_verify_param = {
851
      'filelist': file_names,
852
      'nodelist': nodelist,
853
      'hypervisor': None,
854
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
855
                        for node in nodeinfo]
856
      }
857
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858
    all_rversion = rpc.call_version(nodelist)
859
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
860

    
861
    for node in nodelist:
862
      feedback_fn("* Verifying node %s" % node)
863
      result = self._VerifyNode(node, file_names, local_checksums,
864
                                all_vglist[node], all_nvinfo[node],
865
                                all_rversion[node], feedback_fn)
866
      bad = bad or result
867

    
868
      # node_volume
869
      volumeinfo = all_volumeinfo[node]
870

    
871
      if isinstance(volumeinfo, basestring):
872
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
873
                    (node, volumeinfo[-400:].encode('string_escape')))
874
        bad = True
875
        node_volume[node] = {}
876
      elif not isinstance(volumeinfo, dict):
877
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
878
        bad = True
879
        continue
880
      else:
881
        node_volume[node] = volumeinfo
882

    
883
      # node_instance
884
      nodeinstance = all_instanceinfo[node]
885
      if type(nodeinstance) != list:
886
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
887
        bad = True
888
        continue
889

    
890
      node_instance[node] = nodeinstance
891

    
892
      # node_info
893
      nodeinfo = all_ninfo[node]
894
      if not isinstance(nodeinfo, dict):
895
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
896
        bad = True
897
        continue
898

    
899
      try:
900
        node_info[node] = {
901
          "mfree": int(nodeinfo['memory_free']),
902
          "dfree": int(nodeinfo['vg_free']),
903
          "pinst": [],
904
          "sinst": [],
905
          # dictionary holding all instances this node is secondary for,
906
          # grouped by their primary node. Each key is a cluster node, and each
907
          # value is a list of instances which have the key as primary and the
908
          # current node as secondary.  this is handy to calculate N+1 memory
909
          # availability if you can only failover from a primary to its
910
          # secondary.
911
          "sinst-by-pnode": {},
912
        }
913
      except ValueError:
914
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
915
        bad = True
916
        continue
917

    
918
    node_vol_should = {}
919

    
920
    for instance in instancelist:
921
      feedback_fn("* Verifying instance %s" % instance)
922
      inst_config = self.cfg.GetInstanceInfo(instance)
923
      result =  self._VerifyInstance(instance, inst_config, node_volume,
924
                                     node_instance, feedback_fn)
925
      bad = bad or result
926

    
927
      inst_config.MapLVsByNode(node_vol_should)
928

    
929
      instance_cfg[instance] = inst_config
930

    
931
      pnode = inst_config.primary_node
932
      if pnode in node_info:
933
        node_info[pnode]['pinst'].append(instance)
934
      else:
935
        feedback_fn("  - ERROR: instance %s, connection to primary node"
936
                    " %s failed" % (instance, pnode))
937
        bad = True
938

    
939
      # If the instance is non-redundant we cannot survive losing its primary
940
      # node, so we are not N+1 compliant. On the other hand we have no disk
941
      # templates with more than one secondary so that situation is not well
942
      # supported either.
943
      # FIXME: does not support file-backed instances
944
      if len(inst_config.secondary_nodes) == 0:
945
        i_non_redundant.append(instance)
946
      elif len(inst_config.secondary_nodes) > 1:
947
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
948
                    % instance)
949

    
950
      for snode in inst_config.secondary_nodes:
951
        if snode in node_info:
952
          node_info[snode]['sinst'].append(instance)
953
          if pnode not in node_info[snode]['sinst-by-pnode']:
954
            node_info[snode]['sinst-by-pnode'][pnode] = []
955
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
956
        else:
957
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
958
                      " %s failed" % (instance, snode))
959

    
960
    feedback_fn("* Verifying orphan volumes")
961
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
962
                                       feedback_fn)
963
    bad = bad or result
964

    
965
    feedback_fn("* Verifying remaining instances")
966
    result = self._VerifyOrphanInstances(instancelist, node_instance,
967
                                         feedback_fn)
968
    bad = bad or result
969

    
970
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
971
      feedback_fn("* Verifying N+1 Memory redundancy")
972
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
973
      bad = bad or result
974

    
975
    feedback_fn("* Other Notes")
976
    if i_non_redundant:
977
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
978
                  % len(i_non_redundant))
979

    
980
    return int(bad)
981

    
982
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
983
    """Analize the post-hooks' result, handle it, and send some
984
    nicely-formatted feedback back to the user.
985

986
    Args:
987
      phase: the hooks phase that has just been run
988
      hooks_results: the results of the multi-node hooks rpc call
989
      feedback_fn: function to send feedback back to the caller
990
      lu_result: previous Exec result
991

992
    """
993
    # We only really run POST phase hooks, and are only interested in their results
994
    if phase == constants.HOOKS_PHASE_POST:
995
      # Used to change hooks' output to proper indentation
996
      indent_re = re.compile('^', re.M)
997
      feedback_fn("* Hooks Results")
998
      if not hooks_results:
999
        feedback_fn("  - ERROR: general communication failure")
1000
        lu_result = 1
1001
      else:
1002
        for node_name in hooks_results:
1003
          show_node_header = True
1004
          res = hooks_results[node_name]
1005
          if res is False or not isinstance(res, list):
1006
            feedback_fn("    Communication failure")
1007
            lu_result = 1
1008
            continue
1009
          for script, hkr, output in res:
1010
            if hkr == constants.HKR_FAIL:
1011
              # The node header is only shown once, if there are
1012
              # failing hooks on that node
1013
              if show_node_header:
1014
                feedback_fn("  Node %s:" % node_name)
1015
                show_node_header = False
1016
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1017
              output = indent_re.sub('      ', output)
1018
              feedback_fn("%s" % output)
1019
              lu_result = 1
1020

    
1021
      return lu_result
1022

    
1023

    
1024
class LUVerifyDisks(NoHooksLU):
1025
  """Verifies the cluster disks status.
1026

1027
  """
1028
  _OP_REQP = []
1029

    
1030
  def CheckPrereq(self):
1031
    """Check prerequisites.
1032

1033
    This has no prerequisites.
1034

1035
    """
1036
    pass
1037

    
1038
  def Exec(self, feedback_fn):
1039
    """Verify integrity of cluster disks.
1040

1041
    """
1042
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1043

    
1044
    vg_name = self.cfg.GetVGName()
1045
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1046
    instances = [self.cfg.GetInstanceInfo(name)
1047
                 for name in self.cfg.GetInstanceList()]
1048

    
1049
    nv_dict = {}
1050
    for inst in instances:
1051
      inst_lvs = {}
1052
      if (inst.status != "up" or
1053
          inst.disk_template not in constants.DTS_NET_MIRROR):
1054
        continue
1055
      inst.MapLVsByNode(inst_lvs)
1056
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1057
      for node, vol_list in inst_lvs.iteritems():
1058
        for vol in vol_list:
1059
          nv_dict[(node, vol)] = inst
1060

    
1061
    if not nv_dict:
1062
      return result
1063

    
1064
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1065

    
1066
    to_act = set()
1067
    for node in nodes:
1068
      # node_volume
1069
      lvs = node_lvs[node]
1070

    
1071
      if isinstance(lvs, basestring):
1072
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1073
        res_nlvm[node] = lvs
1074
      elif not isinstance(lvs, dict):
1075
        logger.Info("connection to node %s failed or invalid data returned" %
1076
                    (node,))
1077
        res_nodes.append(node)
1078
        continue
1079

    
1080
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1081
        inst = nv_dict.pop((node, lv_name), None)
1082
        if (not lv_online and inst is not None
1083
            and inst.name not in res_instances):
1084
          res_instances.append(inst.name)
1085

    
1086
    # any leftover items in nv_dict are missing LVs, let's arrange the
1087
    # data better
1088
    for key, inst in nv_dict.iteritems():
1089
      if inst.name not in res_missing:
1090
        res_missing[inst.name] = []
1091
      res_missing[inst.name].append(key)
1092

    
1093
    return result
1094

    
1095

    
1096
class LURenameCluster(LogicalUnit):
1097
  """Rename the cluster.
1098

1099
  """
1100
  HPATH = "cluster-rename"
1101
  HTYPE = constants.HTYPE_CLUSTER
1102
  _OP_REQP = ["name"]
1103

    
1104
  def BuildHooksEnv(self):
1105
    """Build hooks env.
1106

1107
    """
1108
    env = {
1109
      "OP_TARGET": self.sstore.GetClusterName(),
1110
      "NEW_NAME": self.op.name,
1111
      }
1112
    mn = self.sstore.GetMasterNode()
1113
    return env, [mn], [mn]
1114

    
1115
  def CheckPrereq(self):
1116
    """Verify that the passed name is a valid one.
1117

1118
    """
1119
    hostname = utils.HostInfo(self.op.name)
1120

    
1121
    new_name = hostname.name
1122
    self.ip = new_ip = hostname.ip
1123
    old_name = self.sstore.GetClusterName()
1124
    old_ip = self.sstore.GetMasterIP()
1125
    if new_name == old_name and new_ip == old_ip:
1126
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1127
                                 " cluster has changed")
1128
    if new_ip != old_ip:
1129
      result = utils.RunCmd(["fping", "-q", new_ip])
1130
      if not result.failed:
1131
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1132
                                   " reachable on the network. Aborting." %
1133
                                   new_ip)
1134

    
1135
    self.op.name = new_name
1136

    
1137
  def Exec(self, feedback_fn):
1138
    """Rename the cluster.
1139

1140
    """
1141
    clustername = self.op.name
1142
    ip = self.ip
1143
    ss = self.sstore
1144

    
1145
    # shutdown the master IP
1146
    master = ss.GetMasterNode()
1147
    if not rpc.call_node_stop_master(master):
1148
      raise errors.OpExecError("Could not disable the master role")
1149

    
1150
    try:
1151
      # modify the sstore
1152
      ss.SetKey(ss.SS_MASTER_IP, ip)
1153
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1154

    
1155
      # Distribute updated ss config to all nodes
1156
      myself = self.cfg.GetNodeInfo(master)
1157
      dist_nodes = self.cfg.GetNodeList()
1158
      if myself.name in dist_nodes:
1159
        dist_nodes.remove(myself.name)
1160

    
1161
      logger.Debug("Copying updated ssconf data to all nodes")
1162
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1163
        fname = ss.KeyToFilename(keyname)
1164
        result = rpc.call_upload_file(dist_nodes, fname)
1165
        for to_node in dist_nodes:
1166
          if not result[to_node]:
1167
            logger.Error("copy of file %s to node %s failed" %
1168
                         (fname, to_node))
1169
    finally:
1170
      if not rpc.call_node_start_master(master):
1171
        logger.Error("Could not re-enable the master role on the master,"
1172
                     " please restart manually.")
1173

    
1174

    
1175
def _RecursiveCheckIfLVMBased(disk):
1176
  """Check if the given disk or its children are lvm-based.
1177

1178
  Args:
1179
    disk: ganeti.objects.Disk object
1180

1181
  Returns:
1182
    boolean indicating whether a LD_LV dev_type was found or not
1183

1184
  """
1185
  if disk.children:
1186
    for chdisk in disk.children:
1187
      if _RecursiveCheckIfLVMBased(chdisk):
1188
        return True
1189
  return disk.dev_type == constants.LD_LV
1190

    
1191

    
1192
class LUSetClusterParams(LogicalUnit):
1193
  """Change the parameters of the cluster.
1194

1195
  """
1196
  HPATH = "cluster-modify"
1197
  HTYPE = constants.HTYPE_CLUSTER
1198
  _OP_REQP = []
1199

    
1200
  def BuildHooksEnv(self):
1201
    """Build hooks env.
1202

1203
    """
1204
    env = {
1205
      "OP_TARGET": self.sstore.GetClusterName(),
1206
      "NEW_VG_NAME": self.op.vg_name,
1207
      }
1208
    mn = self.sstore.GetMasterNode()
1209
    return env, [mn], [mn]
1210

    
1211
  def CheckPrereq(self):
1212
    """Check prerequisites.
1213

1214
    This checks whether the given params don't conflict and
1215
    if the given volume group is valid.
1216

1217
    """
1218
    if not self.op.vg_name:
1219
      instances = [self.cfg.GetInstanceInfo(name)
1220
                   for name in self.cfg.GetInstanceList()]
1221
      for inst in instances:
1222
        for disk in inst.disks:
1223
          if _RecursiveCheckIfLVMBased(disk):
1224
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1225
                                       " lvm-based instances exist")
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      node_list = self.cfg.GetNodeList()
1230
      vglist = rpc.call_vg_list(node_list)
1231
      for node in node_list:
1232
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
  def Exec(self, feedback_fn):
1238
    """Change the parameters of the cluster.
1239

1240
    """
1241
    if self.op.vg_name != self.cfg.GetVGName():
1242
      self.cfg.SetVGName(self.op.vg_name)
1243
    else:
1244
      feedback_fn("Cluster LVM configuration already in desired"
1245
                  " state, not changing")
1246

    
1247

    
1248
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1249
  """Sleep and poll for an instance's disk to sync.
1250

1251
  """
1252
  if not instance.disks:
1253
    return True
1254

    
1255
  if not oneshot:
1256
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1257

    
1258
  node = instance.primary_node
1259

    
1260
  for dev in instance.disks:
1261
    cfgw.SetDiskID(dev, node)
1262

    
1263
  retries = 0
1264
  while True:
1265
    max_time = 0
1266
    done = True
1267
    cumul_degraded = False
1268
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1269
    if not rstats:
1270
      proc.LogWarning("Can't get any data from node %s" % node)
1271
      retries += 1
1272
      if retries >= 10:
1273
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1274
                                 " aborting." % node)
1275
      time.sleep(6)
1276
      continue
1277
    retries = 0
1278
    for i in range(len(rstats)):
1279
      mstat = rstats[i]
1280
      if mstat is None:
1281
        proc.LogWarning("Can't compute data for node %s/%s" %
1282
                        (node, instance.disks[i].iv_name))
1283
        continue
1284
      # we ignore the ldisk parameter
1285
      perc_done, est_time, is_degraded, _ = mstat
1286
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1287
      if perc_done is not None:
1288
        done = False
1289
        if est_time is not None:
1290
          rem_time = "%d estimated seconds remaining" % est_time
1291
          max_time = est_time
1292
        else:
1293
          rem_time = "no time estimate"
1294
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1295
                     (instance.disks[i].iv_name, perc_done, rem_time))
1296
    if done or oneshot:
1297
      break
1298

    
1299
    if unlock:
1300
      #utils.Unlock('cmd')
1301
      pass
1302
    try:
1303
      time.sleep(min(60, max_time))
1304
    finally:
1305
      if unlock:
1306
        #utils.Lock('cmd')
1307
        pass
1308

    
1309
  if done:
1310
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1311
  return not cumul_degraded
1312

    
1313

    
1314
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1315
  """Check that mirrors are not degraded.
1316

1317
  The ldisk parameter, if True, will change the test from the
1318
  is_degraded attribute (which represents overall non-ok status for
1319
  the device(s)) to the ldisk (representing the local storage status).
1320

1321
  """
1322
  cfgw.SetDiskID(dev, node)
1323
  if ldisk:
1324
    idx = 6
1325
  else:
1326
    idx = 5
1327

    
1328
  result = True
1329
  if on_primary or dev.AssembleOnSecondary():
1330
    rstats = rpc.call_blockdev_find(node, dev)
1331
    if not rstats:
1332
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1333
      result = False
1334
    else:
1335
      result = result and (not rstats[idx])
1336
  if dev.children:
1337
    for child in dev.children:
1338
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1339

    
1340
  return result
1341

    
1342

    
1343
class LUDiagnoseOS(NoHooksLU):
1344
  """Logical unit for OS diagnose/query.
1345

1346
  """
1347
  _OP_REQP = ["output_fields", "names"]
1348

    
1349
  def CheckPrereq(self):
1350
    """Check prerequisites.
1351

1352
    This always succeeds, since this is a pure query LU.
1353

1354
    """
1355
    if self.op.names:
1356
      raise errors.OpPrereqError("Selective OS query not supported")
1357

    
1358
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1359
    _CheckOutputFields(static=[],
1360
                       dynamic=self.dynamic_fields,
1361
                       selected=self.op.output_fields)
1362

    
1363
  @staticmethod
1364
  def _DiagnoseByOS(node_list, rlist):
1365
    """Remaps a per-node return list into an a per-os per-node dictionary
1366

1367
      Args:
1368
        node_list: a list with the names of all nodes
1369
        rlist: a map with node names as keys and OS objects as values
1370

1371
      Returns:
1372
        map: a map with osnames as keys and as value another map, with
1373
             nodes as
1374
             keys and list of OS objects as values
1375
             e.g. {"debian-etch": {"node1": [<object>,...],
1376
                                   "node2": [<object>,]}
1377
                  }
1378

1379
    """
1380
    all_os = {}
1381
    for node_name, nr in rlist.iteritems():
1382
      if not nr:
1383
        continue
1384
      for os_obj in nr:
1385
        if os_obj.name not in all_os:
1386
          # build a list of nodes for this os containing empty lists
1387
          # for each node in node_list
1388
          all_os[os_obj.name] = {}
1389
          for nname in node_list:
1390
            all_os[os_obj.name][nname] = []
1391
        all_os[os_obj.name][node_name].append(os_obj)
1392
    return all_os
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Compute the list of OSes.
1396

1397
    """
1398
    node_list = self.cfg.GetNodeList()
1399
    node_data = rpc.call_os_diagnose(node_list)
1400
    if node_data == False:
1401
      raise errors.OpExecError("Can't gather the list of OSes")
1402
    pol = self._DiagnoseByOS(node_list, node_data)
1403
    output = []
1404
    for os_name, os_data in pol.iteritems():
1405
      row = []
1406
      for field in self.op.output_fields:
1407
        if field == "name":
1408
          val = os_name
1409
        elif field == "valid":
1410
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1411
        elif field == "node_status":
1412
          val = {}
1413
          for node_name, nos_list in os_data.iteritems():
1414
            val[node_name] = [(v.status, v.path) for v in nos_list]
1415
        else:
1416
          raise errors.ParameterError(field)
1417
        row.append(val)
1418
      output.append(row)
1419

    
1420
    return output
1421

    
1422

    
1423
class LURemoveNode(LogicalUnit):
1424
  """Logical unit for removing a node.
1425

1426
  """
1427
  HPATH = "node-remove"
1428
  HTYPE = constants.HTYPE_NODE
1429
  _OP_REQP = ["node_name"]
1430

    
1431
  def BuildHooksEnv(self):
1432
    """Build hooks env.
1433

1434
    This doesn't run on the target node in the pre phase as a failed
1435
    node would not allows itself to run.
1436

1437
    """
1438
    env = {
1439
      "OP_TARGET": self.op.node_name,
1440
      "NODE_NAME": self.op.node_name,
1441
      }
1442
    all_nodes = self.cfg.GetNodeList()
1443
    all_nodes.remove(self.op.node_name)
1444
    return env, all_nodes, all_nodes
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks:
1450
     - the node exists in the configuration
1451
     - it does not have primary or secondary instances
1452
     - it's not the master
1453

1454
    Any errors are signalled by raising errors.OpPrereqError.
1455

1456
    """
1457
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1458
    if node is None:
1459
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1460

    
1461
    instance_list = self.cfg.GetInstanceList()
1462

    
1463
    masternode = self.sstore.GetMasterNode()
1464
    if node.name == masternode:
1465
      raise errors.OpPrereqError("Node is the master node,"
1466
                                 " you need to failover first.")
1467

    
1468
    for instance_name in instance_list:
1469
      instance = self.cfg.GetInstanceInfo(instance_name)
1470
      if node.name == instance.primary_node:
1471
        raise errors.OpPrereqError("Instance %s still running on the node,"
1472
                                   " please remove first." % instance_name)
1473
      if node.name in instance.secondary_nodes:
1474
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1475
                                   " please remove first." % instance_name)
1476
    self.op.node_name = node.name
1477
    self.node = node
1478

    
1479
  def Exec(self, feedback_fn):
1480
    """Removes the node from the cluster.
1481

1482
    """
1483
    node = self.node
1484
    logger.Info("stopping the node daemon and removing configs from node %s" %
1485
                node.name)
1486

    
1487
    rpc.call_node_leave_cluster(node.name)
1488

    
1489
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1490

    
1491
    logger.Info("Removing node %s from config" % node.name)
1492

    
1493
    self.cfg.RemoveNode(node.name)
1494

    
1495
    _RemoveHostFromEtcHosts(node.name)
1496

    
1497

    
1498
class LUQueryNodes(NoHooksLU):
1499
  """Logical unit for querying nodes.
1500

1501
  """
1502
  _OP_REQP = ["output_fields", "names"]
1503

    
1504
  def CheckPrereq(self):
1505
    """Check prerequisites.
1506

1507
    This checks that the fields required are valid output fields.
1508

1509
    """
1510
    self.dynamic_fields = frozenset([
1511
      "dtotal", "dfree",
1512
      "mtotal", "mnode", "mfree",
1513
      "bootid",
1514
      "ctotal",
1515
      ])
1516

    
1517
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1518
                               "pinst_list", "sinst_list",
1519
                               "pip", "sip"],
1520
                       dynamic=self.dynamic_fields,
1521
                       selected=self.op.output_fields)
1522

    
1523
    self.wanted = _GetWantedNodes(self, self.op.names)
1524

    
1525
  def Exec(self, feedback_fn):
1526
    """Computes the list of nodes and their attributes.
1527

1528
    """
1529
    nodenames = self.wanted
1530
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1531

    
1532
    # begin data gathering
1533

    
1534
    if self.dynamic_fields.intersection(self.op.output_fields):
1535
      live_data = {}
1536
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1537
      for name in nodenames:
1538
        nodeinfo = node_data.get(name, None)
1539
        if nodeinfo:
1540
          live_data[name] = {
1541
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1542
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1543
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1544
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1545
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1546
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1547
            "bootid": nodeinfo['bootid'],
1548
            }
1549
        else:
1550
          live_data[name] = {}
1551
    else:
1552
      live_data = dict.fromkeys(nodenames, {})
1553

    
1554
    node_to_primary = dict([(name, set()) for name in nodenames])
1555
    node_to_secondary = dict([(name, set()) for name in nodenames])
1556

    
1557
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1558
                             "sinst_cnt", "sinst_list"))
1559
    if inst_fields & frozenset(self.op.output_fields):
1560
      instancelist = self.cfg.GetInstanceList()
1561

    
1562
      for instance_name in instancelist:
1563
        inst = self.cfg.GetInstanceInfo(instance_name)
1564
        if inst.primary_node in node_to_primary:
1565
          node_to_primary[inst.primary_node].add(inst.name)
1566
        for secnode in inst.secondary_nodes:
1567
          if secnode in node_to_secondary:
1568
            node_to_secondary[secnode].add(inst.name)
1569

    
1570
    # end data gathering
1571

    
1572
    output = []
1573
    for node in nodelist:
1574
      node_output = []
1575
      for field in self.op.output_fields:
1576
        if field == "name":
1577
          val = node.name
1578
        elif field == "pinst_list":
1579
          val = list(node_to_primary[node.name])
1580
        elif field == "sinst_list":
1581
          val = list(node_to_secondary[node.name])
1582
        elif field == "pinst_cnt":
1583
          val = len(node_to_primary[node.name])
1584
        elif field == "sinst_cnt":
1585
          val = len(node_to_secondary[node.name])
1586
        elif field == "pip":
1587
          val = node.primary_ip
1588
        elif field == "sip":
1589
          val = node.secondary_ip
1590
        elif field in self.dynamic_fields:
1591
          val = live_data[node.name].get(field, None)
1592
        else:
1593
          raise errors.ParameterError(field)
1594
        node_output.append(val)
1595
      output.append(node_output)
1596

    
1597
    return output
1598

    
1599

    
1600
class LUQueryNodeVolumes(NoHooksLU):
1601
  """Logical unit for getting volumes on node(s).
1602

1603
  """
1604
  _OP_REQP = ["nodes", "output_fields"]
1605

    
1606
  def CheckPrereq(self):
1607
    """Check prerequisites.
1608

1609
    This checks that the fields required are valid output fields.
1610

1611
    """
1612
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1613

    
1614
    _CheckOutputFields(static=["node"],
1615
                       dynamic=["phys", "vg", "name", "size", "instance"],
1616
                       selected=self.op.output_fields)
1617

    
1618

    
1619
  def Exec(self, feedback_fn):
1620
    """Computes the list of nodes and their attributes.
1621

1622
    """
1623
    nodenames = self.nodes
1624
    volumes = rpc.call_node_volumes(nodenames)
1625

    
1626
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1627
             in self.cfg.GetInstanceList()]
1628

    
1629
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1630

    
1631
    output = []
1632
    for node in nodenames:
1633
      if node not in volumes or not volumes[node]:
1634
        continue
1635

    
1636
      node_vols = volumes[node][:]
1637
      node_vols.sort(key=lambda vol: vol['dev'])
1638

    
1639
      for vol in node_vols:
1640
        node_output = []
1641
        for field in self.op.output_fields:
1642
          if field == "node":
1643
            val = node
1644
          elif field == "phys":
1645
            val = vol['dev']
1646
          elif field == "vg":
1647
            val = vol['vg']
1648
          elif field == "name":
1649
            val = vol['name']
1650
          elif field == "size":
1651
            val = int(float(vol['size']))
1652
          elif field == "instance":
1653
            for inst in ilist:
1654
              if node not in lv_by_node[inst]:
1655
                continue
1656
              if vol['name'] in lv_by_node[inst][node]:
1657
                val = inst.name
1658
                break
1659
            else:
1660
              val = '-'
1661
          else:
1662
            raise errors.ParameterError(field)
1663
          node_output.append(str(val))
1664

    
1665
        output.append(node_output)
1666

    
1667
    return output
1668

    
1669

    
1670
class LUAddNode(LogicalUnit):
1671
  """Logical unit for adding node to the cluster.
1672

1673
  """
1674
  HPATH = "node-add"
1675
  HTYPE = constants.HTYPE_NODE
1676
  _OP_REQP = ["node_name"]
1677

    
1678
  def BuildHooksEnv(self):
1679
    """Build hooks env.
1680

1681
    This will run on all nodes before, and on all nodes + the new node after.
1682

1683
    """
1684
    env = {
1685
      "OP_TARGET": self.op.node_name,
1686
      "NODE_NAME": self.op.node_name,
1687
      "NODE_PIP": self.op.primary_ip,
1688
      "NODE_SIP": self.op.secondary_ip,
1689
      }
1690
    nodes_0 = self.cfg.GetNodeList()
1691
    nodes_1 = nodes_0 + [self.op.node_name, ]
1692
    return env, nodes_0, nodes_1
1693

    
1694
  def CheckPrereq(self):
1695
    """Check prerequisites.
1696

1697
    This checks:
1698
     - the new node is not already in the config
1699
     - it is resolvable
1700
     - its parameters (single/dual homed) matches the cluster
1701

1702
    Any errors are signalled by raising errors.OpPrereqError.
1703

1704
    """
1705
    node_name = self.op.node_name
1706
    cfg = self.cfg
1707

    
1708
    dns_data = utils.HostInfo(node_name)
1709

    
1710
    node = dns_data.name
1711
    primary_ip = self.op.primary_ip = dns_data.ip
1712
    secondary_ip = getattr(self.op, "secondary_ip", None)
1713
    if secondary_ip is None:
1714
      secondary_ip = primary_ip
1715
    if not utils.IsValidIP(secondary_ip):
1716
      raise errors.OpPrereqError("Invalid secondary IP given")
1717
    self.op.secondary_ip = secondary_ip
1718

    
1719
    node_list = cfg.GetNodeList()
1720
    if not self.op.readd and node in node_list:
1721
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1722
                                 node)
1723
    elif self.op.readd and node not in node_list:
1724
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1725

    
1726
    for existing_node_name in node_list:
1727
      existing_node = cfg.GetNodeInfo(existing_node_name)
1728

    
1729
      if self.op.readd and node == existing_node_name:
1730
        if (existing_node.primary_ip != primary_ip or
1731
            existing_node.secondary_ip != secondary_ip):
1732
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1733
                                     " address configuration as before")
1734
        continue
1735

    
1736
      if (existing_node.primary_ip == primary_ip or
1737
          existing_node.secondary_ip == primary_ip or
1738
          existing_node.primary_ip == secondary_ip or
1739
          existing_node.secondary_ip == secondary_ip):
1740
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1741
                                   " existing node %s" % existing_node.name)
1742

    
1743
    # check that the type of the node (single versus dual homed) is the
1744
    # same as for the master
1745
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1746
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1747
    newbie_singlehomed = secondary_ip == primary_ip
1748
    if master_singlehomed != newbie_singlehomed:
1749
      if master_singlehomed:
1750
        raise errors.OpPrereqError("The master has no private ip but the"
1751
                                   " new node has one")
1752
      else:
1753
        raise errors.OpPrereqError("The master has a private ip but the"
1754
                                   " new node doesn't have one")
1755

    
1756
    # checks reachablity
1757
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1758
      raise errors.OpPrereqError("Node not reachable by ping")
1759

    
1760
    if not newbie_singlehomed:
1761
      # check reachability from my secondary ip to newbie's secondary ip
1762
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1763
                           source=myself.secondary_ip):
1764
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1765
                                   " based ping to noded port")
1766

    
1767
    self.new_node = objects.Node(name=node,
1768
                                 primary_ip=primary_ip,
1769
                                 secondary_ip=secondary_ip)
1770

    
1771
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1772
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1773
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1774
                                   constants.VNC_PASSWORD_FILE)
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Adds the new node to the cluster.
1778

1779
    """
1780
    new_node = self.new_node
1781
    node = new_node.name
1782

    
1783
    # set up inter-node password and certificate and restarts the node daemon
1784
    gntpass = self.sstore.GetNodeDaemonPassword()
1785
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1786
      raise errors.OpExecError("ganeti password corruption detected")
1787
    f = open(constants.SSL_CERT_FILE)
1788
    try:
1789
      gntpem = f.read(8192)
1790
    finally:
1791
      f.close()
1792
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1793
    # so we use this to detect an invalid certificate; as long as the
1794
    # cert doesn't contain this, the here-document will be correctly
1795
    # parsed by the shell sequence below
1796
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1797
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1798
    if not gntpem.endswith("\n"):
1799
      raise errors.OpExecError("PEM must end with newline")
1800
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1801

    
1802
    # and then connect with ssh to set password and start ganeti-noded
1803
    # note that all the below variables are sanitized at this point,
1804
    # either by being constants or by the checks above
1805
    ss = self.sstore
1806
    mycommand = ("umask 077 && "
1807
                 "echo '%s' > '%s' && "
1808
                 "cat > '%s' << '!EOF.' && \n"
1809
                 "%s!EOF.\n%s restart" %
1810
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1811
                  constants.SSL_CERT_FILE, gntpem,
1812
                  constants.NODE_INITD_SCRIPT))
1813

    
1814
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1815
    if result.failed:
1816
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1817
                               " output: %s" %
1818
                               (node, result.fail_reason, result.output))
1819

    
1820
    # check connectivity
1821
    time.sleep(4)
1822

    
1823
    result = rpc.call_version([node])[node]
1824
    if result:
1825
      if constants.PROTOCOL_VERSION == result:
1826
        logger.Info("communication to node %s fine, sw version %s match" %
1827
                    (node, result))
1828
      else:
1829
        raise errors.OpExecError("Version mismatch master version %s,"
1830
                                 " node version %s" %
1831
                                 (constants.PROTOCOL_VERSION, result))
1832
    else:
1833
      raise errors.OpExecError("Cannot get version from the new node")
1834

    
1835
    # setup ssh on node
1836
    logger.Info("copy ssh key to node %s" % node)
1837
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1838
    keyarray = []
1839
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1840
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1841
                priv_key, pub_key]
1842

    
1843
    for i in keyfiles:
1844
      f = open(i, 'r')
1845
      try:
1846
        keyarray.append(f.read())
1847
      finally:
1848
        f.close()
1849

    
1850
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1851
                               keyarray[3], keyarray[4], keyarray[5])
1852

    
1853
    if not result:
1854
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1855

    
1856
    # Add node to our /etc/hosts, and add key to known_hosts
1857
    _AddHostToEtcHosts(new_node.name)
1858

    
1859
    if new_node.secondary_ip != new_node.primary_ip:
1860
      if not rpc.call_node_tcp_ping(new_node.name,
1861
                                    constants.LOCALHOST_IP_ADDRESS,
1862
                                    new_node.secondary_ip,
1863
                                    constants.DEFAULT_NODED_PORT,
1864
                                    10, False):
1865
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1866
                                 " you gave (%s). Please fix and re-run this"
1867
                                 " command." % new_node.secondary_ip)
1868

    
1869
    success, msg = self.ssh.VerifyNodeHostname(node)
1870
    if not success:
1871
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1872
                               " than the one the resolver gives: %s."
1873
                               " Please fix and re-run this command." %
1874
                               (node, msg))
1875

    
1876
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1877
    # including the node just added
1878
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1879
    dist_nodes = self.cfg.GetNodeList()
1880
    if not self.op.readd:
1881
      dist_nodes.append(node)
1882
    if myself.name in dist_nodes:
1883
      dist_nodes.remove(myself.name)
1884

    
1885
    logger.Debug("Copying hosts and known_hosts to all nodes")
1886
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1887
      result = rpc.call_upload_file(dist_nodes, fname)
1888
      for to_node in dist_nodes:
1889
        if not result[to_node]:
1890
          logger.Error("copy of file %s to node %s failed" %
1891
                       (fname, to_node))
1892

    
1893
    to_copy = ss.GetFileList()
1894
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1895
      to_copy.append(constants.VNC_PASSWORD_FILE)
1896
    for fname in to_copy:
1897
      if not self.ssh.CopyFileToNode(node, fname):
1898
        logger.Error("could not copy file %s to node %s" % (fname, node))
1899

    
1900
    if not self.op.readd:
1901
      logger.Info("adding node %s to cluster.conf" % node)
1902
      self.cfg.AddNode(new_node)
1903

    
1904

    
1905
class LUMasterFailover(LogicalUnit):
1906
  """Failover the master node to the current node.
1907

1908
  This is a special LU in that it must run on a non-master node.
1909

1910
  """
1911
  HPATH = "master-failover"
1912
  HTYPE = constants.HTYPE_CLUSTER
1913
  REQ_MASTER = False
1914
  _OP_REQP = []
1915

    
1916
  def BuildHooksEnv(self):
1917
    """Build hooks env.
1918

1919
    This will run on the new master only in the pre phase, and on all
1920
    the nodes in the post phase.
1921

1922
    """
1923
    env = {
1924
      "OP_TARGET": self.new_master,
1925
      "NEW_MASTER": self.new_master,
1926
      "OLD_MASTER": self.old_master,
1927
      }
1928
    return env, [self.new_master], self.cfg.GetNodeList()
1929

    
1930
  def CheckPrereq(self):
1931
    """Check prerequisites.
1932

1933
    This checks that we are not already the master.
1934

1935
    """
1936
    self.new_master = utils.HostInfo().name
1937
    self.old_master = self.sstore.GetMasterNode()
1938

    
1939
    if self.old_master == self.new_master:
1940
      raise errors.OpPrereqError("This commands must be run on the node"
1941
                                 " where you want the new master to be."
1942
                                 " %s is already the master" %
1943
                                 self.old_master)
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Failover the master node.
1947

1948
    This command, when run on a non-master node, will cause the current
1949
    master to cease being master, and the non-master to become new
1950
    master.
1951

1952
    """
1953
    #TODO: do not rely on gethostname returning the FQDN
1954
    logger.Info("setting master to %s, old master: %s" %
1955
                (self.new_master, self.old_master))
1956

    
1957
    if not rpc.call_node_stop_master(self.old_master):
1958
      logger.Error("could disable the master role on the old master"
1959
                   " %s, please disable manually" % self.old_master)
1960

    
1961
    ss = self.sstore
1962
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1963
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1964
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1965
      logger.Error("could not distribute the new simple store master file"
1966
                   " to the other nodes, please check.")
1967

    
1968
    if not rpc.call_node_start_master(self.new_master):
1969
      logger.Error("could not start the master role on the new master"
1970
                   " %s, please check" % self.new_master)
1971
      feedback_fn("Error in activating the master IP on the new master,"
1972
                  " please fix manually.")
1973

    
1974

    
1975

    
1976
class LUQueryClusterInfo(NoHooksLU):
1977
  """Query cluster configuration.
1978

1979
  """
1980
  _OP_REQP = []
1981
  REQ_MASTER = False
1982

    
1983
  def CheckPrereq(self):
1984
    """No prerequsites needed for this LU.
1985

1986
    """
1987
    pass
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Return cluster config.
1991

1992
    """
1993
    result = {
1994
      "name": self.sstore.GetClusterName(),
1995
      "software_version": constants.RELEASE_VERSION,
1996
      "protocol_version": constants.PROTOCOL_VERSION,
1997
      "config_version": constants.CONFIG_VERSION,
1998
      "os_api_version": constants.OS_API_VERSION,
1999
      "export_version": constants.EXPORT_VERSION,
2000
      "master": self.sstore.GetMasterNode(),
2001
      "architecture": (platform.architecture()[0], platform.machine()),
2002
      }
2003

    
2004
    return result
2005

    
2006

    
2007
class LUClusterCopyFile(NoHooksLU):
2008
  """Copy file to cluster.
2009

2010
  """
2011
  _OP_REQP = ["nodes", "filename"]
2012

    
2013
  def CheckPrereq(self):
2014
    """Check prerequisites.
2015

2016
    It should check that the named file exists and that the given list
2017
    of nodes is valid.
2018

2019
    """
2020
    if not os.path.exists(self.op.filename):
2021
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2022

    
2023
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2024

    
2025
  def Exec(self, feedback_fn):
2026
    """Copy a file from master to some nodes.
2027

2028
    Args:
2029
      opts - class with options as members
2030
      args - list containing a single element, the file name
2031
    Opts used:
2032
      nodes - list containing the name of target nodes; if empty, all nodes
2033

2034
    """
2035
    filename = self.op.filename
2036

    
2037
    myname = utils.HostInfo().name
2038

    
2039
    for node in self.nodes:
2040
      if node == myname:
2041
        continue
2042
      if not self.ssh.CopyFileToNode(node, filename):
2043
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2044

    
2045

    
2046
class LUDumpClusterConfig(NoHooksLU):
2047
  """Return a text-representation of the cluster-config.
2048

2049
  """
2050
  _OP_REQP = []
2051

    
2052
  def CheckPrereq(self):
2053
    """No prerequisites.
2054

2055
    """
2056
    pass
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Dump a representation of the cluster config to the standard output.
2060

2061
    """
2062
    return self.cfg.DumpConfig()
2063

    
2064

    
2065
class LURunClusterCommand(NoHooksLU):
2066
  """Run a command on some nodes.
2067

2068
  """
2069
  _OP_REQP = ["command", "nodes"]
2070

    
2071
  def CheckPrereq(self):
2072
    """Check prerequisites.
2073

2074
    It checks that the given list of nodes is valid.
2075

2076
    """
2077
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2078

    
2079
  def Exec(self, feedback_fn):
2080
    """Run a command on some nodes.
2081

2082
    """
2083
    # put the master at the end of the nodes list
2084
    master_node = self.sstore.GetMasterNode()
2085
    if master_node in self.nodes:
2086
      self.nodes.remove(master_node)
2087
      self.nodes.append(master_node)
2088

    
2089
    data = []
2090
    for node in self.nodes:
2091
      result = self.ssh.Run(node, "root", self.op.command)
2092
      data.append((node, result.output, result.exit_code))
2093

    
2094
    return data
2095

    
2096

    
2097
class LUActivateInstanceDisks(NoHooksLU):
2098
  """Bring up an instance's disks.
2099

2100
  """
2101
  _OP_REQP = ["instance_name"]
2102

    
2103
  def CheckPrereq(self):
2104
    """Check prerequisites.
2105

2106
    This checks that the instance is in the cluster.
2107

2108
    """
2109
    instance = self.cfg.GetInstanceInfo(
2110
      self.cfg.ExpandInstanceName(self.op.instance_name))
2111
    if instance is None:
2112
      raise errors.OpPrereqError("Instance '%s' not known" %
2113
                                 self.op.instance_name)
2114
    self.instance = instance
2115

    
2116

    
2117
  def Exec(self, feedback_fn):
2118
    """Activate the disks.
2119

2120
    """
2121
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2122
    if not disks_ok:
2123
      raise errors.OpExecError("Cannot activate block devices")
2124

    
2125
    return disks_info
2126

    
2127

    
2128
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2129
  """Prepare the block devices for an instance.
2130

2131
  This sets up the block devices on all nodes.
2132

2133
  Args:
2134
    instance: a ganeti.objects.Instance object
2135
    ignore_secondaries: if true, errors on secondary nodes won't result
2136
                        in an error return from the function
2137

2138
  Returns:
2139
    false if the operation failed
2140
    list of (host, instance_visible_name, node_visible_name) if the operation
2141
         suceeded with the mapping from node devices to instance devices
2142
  """
2143
  device_info = []
2144
  disks_ok = True
2145
  iname = instance.name
2146
  # With the two passes mechanism we try to reduce the window of
2147
  # opportunity for the race condition of switching DRBD to primary
2148
  # before handshaking occured, but we do not eliminate it
2149

    
2150
  # The proper fix would be to wait (with some limits) until the
2151
  # connection has been made and drbd transitions from WFConnection
2152
  # into any other network-connected state (Connected, SyncTarget,
2153
  # SyncSource, etc.)
2154

    
2155
  # 1st pass, assemble on all nodes in secondary mode
2156
  for inst_disk in instance.disks:
2157
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2158
      cfg.SetDiskID(node_disk, node)
2159
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2160
      if not result:
2161
        logger.Error("could not prepare block device %s on node %s"
2162
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2163
        if not ignore_secondaries:
2164
          disks_ok = False
2165

    
2166
  # FIXME: race condition on drbd migration to primary
2167

    
2168
  # 2nd pass, do only the primary node
2169
  for inst_disk in instance.disks:
2170
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2171
      if node != instance.primary_node:
2172
        continue
2173
      cfg.SetDiskID(node_disk, node)
2174
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2175
      if not result:
2176
        logger.Error("could not prepare block device %s on node %s"
2177
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2178
        disks_ok = False
2179
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2180

    
2181
  # leave the disks configured for the primary node
2182
  # this is a workaround that would be fixed better by
2183
  # improving the logical/physical id handling
2184
  for disk in instance.disks:
2185
    cfg.SetDiskID(disk, instance.primary_node)
2186

    
2187
  return disks_ok, device_info
2188

    
2189

    
2190
def _StartInstanceDisks(cfg, instance, force):
2191
  """Start the disks of an instance.
2192

2193
  """
2194
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2195
                                           ignore_secondaries=force)
2196
  if not disks_ok:
2197
    _ShutdownInstanceDisks(instance, cfg)
2198
    if force is not None and not force:
2199
      logger.Error("If the message above refers to a secondary node,"
2200
                   " you can retry the operation using '--force'.")
2201
    raise errors.OpExecError("Disk consistency error")
2202

    
2203

    
2204
class LUDeactivateInstanceDisks(NoHooksLU):
2205
  """Shutdown an instance's disks.
2206

2207
  """
2208
  _OP_REQP = ["instance_name"]
2209

    
2210
  def CheckPrereq(self):
2211
    """Check prerequisites.
2212

2213
    This checks that the instance is in the cluster.
2214

2215
    """
2216
    instance = self.cfg.GetInstanceInfo(
2217
      self.cfg.ExpandInstanceName(self.op.instance_name))
2218
    if instance is None:
2219
      raise errors.OpPrereqError("Instance '%s' not known" %
2220
                                 self.op.instance_name)
2221
    self.instance = instance
2222

    
2223
  def Exec(self, feedback_fn):
2224
    """Deactivate the disks
2225

2226
    """
2227
    instance = self.instance
2228
    ins_l = rpc.call_instance_list([instance.primary_node])
2229
    ins_l = ins_l[instance.primary_node]
2230
    if not type(ins_l) is list:
2231
      raise errors.OpExecError("Can't contact node '%s'" %
2232
                               instance.primary_node)
2233

    
2234
    if self.instance.name in ins_l:
2235
      raise errors.OpExecError("Instance is running, can't shutdown"
2236
                               " block devices.")
2237

    
2238
    _ShutdownInstanceDisks(instance, self.cfg)
2239

    
2240

    
2241
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2242
  """Shutdown block devices of an instance.
2243

2244
  This does the shutdown on all nodes of the instance.
2245

2246
  If the ignore_primary is false, errors on the primary node are
2247
  ignored.
2248

2249
  """
2250
  result = True
2251
  for disk in instance.disks:
2252
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2253
      cfg.SetDiskID(top_disk, node)
2254
      if not rpc.call_blockdev_shutdown(node, top_disk):
2255
        logger.Error("could not shutdown block device %s on node %s" %
2256
                     (disk.iv_name, node))
2257
        if not ignore_primary or node != instance.primary_node:
2258
          result = False
2259
  return result
2260

    
2261

    
2262
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2263
  """Checks if a node has enough free memory.
2264

2265
  This function check if a given node has the needed amount of free
2266
  memory. In case the node has less memory or we cannot get the
2267
  information from the node, this function raise an OpPrereqError
2268
  exception.
2269

2270
  Args:
2271
    - cfg: a ConfigWriter instance
2272
    - node: the node name
2273
    - reason: string to use in the error message
2274
    - requested: the amount of memory in MiB
2275

2276
  """
2277
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2278
  if not nodeinfo or not isinstance(nodeinfo, dict):
2279
    raise errors.OpPrereqError("Could not contact node %s for resource"
2280
                             " information" % (node,))
2281

    
2282
  free_mem = nodeinfo[node].get('memory_free')
2283
  if not isinstance(free_mem, int):
2284
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2285
                             " was '%s'" % (node, free_mem))
2286
  if requested > free_mem:
2287
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2288
                             " needed %s MiB, available %s MiB" %
2289
                             (node, reason, requested, free_mem))
2290

    
2291

    
2292
class LUStartupInstance(LogicalUnit):
2293
  """Starts an instance.
2294

2295
  """
2296
  HPATH = "instance-start"
2297
  HTYPE = constants.HTYPE_INSTANCE
2298
  _OP_REQP = ["instance_name", "force"]
2299

    
2300
  def BuildHooksEnv(self):
2301
    """Build hooks env.
2302

2303
    This runs on master, primary and secondary nodes of the instance.
2304

2305
    """
2306
    env = {
2307
      "FORCE": self.op.force,
2308
      }
2309
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2310
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2311
          list(self.instance.secondary_nodes))
2312
    return env, nl, nl
2313

    
2314
  def CheckPrereq(self):
2315
    """Check prerequisites.
2316

2317
    This checks that the instance is in the cluster.
2318

2319
    """
2320
    instance = self.cfg.GetInstanceInfo(
2321
      self.cfg.ExpandInstanceName(self.op.instance_name))
2322
    if instance is None:
2323
      raise errors.OpPrereqError("Instance '%s' not known" %
2324
                                 self.op.instance_name)
2325

    
2326
    # check bridges existance
2327
    _CheckInstanceBridgesExist(instance)
2328

    
2329
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2330
                         "starting instance %s" % instance.name,
2331
                         instance.memory)
2332

    
2333
    self.instance = instance
2334
    self.op.instance_name = instance.name
2335

    
2336
  def Exec(self, feedback_fn):
2337
    """Start the instance.
2338

2339
    """
2340
    instance = self.instance
2341
    force = self.op.force
2342
    extra_args = getattr(self.op, "extra_args", "")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346
    node_current = instance.primary_node
2347

    
2348
    _StartInstanceDisks(self.cfg, instance, force)
2349

    
2350
    if not rpc.call_instance_start(node_current, instance, extra_args):
2351
      _ShutdownInstanceDisks(instance, self.cfg)
2352
      raise errors.OpExecError("Could not start instance")
2353

    
2354

    
2355
class LURebootInstance(LogicalUnit):
2356
  """Reboot an instance.
2357

2358
  """
2359
  HPATH = "instance-reboot"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = {
2370
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2371
      }
2372
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2373
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2374
          list(self.instance.secondary_nodes))
2375
    return env, nl, nl
2376

    
2377
  def CheckPrereq(self):
2378
    """Check prerequisites.
2379

2380
    This checks that the instance is in the cluster.
2381

2382
    """
2383
    instance = self.cfg.GetInstanceInfo(
2384
      self.cfg.ExpandInstanceName(self.op.instance_name))
2385
    if instance is None:
2386
      raise errors.OpPrereqError("Instance '%s' not known" %
2387
                                 self.op.instance_name)
2388

    
2389
    # check bridges existance
2390
    _CheckInstanceBridgesExist(instance)
2391

    
2392
    self.instance = instance
2393
    self.op.instance_name = instance.name
2394

    
2395
  def Exec(self, feedback_fn):
2396
    """Reboot the instance.
2397

2398
    """
2399
    instance = self.instance
2400
    ignore_secondaries = self.op.ignore_secondaries
2401
    reboot_type = self.op.reboot_type
2402
    extra_args = getattr(self.op, "extra_args", "")
2403

    
2404
    node_current = instance.primary_node
2405

    
2406
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2407
                           constants.INSTANCE_REBOOT_HARD,
2408
                           constants.INSTANCE_REBOOT_FULL]:
2409
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2410
                                  (constants.INSTANCE_REBOOT_SOFT,
2411
                                   constants.INSTANCE_REBOOT_HARD,
2412
                                   constants.INSTANCE_REBOOT_FULL))
2413

    
2414
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2415
                       constants.INSTANCE_REBOOT_HARD]:
2416
      if not rpc.call_instance_reboot(node_current, instance,
2417
                                      reboot_type, extra_args):
2418
        raise errors.OpExecError("Could not reboot instance")
2419
    else:
2420
      if not rpc.call_instance_shutdown(node_current, instance):
2421
        raise errors.OpExecError("could not shutdown instance for full reboot")
2422
      _ShutdownInstanceDisks(instance, self.cfg)
2423
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2424
      if not rpc.call_instance_start(node_current, instance, extra_args):
2425
        _ShutdownInstanceDisks(instance, self.cfg)
2426
        raise errors.OpExecError("Could not start instance for full reboot")
2427

    
2428
    self.cfg.MarkInstanceUp(instance.name)
2429

    
2430

    
2431
class LUShutdownInstance(LogicalUnit):
2432
  """Shutdown an instance.
2433

2434
  """
2435
  HPATH = "instance-stop"
2436
  HTYPE = constants.HTYPE_INSTANCE
2437
  _OP_REQP = ["instance_name"]
2438

    
2439
  def BuildHooksEnv(self):
2440
    """Build hooks env.
2441

2442
    This runs on master, primary and secondary nodes of the instance.
2443

2444
    """
2445
    env = _BuildInstanceHookEnvByObject(self.instance)
2446
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2447
          list(self.instance.secondary_nodes))
2448
    return env, nl, nl
2449

    
2450
  def CheckPrereq(self):
2451
    """Check prerequisites.
2452

2453
    This checks that the instance is in the cluster.
2454

2455
    """
2456
    instance = self.cfg.GetInstanceInfo(
2457
      self.cfg.ExpandInstanceName(self.op.instance_name))
2458
    if instance is None:
2459
      raise errors.OpPrereqError("Instance '%s' not known" %
2460
                                 self.op.instance_name)
2461
    self.instance = instance
2462

    
2463
  def Exec(self, feedback_fn):
2464
    """Shutdown the instance.
2465

2466
    """
2467
    instance = self.instance
2468
    node_current = instance.primary_node
2469
    self.cfg.MarkInstanceDown(instance.name)
2470
    if not rpc.call_instance_shutdown(node_current, instance):
2471
      logger.Error("could not shutdown instance")
2472

    
2473
    _ShutdownInstanceDisks(instance, self.cfg)
2474

    
2475

    
2476
class LUReinstallInstance(LogicalUnit):
2477
  """Reinstall an instance.
2478

2479
  """
2480
  HPATH = "instance-reinstall"
2481
  HTYPE = constants.HTYPE_INSTANCE
2482
  _OP_REQP = ["instance_name"]
2483

    
2484
  def BuildHooksEnv(self):
2485
    """Build hooks env.
2486

2487
    This runs on master, primary and secondary nodes of the instance.
2488

2489
    """
2490
    env = _BuildInstanceHookEnvByObject(self.instance)
2491
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2492
          list(self.instance.secondary_nodes))
2493
    return env, nl, nl
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster and is not running.
2499

2500
    """
2501
    instance = self.cfg.GetInstanceInfo(
2502
      self.cfg.ExpandInstanceName(self.op.instance_name))
2503
    if instance is None:
2504
      raise errors.OpPrereqError("Instance '%s' not known" %
2505
                                 self.op.instance_name)
2506
    if instance.disk_template == constants.DT_DISKLESS:
2507
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2508
                                 self.op.instance_name)
2509
    if instance.status != "down":
2510
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2511
                                 self.op.instance_name)
2512
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2513
    if remote_info:
2514
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2515
                                 (self.op.instance_name,
2516
                                  instance.primary_node))
2517

    
2518
    self.op.os_type = getattr(self.op, "os_type", None)
2519
    if self.op.os_type is not None:
2520
      # OS verification
2521
      pnode = self.cfg.GetNodeInfo(
2522
        self.cfg.ExpandNodeName(instance.primary_node))
2523
      if pnode is None:
2524
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2525
                                   self.op.pnode)
2526
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2527
      if not os_obj:
2528
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2529
                                   " primary node"  % self.op.os_type)
2530

    
2531
    self.instance = instance
2532

    
2533
  def Exec(self, feedback_fn):
2534
    """Reinstall the instance.
2535

2536
    """
2537
    inst = self.instance
2538

    
2539
    if self.op.os_type is not None:
2540
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2541
      inst.os = self.op.os_type
2542
      self.cfg.AddInstance(inst)
2543

    
2544
    _StartInstanceDisks(self.cfg, inst, None)
2545
    try:
2546
      feedback_fn("Running the instance OS create scripts...")
2547
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2548
        raise errors.OpExecError("Could not install OS for instance %s"
2549
                                 " on node %s" %
2550
                                 (inst.name, inst.primary_node))
2551
    finally:
2552
      _ShutdownInstanceDisks(inst, self.cfg)
2553

    
2554

    
2555
class LURenameInstance(LogicalUnit):
2556
  """Rename an instance.
2557

2558
  """
2559
  HPATH = "instance-rename"
2560
  HTYPE = constants.HTYPE_INSTANCE
2561
  _OP_REQP = ["instance_name", "new_name"]
2562

    
2563
  def BuildHooksEnv(self):
2564
    """Build hooks env.
2565

2566
    This runs on master, primary and secondary nodes of the instance.
2567

2568
    """
2569
    env = _BuildInstanceHookEnvByObject(self.instance)
2570
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2571
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2572
          list(self.instance.secondary_nodes))
2573
    return env, nl, nl
2574

    
2575
  def CheckPrereq(self):
2576
    """Check prerequisites.
2577

2578
    This checks that the instance is in the cluster and is not running.
2579

2580
    """
2581
    instance = self.cfg.GetInstanceInfo(
2582
      self.cfg.ExpandInstanceName(self.op.instance_name))
2583
    if instance is None:
2584
      raise errors.OpPrereqError("Instance '%s' not known" %
2585
                                 self.op.instance_name)
2586
    if instance.status != "down":
2587
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2588
                                 self.op.instance_name)
2589
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2590
    if remote_info:
2591
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2592
                                 (self.op.instance_name,
2593
                                  instance.primary_node))
2594
    self.instance = instance
2595

    
2596
    # new name verification
2597
    name_info = utils.HostInfo(self.op.new_name)
2598

    
2599
    self.op.new_name = new_name = name_info.name
2600
    instance_list = self.cfg.GetInstanceList()
2601
    if new_name in instance_list:
2602
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2603
                                 new_name)
2604

    
2605
    if not getattr(self.op, "ignore_ip", False):
2606
      command = ["fping", "-q", name_info.ip]
2607
      result = utils.RunCmd(command)
2608
      if not result.failed:
2609
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2610
                                   (name_info.ip, new_name))
2611

    
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Reinstall the instance.
2615

2616
    """
2617
    inst = self.instance
2618
    old_name = inst.name
2619

    
2620
    if inst.disk_template == constants.DT_FILE:
2621
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2622

    
2623
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2624

    
2625
    # re-read the instance from the configuration after rename
2626
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2627

    
2628
    if inst.disk_template == constants.DT_FILE:
2629
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2630
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2631
                                                old_file_storage_dir,
2632
                                                new_file_storage_dir)
2633

    
2634
      if not result:
2635
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2636
                                 " directory '%s' to '%s' (but the instance"
2637
                                 " has been renamed in Ganeti)" % (
2638
                                 inst.primary_node, old_file_storage_dir,
2639
                                 new_file_storage_dir))
2640

    
2641
      if not result[0]:
2642
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2643
                                 " (but the instance has been renamed in"
2644
                                 " Ganeti)" % (old_file_storage_dir,
2645
                                               new_file_storage_dir))
2646

    
2647
    _StartInstanceDisks(self.cfg, inst, None)
2648
    try:
2649
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2650
                                          "sda", "sdb"):
2651
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2652
               " instance has been renamed in Ganeti)" %
2653
               (inst.name, inst.primary_node))
2654
        logger.Error(msg)
2655
    finally:
2656
      _ShutdownInstanceDisks(inst, self.cfg)
2657

    
2658

    
2659
class LURemoveInstance(LogicalUnit):
2660
  """Remove an instance.
2661

2662
  """
2663
  HPATH = "instance-remove"
2664
  HTYPE = constants.HTYPE_INSTANCE
2665
  _OP_REQP = ["instance_name", "ignore_failures"]
2666

    
2667
  def BuildHooksEnv(self):
2668
    """Build hooks env.
2669

2670
    This runs on master, primary and secondary nodes of the instance.
2671

2672
    """
2673
    env = _BuildInstanceHookEnvByObject(self.instance)
2674
    nl = [self.sstore.GetMasterNode()]
2675
    return env, nl, nl
2676

    
2677
  def CheckPrereq(self):
2678
    """Check prerequisites.
2679

2680
    This checks that the instance is in the cluster.
2681

2682
    """
2683
    instance = self.cfg.GetInstanceInfo(
2684
      self.cfg.ExpandInstanceName(self.op.instance_name))
2685
    if instance is None:
2686
      raise errors.OpPrereqError("Instance '%s' not known" %
2687
                                 self.op.instance_name)
2688
    self.instance = instance
2689

    
2690
  def Exec(self, feedback_fn):
2691
    """Remove the instance.
2692

2693
    """
2694
    instance = self.instance
2695
    logger.Info("shutting down instance %s on node %s" %
2696
                (instance.name, instance.primary_node))
2697

    
2698
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2699
      if self.op.ignore_failures:
2700
        feedback_fn("Warning: can't shutdown instance")
2701
      else:
2702
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2703
                                 (instance.name, instance.primary_node))
2704

    
2705
    logger.Info("removing block devices for instance %s" % instance.name)
2706

    
2707
    if not _RemoveDisks(instance, self.cfg):
2708
      if self.op.ignore_failures:
2709
        feedback_fn("Warning: can't remove instance's disks")
2710
      else:
2711
        raise errors.OpExecError("Can't remove instance's disks")
2712

    
2713
    logger.Info("removing instance %s out of cluster config" % instance.name)
2714

    
2715
    self.cfg.RemoveInstance(instance.name)
2716

    
2717

    
2718
class LUQueryInstances(NoHooksLU):
2719
  """Logical unit for querying instances.
2720

2721
  """
2722
  _OP_REQP = ["output_fields", "names"]
2723

    
2724
  def CheckPrereq(self):
2725
    """Check prerequisites.
2726

2727
    This checks that the fields required are valid output fields.
2728

2729
    """
2730
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2731
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2732
                               "admin_state", "admin_ram",
2733
                               "disk_template", "ip", "mac", "bridge",
2734
                               "sda_size", "sdb_size", "vcpus"],
2735
                       dynamic=self.dynamic_fields,
2736
                       selected=self.op.output_fields)
2737

    
2738
    self.wanted = _GetWantedInstances(self, self.op.names)
2739

    
2740
  def Exec(self, feedback_fn):
2741
    """Computes the list of nodes and their attributes.
2742

2743
    """
2744
    instance_names = self.wanted
2745
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2746
                     in instance_names]
2747

    
2748
    # begin data gathering
2749

    
2750
    nodes = frozenset([inst.primary_node for inst in instance_list])
2751

    
2752
    bad_nodes = []
2753
    if self.dynamic_fields.intersection(self.op.output_fields):
2754
      live_data = {}
2755
      node_data = rpc.call_all_instances_info(nodes)
2756
      for name in nodes:
2757
        result = node_data[name]
2758
        if result:
2759
          live_data.update(result)
2760
        elif result == False:
2761
          bad_nodes.append(name)
2762
        # else no instance is alive
2763
    else:
2764
      live_data = dict([(name, {}) for name in instance_names])
2765

    
2766
    # end data gathering
2767

    
2768
    output = []
2769
    for instance in instance_list:
2770
      iout = []
2771
      for field in self.op.output_fields:
2772
        if field == "name":
2773
          val = instance.name
2774
        elif field == "os":
2775
          val = instance.os
2776
        elif field == "pnode":
2777
          val = instance.primary_node
2778
        elif field == "snodes":
2779
          val = list(instance.secondary_nodes)
2780
        elif field == "admin_state":
2781
          val = (instance.status != "down")
2782
        elif field == "oper_state":
2783
          if instance.primary_node in bad_nodes:
2784
            val = None
2785
          else:
2786
            val = bool(live_data.get(instance.name))
2787
        elif field == "status":
2788
          if instance.primary_node in bad_nodes:
2789
            val = "ERROR_nodedown"
2790
          else:
2791
            running = bool(live_data.get(instance.name))
2792
            if running:
2793
              if instance.status != "down":
2794
                val = "running"
2795
              else:
2796
                val = "ERROR_up"
2797
            else:
2798
              if instance.status != "down":
2799
                val = "ERROR_down"
2800
              else:
2801
                val = "ADMIN_down"
2802
        elif field == "admin_ram":
2803
          val = instance.memory
2804
        elif field == "oper_ram":
2805
          if instance.primary_node in bad_nodes:
2806
            val = None
2807
          elif instance.name in live_data:
2808
            val = live_data[instance.name].get("memory", "?")
2809
          else:
2810
            val = "-"
2811
        elif field == "disk_template":
2812
          val = instance.disk_template
2813
        elif field == "ip":
2814
          val = instance.nics[0].ip
2815
        elif field == "bridge":
2816
          val = instance.nics[0].bridge
2817
        elif field == "mac":
2818
          val = instance.nics[0].mac
2819
        elif field == "sda_size" or field == "sdb_size":
2820
          disk = instance.FindDisk(field[:3])
2821
          if disk is None:
2822
            val = None
2823
          else:
2824
            val = disk.size
2825
        elif field == "vcpus":
2826
          val = instance.vcpus
2827
        else:
2828
          raise errors.ParameterError(field)
2829
        iout.append(val)
2830
      output.append(iout)
2831

    
2832
    return output
2833

    
2834

    
2835
class LUFailoverInstance(LogicalUnit):
2836
  """Failover an instance.
2837

2838
  """
2839
  HPATH = "instance-failover"
2840
  HTYPE = constants.HTYPE_INSTANCE
2841
  _OP_REQP = ["instance_name", "ignore_consistency"]
2842

    
2843
  def BuildHooksEnv(self):
2844
    """Build hooks env.
2845

2846
    This runs on master, primary and secondary nodes of the instance.
2847

2848
    """
2849
    env = {
2850
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2851
      }
2852
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2853
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2854
    return env, nl, nl
2855

    
2856
  def CheckPrereq(self):
2857
    """Check prerequisites.
2858

2859
    This checks that the instance is in the cluster.
2860

2861
    """
2862
    instance = self.cfg.GetInstanceInfo(
2863
      self.cfg.ExpandInstanceName(self.op.instance_name))
2864
    if instance is None:
2865
      raise errors.OpPrereqError("Instance '%s' not known" %
2866
                                 self.op.instance_name)
2867

    
2868
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2869
      raise errors.OpPrereqError("Instance's disk layout is not"
2870
                                 " network mirrored, cannot failover.")
2871

    
2872
    secondary_nodes = instance.secondary_nodes
2873
    if not secondary_nodes:
2874
      raise errors.ProgrammerError("no secondary node but using "
2875
                                   "DT_REMOTE_RAID1 template")
2876

    
2877
    target_node = secondary_nodes[0]
2878
    # check memory requirements on the secondary node
2879
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2880
                         instance.name, instance.memory)
2881

    
2882
    # check bridge existance
2883
    brlist = [nic.bridge for nic in instance.nics]
2884
    if not rpc.call_bridges_exist(target_node, brlist):
2885
      raise errors.OpPrereqError("One or more target bridges %s does not"
2886
                                 " exist on destination node '%s'" %
2887
                                 (brlist, target_node))
2888

    
2889
    self.instance = instance
2890

    
2891
  def Exec(self, feedback_fn):
2892
    """Failover an instance.
2893

2894
    The failover is done by shutting it down on its present node and
2895
    starting it on the secondary.
2896

2897
    """
2898
    instance = self.instance
2899

    
2900
    source_node = instance.primary_node
2901
    target_node = instance.secondary_nodes[0]
2902

    
2903
    feedback_fn("* checking disk consistency between source and target")
2904
    for dev in instance.disks:
2905
      # for remote_raid1, these are md over drbd
2906
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2907
        if instance.status == "up" and not self.op.ignore_consistency:
2908
          raise errors.OpExecError("Disk %s is degraded on target node,"
2909
                                   " aborting failover." % dev.iv_name)
2910

    
2911
    feedback_fn("* shutting down instance on source node")
2912
    logger.Info("Shutting down instance %s on node %s" %
2913
                (instance.name, source_node))
2914

    
2915
    if not rpc.call_instance_shutdown(source_node, instance):
2916
      if self.op.ignore_consistency:
2917
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2918
                     " anyway. Please make sure node %s is down"  %
2919
                     (instance.name, source_node, source_node))
2920
      else:
2921
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2922
                                 (instance.name, source_node))
2923

    
2924
    feedback_fn("* deactivating the instance's disks on source node")
2925
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2926
      raise errors.OpExecError("Can't shut down the instance's disks.")
2927

    
2928
    instance.primary_node = target_node
2929
    # distribute new instance config to the other nodes
2930
    self.cfg.AddInstance(instance)
2931

    
2932
    # Only start the instance if it's marked as up
2933
    if instance.status == "up":
2934
      feedback_fn("* activating the instance's disks on target node")
2935
      logger.Info("Starting instance %s on node %s" %
2936
                  (instance.name, target_node))
2937

    
2938
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2939
                                               ignore_secondaries=True)
2940
      if not disks_ok:
2941
        _ShutdownInstanceDisks(instance, self.cfg)
2942
        raise errors.OpExecError("Can't activate the instance's disks")
2943

    
2944
      feedback_fn("* starting the instance on the target node")
2945
      if not rpc.call_instance_start(target_node, instance, None):
2946
        _ShutdownInstanceDisks(instance, self.cfg)
2947
        raise errors.OpExecError("Could not start instance %s on node %s." %
2948
                                 (instance.name, target_node))
2949

    
2950

    
2951
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2952
  """Create a tree of block devices on the primary node.
2953

2954
  This always creates all devices.
2955

2956
  """
2957
  if device.children:
2958
    for child in device.children:
2959
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2960
        return False
2961

    
2962
  cfg.SetDiskID(device, node)
2963
  new_id = rpc.call_blockdev_create(node, device, device.size,
2964
                                    instance.name, True, info)
2965
  if not new_id:
2966
    return False
2967
  if device.physical_id is None:
2968
    device.physical_id = new_id
2969
  return True
2970

    
2971

    
2972
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2973
  """Create a tree of block devices on a secondary node.
2974

2975
  If this device type has to be created on secondaries, create it and
2976
  all its children.
2977

2978
  If not, just recurse to children keeping the same 'force' value.
2979

2980
  """
2981
  if device.CreateOnSecondary():
2982
    force = True
2983
  if device.children:
2984
    for child in device.children:
2985
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2986
                                        child, force, info):
2987
        return False
2988

    
2989
  if not force:
2990
    return True
2991
  cfg.SetDiskID(device, node)
2992
  new_id = rpc.call_blockdev_create(node, device, device.size,
2993
                                    instance.name, False, info)
2994
  if not new_id:
2995
    return False
2996
  if device.physical_id is None:
2997
    device.physical_id = new_id
2998
  return True
2999

    
3000

    
3001
def _GenerateUniqueNames(cfg, exts):
3002
  """Generate a suitable LV name.
3003

3004
  This will generate a logical volume name for the given instance.
3005

3006
  """
3007
  results = []
3008
  for val in exts:
3009
    new_id = cfg.GenerateUniqueID()
3010
    results.append("%s%s" % (new_id, val))
3011
  return results
3012

    
3013

    
3014
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3015
  """Generate a drbd device complete with its children.
3016

3017
  """
3018
  port = cfg.AllocatePort()
3019
  vgname = cfg.GetVGName()
3020
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3021
                          logical_id=(vgname, names[0]))
3022
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3023
                          logical_id=(vgname, names[1]))
3024
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3025
                          logical_id = (primary, secondary, port),
3026
                          children = [dev_data, dev_meta])
3027
  return drbd_dev
3028

    
3029

    
3030
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3031
  """Generate a drbd8 device complete with its children.
3032

3033
  """
3034
  port = cfg.AllocatePort()
3035
  vgname = cfg.GetVGName()
3036
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3037
                          logical_id=(vgname, names[0]))
3038
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3039
                          logical_id=(vgname, names[1]))
3040
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3041
                          logical_id = (primary, secondary, port),
3042
                          children = [dev_data, dev_meta],
3043
                          iv_name=iv_name)
3044
  return drbd_dev
3045

    
3046

    
3047
def _GenerateDiskTemplate(cfg, template_name,
3048
                          instance_name, primary_node,
3049
                          secondary_nodes, disk_sz, swap_sz,
3050
                          file_storage_dir, file_driver):
3051
  """Generate the entire disk layout for a given template type.
3052

3053
  """
3054
  #TODO: compute space requirements
3055

    
3056
  vgname = cfg.GetVGName()
3057
  if template_name == constants.DT_DISKLESS:
3058
    disks = []
3059
  elif template_name == constants.DT_PLAIN:
3060
    if len(secondary_nodes) != 0:
3061
      raise errors.ProgrammerError("Wrong template configuration")
3062

    
3063
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3064
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3065
                           logical_id=(vgname, names[0]),
3066
                           iv_name = "sda")
3067
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3068
                           logical_id=(vgname, names[1]),
3069
                           iv_name = "sdb")
3070
    disks = [sda_dev, sdb_dev]
3071
  elif template_name == constants.DT_DRBD8:
3072
    if len(secondary_nodes) != 1:
3073
      raise errors.ProgrammerError("Wrong template configuration")
3074
    remote_node = secondary_nodes[0]
3075
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3076
                                       ".sdb_data", ".sdb_meta"])
3077
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3078
                                         disk_sz, names[0:2], "sda")
3079
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3080
                                         swap_sz, names[2:4], "sdb")
3081
    disks = [drbd_sda_dev, drbd_sdb_dev]
3082
  elif template_name == constants.DT_FILE:
3083
    if len(secondary_nodes) != 0:
3084
      raise errors.ProgrammerError("Wrong template configuration")
3085

    
3086
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3087
                                iv_name="sda", logical_id=(file_driver,
3088
                                "%s/sda" % file_storage_dir))
3089
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3090
                                iv_name="sdb", logical_id=(file_driver,
3091
                                "%s/sdb" % file_storage_dir))
3092
    disks = [file_sda_dev, file_sdb_dev]
3093
  else:
3094
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3095
  return disks
3096

    
3097

    
3098
def _GetInstanceInfoText(instance):
3099
  """Compute that text that should be added to the disk's metadata.
3100

3101
  """
3102
  return "originstname+%s" % instance.name
3103

    
3104

    
3105
def _CreateDisks(cfg, instance):
3106
  """Create all disks for an instance.
3107

3108
  This abstracts away some work from AddInstance.
3109

3110
  Args:
3111
    instance: the instance object
3112

3113
  Returns:
3114
    True or False showing the success of the creation process
3115

3116
  """
3117
  info = _GetInstanceInfoText(instance)
3118

    
3119
  if instance.disk_template == constants.DT_FILE:
3120
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3121
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3122
                                              file_storage_dir)
3123

    
3124
    if not result:
3125
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3126
      return False
3127

    
3128
    if not result[0]:
3129
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3130
      return False
3131

    
3132
  for device in instance.disks:
3133
    logger.Info("creating volume %s for instance %s" %
3134
                (device.iv_name, instance.name))
3135
    #HARDCODE
3136
    for secondary_node in instance.secondary_nodes:
3137
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3138
                                        device, False, info):
3139
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3140
                     (device.iv_name, device, secondary_node))
3141
        return False
3142
    #HARDCODE
3143
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3144
                                    instance, device, info):
3145
      logger.Error("failed to create volume %s on primary!" %
3146
                   device.iv_name)
3147
      return False
3148

    
3149
  return True
3150

    
3151

    
3152
def _RemoveDisks(instance, cfg):
3153
  """Remove all disks for an instance.
3154

3155
  This abstracts away some work from `AddInstance()` and
3156
  `RemoveInstance()`. Note that in case some of the devices couldn't
3157
  be removed, the removal will continue with the other ones (compare
3158
  with `_CreateDisks()`).
3159

3160
  Args:
3161
    instance: the instance object
3162

3163
  Returns:
3164
    True or False showing the success of the removal proces
3165

3166
  """
3167
  logger.Info("removing block devices for instance %s" % instance.name)
3168

    
3169
  result = True
3170
  for device in instance.disks:
3171
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3172
      cfg.SetDiskID(disk, node)
3173
      if not rpc.call_blockdev_remove(node, disk):
3174
        logger.Error("could not remove block device %s on node %s,"
3175
                     " continuing anyway" %
3176
                     (device.iv_name, node))
3177
        result = False
3178

    
3179
  if instance.disk_template == constants.DT_FILE:
3180
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3181
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3182
                                            file_storage_dir):
3183
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3184
      result = False
3185

    
3186
  return result
3187

    
3188

    
3189
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3190
  """Compute disk size requirements in the volume group
3191

3192
  This is currently hard-coded for the two-drive layout.
3193

3194
  """
3195
  # Required free disk space as a function of disk and swap space
3196
  req_size_dict = {
3197
    constants.DT_DISKLESS: None,
3198
    constants.DT_PLAIN: disk_size + swap_size,
3199
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3200
    constants.DT_DRBD8: disk_size + swap_size + 256,
3201
    constants.DT_FILE: None,
3202
  }
3203

    
3204
  if disk_template not in req_size_dict:
3205
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3206
                                 " is unknown" %  disk_template)
3207

    
3208
  return req_size_dict[disk_template]
3209

    
3210

    
3211
class LUCreateInstance(LogicalUnit):
3212
  """Create an instance.
3213

3214
  """
3215
  HPATH = "instance-add"
3216
  HTYPE = constants.HTYPE_INSTANCE
3217
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3218
              "disk_template", "swap_size", "mode", "start", "vcpus",
3219
              "wait_for_sync", "ip_check", "mac"]
3220

    
3221
  def _RunAllocator(self):
3222
    """Run the allocator based on input opcode.
3223

3224
    """
3225
    disks = [{"size": self.op.disk_size, "mode": "w"},
3226
             {"size": self.op.swap_size, "mode": "w"}]
3227
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3228
             "bridge": self.op.bridge}]
3229
    ial = IAllocator(self.cfg, self.sstore,
3230
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3231
                     name=self.op.instance_name,
3232
                     disk_template=self.op.disk_template,
3233
                     tags=[],
3234
                     os=self.op.os_type,
3235
                     vcpus=self.op.vcpus,
3236
                     mem_size=self.op.mem_size,
3237
                     disks=disks,
3238
                     nics=nics,
3239
                     )
3240

    
3241
    ial.Run(self.op.iallocator)
3242

    
3243
    if not ial.success:
3244
      raise errors.OpPrereqError("Can't compute nodes using"
3245
                                 " iallocator '%s': %s" % (self.op.iallocator,
3246
                                                           ial.info))
3247
    if len(ial.nodes) != ial.required_nodes:
3248
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3249
                                 " of nodes (%s), required %s" %
3250
                                 (len(ial.nodes), ial.required_nodes))
3251
    self.op.pnode = ial.nodes[0]
3252
    logger.ToStdout("Selected nodes for the instance: %s" %
3253
                    (", ".join(ial.nodes),))
3254
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3255
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3256
    if ial.required_nodes == 2:
3257
      self.op.snode = ial.nodes[1]
3258

    
3259
  def BuildHooksEnv(self):
3260
    """Build hooks env.
3261

3262
    This runs on master, primary and secondary nodes of the instance.
3263

3264
    """
3265
    env = {
3266
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3267
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3268
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3269
      "INSTANCE_ADD_MODE": self.op.mode,
3270
      }
3271
    if self.op.mode == constants.INSTANCE_IMPORT:
3272
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3273
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3274
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3275

    
3276
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3277
      primary_node=self.op.pnode,
3278
      secondary_nodes=self.secondaries,
3279
      status=self.instance_status,
3280
      os_type=self.op.os_type,
3281
      memory=self.op.mem_size,
3282
      vcpus=self.op.vcpus,
3283
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3284
    ))
3285

    
3286
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3287
          self.secondaries)
3288
    return env, nl, nl
3289

    
3290

    
3291
  def CheckPrereq(self):
3292
    """Check prerequisites.
3293

3294
    """
3295
    # set optional parameters to none if they don't exist
3296
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3297
                 "iallocator"]:
3298
      if not hasattr(self.op, attr):
3299
        setattr(self.op, attr, None)
3300

    
3301
    if self.op.mode not in (constants.INSTANCE_CREATE,
3302
                            constants.INSTANCE_IMPORT):
3303
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3304
                                 self.op.mode)
3305

    
3306
    if (not self.cfg.GetVGName() and
3307
        self.op.disk_template not in constants.DTS_NOT_LVM):
3308
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3309
                                 " instances")
3310

    
3311
    if self.op.mode == constants.INSTANCE_IMPORT:
3312
      src_node = getattr(self.op, "src_node", None)
3313
      src_path = getattr(self.op, "src_path", None)
3314
      if src_node is None or src_path is None:
3315
        raise errors.OpPrereqError("Importing an instance requires source"
3316
                                   " node and path options")
3317
      src_node_full = self.cfg.ExpandNodeName(src_node)
3318
      if src_node_full is None:
3319
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3320
      self.op.src_node = src_node = src_node_full
3321

    
3322
      if not os.path.isabs(src_path):
3323
        raise errors.OpPrereqError("The source path must be absolute")
3324

    
3325
      export_info = rpc.call_export_info(src_node, src_path)
3326

    
3327
      if not export_info:
3328
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3329

    
3330
      if not export_info.has_section(constants.INISECT_EXP):
3331
        raise errors.ProgrammerError("Corrupted export config")
3332

    
3333
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3334
      if (int(ei_version) != constants.EXPORT_VERSION):
3335
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3336
                                   (ei_version, constants.EXPORT_VERSION))
3337

    
3338
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3339
        raise errors.OpPrereqError("Can't import instance with more than"
3340
                                   " one data disk")
3341

    
3342
      # FIXME: are the old os-es, disk sizes, etc. useful?
3343
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3344
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3345
                                                         'disk0_dump'))
3346
      self.src_image = diskimage
3347
    else: # INSTANCE_CREATE
3348
      if getattr(self.op, "os_type", None) is None:
3349
        raise errors.OpPrereqError("No guest OS specified")
3350

    
3351
    #### instance parameters check
3352

    
3353
    # disk template and mirror node verification
3354
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3355
      raise errors.OpPrereqError("Invalid disk template name")
3356

    
3357
    # instance name verification
3358
    hostname1 = utils.HostInfo(self.op.instance_name)
3359

    
3360
    self.op.instance_name = instance_name = hostname1.name
3361
    instance_list = self.cfg.GetInstanceList()
3362
    if instance_name in instance_list:
3363
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3364
                                 instance_name)
3365

    
3366
    # ip validity checks
3367
    ip = getattr(self.op, "ip", None)
3368
    if ip is None or ip.lower() == "none":
3369
      inst_ip = None
3370
    elif ip.lower() == "auto":
3371
      inst_ip = hostname1.ip
3372
    else:
3373
      if not utils.IsValidIP(ip):
3374
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3375
                                   " like a valid IP" % ip)
3376
      inst_ip = ip
3377
    self.inst_ip = self.op.ip = inst_ip
3378

    
3379
    if self.op.start and not self.op.ip_check:
3380
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3381
                                 " adding an instance in start mode")
3382

    
3383
    if self.op.ip_check:
3384
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3385
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3386
                                   (hostname1.ip, instance_name))
3387

    
3388
    # MAC address verification
3389
    if self.op.mac != "auto":
3390
      if not utils.IsValidMac(self.op.mac.lower()):
3391
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3392
                                   self.op.mac)
3393

    
3394
    # bridge verification
3395
    bridge = getattr(self.op, "bridge", None)
3396
    if bridge is None:
3397
      self.op.bridge = self.cfg.GetDefBridge()
3398
    else:
3399
      self.op.bridge = bridge
3400

    
3401
    # boot order verification
3402
    if self.op.hvm_boot_order is not None:
3403
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3404
        raise errors.OpPrereqError("invalid boot order specified,"
3405
                                   " must be one or more of [acdn]")
3406
    # file storage checks
3407
    if (self.op.file_driver and
3408
        not self.op.file_driver in constants.FILE_DRIVER):
3409
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3410
                                 self.op.file_driver)
3411

    
3412
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3413
      raise errors.OpPrereqError("File storage directory not a relative"
3414
                                 " path")
3415
    #### allocator run
3416

    
3417
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3418
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3419
                                 " node must be given")
3420

    
3421
    if self.op.iallocator is not None:
3422
      self._RunAllocator()
3423

    
3424
    #### node related checks
3425

    
3426
    # check primary node
3427
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3428
    if pnode is None:
3429
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3430
                                 self.op.pnode)
3431
    self.op.pnode = pnode.name
3432
    self.pnode = pnode
3433
    self.secondaries = []
3434

    
3435
    # mirror node verification
3436
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3437
      if getattr(self.op, "snode", None) is None:
3438
        raise errors.OpPrereqError("The networked disk templates need"
3439
                                   " a mirror node")
3440

    
3441
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3442
      if snode_name is None:
3443
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3444
                                   self.op.snode)
3445
      elif snode_name == pnode.name:
3446
        raise errors.OpPrereqError("The secondary node cannot be"
3447
                                   " the primary node.")
3448
      self.secondaries.append(snode_name)
3449

    
3450
    req_size = _ComputeDiskSize(self.op.disk_template,
3451
                                self.op.disk_size, self.op.swap_size)
3452

    
3453
    # Check lv size requirements
3454
    if req_size is not None:
3455
      nodenames = [pnode.name] + self.secondaries
3456
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3457
      for node in nodenames:
3458
        info = nodeinfo.get(node, None)
3459
        if not info:
3460
          raise errors.OpPrereqError("Cannot get current information"
3461
                                     " from node '%s'" % nodeinfo)
3462
        vg_free = info.get('vg_free', None)
3463
        if not isinstance(vg_free, int):
3464
          raise errors.OpPrereqError("Can't compute free disk space on"
3465
                                     " node %s" % node)
3466
        if req_size > info['vg_free']:
3467
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3468
                                     " %d MB available, %d MB required" %
3469
                                     (node, info['vg_free'], req_size))
3470

    
3471
    # os verification
3472
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3473
    if not os_obj:
3474
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3475
                                 " primary node"  % self.op.os_type)
3476

    
3477
    if self.op.kernel_path == constants.VALUE_NONE:
3478
      raise errors.OpPrereqError("Can't set instance kernel to none")
3479

    
3480

    
3481
    # bridge check on primary node
3482
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3483
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3484
                                 " destination node '%s'" %
3485
                                 (self.op.bridge, pnode.name))
3486

    
3487
    if self.op.start:
3488
      self.instance_status = 'up'
3489
    else:
3490
      self.instance_status = 'down'
3491

    
3492
  def Exec(self, feedback_fn):
3493
    """Create and add the instance to the cluster.
3494

3495
    """
3496
    instance = self.op.instance_name
3497
    pnode_name = self.pnode.name
3498

    
3499
    if self.op.mac == "auto":
3500
      mac_address = self.cfg.GenerateMAC()
3501
    else:
3502
      mac_address = self.op.mac
3503

    
3504
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3505
    if self.inst_ip is not None:
3506
      nic.ip = self.inst_ip
3507

    
3508
    ht_kind = self.sstore.GetHypervisorType()
3509
    if ht_kind in constants.HTS_REQ_PORT:
3510
      network_port = self.cfg.AllocatePort()
3511
    else:
3512
      network_port = None
3513

    
3514
    # this is needed because os.path.join does not accept None arguments
3515
    if self.op.file_storage_dir is None:
3516
      string_file_storage_dir = ""
3517
    else:
3518
      string_file_storage_dir = self.op.file_storage_dir
3519

    
3520
    # build the full file storage dir path
3521
    file_storage_dir = os.path.normpath(os.path.join(
3522
                                        self.sstore.GetFileStorageDir(),
3523
                                        string_file_storage_dir, instance))
3524

    
3525

    
3526
    disks = _GenerateDiskTemplate(self.cfg,
3527
                                  self.op.disk_template,
3528
                                  instance, pnode_name,
3529
                                  self.secondaries, self.op.disk_size,
3530
                                  self.op.swap_size,
3531
                                  file_storage_dir,
3532
                                  self.op.file_driver)
3533

    
3534
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3535
                            primary_node=pnode_name,
3536
                            memory=self.op.mem_size,
3537
                            vcpus=self.op.vcpus,
3538
                            nics=[nic], disks=disks,
3539
                            disk_template=self.op.disk_template,
3540
                            status=self.instance_status,
3541
                            network_port=network_port,
3542
                            kernel_path=self.op.kernel_path,
3543
                            initrd_path=self.op.initrd_path,
3544
                            hvm_boot_order=self.op.hvm_boot_order,
3545
                            )
3546

    
3547
    feedback_fn("* creating instance disks...")
3548
    if not _CreateDisks(self.cfg, iobj):
3549
      _RemoveDisks(iobj, self.cfg)
3550
      raise errors.OpExecError("Device creation failed, reverting...")
3551

    
3552
    feedback_fn("adding instance %s to cluster config" % instance)
3553

    
3554
    self.cfg.AddInstance(iobj)
3555

    
3556
    if self.op.wait_for_sync:
3557
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3558
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3559
      # make sure the disks are not degraded (still sync-ing is ok)
3560
      time.sleep(15)
3561
      feedback_fn("* checking mirrors status")
3562
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3563
    else:
3564
      disk_abort = False
3565

    
3566
    if disk_abort:
3567
      _RemoveDisks(iobj, self.cfg)
3568
      self.cfg.RemoveInstance(iobj.name)
3569
      raise errors.OpExecError("There are some degraded disks for"
3570
                               " this instance")
3571

    
3572
    feedback_fn("creating os for instance %s on node %s" %
3573
                (instance, pnode_name))
3574

    
3575
    if iobj.disk_template != constants.DT_DISKLESS:
3576
      if self.op.mode == constants.INSTANCE_CREATE:
3577
        feedback_fn("* running the instance OS create scripts...")
3578
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3579
          raise errors.OpExecError("could not add os for instance %s"
3580
                                   " on node %s" %
3581
                                   (instance, pnode_name))
3582

    
3583
      elif self.op.mode == constants.INSTANCE_IMPORT:
3584
        feedback_fn("* running the instance OS import scripts...")
3585
        src_node = self.op.src_node
3586
        src_image = self.src_image
3587
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3588
                                                src_node, src_image):
3589
          raise errors.OpExecError("Could not import os for instance"
3590
                                   " %s on node %s" %
3591
                                   (instance, pnode_name))
3592
      else:
3593
        # also checked in the prereq part
3594
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3595
                                     % self.op.mode)
3596

    
3597
    if self.op.start:
3598
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3599
      feedback_fn("* starting instance...")
3600
      if not rpc.call_instance_start(pnode_name, iobj, None):
3601
        raise errors.OpExecError("Could not start instance")
3602

    
3603

    
3604
class LUConnectConsole(NoHooksLU):
3605
  """Connect to an instance's console.
3606

3607
  This is somewhat special in that it returns the command line that
3608
  you need to run on the master node in order to connect to the
3609
  console.
3610

3611
  """
3612
  _OP_REQP = ["instance_name"]
3613

    
3614
  def CheckPrereq(self):
3615
    """Check prerequisites.
3616

3617
    This checks that the instance is in the cluster.
3618

3619
    """
3620
    instance = self.cfg.GetInstanceInfo(
3621
      self.cfg.ExpandInstanceName(self.op.instance_name))
3622
    if instance is None:
3623
      raise errors.OpPrereqError("Instance '%s' not known" %
3624
                                 self.op.instance_name)
3625
    self.instance = instance
3626

    
3627
  def Exec(self, feedback_fn):
3628
    """Connect to the console of an instance
3629

3630
    """
3631
    instance = self.instance
3632
    node = instance.primary_node
3633

    
3634
    node_insts = rpc.call_instance_list([node])[node]
3635
    if node_insts is False:
3636
      raise errors.OpExecError("Can't connect to node %s." % node)
3637

    
3638
    if instance.name not in node_insts:
3639
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3640

    
3641
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3642

    
3643
    hyper = hypervisor.GetHypervisor()
3644
    console_cmd = hyper.GetShellCommandForConsole(instance)
3645

    
3646
    # build ssh cmdline
3647
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3648

    
3649

    
3650
class LUReplaceDisks(LogicalUnit):
3651
  """Replace the disks of an instance.
3652

3653
  """
3654
  HPATH = "mirrors-replace"
3655
  HTYPE = constants.HTYPE_INSTANCE
3656
  _OP_REQP = ["instance_name", "mode", "disks"]
3657

    
3658
  def _RunAllocator(self):
3659
    """Compute a new secondary node using an IAllocator.
3660

3661
    """
3662
    ial = IAllocator(self.cfg, self.sstore,
3663
                     mode=constants.IALLOCATOR_MODE_RELOC,
3664
                     name=self.op.instance_name,
3665
                     relocate_from=[self.sec_node])
3666

    
3667
    ial.Run(self.op.iallocator)
3668

    
3669
    if not ial.success:
3670
      raise errors.OpPrereqError("Can't compute nodes using"
3671
                                 " iallocator '%s': %s" % (self.op.iallocator,
3672
                                                           ial.info))
3673
    if len(ial.nodes) != ial.required_nodes:
3674
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3675
                                 " of nodes (%s), required %s" %
3676
                                 (len(ial.nodes), ial.required_nodes))
3677
    self.op.remote_node = ial.nodes[0]
3678
    logger.ToStdout("Selected new secondary for the instance: %s" %
3679
                    self.op.remote_node)
3680

    
3681
  def BuildHooksEnv(self):
3682
    """Build hooks env.
3683

3684
    This runs on the master, the primary and all the secondaries.
3685

3686
    """
3687
    env = {
3688
      "MODE": self.op.mode,
3689
      "NEW_SECONDARY": self.op.remote_node,
3690
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3691
      }
3692
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3693
    nl = [
3694
      self.sstore.GetMasterNode(),
3695
      self.instance.primary_node,
3696
      ]
3697
    if self.op.remote_node is not None:
3698
      nl.append(self.op.remote_node)
3699
    return env, nl, nl
3700

    
3701
  def CheckPrereq(self):
3702
    """Check prerequisites.
3703

3704
    This checks that the instance is in the cluster.
3705

3706
    """
3707
    if not hasattr(self.op, "remote_node"):
3708
      self.op.remote_node = None
3709

    
3710
    instance = self.cfg.GetInstanceInfo(
3711
      self.cfg.ExpandInstanceName(self.op.instance_name))
3712
    if instance is None:
3713
      raise errors.OpPrereqError("Instance '%s' not known" %
3714
                                 self.op.instance_name)
3715
    self.instance = instance
3716
    self.op.instance_name = instance.name
3717

    
3718
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3719
      raise errors.OpPrereqError("Instance's disk layout is not"
3720
                                 " network mirrored.")
3721

    
3722
    if len(instance.secondary_nodes) != 1:
3723
      raise errors.OpPrereqError("The instance has a strange layout,"
3724
                                 " expected one secondary but found %d" %
3725
                                 len(instance.secondary_nodes))
3726

    
3727
    self.sec_node = instance.secondary_nodes[0]
3728

    
3729
    ia_name = getattr(self.op, "iallocator", None)
3730
    if ia_name is not None:
3731
      if self.op.remote_node is not None:
3732
        raise errors.OpPrereqError("Give either the iallocator or the new"
3733
                                   " secondary, not both")
3734
      self.op.remote_node = self._RunAllocator()
3735

    
3736
    remote_node = self.op.remote_node
3737
    if remote_node is not None:
3738
      remote_node = self.cfg.ExpandNodeName(remote_node)
3739
      if remote_node is None:
3740
        raise errors.OpPrereqError("Node '%s' not known" %
3741
                                   self.op.remote_node)
3742
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3743
    else:
3744
      self.remote_node_info = None
3745
    if remote_node == instance.primary_node:
3746
      raise errors.OpPrereqError("The specified node is the primary node of"
3747
                                 " the instance.")
3748
    elif remote_node == self.sec_node:
3749
      if self.op.mode == constants.REPLACE_DISK_SEC:
3750
        # this is for DRBD8, where we can't execute the same mode of
3751
        # replacement as for drbd7 (no different port allocated)
3752
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3753
                                   " replacement")
3754
      # the user gave the current secondary, switch to
3755
      # 'no-replace-secondary' mode for drbd7
3756
      remote_node = None
3757
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3758
        self.op.mode != constants.REPLACE_DISK_ALL):
3759
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3760
                                 " disks replacement, not individual ones")
3761
    if instance.disk_template == constants.DT_DRBD8:
3762
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3763
          remote_node is not None):
3764
        # switch to replace secondary mode
3765
        self.op.mode = constants.REPLACE_DISK_SEC
3766

    
3767
      if self.op.mode == constants.REPLACE_DISK_ALL:
3768
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3769
                                   " secondary disk replacement, not"
3770
                                   " both at once")
3771
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3772
        if remote_node is not None:
3773
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3774
                                     " the secondary while doing a primary"
3775
                                     " node disk replacement")
3776
        self.tgt_node = instance.primary_node
3777
        self.oth_node = instance.secondary_nodes[0]
3778
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3779
        self.new_node = remote_node # this can be None, in which case
3780
                                    # we don't change the secondary
3781
        self.tgt_node = instance.secondary_nodes[0]
3782
        self.oth_node = instance.primary_node
3783
      else:
3784
        raise errors.ProgrammerError("Unhandled disk replace mode")
3785

    
3786
    for name in self.op.disks:
3787
      if instance.FindDisk(name) is None:
3788
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3789
                                   (name, instance.name))
3790
    self.op.remote_node = remote_node
3791

    
3792
  def _ExecRR1(self, feedback_fn):
3793
    """Replace the disks of an instance.
3794

3795
    """
3796
    instance = self.instance
3797
    iv_names = {}
3798
    # start of work
3799
    if self.op.remote_node is None:
3800
      remote_node = self.sec_node
3801
    else:
3802
      remote_node = self.op.remote_node
3803
    cfg = self.cfg
3804
    for dev in instance.disks:
3805
      size = dev.size
3806
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3807
      names = _GenerateUniqueNames(cfg, lv_names)
3808
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3809
                                       remote_node, size, names)
3810
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3811
      logger.Info("adding new mirror component on secondary for %s" %
3812
                  dev.iv_name)
3813
      #HARDCODE
3814
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3815
                                        new_drbd, False,
3816
                                        _GetInstanceInfoText(instance)):
3817
        raise errors.OpExecError("Failed to create new component on secondary"
3818
                                 " node %s. Full abort, cleanup manually!" %
3819
                                 remote_node)
3820

    
3821
      logger.Info("adding new mirror component on primary")
3822
      #HARDCODE
3823
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3824
                                      instance, new_drbd,
3825
                                      _GetInstanceInfoText(instance)):
3826
        # remove secondary dev
3827
        cfg.SetDiskID(new_drbd, remote_node)
3828
        rpc.call_blockdev_remove(remote_node, new_drbd)
3829
        raise errors.OpExecError("Failed to create volume on primary!"
3830
                                 " Full abort, cleanup manually!!")
3831

    
3832
      # the device exists now
3833
      # call the primary node to add the mirror to md
3834
      logger.Info("adding new mirror component to md")
3835
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3836
                                           [new_drbd]):
3837
        logger.Error("Can't add mirror compoment to md!")
3838
        cfg.SetDiskID(new_drbd, remote_node)
3839
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3840
          logger.Error("Can't rollback on secondary")
3841
        cfg.SetDiskID(new_drbd, instance.primary_node)
3842
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3843
          logger.Error("Can't rollback on primary")
3844
        raise errors.OpExecError("Full abort, cleanup manually!!")
3845

    
3846
      dev.children.append(new_drbd)
3847
      cfg.AddInstance(instance)
3848

    
3849
    # this can fail as the old devices are degraded and _WaitForSync
3850
    # does a combined result over all disks, so we don't check its
3851
    # return value
3852
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3853

    
3854
    # so check manually all the devices
3855
    for name in iv_names:
3856
      dev, child, new_drbd = iv_names[name]
3857
      cfg.SetDiskID(dev, instance.primary_node)
3858
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3859
      if is_degr:
3860
        raise errors.OpExecError("MD device %s is degraded!" % name)
3861
      cfg.SetDiskID(new_drbd, instance.primary_node)
3862
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3863
      if is_degr:
3864
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3865

    
3866
    for name in iv_names:
3867
      dev, child, new_drbd = iv_names[name]
3868
      logger.Info("remove mirror %s component" % name)
3869
      cfg.SetDiskID(dev, instance.primary_node)
3870
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3871
                                              dev, [child]):
3872
        logger.Error("Can't remove child from mirror, aborting"
3873
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3874
        continue
3875

    
3876
      for node in child.logical_id[:2]:
3877
        logger.Info("remove child device on %s" % node)
3878
        cfg.SetDiskID(child, node)
3879
        if not rpc.call_blockdev_remove(node, child):
3880
          logger.Error("Warning: failed to remove device from node %s,"
3881
                       " continuing operation." % node)
3882

    
3883
      dev.children.remove(child)
3884

    
3885
      cfg.AddInstance(instance)
3886

    
3887
  def _ExecD8DiskOnly(self, feedback_fn):
3888
    """Replace a disk on the primary or secondary for dbrd8.
3889

3890
    The algorithm for replace is quite complicated:
3891
      - for each disk to be replaced:
3892
        - create new LVs on the target node with unique names
3893
        - detach old LVs from the drbd device
3894
        - rename old LVs to name_replaced.<time_t>
3895
        - rename new LVs to old LVs
3896
        - attach the new LVs (with the old names now) to the drbd device
3897
      - wait for sync across all devices
3898
      - for each modified disk:
3899
        - remove old LVs (which have the name name_replaces.<time_t>)
3900

3901
    Failures are not very well handled.
3902

3903
    """
3904
    steps_total = 6
3905
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3906
    instance = self.instance
3907
    iv_names = {}
3908
    vgname = self.cfg.GetVGName()
3909
    # start of work
3910
    cfg = self.cfg
3911
    tgt_node = self.tgt_node
3912
    oth_node = self.oth_node
3913

    
3914
    # Step: check device activation
3915
    self.proc.LogStep(1, steps_total, "check device existence")
3916
    info("checking volume groups")
3917
    my_vg = cfg.GetVGName()
3918
    results = rpc.call_vg_list([oth_node, tgt_node])
3919
    if not results:
3920
      raise errors.OpExecError("Can't list volume groups on the nodes")
3921
    for node in oth_node, tgt_node:
3922
      res = results.get(node, False)
3923
      if not res or my_vg not in res:
3924
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3925
                                 (my_vg, node))
3926
    for dev in instance.disks:
3927
      if not dev.iv_name in self.op.disks:
3928
        continue
3929
      for node in tgt_node, oth_node:
3930
        info("checking %s on %s" % (dev.iv_name, node))
3931
        cfg.SetDiskID(dev, node)
3932
        if not rpc.call_blockdev_find(node, dev):
3933
          raise errors.OpExecError("Can't find device %s on node %s" %
3934
                                   (dev.iv_name, node))
3935

    
3936
    # Step: check other node consistency
3937
    self.proc.LogStep(2, steps_total, "check peer consistency")
3938
    for dev in instance.disks:
3939
      if not dev.iv_name in self.op.disks:
3940
        continue
3941
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3942
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3943
                                   oth_node==instance.primary_node):
3944
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3945
                                 " to replace disks on this node (%s)" %
3946
                                 (oth_node, tgt_node))
3947

    
3948
    # Step: create new storage
3949
    self.proc.LogStep(3, steps_total, "allocate new storage")
3950
    for dev in instance.disks:
3951
      if not dev.iv_name in self.op.disks:
3952
        continue
3953
      size = dev.size
3954
      cfg.SetDiskID(dev, tgt_node)
3955
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3956
      names = _GenerateUniqueNames(cfg, lv_names)
3957
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3958
                             logical_id=(vgname, names[0]))
3959
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3960
                             logical_id=(vgname, names[1]))
3961
      new_lvs = [lv_data, lv_meta]
3962
      old_lvs = dev.children
3963
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3964
      info("creating new local storage on %s for %s" %
3965
           (tgt_node, dev.iv_name))
3966
      # since we *always* want to create this LV, we use the
3967
      # _Create...OnPrimary (which forces the creation), even if we
3968
      # are talking about the secondary node
3969
      for new_lv in new_lvs:
3970
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3971
                                        _GetInstanceInfoText(instance)):
3972
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3973
                                   " node '%s'" %
3974
                                   (new_lv.logical_id[1], tgt_node))
3975

    
3976
    # Step: for each lv, detach+rename*2+attach
3977
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3978
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3979
      info("detaching %s drbd from local storage" % dev.iv_name)
3980
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3981
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3982
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3983
      #dev.children = []
3984
      #cfg.Update(instance)
3985

    
3986
      # ok, we created the new LVs, so now we know we have the needed
3987
      # storage; as such, we proceed on the target node to rename
3988
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3989
      # using the assumption that logical_id == physical_id (which in
3990
      # turn is the unique_id on that node)
3991

    
3992
      # FIXME(iustin): use a better name for the replaced LVs
3993
      temp_suffix = int(time.time())
3994
      ren_fn = lambda d, suff: (d.physical_id[0],
3995
                                d.physical_id[1] + "_replaced-%s" % suff)
3996
      # build the rename list based on what LVs exist on the node
3997
      rlist = []
3998
      for to_ren in old_lvs:
3999
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4000
        if find_res is not None: # device exists
4001
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4002

    
4003
      info("renaming the old LVs on the target node")
4004
      if not rpc.call_blockdev_rename(tgt_node, rlist):
4005
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4006
      # now we rename the new LVs to the old LVs
4007
      info("renaming the new LVs on the target node")
4008
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4009
      if not rpc.call_blockdev_rename(tgt_node, rlist):
4010
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4011

    
4012
      for old, new in zip(old_lvs, new_lvs):
4013
        new.logical_id = old.logical_id
4014
        cfg.SetDiskID(new, tgt_node)
4015

    
4016
      for disk in old_lvs:
4017
        disk.logical_id = ren_fn(disk, temp_suffix)
4018
        cfg.SetDiskID(disk, tgt_node)
4019

    
4020
      # now that the new lvs have the old name, we can add them to the device
4021
      info("adding new mirror component on %s" % tgt_node)
4022
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4023
        for new_lv in new_lvs:
4024
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
4025
            warning("Can't rollback device %s", hint="manually cleanup unused"
4026
                    " logical volumes")
4027
        raise errors.OpExecError("Can't add local storage to drbd")
4028

    
4029
      dev.children = new_lvs
4030
      cfg.Update(instance)
4031

    
4032
    # Step: wait for sync
4033

    
4034
    # this can fail as the old devices are degraded and _WaitForSync
4035
    # does a combined result over all disks, so we don't check its
4036
    # return value
4037
    self.proc.LogStep(5, steps_total, "sync devices")
4038
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4039

    
4040
    # so check manually all the devices
4041
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4042
      cfg.SetDiskID(dev, instance.primary_node)
4043
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4044
      if is_degr:
4045
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4046

    
4047
    # Step: remove old storage
4048
    self.proc.LogStep(6, steps_total, "removing old storage")
4049
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4050
      info("remove logical volumes for %s" % name)
4051
      for lv in old_lvs:
4052
        cfg.SetDiskID(lv, tgt_node)
4053
        if not rpc.call_blockdev_remove(tgt_node, lv):
4054
          warning("Can't remove old LV", hint="manually remove unused LVs")
4055
          continue
4056

    
4057
  def _ExecD8Secondary(self, feedback_fn):
4058
    """Replace the secondary node for drbd8.
4059

4060
    The algorithm for replace is quite complicated:
4061
      - for all disks of the instance:
4062
        - create new LVs on the new node with same names
4063
        - shutdown the drbd device on the old secondary
4064
        - disconnect the drbd network on the primary
4065
        - create the drbd device on the new secondary
4066
        - network attach the drbd on the primary, using an artifice:
4067
          the drbd code for Attach() will connect to the network if it
4068
          finds a device which is connected to the good local disks but
4069
          not network enabled
4070
      - wait for sync across all devices
4071
      - remove all disks from the old secondary
4072

4073
    Failures are not very well handled.
4074

4075
    """
4076
    steps_total = 6
4077
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4078
    instance = self.instance
4079
    iv_names = {}
4080
    vgname = self.cfg.GetVGName()
4081
    # start of work
4082
    cfg = self.cfg
4083
    old_node = self.tgt_node
4084
    new_node = self.new_node
4085
    pri_node = instance.primary_node
4086

    
4087
    # Step: check device activation
4088
    self.proc.LogStep(1, steps_total, "check device existence")
4089
    info("checking volume groups")
4090
    my_vg = cfg.GetVGName()
4091
    results = rpc.call_vg_list([pri_node, new_node])
4092
    if not results:
4093
      raise errors.OpExecError("Can't list volume groups on the nodes")
4094
    for node in pri_node, new_node:
4095
      res = results.get(node, False)
4096
      if not res or my_vg not in res:
4097
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4098
                                 (my_vg, node))
4099
    for dev in instance.disks:
4100
      if not dev.iv_name in self.op.disks:
4101
        continue
4102
      info("checking %s on %s" % (dev.iv_name, pri_node))
4103
      cfg.SetDiskID(dev, pri_node)
4104
      if not rpc.call_blockdev_find(pri_node, dev):
4105
        raise errors.OpExecError("Can't find device %s on node %s" %
4106
                                 (dev.iv_name, pri_node))
4107

    
4108
    # Step: check other node consistency
4109
    self.proc.LogStep(2, steps_total, "check peer consistency")
4110
    for dev in instance.disks:
4111
      if not dev.iv_name in self.op.disks:
4112
        continue
4113
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4114
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4115
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4116
                                 " unsafe to replace the secondary" %
4117
                                 pri_node)
4118

    
4119
    # Step: create new storage
4120
    self.proc.LogStep(3, steps_total, "allocate new storage")
4121
    for dev in instance.disks:
4122
      size = dev.size
4123
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4124
      # since we *always* want to create this LV, we use the
4125
      # _Create...OnPrimary (which forces the creation), even if we
4126
      # are talking about the secondary node
4127
      for new_lv in dev.children:
4128
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4129
                                        _GetInstanceInfoText(instance)):
4130
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4131
                                   " node '%s'" %
4132
                                   (new_lv.logical_id[1], new_node))
4133

    
4134
      iv_names[dev.iv_name] = (dev, dev.children)
4135

    
4136
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4137
    for dev in instance.disks:
4138
      size = dev.size
4139
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4140
      # create new devices on new_node
4141
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4142
                              logical_id=(pri_node, new_node,
4143
                                          dev.logical_id[2]),
4144
                              children=dev.children)
4145
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4146
                                        new_drbd, False,
4147
                                      _GetInstanceInfoText(instance)):
4148
        raise errors.OpExecError("Failed to create new DRBD on"
4149
                                 " node '%s'" % new_node)
4150

    
4151
    for dev in instance.disks:
4152
      # we have new devices, shutdown the drbd on the old secondary
4153
      info("shutting down drbd for %s on old node" % dev.iv_name)
4154
      cfg.SetDiskID(dev, old_node)
4155
      if not rpc.call_blockdev_shutdown(old_node, dev):
4156
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4157
                hint="Please cleanup this device manually as soon as possible")
4158

    
4159
    info("detaching primary drbds from the network (=> standalone)")
4160
    done = 0
4161
    for dev in instance.disks:
4162
      cfg.SetDiskID(dev, pri_node)
4163
      # set the physical (unique in bdev terms) id to None, meaning
4164
      # detach from network
4165
      dev.physical_id = (None,) * len(dev.physical_id)
4166
      # and 'find' the device, which will 'fix' it to match the
4167
      # standalone state
4168
      if rpc.call_blockdev_find(pri_node, dev):
4169
        done += 1
4170
      else:
4171
        warning("Failed to detach drbd %s from network, unusual case" %
4172
                dev.iv_name)
4173

    
4174
    if not done:
4175
      # no detaches succeeded (very unlikely)
4176
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4177

    
4178
    # if we managed to detach at least one, we update all the disks of
4179
    # the instance to point to the new secondary
4180
    info("updating instance configuration")
4181
    for dev in instance.disks:
4182
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4183
      cfg.SetDiskID(dev, pri_node)
4184
    cfg.Update(instance)
4185

    
4186
    # and now perform the drbd attach
4187
    info("attaching primary drbds to new secondary (standalone => connected)")
4188
    failures = []
4189
    for dev in instance.disks:
4190
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4191
      # since the attach is smart, it's enough to 'find' the device,
4192
      # it will automatically activate the network, if the physical_id
4193
      # is correct
4194
      cfg.SetDiskID(dev, pri_node)
4195
      if not rpc.call_blockdev_find(pri_node, dev):
4196
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4197
                "please do a gnt-instance info to see the status of disks")
4198

    
4199
    # this can fail as the old devices are degraded and _WaitForSync
4200
    # does a combined result over all disks, so we don't check its
4201
    # return value
4202
    self.proc.LogStep(5, steps_total, "sync devices")
4203
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4204

    
4205
    # so check manually all the devices
4206
    for name, (dev, old_lvs) in iv_names.iteritems():
4207
      cfg.SetDiskID(dev, pri_node)
4208
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4209
      if is_degr:
4210
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4211

    
4212
    self.proc.LogStep(6, steps_total, "removing old storage")
4213
    for name, (dev, old_lvs) in iv_names.iteritems():
4214
      info("remove logical volumes for %s" % name)
4215
      for lv in old_lvs:
4216
        cfg.SetDiskID(lv, old_node)
4217
        if not rpc.call_blockdev_remove(old_node, lv):
4218
          warning("Can't remove LV on old secondary",
4219
                  hint="Cleanup stale volumes by hand")
4220

    
4221
  def Exec(self, feedback_fn):
4222
    """Execute disk replacement.
4223

4224
    This dispatches the disk replacement to the appropriate handler.
4225

4226
    """
4227
    instance = self.instance
4228
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4229
      fn = self._ExecRR1
4230
    elif instance.disk_template == constants.DT_DRBD8:
4231
      if self.op.remote_node is None:
4232
        fn = self._ExecD8DiskOnly
4233
      else:
4234
        fn = self._ExecD8Secondary
4235
    else:
4236
      raise errors.ProgrammerError("Unhandled disk replacement case")
4237
    return fn(feedback_fn)
4238

    
4239

    
4240
class LUQueryInstanceData(NoHooksLU):
4241
  """Query runtime instance data.
4242

4243
  """
4244
  _OP_REQP = ["instances"]
4245

    
4246
  def CheckPrereq(self):
4247
    """Check prerequisites.
4248

4249
    This only checks the optional instance list against the existing names.
4250

4251
    """
4252
    if not isinstance(self.op.instances, list):
4253
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4254
    if self.op.instances:
4255
      self.wanted_instances = []
4256
      names = self.op.instances
4257
      for name in names:
4258
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4259
        if instance is None:
4260
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4261
        self.wanted_instances.append(instance)
4262
    else:
4263
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4264
                               in self.cfg.GetInstanceList()]
4265
    return
4266

    
4267

    
4268
  def _ComputeDiskStatus(self, instance, snode, dev):
4269
    """Compute block device status.
4270

4271
    """
4272
    self.cfg.SetDiskID(dev, instance.primary_node)
4273
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4274
    if dev.dev_type in constants.LDS_DRBD:
4275
      # we change the snode then (otherwise we use the one passed in)
4276
      if dev.logical_id[0] == instance.primary_node:
4277
        snode = dev.logical_id[1]
4278
      else:
4279
        snode = dev.logical_id[0]
4280

    
4281
    if snode:
4282
      self.cfg.SetDiskID(dev, snode)
4283
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4284
    else:
4285
      dev_sstatus = None
4286

    
4287
    if dev.children:
4288
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4289
                      for child in dev.children]
4290
    else:
4291
      dev_children = []
4292

    
4293
    data = {
4294
      "iv_name": dev.iv_name,
4295
      "dev_type": dev.dev_type,
4296
      "logical_id": dev.logical_id,
4297
      "physical_id": dev.physical_id,
4298
      "pstatus": dev_pstatus,
4299
      "sstatus": dev_sstatus,
4300
      "children": dev_children,
4301
      }
4302

    
4303
    return data
4304

    
4305
  def Exec(self, feedback_fn):
4306
    """Gather and return data"""
4307
    result = {}
4308
    for instance in self.wanted_instances:
4309
      remote_info = rpc.call_instance_info(instance.primary_node,
4310
                                                instance.name)
4311
      if remote_info and "state" in remote_info:
4312
        remote_state = "up"
4313
      else:
4314
        remote_state = "down"
4315
      if instance.status == "down":
4316
        config_state = "down"
4317
      else:
4318
        config_state = "up"
4319

    
4320
      disks = [self._ComputeDiskStatus(instance, None, device)
4321
               for device in instance.disks]
4322

    
4323
      idict = {
4324
        "name": instance.name,
4325
        "config_state": config_state,
4326
        "run_state": remote_state,
4327
        "pnode": instance.primary_node,
4328
        "snodes": instance.secondary_nodes,
4329
        "os": instance.os,
4330
        "memory": instance.memory,
4331
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4332
        "disks": disks,
4333
        "network_port": instance.network_port,
4334
        "vcpus": instance.vcpus,
4335
        "kernel_path": instance.kernel_path,
4336
        "initrd_path": instance.initrd_path,
4337
        "hvm_boot_order": instance.hvm_boot_order,
4338
        }
4339

    
4340
      result[instance.name] = idict
4341

    
4342
    return result
4343

    
4344

    
4345
class LUSetInstanceParams(LogicalUnit):
4346
  """Modifies an instances's parameters.
4347

4348
  """
4349
  HPATH = "instance-modify"
4350
  HTYPE = constants.HTYPE_INSTANCE
4351
  _OP_REQP = ["instance_name"]
4352

    
4353
  def BuildHooksEnv(self):
4354
    """Build hooks env.
4355

4356
    This runs on the master, primary and secondaries.
4357

4358
    """
4359
    args = dict()
4360
    if self.mem:
4361
      args['memory'] = self.mem
4362
    if self.vcpus:
4363
      args['vcpus'] = self.vcpus
4364
    if self.do_ip or self.do_bridge or self.mac:
4365
      if self.do_ip:
4366
        ip = self.ip
4367
      else:
4368
        ip = self.instance.nics[0].ip
4369
      if self.bridge:
4370
        bridge = self.bridge
4371
      else:
4372
        bridge = self.instance.nics[0].bridge
4373
      if self.mac:
4374
        mac = self.mac
4375
      else:
4376
        mac = self.instance.nics[0].mac
4377
      args['nics'] = [(ip, bridge, mac)]
4378
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4379
    nl = [self.sstore.GetMasterNode(),
4380
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4381
    return env, nl, nl
4382

    
4383
  def CheckPrereq(self):
4384
    """Check prerequisites.
4385

4386
    This only checks the instance list against the existing names.
4387

4388
    """
4389
    self.mem = getattr(self.op, "mem", None)
4390
    self.vcpus = getattr(self.op, "vcpus", None)
4391
    self.ip = getattr(self.op, "ip", None)
4392
    self.mac = getattr(self.op, "mac", None)
4393
    self.bridge = getattr(self.op, "bridge", None)
4394
    self.kernel_path = getattr(self.op, "kernel_path", None)
4395
    self.initrd_path = getattr(self.op, "initrd_path", None)
4396
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4397
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4398
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4399
    if all_params.count(None) == len(all_params):
4400
      raise errors.OpPrereqError("No changes submitted")
4401
    if self.mem is not None:
4402
      try:
4403
        self.mem = int(self.mem)
4404
      except ValueError, err:
4405
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4406
    if self.vcpus is not None:
4407
      try:
4408
        self.vcpus = int(self.vcpus)
4409
      except ValueError, err:
4410
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4411
    if self.ip is not None:
4412
      self.do_ip = True
4413
      if self.ip.lower() == "none":
4414
        self.ip = None
4415
      else:
4416
        if not utils.IsValidIP(self.ip):
4417
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4418
    else:
4419
      self.do_ip = False
4420
    self.do_bridge = (self.bridge is not None)
4421
    if self.mac is not None:
4422
      if self.cfg.IsMacInUse(self.mac):
4423
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4424
                                   self.mac)
4425
      if not utils.IsValidMac(self.mac):
4426
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4427

    
4428
    if self.kernel_path is not None:
4429
      self.do_kernel_path = True
4430
      if self.kernel_path == constants.VALUE_NONE:
4431
        raise errors.OpPrereqError("Can't set instance to no kernel")
4432

    
4433
      if self.kernel_path != constants.VALUE_DEFAULT:
4434
        if not os.path.isabs(self.kernel_path):
4435
          raise errors.OpPrereqError("The kernel path must be an absolute"
4436
                                    " filename")
4437
    else:
4438
      self.do_kernel_path = False
4439

    
4440
    if self.initrd_path is not None:
4441
      self.do_initrd_path = True
4442
      if self.initrd_path not in (constants.VALUE_NONE,
4443
                                  constants.VALUE_DEFAULT):
4444
        if not os.path.isabs(self.initrd_path):
4445
          raise errors.OpPrereqError("The initrd path must be an absolute"
4446
                                    " filename")
4447
    else:
4448
      self.do_initrd_path = False
4449

    
4450
    # boot order verification
4451
    if self.hvm_boot_order is not None:
4452
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4453
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4454
          raise errors.OpPrereqError("invalid boot order specified,"
4455
                                     " must be one or more of [acdn]"
4456
                                     " or 'default'")
4457

    
4458
    instance = self.cfg.GetInstanceInfo(
4459
      self.cfg.ExpandInstanceName(self.op.instance_name))
4460
    if instance is None:
4461
      raise errors.OpPrereqError("No such instance name '%s'" %
4462
                                 self.op.instance_name)
4463
    self.op.instance_name = instance.name
4464
    self.instance = instance
4465
    return
4466

    
4467
  def Exec(self, feedback_fn):
4468
    """Modifies an instance.
4469

4470
    All parameters take effect only at the next restart of the instance.
4471
    """
4472
    result = []
4473
    instance = self.instance
4474
    if self.mem:
4475
      instance.memory = self.mem
4476
      result.append(("mem", self.mem))
4477
    if self.vcpus:
4478
      instance.vcpus = self.vcpus
4479
      result.append(("vcpus",  self.vcpus))
4480
    if self.do_ip:
4481
      instance.nics[0].ip = self.ip
4482
      result.append(("ip", self.ip))
4483
    if self.bridge:
4484
      instance.nics[0].bridge = self.bridge
4485
      result.append(("bridge", self.bridge))
4486
    if self.mac:
4487
      instance.nics[0].mac = self.mac
4488
      result.append(("mac", self.mac))
4489
    if self.do_kernel_path:
4490
      instance.kernel_path = self.kernel_path
4491
      result.append(("kernel_path", self.kernel_path))
4492
    if self.do_initrd_path:
4493
      instance.initrd_path = self.initrd_path
4494
      result.append(("initrd_path", self.initrd_path))
4495
    if self.hvm_boot_order:
4496
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4497
        instance.hvm_boot_order = None
4498
      else:
4499
        instance.hvm_boot_order = self.hvm_boot_order
4500
      result.append(("hvm_boot_order", self.hvm_boot_order))
4501

    
4502
    self.cfg.AddInstance(instance)
4503

    
4504
    return result
4505

    
4506

    
4507
class LUQueryExports(NoHooksLU):
4508
  """Query the exports list
4509

4510
  """
4511
  _OP_REQP = []
4512

    
4513
  def CheckPrereq(self):
4514
    """Check that the nodelist contains only existing nodes.
4515

4516
    """
4517
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4518

    
4519
  def Exec(self, feedback_fn):
4520
    """Compute the list of all the exported system images.
4521

4522
    Returns:
4523
      a dictionary with the structure node->(export-list)
4524
      where export-list is a list of the instances exported on
4525
      that node.
4526

4527
    """
4528
    return rpc.call_export_list(self.nodes)
4529

    
4530

    
4531
class LUExportInstance(LogicalUnit):
4532
  """Export an instance to an image in the cluster.
4533

4534
  """
4535
  HPATH = "instance-export"
4536
  HTYPE = constants.HTYPE_INSTANCE
4537
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4538

    
4539
  def BuildHooksEnv(self):
4540
    """Build hooks env.
4541

4542
    This will run on the master, primary node and target node.
4543

4544
    """
4545
    env = {
4546
      "EXPORT_NODE": self.op.target_node,
4547
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4548
      }
4549
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4550
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4551
          self.op.target_node]
4552
    return env, nl, nl
4553

    
4554
  def CheckPrereq(self):
4555
    """Check prerequisites.
4556

4557
    This checks that the instance and node names are valid.
4558

4559
    """
4560
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4561
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4562
    if self.instance is None:
4563
      raise errors.OpPrereqError("Instance '%s' not found" %
4564
                                 self.op.instance_name)
4565

    
4566
    # node verification
4567
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4568
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4569

    
4570
    if self.dst_node is None:
4571
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4572
                                 self.op.target_node)
4573
    self.op.target_node = self.dst_node.name
4574

    
4575
    # instance disk type verification
4576
    for disk in self.instance.disks:
4577
      if disk.dev_type == constants.LD_FILE:
4578
        raise errors.OpPrereqError("Export not supported for instances with"
4579
                                   " file-based disks")
4580

    
4581
  def Exec(self, feedback_fn):
4582
    """Export an instance to an image in the cluster.
4583

4584
    """
4585
    instance = self.instance
4586
    dst_node = self.dst_node
4587
    src_node = instance.primary_node
4588
    if self.op.shutdown:
4589
      # shutdown the instance, but not the disks
4590
      if not rpc.call_instance_shutdown(src_node, instance):
4591
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4592
                                  (instance.name, src_node))
4593

    
4594
    vgname = self.cfg.GetVGName()
4595

    
4596
    snap_disks = []
4597

    
4598
    try:
4599
      for disk in instance.disks:
4600
        if disk.iv_name == "sda":
4601
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4602
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4603

    
4604
          if not new_dev_name:
4605
            logger.Error("could not snapshot block device %s on node %s" %
4606
                         (disk.logical_id[1], src_node))
4607
          else:
4608
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4609
                                      logical_id=(vgname, new_dev_name),
4610
                                      physical_id=(vgname, new_dev_name),
4611
                                      iv_name=disk.iv_name)
4612
            snap_disks.append(new_dev)
4613

    
4614
    finally:
4615
      if self.op.shutdown and instance.status == "up":
4616
        if not rpc.call_instance_start(src_node, instance, None):
4617
          _ShutdownInstanceDisks(instance, self.cfg)
4618
          raise errors.OpExecError("Could not start instance")
4619

    
4620
    # TODO: check for size
4621

    
4622
    for dev in snap_disks:
4623
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4624
        logger.Error("could not export block device %s from node %s to node %s"
4625
                     % (dev.logical_id[1], src_node, dst_node.name))
4626
      if not rpc.call_blockdev_remove(src_node, dev):
4627
        logger.Error("could not remove snapshot block device %s from node %s" %
4628
                     (dev.logical_id[1], src_node))
4629

    
4630
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4631
      logger.Error("could not finalize export for instance %s on node %s" %
4632
                   (instance.name, dst_node.name))
4633

    
4634
    nodelist = self.cfg.GetNodeList()
4635
    nodelist.remove(dst_node.name)
4636

    
4637
    # on one-node clusters nodelist will be empty after the removal
4638
    # if we proceed the backup would be removed because OpQueryExports
4639
    # substitutes an empty list with the full cluster node list.
4640
    if nodelist:
4641
      op = opcodes.OpQueryExports(nodes=nodelist)
4642
      exportlist = self.proc.ChainOpCode(op)
4643
      for node in exportlist:
4644
        if instance.name in exportlist[node]:
4645
          if not rpc.call_export_remove(node, instance.name):
4646
            logger.Error("could not remove older export for instance %s"
4647
                         " on node %s" % (instance.name, node))
4648

    
4649

    
4650
class LURemoveExport(NoHooksLU):
4651
  """Remove exports related to the named instance.
4652

4653
  """
4654
  _OP_REQP = ["instance_name"]
4655

    
4656
  def CheckPrereq(self):
4657
    """Check prerequisites.
4658
    """
4659
    pass
4660

    
4661
  def Exec(self, feedback_fn):
4662
    """Remove any export.
4663

4664
    """
4665
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4666
    # If the instance was not found we'll try with the name that was passed in.
4667
    # This will only work if it was an FQDN, though.
4668
    fqdn_warn = False
4669
    if not instance_name:
4670
      fqdn_warn = True
4671
      instance_name = self.op.instance_name
4672

    
4673
    op = opcodes.OpQueryExports(nodes=[])
4674
    exportlist = self.proc.ChainOpCode(op)
4675
    found = False
4676
    for node in exportlist:
4677
      if instance_name in exportlist[node]:
4678
        found = True
4679
        if not rpc.call_export_remove(node, instance_name):
4680
          logger.Error("could not remove export for instance %s"
4681
                       " on node %s" % (instance_name, node))
4682

    
4683
    if fqdn_warn and not found:
4684
      feedback_fn("Export not found. If trying to remove an export belonging"
4685
                  " to a deleted instance please use its Fully Qualified"
4686
                  " Domain Name.")
4687

    
4688

    
4689
class TagsLU(NoHooksLU):
4690
  """Generic tags LU.
4691

4692
  This is an abstract class which is the parent of all the other tags LUs.
4693

4694
  """
4695
  def CheckPrereq(self):
4696
    """Check prerequisites.
4697

4698
    """
4699
    if self.op.kind == constants.TAG_CLUSTER:
4700
      self.target = self.cfg.GetClusterInfo()
4701
    elif self.op.kind == constants.TAG_NODE:
4702
      name = self.cfg.ExpandNodeName(self.op.name)
4703
      if name is None:
4704
        raise errors.OpPrereqError("Invalid node name (%s)" %
4705
                                   (self.op.name,))
4706
      self.op.name = name
4707
      self.target = self.cfg.GetNodeInfo(name)
4708
    elif self.op.kind == constants.TAG_INSTANCE:
4709
      name = self.cfg.ExpandInstanceName(self.op.name)
4710
      if name is None:
4711
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4712
                                   (self.op.name,))
4713
      self.op.name = name
4714
      self.target = self.cfg.GetInstanceInfo(name)
4715
    else:
4716
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4717
                                 str(self.op.kind))
4718

    
4719

    
4720
class LUGetTags(TagsLU):
4721
  """Returns the tags of a given object.
4722

4723
  """
4724
  _OP_REQP = ["kind", "name"]
4725

    
4726
  def Exec(self, feedback_fn):
4727
    """Returns the tag list.
4728

4729
    """
4730
    return self.target.GetTags()
4731

    
4732

    
4733
class LUSearchTags(NoHooksLU):
4734
  """Searches the tags for a given pattern.
4735

4736
  """
4737
  _OP_REQP = ["pattern"]
4738

    
4739
  def CheckPrereq(self):
4740
    """Check prerequisites.
4741

4742
    This checks the pattern passed for validity by compiling it.
4743

4744
    """
4745
    try:
4746
      self.re = re.compile(self.op.pattern)
4747
    except re.error, err:
4748
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4749
                                 (self.op.pattern, err))
4750

    
4751
  def Exec(self, feedback_fn):
4752
    """Returns the tag list.
4753

4754
    """
4755
    cfg = self.cfg
4756
    tgts = [("/cluster", cfg.GetClusterInfo())]
4757
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4758
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4759
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4760
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4761
    results = []
4762
    for path, target in tgts:
4763
      for tag in target.GetTags():
4764
        if self.re.search(tag):
4765
          results.append((path, tag))
4766
    return results
4767

    
4768

    
4769
class LUAddTags(TagsLU):
4770
  """Sets a tag on a given object.
4771

4772
  """
4773
  _OP_REQP = ["kind", "name", "tags"]
4774

    
4775
  def CheckPrereq(self):
4776
    """Check prerequisites.
4777

4778
    This checks the type and length of the tag name and value.
4779

4780
    """
4781
    TagsLU.CheckPrereq(self)
4782
    for tag in self.op.tags:
4783
      objects.TaggableObject.ValidateTag(tag)
4784

    
4785
  def Exec(self, feedback_fn):
4786
    """Sets the tag.
4787

4788
    """
4789
    try:
4790
      for tag in self.op.tags:
4791
        self.target.AddTag(tag)
4792
    except errors.TagError, err:
4793
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4794
    try:
4795
      self.cfg.Update(self.target)
4796
    except errors.ConfigurationError:
4797
      raise errors.OpRetryError("There has been a modification to the"
4798
                                " config file and the operation has been"
4799
                                " aborted. Please retry.")
4800

    
4801

    
4802
class LUDelTags(TagsLU):
4803
  """Delete a list of tags from a given object.
4804

4805
  """
4806
  _OP_REQP = ["kind", "name", "tags"]
4807

    
4808
  def CheckPrereq(self):
4809
    """Check prerequisites.
4810

4811
    This checks that we have the given tag.
4812

4813
    """
4814
    TagsLU.CheckPrereq(self)
4815
    for tag in self.op.tags:
4816
      objects.TaggableObject.ValidateTag(tag)
4817
    del_tags = frozenset(self.op.tags)
4818
    cur_tags = self.target.GetTags()
4819
    if not del_tags <= cur_tags:
4820
      diff_tags = del_tags - cur_tags
4821
      diff_names = ["'%s'" % tag for tag in diff_tags]
4822
      diff_names.sort()
4823
      raise errors.OpPrereqError("Tag(s) %s not found" %
4824
                                 (",".join(diff_names)))
4825

    
4826
  def Exec(self, feedback_fn):
4827
    """Remove the tag from the object.
4828

4829
    """
4830
    for tag in self.op.tags:
4831
      self.target.RemoveTag(tag)
4832
    try:
4833
      self.cfg.Update(self.target)
4834
    except errors.ConfigurationError:
4835
      raise errors.OpRetryError("There has been a modification to the"
4836
                                " config file and the operation has been"
4837
                                " aborted. Please retry.")
4838

    
4839
class LUTestDelay(NoHooksLU):
4840
  """Sleep for a specified amount of time.
4841

4842
  This LU sleeps on the master and/or nodes for a specified amoutn of
4843
  time.
4844

4845
  """
4846
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4847

    
4848
  def CheckPrereq(self):
4849
    """Check prerequisites.
4850

4851
    This checks that we have a good list of nodes and/or the duration
4852
    is valid.
4853

4854
    """
4855

    
4856
    if self.op.on_nodes:
4857
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4858

    
4859
  def Exec(self, feedback_fn):
4860
    """Do the actual sleep.
4861

4862
    """
4863
    if self.op.on_master:
4864
      if not utils.TestDelay(self.op.duration):
4865
        raise errors.OpExecError("Error during master delay test")
4866
    if self.op.on_nodes:
4867
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4868
      if not result:
4869
        raise errors.OpExecError("Complete failure from rpc call")
4870
      for node, node_result in result.items():
4871
        if not node_result:
4872
          raise errors.OpExecError("Failure during rpc call to node %s,"
4873
                                   " result: %s" % (node, node_result))
4874

    
4875

    
4876
class IAllocator(object):
4877
  """IAllocator framework.
4878

4879
  An IAllocator instance has three sets of attributes:
4880
    - cfg/sstore that are needed to query the cluster
4881
    - input data (all members of the _KEYS class attribute are required)
4882
    - four buffer attributes (in|out_data|text), that represent the
4883
      input (to the external script) in text and data structure format,
4884
      and the output from it, again in two formats
4885
    - the result variables from the script (success, info, nodes) for
4886
      easy usage
4887

4888
  """
4889
  _ALLO_KEYS = [
4890
    "mem_size", "disks", "disk_template",
4891
    "os", "tags", "nics", "vcpus",
4892
    ]
4893
  _RELO_KEYS = [
4894
    "relocate_from",
4895
    ]
4896

    
4897
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4898
    self.cfg = cfg
4899
    self.sstore = sstore
4900
    # init buffer variables
4901
    self.in_text = self.out_text = self.in_data = self.out_data = None
4902
    # init all input fields so that pylint is happy
4903
    self.mode = mode
4904
    self.name = name
4905
    self.mem_size = self.disks = self.disk_template = None
4906
    self.os = self.tags = self.nics = self.vcpus = None
4907
    self.relocate_from = None
4908
    # computed fields
4909
    self.required_nodes = None
4910
    # init result fields
4911
    self.success = self.info = self.nodes = None
4912
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4913
      keyset = self._ALLO_KEYS
4914
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4915
      keyset = self._RELO_KEYS
4916
    else:
4917
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4918
                                   " IAllocator" % self.mode)
4919
    for key in kwargs:
4920
      if key not in keyset:
4921
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4922
                                     " IAllocator" % key)
4923
      setattr(self, key, kwargs[key])
4924
    for key in keyset:
4925
      if key not in kwargs:
4926
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4927
                                     " IAllocator" % key)
4928
    self._BuildInputData()
4929

    
4930
  def _ComputeClusterData(self):
4931
    """Compute the generic allocator input data.
4932

4933
    This is the data that is independent of the actual operation.
4934

4935
    """
4936
    cfg = self.cfg
4937
    # cluster data
4938
    data = {
4939
      "version": 1,
4940
      "cluster_name": self.sstore.GetClusterName(),
4941
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4942
      "hypervisor_type": self.sstore.GetHypervisorType(),
4943
      # we don't have job IDs
4944
      }
4945

    
4946
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4947

    
4948
    # node data
4949
    node_results = {}
4950
    node_list = cfg.GetNodeList()
4951
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4952
    for nname in node_list:
4953
      ninfo = cfg.GetNodeInfo(nname)
4954
      if nname not in node_data or not isinstance(node_data[nname], dict):
4955
        raise errors.OpExecError("Can't get data for node %s" % nname)
4956
      remote_info = node_data[nname]
4957
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4958
                   'vg_size', 'vg_free', 'cpu_total']:
4959
        if attr not in remote_info:
4960
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4961
                                   (nname, attr))
4962
        try:
4963
          remote_info[attr] = int(remote_info[attr])
4964
        except ValueError, err:
4965
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4966
                                   " %s" % (nname, attr, str(err)))
4967
      # compute memory used by primary instances
4968
      i_p_mem = i_p_up_mem = 0
4969
      for iinfo in i_list:
4970
        if iinfo.primary_node == nname:
4971
          i_p_mem += iinfo.memory
4972
          if iinfo.status == "up":
4973
            i_p_up_mem += iinfo.memory
4974

    
4975
      # compute memory used by instances
4976
      pnr = {
4977
        "tags": list(ninfo.GetTags()),
4978
        "total_memory": remote_info['memory_total'],
4979
        "reserved_memory": remote_info['memory_dom0'],
4980
        "free_memory": remote_info['memory_free'],
4981
        "i_pri_memory": i_p_mem,
4982
        "i_pri_up_memory": i_p_up_mem,
4983
        "total_disk": remote_info['vg_size'],
4984
        "free_disk": remote_info['vg_free'],
4985
        "primary_ip": ninfo.primary_ip,
4986
        "secondary_ip": ninfo.secondary_ip,
4987
        "total_cpus": remote_info['cpu_total'],
4988
        }
4989
      node_results[nname] = pnr
4990
    data["nodes"] = node_results
4991

    
4992
    # instance data
4993
    instance_data = {}
4994
    for iinfo in i_list:
4995
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4996
                  for n in iinfo.nics]
4997
      pir = {
4998
        "tags": list(iinfo.GetTags()),
4999
        "should_run": iinfo.status == "up",
5000
        "vcpus": iinfo.vcpus,
5001
        "memory": iinfo.memory,
5002
        "os": iinfo.os,
5003
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5004
        "nics": nic_data,
5005
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5006
        "disk_template": iinfo.disk_template,
5007
        }
5008
      instance_data[iinfo.name] = pir
5009

    
5010
    data["instances"] = instance_data
5011

    
5012
    self.in_data = data
5013

    
5014
  def _AddNewInstance(self):
5015
    """Add new instance data to allocator structure.
5016

5017
    This in combination with _AllocatorGetClusterData will create the
5018
    correct structure needed as input for the allocator.
5019

5020
    The checks for the completeness of the opcode must have already been
5021
    done.
5022

5023
    """
5024
    data = self.in_data
5025
    if len(self.disks) != 2:
5026
      raise errors.OpExecError("Only two-disk configurations supported")
5027

    
5028
    disk_space = _ComputeDiskSize(self.disk_template,
5029
                                  self.disks[0]["size"], self.disks[1]["size"])
5030

    
5031
    if self.disk_template in constants.DTS_NET_MIRROR:
5032
      self.required_nodes = 2
5033
    else:
5034
      self.required_nodes = 1
5035
    request = {
5036
      "type": "allocate",
5037
      "name": self.name,
5038
      "disk_template": self.disk_template,
5039
      "tags": self.tags,
5040
      "os": self.os,
5041
      "vcpus": self.vcpus,
5042
      "memory": self.mem_size,
5043
      "disks": self.disks,
5044
      "disk_space_total": disk_space,
5045
      "nics": self.nics,
5046
      "required_nodes": self.required_nodes,
5047
      }
5048
    data["request"] = request
5049

    
5050
  def _AddRelocateInstance(self):
5051
    """Add relocate instance data to allocator structure.
5052

5053
    This in combination with _IAllocatorGetClusterData will create the
5054
    correct structure needed as input for the allocator.
5055

5056
    The checks for the completeness of the opcode must have already been
5057
    done.
5058

5059
    """
5060
    instance = self.cfg.GetInstanceInfo(self.name)
5061
    if instance is None:
5062
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5063
                                   " IAllocator" % self.name)
5064

    
5065
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5066
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5067

    
5068
    if len(instance.secondary_nodes) != 1:
5069
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5070

    
5071
    self.required_nodes = 1
5072

    
5073
    disk_space = _ComputeDiskSize(instance.disk_template,
5074
                                  instance.disks[0].size,
5075
                                  instance.disks[1].size)
5076

    
5077
    request = {
5078
      "type": "relocate",
5079
      "name": self.name,
5080
      "disk_space_total": disk_space,
5081
      "required_nodes": self.required_nodes,
5082
      "relocate_from": self.relocate_from,
5083
      }
5084
    self.in_data["request"] = request
5085

    
5086
  def _BuildInputData(self):
5087
    """Build input data structures.
5088

5089
    """
5090
    self._ComputeClusterData()
5091

    
5092
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5093
      self._AddNewInstance()
5094
    else:
5095
      self._AddRelocateInstance()
5096

    
5097
    self.in_text = serializer.Dump(self.in_data)
5098

    
5099
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5100
    """Run an instance allocator and return the results.
5101

5102
    """
5103
    data = self.in_text
5104

    
5105
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5106

    
5107
    if not isinstance(result, tuple) or len(result) != 4:
5108
      raise errors.OpExecError("Invalid result from master iallocator runner")
5109

    
5110
    rcode, stdout, stderr, fail = result
5111

    
5112
    if rcode == constants.IARUN_NOTFOUND:
5113
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5114
    elif rcode == constants.IARUN_FAILURE:
5115
        raise errors.OpExecError("Instance allocator call failed: %s,"
5116
                                 " output: %s" %
5117
                                 (fail, stdout+stderr))
5118
    self.out_text = stdout
5119
    if validate:
5120
      self._ValidateResult()
5121

    
5122
  def _ValidateResult(self):
5123
    """Process the allocator results.
5124

5125
    This will process and if successful save the result in
5126
    self.out_data and the other parameters.
5127

5128
    """
5129
    try:
5130
      rdict = serializer.Load(self.out_text)
5131
    except Exception, err:
5132
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5133

    
5134
    if not isinstance(rdict, dict):
5135
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5136

    
5137
    for key in "success", "info", "nodes":
5138
      if key not in rdict:
5139
        raise errors.OpExecError("Can't parse iallocator results:"
5140
                                 " missing key '%s'" % key)
5141
      setattr(self, key, rdict[key])
5142

    
5143
    if not isinstance(rdict["nodes"], list):
5144
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5145
                               " is not a list")
5146
    self.out_data = rdict
5147

    
5148

    
5149
class LUTestAllocator(NoHooksLU):
5150
  """Run allocator tests.
5151

5152
  This LU runs the allocator tests
5153

5154
  """
5155
  _OP_REQP = ["direction", "mode", "name"]
5156

    
5157
  def CheckPrereq(self):
5158
    """Check prerequisites.
5159

5160
    This checks the opcode parameters depending on the director and mode test.
5161

5162
    """
5163
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5164
      for attr in ["name", "mem_size", "disks", "disk_template",
5165
                   "os", "tags", "nics", "vcpus"]:
5166
        if not hasattr(self.op, attr):
5167
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5168
                                     attr)
5169
      iname = self.cfg.ExpandInstanceName(self.op.name)
5170
      if iname is not None:
5171
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5172
                                   iname)
5173
      if not isinstance(self.op.nics, list):
5174
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5175
      for row in self.op.nics:
5176
        if (not isinstance(row, dict) or
5177
            "mac" not in row or
5178
            "ip" not in row or
5179
            "bridge" not in row):
5180
          raise errors.OpPrereqError("Invalid contents of the"
5181
                                     " 'nics' parameter")
5182
      if not isinstance(self.op.disks, list):
5183
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5184
      if len(self.op.disks) != 2:
5185
        raise errors.OpPrereqError("Only two-disk configurations supported")
5186
      for row in self.op.disks:
5187
        if (not isinstance(row, dict) or
5188
            "size" not in row or
5189
            not isinstance(row["size"], int) or
5190
            "mode" not in row or
5191
            row["mode"] not in ['r', 'w']):
5192
          raise errors.OpPrereqError("Invalid contents of the"
5193
                                     " 'disks' parameter")
5194
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5195
      if not hasattr(self.op, "name"):
5196
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5197
      fname = self.cfg.ExpandInstanceName(self.op.name)
5198
      if fname is None:
5199
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5200
                                   self.op.name)
5201
      self.op.name = fname
5202
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5203
    else:
5204
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5205
                                 self.op.mode)
5206

    
5207
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5208
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5209
        raise errors.OpPrereqError("Missing allocator name")
5210
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5211
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5212
                                 self.op.direction)
5213

    
5214
  def Exec(self, feedback_fn):
5215
    """Run the allocator test.
5216

5217
    """
5218
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5219
      ial = IAllocator(self.cfg, self.sstore,
5220
                       mode=self.op.mode,
5221
                       name=self.op.name,
5222
                       mem_size=self.op.mem_size,
5223
                       disks=self.op.disks,
5224
                       disk_template=self.op.disk_template,
5225
                       os=self.op.os,
5226
                       tags=self.op.tags,
5227
                       nics=self.op.nics,
5228
                       vcpus=self.op.vcpus,
5229
                       )
5230
    else:
5231
      ial = IAllocator(self.cfg, self.sstore,
5232
                       mode=self.op.mode,
5233
                       name=self.op.name,
5234
                       relocate_from=list(self.relocate_from),
5235
                       )
5236

    
5237
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5238
      result = ial.in_text
5239
    else:
5240
      ial.Run(self.op.allocator, validate=False)
5241
      result = ial.out_text
5242
    return result