Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 102b115b

History | View | Annotate | Download (176.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
545

    
546
    # set up the inter-node password and certificate
547
    _InitGanetiServerSetup(ss)
548

    
549
    # start the master ip
550
    rpc.call_node_start_master(hostname.name)
551

    
552
    # set up ssh config and /etc/hosts
553
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
554
    try:
555
      sshline = f.read()
556
    finally:
557
      f.close()
558
    sshkey = sshline.split(" ")[1]
559

    
560
    _AddHostToEtcHosts(hostname.name)
561
    _InitSSHSetup(hostname.name)
562

    
563
    # init of cluster config file
564
    self.cfg = cfgw = config.ConfigWriter()
565
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
566
                    sshkey, self.op.mac_prefix,
567
                    self.op.vg_name, self.op.def_bridge)
568

    
569
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
570

    
571

    
572
class LUDestroyCluster(NoHooksLU):
573
  """Logical unit for destroying the cluster.
574

575
  """
576
  _OP_REQP = []
577

    
578
  def CheckPrereq(self):
579
    """Check prerequisites.
580

581
    This checks whether the cluster is empty.
582

583
    Any errors are signalled by raising errors.OpPrereqError.
584

585
    """
586
    master = self.sstore.GetMasterNode()
587

    
588
    nodelist = self.cfg.GetNodeList()
589
    if len(nodelist) != 1 or nodelist[0] != master:
590
      raise errors.OpPrereqError("There are still %d node(s) in"
591
                                 " this cluster." % (len(nodelist) - 1))
592
    instancelist = self.cfg.GetInstanceList()
593
    if instancelist:
594
      raise errors.OpPrereqError("There are still %d instance(s) in"
595
                                 " this cluster." % len(instancelist))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    master = self.sstore.GetMasterNode()
602
    if not rpc.call_node_stop_master(master):
603
      raise errors.OpExecError("Could not disable the master role")
604
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
605
    utils.CreateBackup(priv_key)
606
    utils.CreateBackup(pub_key)
607
    rpc.call_node_leave_cluster(master)
608

    
609

    
610
class LUVerifyCluster(LogicalUnit):
611
  """Verifies the cluster status.
612

613
  """
614
  HPATH = "cluster-verify"
615
  HTYPE = constants.HTYPE_CLUSTER
616
  _OP_REQP = ["skip_checks"]
617

    
618
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
619
                  remote_version, feedback_fn):
620
    """Run multiple tests against a node.
621

622
    Test list:
623
      - compares ganeti version
624
      - checks vg existance and size > 20G
625
      - checks config file checksum
626
      - checks ssh to other nodes
627

628
    Args:
629
      node: name of the node to check
630
      file_list: required list of files
631
      local_cksum: dictionary of local files and their checksums
632

633
    """
634
    # compares ganeti version
635
    local_version = constants.PROTOCOL_VERSION
636
    if not remote_version:
637
      feedback_fn("  - ERROR: connection to %s failed" % (node))
638
      return True
639

    
640
    if local_version != remote_version:
641
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
642
                      (local_version, node, remote_version))
643
      return True
644

    
645
    # checks vg existance and size > 20G
646

    
647
    bad = False
648
    if not vglist:
649
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
650
                      (node,))
651
      bad = True
652
    else:
653
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
654
      if vgstatus:
655
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
656
        bad = True
657

    
658
    # checks config file checksum
659
    # checks ssh to any
660

    
661
    if 'filelist' not in node_result:
662
      bad = True
663
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
664
    else:
665
      remote_cksum = node_result['filelist']
666
      for file_name in file_list:
667
        if file_name not in remote_cksum:
668
          bad = True
669
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
670
        elif remote_cksum[file_name] != local_cksum[file_name]:
671
          bad = True
672
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
673

    
674
    if 'nodelist' not in node_result:
675
      bad = True
676
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
677
    else:
678
      if node_result['nodelist']:
679
        bad = True
680
        for node in node_result['nodelist']:
681
          feedback_fn("  - ERROR: communication with node '%s': %s" %
682
                          (node, node_result['nodelist'][node]))
683
    hyp_result = node_result.get('hypervisor', None)
684
    if hyp_result is not None:
685
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
686
    return bad
687

    
688
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
689
                      node_instance, feedback_fn):
690
    """Verify an instance.
691

692
    This function checks to see if the required block devices are
693
    available on the instance's node.
694

695
    """
696
    bad = False
697

    
698
    node_current = instanceconfig.primary_node
699

    
700
    node_vol_should = {}
701
    instanceconfig.MapLVsByNode(node_vol_should)
702

    
703
    for node in node_vol_should:
704
      for volume in node_vol_should[node]:
705
        if node not in node_vol_is or volume not in node_vol_is[node]:
706
          feedback_fn("  - ERROR: volume %s missing on node %s" %
707
                          (volume, node))
708
          bad = True
709

    
710
    if not instanceconfig.status == 'down':
711
      if (node_current not in node_instance or
712
          not instance in node_instance[node_current]):
713
        feedback_fn("  - ERROR: instance %s not running on node %s" %
714
                        (instance, node_current))
715
        bad = True
716

    
717
    for node in node_instance:
718
      if (not node == node_current):
719
        if instance in node_instance[node]:
720
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
721
                          (instance, node))
722
          bad = True
723

    
724
    return bad
725

    
726
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
727
    """Verify if there are any unknown volumes in the cluster.
728

729
    The .os, .swap and backup volumes are ignored. All other volumes are
730
    reported as unknown.
731

732
    """
733
    bad = False
734

    
735
    for node in node_vol_is:
736
      for volume in node_vol_is[node]:
737
        if node not in node_vol_should or volume not in node_vol_should[node]:
738
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
739
                      (volume, node))
740
          bad = True
741
    return bad
742

    
743
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
744
    """Verify the list of running instances.
745

746
    This checks what instances are running but unknown to the cluster.
747

748
    """
749
    bad = False
750
    for node in node_instance:
751
      for runninginstance in node_instance[node]:
752
        if runninginstance not in instancelist:
753
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
754
                          (runninginstance, node))
755
          bad = True
756
    return bad
757

    
758
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
759
    """Verify N+1 Memory Resilience.
760

761
    Check that if one single node dies we can still start all the instances it
762
    was primary for.
763

764
    """
765
    bad = False
766

    
767
    for node, nodeinfo in node_info.iteritems():
768
      # This code checks that every node which is now listed as secondary has
769
      # enough memory to host all instances it is supposed to should a single
770
      # other node in the cluster fail.
771
      # FIXME: not ready for failover to an arbitrary node
772
      # FIXME: does not support file-backed instances
773
      # WARNING: we currently take into account down instances as well as up
774
      # ones, considering that even if they're down someone might want to start
775
      # them even in the event of a node failure.
776
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
777
        needed_mem = 0
778
        for instance in instances:
779
          needed_mem += instance_cfg[instance].memory
780
        if nodeinfo['mfree'] < needed_mem:
781
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
782
                      " failovers should node %s fail" % (node, prinode))
783
          bad = True
784
    return bad
785

    
786
  def CheckPrereq(self):
787
    """Check prerequisites.
788

789
    Transform the list of checks we're going to skip into a set and check that
790
    all its members are valid.
791

792
    """
793
    self.skip_set = frozenset(self.op.skip_checks)
794
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
795
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
796

    
797
  def BuildHooksEnv(self):
798
    """Build hooks env.
799

800
    Cluster-Verify hooks just rone in the post phase and their failure makes
801
    the output be logged in the verify output and the verification to fail.
802

803
    """
804
    all_nodes = self.cfg.GetNodeList()
805
    # TODO: populate the environment with useful information for verify hooks
806
    env = {}
807
    return env, [], all_nodes
808

    
809
  def Exec(self, feedback_fn):
810
    """Verify integrity of cluster, performing various test on nodes.
811

812
    """
813
    bad = False
814
    feedback_fn("* Verifying global settings")
815
    for msg in self.cfg.VerifyConfig():
816
      feedback_fn("  - ERROR: %s" % msg)
817

    
818
    vg_name = self.cfg.GetVGName()
819
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
820
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
821
    i_non_redundant = [] # Non redundant instances
822
    node_volume = {}
823
    node_instance = {}
824
    node_info = {}
825
    instance_cfg = {}
826

    
827
    # FIXME: verify OS list
828
    # do local checksums
829
    file_names = list(self.sstore.GetFileList())
830
    file_names.append(constants.SSL_CERT_FILE)
831
    file_names.append(constants.CLUSTER_CONF_FILE)
832
    local_checksums = utils.FingerprintFiles(file_names)
833

    
834
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
835
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
836
    all_instanceinfo = rpc.call_instance_list(nodelist)
837
    all_vglist = rpc.call_vg_list(nodelist)
838
    node_verify_param = {
839
      'filelist': file_names,
840
      'nodelist': nodelist,
841
      'hypervisor': None,
842
      }
843
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
844
    all_rversion = rpc.call_version(nodelist)
845
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
846

    
847
    for node in nodelist:
848
      feedback_fn("* Verifying node %s" % node)
849
      result = self._VerifyNode(node, file_names, local_checksums,
850
                                all_vglist[node], all_nvinfo[node],
851
                                all_rversion[node], feedback_fn)
852
      bad = bad or result
853

    
854
      # node_volume
855
      volumeinfo = all_volumeinfo[node]
856

    
857
      if isinstance(volumeinfo, basestring):
858
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
859
                    (node, volumeinfo[-400:].encode('string_escape')))
860
        bad = True
861
        node_volume[node] = {}
862
      elif not isinstance(volumeinfo, dict):
863
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
864
        bad = True
865
        continue
866
      else:
867
        node_volume[node] = volumeinfo
868

    
869
      # node_instance
870
      nodeinstance = all_instanceinfo[node]
871
      if type(nodeinstance) != list:
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      node_instance[node] = nodeinstance
877

    
878
      # node_info
879
      nodeinfo = all_ninfo[node]
880
      if not isinstance(nodeinfo, dict):
881
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
882
        bad = True
883
        continue
884

    
885
      try:
886
        node_info[node] = {
887
          "mfree": int(nodeinfo['memory_free']),
888
          "dfree": int(nodeinfo['vg_free']),
889
          "pinst": [],
890
          "sinst": [],
891
          # dictionary holding all instances this node is secondary for,
892
          # grouped by their primary node. Each key is a cluster node, and each
893
          # value is a list of instances which have the key as primary and the
894
          # current node as secondary.  this is handy to calculate N+1 memory
895
          # availability if you can only failover from a primary to its
896
          # secondary.
897
          "sinst-by-pnode": {},
898
        }
899
      except ValueError:
900
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
901
        bad = True
902
        continue
903

    
904
    node_vol_should = {}
905

    
906
    for instance in instancelist:
907
      feedback_fn("* Verifying instance %s" % instance)
908
      inst_config = self.cfg.GetInstanceInfo(instance)
909
      result =  self._VerifyInstance(instance, inst_config, node_volume,
910
                                     node_instance, feedback_fn)
911
      bad = bad or result
912

    
913
      inst_config.MapLVsByNode(node_vol_should)
914

    
915
      instance_cfg[instance] = inst_config
916

    
917
      pnode = inst_config.primary_node
918
      if pnode in node_info:
919
        node_info[pnode]['pinst'].append(instance)
920
      else:
921
        feedback_fn("  - ERROR: instance %s, connection to primary node"
922
                    " %s failed" % (instance, pnode))
923
        bad = True
924

    
925
      # If the instance is non-redundant we cannot survive losing its primary
926
      # node, so we are not N+1 compliant. On the other hand we have no disk
927
      # templates with more than one secondary so that situation is not well
928
      # supported either.
929
      # FIXME: does not support file-backed instances
930
      if len(inst_config.secondary_nodes) == 0:
931
        i_non_redundant.append(instance)
932
      elif len(inst_config.secondary_nodes) > 1:
933
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
934
                    % instance)
935

    
936
      for snode in inst_config.secondary_nodes:
937
        if snode in node_info:
938
          node_info[snode]['sinst'].append(instance)
939
          if pnode not in node_info[snode]['sinst-by-pnode']:
940
            node_info[snode]['sinst-by-pnode'][pnode] = []
941
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
942
        else:
943
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
944
                      " %s failed" % (instance, snode))
945

    
946
    feedback_fn("* Verifying orphan volumes")
947
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
948
                                       feedback_fn)
949
    bad = bad or result
950

    
951
    feedback_fn("* Verifying remaining instances")
952
    result = self._VerifyOrphanInstances(instancelist, node_instance,
953
                                         feedback_fn)
954
    bad = bad or result
955

    
956
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
957
      feedback_fn("* Verifying N+1 Memory redundancy")
958
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
959
      bad = bad or result
960

    
961
    feedback_fn("* Other Notes")
962
    if i_non_redundant:
963
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
964
                  % len(i_non_redundant))
965

    
966
    return int(bad)
967

    
968
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
969
    """Analize the post-hooks' result, handle it, and send some
970
    nicely-formatted feedback back to the user.
971

972
    Args:
973
      phase: the hooks phase that has just been run
974
      hooks_results: the results of the multi-node hooks rpc call
975
      feedback_fn: function to send feedback back to the caller
976
      lu_result: previous Exec result
977

978
    """
979
    # We only really run POST phase hooks, and are only interested in their results
980
    if phase == constants.HOOKS_PHASE_POST:
981
      # Used to change hooks' output to proper indentation
982
      indent_re = re.compile('^', re.M)
983
      feedback_fn("* Hooks Results")
984
      if not hooks_results:
985
        feedback_fn("  - ERROR: general communication failure")
986
        lu_result = 1
987
      else:
988
        for node_name in hooks_results:
989
          show_node_header = True
990
          res = hooks_results[node_name]
991
          if res is False or not isinstance(res, list):
992
            feedback_fn("    Communication failure")
993
            lu_result = 1
994
            continue
995
          for script, hkr, output in res:
996
            if hkr == constants.HKR_FAIL:
997
              # The node header is only shown once, if there are
998
              # failing hooks on that node
999
              if show_node_header:
1000
                feedback_fn("  Node %s:" % node_name)
1001
                show_node_header = False
1002
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1003
              output = indent_re.sub('      ', output)
1004
              feedback_fn("%s" % output)
1005
              lu_result = 1
1006

    
1007
      return lu_result
1008

    
1009

    
1010
class LUVerifyDisks(NoHooksLU):
1011
  """Verifies the cluster disks status.
1012

1013
  """
1014
  _OP_REQP = []
1015

    
1016
  def CheckPrereq(self):
1017
    """Check prerequisites.
1018

1019
    This has no prerequisites.
1020

1021
    """
1022
    pass
1023

    
1024
  def Exec(self, feedback_fn):
1025
    """Verify integrity of cluster disks.
1026

1027
    """
1028
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1029

    
1030
    vg_name = self.cfg.GetVGName()
1031
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1032
    instances = [self.cfg.GetInstanceInfo(name)
1033
                 for name in self.cfg.GetInstanceList()]
1034

    
1035
    nv_dict = {}
1036
    for inst in instances:
1037
      inst_lvs = {}
1038
      if (inst.status != "up" or
1039
          inst.disk_template not in constants.DTS_NET_MIRROR):
1040
        continue
1041
      inst.MapLVsByNode(inst_lvs)
1042
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1043
      for node, vol_list in inst_lvs.iteritems():
1044
        for vol in vol_list:
1045
          nv_dict[(node, vol)] = inst
1046

    
1047
    if not nv_dict:
1048
      return result
1049

    
1050
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1051

    
1052
    to_act = set()
1053
    for node in nodes:
1054
      # node_volume
1055
      lvs = node_lvs[node]
1056

    
1057
      if isinstance(lvs, basestring):
1058
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1059
        res_nlvm[node] = lvs
1060
      elif not isinstance(lvs, dict):
1061
        logger.Info("connection to node %s failed or invalid data returned" %
1062
                    (node,))
1063
        res_nodes.append(node)
1064
        continue
1065

    
1066
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1067
        inst = nv_dict.pop((node, lv_name), None)
1068
        if (not lv_online and inst is not None
1069
            and inst.name not in res_instances):
1070
          res_instances.append(inst.name)
1071

    
1072
    # any leftover items in nv_dict are missing LVs, let's arrange the
1073
    # data better
1074
    for key, inst in nv_dict.iteritems():
1075
      if inst.name not in res_missing:
1076
        res_missing[inst.name] = []
1077
      res_missing[inst.name].append(key)
1078

    
1079
    return result
1080

    
1081

    
1082
class LURenameCluster(LogicalUnit):
1083
  """Rename the cluster.
1084

1085
  """
1086
  HPATH = "cluster-rename"
1087
  HTYPE = constants.HTYPE_CLUSTER
1088
  _OP_REQP = ["name"]
1089

    
1090
  def BuildHooksEnv(self):
1091
    """Build hooks env.
1092

1093
    """
1094
    env = {
1095
      "OP_TARGET": self.sstore.GetClusterName(),
1096
      "NEW_NAME": self.op.name,
1097
      }
1098
    mn = self.sstore.GetMasterNode()
1099
    return env, [mn], [mn]
1100

    
1101
  def CheckPrereq(self):
1102
    """Verify that the passed name is a valid one.
1103

1104
    """
1105
    hostname = utils.HostInfo(self.op.name)
1106

    
1107
    new_name = hostname.name
1108
    self.ip = new_ip = hostname.ip
1109
    old_name = self.sstore.GetClusterName()
1110
    old_ip = self.sstore.GetMasterIP()
1111
    if new_name == old_name and new_ip == old_ip:
1112
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1113
                                 " cluster has changed")
1114
    if new_ip != old_ip:
1115
      result = utils.RunCmd(["fping", "-q", new_ip])
1116
      if not result.failed:
1117
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1118
                                   " reachable on the network. Aborting." %
1119
                                   new_ip)
1120

    
1121
    self.op.name = new_name
1122

    
1123
  def Exec(self, feedback_fn):
1124
    """Rename the cluster.
1125

1126
    """
1127
    clustername = self.op.name
1128
    ip = self.ip
1129
    ss = self.sstore
1130

    
1131
    # shutdown the master IP
1132
    master = ss.GetMasterNode()
1133
    if not rpc.call_node_stop_master(master):
1134
      raise errors.OpExecError("Could not disable the master role")
1135

    
1136
    try:
1137
      # modify the sstore
1138
      ss.SetKey(ss.SS_MASTER_IP, ip)
1139
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1140

    
1141
      # Distribute updated ss config to all nodes
1142
      myself = self.cfg.GetNodeInfo(master)
1143
      dist_nodes = self.cfg.GetNodeList()
1144
      if myself.name in dist_nodes:
1145
        dist_nodes.remove(myself.name)
1146

    
1147
      logger.Debug("Copying updated ssconf data to all nodes")
1148
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1149
        fname = ss.KeyToFilename(keyname)
1150
        result = rpc.call_upload_file(dist_nodes, fname)
1151
        for to_node in dist_nodes:
1152
          if not result[to_node]:
1153
            logger.Error("copy of file %s to node %s failed" %
1154
                         (fname, to_node))
1155
    finally:
1156
      if not rpc.call_node_start_master(master):
1157
        logger.Error("Could not re-enable the master role on the master,"
1158
                     " please restart manually.")
1159

    
1160

    
1161
def _RecursiveCheckIfLVMBased(disk):
1162
  """Check if the given disk or its children are lvm-based.
1163

1164
  Args:
1165
    disk: ganeti.objects.Disk object
1166

1167
  Returns:
1168
    boolean indicating whether a LD_LV dev_type was found or not
1169

1170
  """
1171
  if disk.children:
1172
    for chdisk in disk.children:
1173
      if _RecursiveCheckIfLVMBased(chdisk):
1174
        return True
1175
  return disk.dev_type == constants.LD_LV
1176

    
1177

    
1178
class LUSetClusterParams(LogicalUnit):
1179
  """Change the parameters of the cluster.
1180

1181
  """
1182
  HPATH = "cluster-modify"
1183
  HTYPE = constants.HTYPE_CLUSTER
1184
  _OP_REQP = []
1185

    
1186
  def BuildHooksEnv(self):
1187
    """Build hooks env.
1188

1189
    """
1190
    env = {
1191
      "OP_TARGET": self.sstore.GetClusterName(),
1192
      "NEW_VG_NAME": self.op.vg_name,
1193
      }
1194
    mn = self.sstore.GetMasterNode()
1195
    return env, [mn], [mn]
1196

    
1197
  def CheckPrereq(self):
1198
    """Check prerequisites.
1199

1200
    This checks whether the given params don't conflict and
1201
    if the given volume group is valid.
1202

1203
    """
1204
    if not self.op.vg_name:
1205
      instances = [self.cfg.GetInstanceInfo(name)
1206
                   for name in self.cfg.GetInstanceList()]
1207
      for inst in instances:
1208
        for disk in inst.disks:
1209
          if _RecursiveCheckIfLVMBased(disk):
1210
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1211
                                       " lvm-based instances exist")
1212

    
1213
    # if vg_name not None, checks given volume group on all nodes
1214
    if self.op.vg_name:
1215
      node_list = self.cfg.GetNodeList()
1216
      vglist = rpc.call_vg_list(node_list)
1217
      for node in node_list:
1218
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1219
        if vgstatus:
1220
          raise errors.OpPrereqError("Error on node '%s': %s" %
1221
                                     (node, vgstatus))
1222

    
1223
  def Exec(self, feedback_fn):
1224
    """Change the parameters of the cluster.
1225

1226
    """
1227
    if self.op.vg_name != self.cfg.GetVGName():
1228
      self.cfg.SetVGName(self.op.vg_name)
1229
    else:
1230
      feedback_fn("Cluster LVM configuration already in desired"
1231
                  " state, not changing")
1232

    
1233

    
1234
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1235
  """Sleep and poll for an instance's disk to sync.
1236

1237
  """
1238
  if not instance.disks:
1239
    return True
1240

    
1241
  if not oneshot:
1242
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1243

    
1244
  node = instance.primary_node
1245

    
1246
  for dev in instance.disks:
1247
    cfgw.SetDiskID(dev, node)
1248

    
1249
  retries = 0
1250
  while True:
1251
    max_time = 0
1252
    done = True
1253
    cumul_degraded = False
1254
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1255
    if not rstats:
1256
      proc.LogWarning("Can't get any data from node %s" % node)
1257
      retries += 1
1258
      if retries >= 10:
1259
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1260
                                 " aborting." % node)
1261
      time.sleep(6)
1262
      continue
1263
    retries = 0
1264
    for i in range(len(rstats)):
1265
      mstat = rstats[i]
1266
      if mstat is None:
1267
        proc.LogWarning("Can't compute data for node %s/%s" %
1268
                        (node, instance.disks[i].iv_name))
1269
        continue
1270
      # we ignore the ldisk parameter
1271
      perc_done, est_time, is_degraded, _ = mstat
1272
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1273
      if perc_done is not None:
1274
        done = False
1275
        if est_time is not None:
1276
          rem_time = "%d estimated seconds remaining" % est_time
1277
          max_time = est_time
1278
        else:
1279
          rem_time = "no time estimate"
1280
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1281
                     (instance.disks[i].iv_name, perc_done, rem_time))
1282
    if done or oneshot:
1283
      break
1284

    
1285
    if unlock:
1286
      #utils.Unlock('cmd')
1287
      pass
1288
    try:
1289
      time.sleep(min(60, max_time))
1290
    finally:
1291
      if unlock:
1292
        #utils.Lock('cmd')
1293
        pass
1294

    
1295
  if done:
1296
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1297
  return not cumul_degraded
1298

    
1299

    
1300
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1301
  """Check that mirrors are not degraded.
1302

1303
  The ldisk parameter, if True, will change the test from the
1304
  is_degraded attribute (which represents overall non-ok status for
1305
  the device(s)) to the ldisk (representing the local storage status).
1306

1307
  """
1308
  cfgw.SetDiskID(dev, node)
1309
  if ldisk:
1310
    idx = 6
1311
  else:
1312
    idx = 5
1313

    
1314
  result = True
1315
  if on_primary or dev.AssembleOnSecondary():
1316
    rstats = rpc.call_blockdev_find(node, dev)
1317
    if not rstats:
1318
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1319
      result = False
1320
    else:
1321
      result = result and (not rstats[idx])
1322
  if dev.children:
1323
    for child in dev.children:
1324
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1325

    
1326
  return result
1327

    
1328

    
1329
class LUDiagnoseOS(NoHooksLU):
1330
  """Logical unit for OS diagnose/query.
1331

1332
  """
1333
  _OP_REQP = ["output_fields", "names"]
1334

    
1335
  def CheckPrereq(self):
1336
    """Check prerequisites.
1337

1338
    This always succeeds, since this is a pure query LU.
1339

1340
    """
1341
    if self.op.names:
1342
      raise errors.OpPrereqError("Selective OS query not supported")
1343

    
1344
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1345
    _CheckOutputFields(static=[],
1346
                       dynamic=self.dynamic_fields,
1347
                       selected=self.op.output_fields)
1348

    
1349
  @staticmethod
1350
  def _DiagnoseByOS(node_list, rlist):
1351
    """Remaps a per-node return list into an a per-os per-node dictionary
1352

1353
      Args:
1354
        node_list: a list with the names of all nodes
1355
        rlist: a map with node names as keys and OS objects as values
1356

1357
      Returns:
1358
        map: a map with osnames as keys and as value another map, with
1359
             nodes as
1360
             keys and list of OS objects as values
1361
             e.g. {"debian-etch": {"node1": [<object>,...],
1362
                                   "node2": [<object>,]}
1363
                  }
1364

1365
    """
1366
    all_os = {}
1367
    for node_name, nr in rlist.iteritems():
1368
      if not nr:
1369
        continue
1370
      for os_obj in nr:
1371
        if os_obj.name not in all_os:
1372
          # build a list of nodes for this os containing empty lists
1373
          # for each node in node_list
1374
          all_os[os_obj.name] = {}
1375
          for nname in node_list:
1376
            all_os[os_obj.name][nname] = []
1377
        all_os[os_obj.name][node_name].append(os_obj)
1378
    return all_os
1379

    
1380
  def Exec(self, feedback_fn):
1381
    """Compute the list of OSes.
1382

1383
    """
1384
    node_list = self.cfg.GetNodeList()
1385
    node_data = rpc.call_os_diagnose(node_list)
1386
    if node_data == False:
1387
      raise errors.OpExecError("Can't gather the list of OSes")
1388
    pol = self._DiagnoseByOS(node_list, node_data)
1389
    output = []
1390
    for os_name, os_data in pol.iteritems():
1391
      row = []
1392
      for field in self.op.output_fields:
1393
        if field == "name":
1394
          val = os_name
1395
        elif field == "valid":
1396
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1397
        elif field == "node_status":
1398
          val = {}
1399
          for node_name, nos_list in os_data.iteritems():
1400
            val[node_name] = [(v.status, v.path) for v in nos_list]
1401
        else:
1402
          raise errors.ParameterError(field)
1403
        row.append(val)
1404
      output.append(row)
1405

    
1406
    return output
1407

    
1408

    
1409
class LURemoveNode(LogicalUnit):
1410
  """Logical unit for removing a node.
1411

1412
  """
1413
  HPATH = "node-remove"
1414
  HTYPE = constants.HTYPE_NODE
1415
  _OP_REQP = ["node_name"]
1416

    
1417
  def BuildHooksEnv(self):
1418
    """Build hooks env.
1419

1420
    This doesn't run on the target node in the pre phase as a failed
1421
    node would not allows itself to run.
1422

1423
    """
1424
    env = {
1425
      "OP_TARGET": self.op.node_name,
1426
      "NODE_NAME": self.op.node_name,
1427
      }
1428
    all_nodes = self.cfg.GetNodeList()
1429
    all_nodes.remove(self.op.node_name)
1430
    return env, all_nodes, all_nodes
1431

    
1432
  def CheckPrereq(self):
1433
    """Check prerequisites.
1434

1435
    This checks:
1436
     - the node exists in the configuration
1437
     - it does not have primary or secondary instances
1438
     - it's not the master
1439

1440
    Any errors are signalled by raising errors.OpPrereqError.
1441

1442
    """
1443
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1444
    if node is None:
1445
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1446

    
1447
    instance_list = self.cfg.GetInstanceList()
1448

    
1449
    masternode = self.sstore.GetMasterNode()
1450
    if node.name == masternode:
1451
      raise errors.OpPrereqError("Node is the master node,"
1452
                                 " you need to failover first.")
1453

    
1454
    for instance_name in instance_list:
1455
      instance = self.cfg.GetInstanceInfo(instance_name)
1456
      if node.name == instance.primary_node:
1457
        raise errors.OpPrereqError("Instance %s still running on the node,"
1458
                                   " please remove first." % instance_name)
1459
      if node.name in instance.secondary_nodes:
1460
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1461
                                   " please remove first." % instance_name)
1462
    self.op.node_name = node.name
1463
    self.node = node
1464

    
1465
  def Exec(self, feedback_fn):
1466
    """Removes the node from the cluster.
1467

1468
    """
1469
    node = self.node
1470
    logger.Info("stopping the node daemon and removing configs from node %s" %
1471
                node.name)
1472

    
1473
    rpc.call_node_leave_cluster(node.name)
1474

    
1475
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1476

    
1477
    logger.Info("Removing node %s from config" % node.name)
1478

    
1479
    self.cfg.RemoveNode(node.name)
1480

    
1481
    _RemoveHostFromEtcHosts(node.name)
1482

    
1483

    
1484
class LUQueryNodes(NoHooksLU):
1485
  """Logical unit for querying nodes.
1486

1487
  """
1488
  _OP_REQP = ["output_fields", "names"]
1489

    
1490
  def CheckPrereq(self):
1491
    """Check prerequisites.
1492

1493
    This checks that the fields required are valid output fields.
1494

1495
    """
1496
    self.dynamic_fields = frozenset([
1497
      "dtotal", "dfree",
1498
      "mtotal", "mnode", "mfree",
1499
      "bootid",
1500
      "ctotal",
1501
      ])
1502

    
1503
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1504
                               "pinst_list", "sinst_list",
1505
                               "pip", "sip"],
1506
                       dynamic=self.dynamic_fields,
1507
                       selected=self.op.output_fields)
1508

    
1509
    self.wanted = _GetWantedNodes(self, self.op.names)
1510

    
1511
  def Exec(self, feedback_fn):
1512
    """Computes the list of nodes and their attributes.
1513

1514
    """
1515
    nodenames = self.wanted
1516
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1517

    
1518
    # begin data gathering
1519

    
1520
    if self.dynamic_fields.intersection(self.op.output_fields):
1521
      live_data = {}
1522
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1523
      for name in nodenames:
1524
        nodeinfo = node_data.get(name, None)
1525
        if nodeinfo:
1526
          live_data[name] = {
1527
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1528
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1529
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1530
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1531
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1532
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1533
            "bootid": nodeinfo['bootid'],
1534
            }
1535
        else:
1536
          live_data[name] = {}
1537
    else:
1538
      live_data = dict.fromkeys(nodenames, {})
1539

    
1540
    node_to_primary = dict([(name, set()) for name in nodenames])
1541
    node_to_secondary = dict([(name, set()) for name in nodenames])
1542

    
1543
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1544
                             "sinst_cnt", "sinst_list"))
1545
    if inst_fields & frozenset(self.op.output_fields):
1546
      instancelist = self.cfg.GetInstanceList()
1547

    
1548
      for instance_name in instancelist:
1549
        inst = self.cfg.GetInstanceInfo(instance_name)
1550
        if inst.primary_node in node_to_primary:
1551
          node_to_primary[inst.primary_node].add(inst.name)
1552
        for secnode in inst.secondary_nodes:
1553
          if secnode in node_to_secondary:
1554
            node_to_secondary[secnode].add(inst.name)
1555

    
1556
    # end data gathering
1557

    
1558
    output = []
1559
    for node in nodelist:
1560
      node_output = []
1561
      for field in self.op.output_fields:
1562
        if field == "name":
1563
          val = node.name
1564
        elif field == "pinst_list":
1565
          val = list(node_to_primary[node.name])
1566
        elif field == "sinst_list":
1567
          val = list(node_to_secondary[node.name])
1568
        elif field == "pinst_cnt":
1569
          val = len(node_to_primary[node.name])
1570
        elif field == "sinst_cnt":
1571
          val = len(node_to_secondary[node.name])
1572
        elif field == "pip":
1573
          val = node.primary_ip
1574
        elif field == "sip":
1575
          val = node.secondary_ip
1576
        elif field in self.dynamic_fields:
1577
          val = live_data[node.name].get(field, None)
1578
        else:
1579
          raise errors.ParameterError(field)
1580
        node_output.append(val)
1581
      output.append(node_output)
1582

    
1583
    return output
1584

    
1585

    
1586
class LUQueryNodeVolumes(NoHooksLU):
1587
  """Logical unit for getting volumes on node(s).
1588

1589
  """
1590
  _OP_REQP = ["nodes", "output_fields"]
1591

    
1592
  def CheckPrereq(self):
1593
    """Check prerequisites.
1594

1595
    This checks that the fields required are valid output fields.
1596

1597
    """
1598
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1599

    
1600
    _CheckOutputFields(static=["node"],
1601
                       dynamic=["phys", "vg", "name", "size", "instance"],
1602
                       selected=self.op.output_fields)
1603

    
1604

    
1605
  def Exec(self, feedback_fn):
1606
    """Computes the list of nodes and their attributes.
1607

1608
    """
1609
    nodenames = self.nodes
1610
    volumes = rpc.call_node_volumes(nodenames)
1611

    
1612
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1613
             in self.cfg.GetInstanceList()]
1614

    
1615
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1616

    
1617
    output = []
1618
    for node in nodenames:
1619
      if node not in volumes or not volumes[node]:
1620
        continue
1621

    
1622
      node_vols = volumes[node][:]
1623
      node_vols.sort(key=lambda vol: vol['dev'])
1624

    
1625
      for vol in node_vols:
1626
        node_output = []
1627
        for field in self.op.output_fields:
1628
          if field == "node":
1629
            val = node
1630
          elif field == "phys":
1631
            val = vol['dev']
1632
          elif field == "vg":
1633
            val = vol['vg']
1634
          elif field == "name":
1635
            val = vol['name']
1636
          elif field == "size":
1637
            val = int(float(vol['size']))
1638
          elif field == "instance":
1639
            for inst in ilist:
1640
              if node not in lv_by_node[inst]:
1641
                continue
1642
              if vol['name'] in lv_by_node[inst][node]:
1643
                val = inst.name
1644
                break
1645
            else:
1646
              val = '-'
1647
          else:
1648
            raise errors.ParameterError(field)
1649
          node_output.append(str(val))
1650

    
1651
        output.append(node_output)
1652

    
1653
    return output
1654

    
1655

    
1656
class LUAddNode(LogicalUnit):
1657
  """Logical unit for adding node to the cluster.
1658

1659
  """
1660
  HPATH = "node-add"
1661
  HTYPE = constants.HTYPE_NODE
1662
  _OP_REQP = ["node_name"]
1663

    
1664
  def BuildHooksEnv(self):
1665
    """Build hooks env.
1666

1667
    This will run on all nodes before, and on all nodes + the new node after.
1668

1669
    """
1670
    env = {
1671
      "OP_TARGET": self.op.node_name,
1672
      "NODE_NAME": self.op.node_name,
1673
      "NODE_PIP": self.op.primary_ip,
1674
      "NODE_SIP": self.op.secondary_ip,
1675
      }
1676
    nodes_0 = self.cfg.GetNodeList()
1677
    nodes_1 = nodes_0 + [self.op.node_name, ]
1678
    return env, nodes_0, nodes_1
1679

    
1680
  def CheckPrereq(self):
1681
    """Check prerequisites.
1682

1683
    This checks:
1684
     - the new node is not already in the config
1685
     - it is resolvable
1686
     - its parameters (single/dual homed) matches the cluster
1687

1688
    Any errors are signalled by raising errors.OpPrereqError.
1689

1690
    """
1691
    node_name = self.op.node_name
1692
    cfg = self.cfg
1693

    
1694
    dns_data = utils.HostInfo(node_name)
1695

    
1696
    node = dns_data.name
1697
    primary_ip = self.op.primary_ip = dns_data.ip
1698
    secondary_ip = getattr(self.op, "secondary_ip", None)
1699
    if secondary_ip is None:
1700
      secondary_ip = primary_ip
1701
    if not utils.IsValidIP(secondary_ip):
1702
      raise errors.OpPrereqError("Invalid secondary IP given")
1703
    self.op.secondary_ip = secondary_ip
1704

    
1705
    node_list = cfg.GetNodeList()
1706
    if not self.op.readd and node in node_list:
1707
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1708
                                 node)
1709
    elif self.op.readd and node not in node_list:
1710
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1711

    
1712
    for existing_node_name in node_list:
1713
      existing_node = cfg.GetNodeInfo(existing_node_name)
1714

    
1715
      if self.op.readd and node == existing_node_name:
1716
        if (existing_node.primary_ip != primary_ip or
1717
            existing_node.secondary_ip != secondary_ip):
1718
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1719
                                     " address configuration as before")
1720
        continue
1721

    
1722
      if (existing_node.primary_ip == primary_ip or
1723
          existing_node.secondary_ip == primary_ip or
1724
          existing_node.primary_ip == secondary_ip or
1725
          existing_node.secondary_ip == secondary_ip):
1726
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1727
                                   " existing node %s" % existing_node.name)
1728

    
1729
    # check that the type of the node (single versus dual homed) is the
1730
    # same as for the master
1731
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1732
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1733
    newbie_singlehomed = secondary_ip == primary_ip
1734
    if master_singlehomed != newbie_singlehomed:
1735
      if master_singlehomed:
1736
        raise errors.OpPrereqError("The master has no private ip but the"
1737
                                   " new node has one")
1738
      else:
1739
        raise errors.OpPrereqError("The master has a private ip but the"
1740
                                   " new node doesn't have one")
1741

    
1742
    # checks reachablity
1743
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1744
      raise errors.OpPrereqError("Node not reachable by ping")
1745

    
1746
    if not newbie_singlehomed:
1747
      # check reachability from my secondary ip to newbie's secondary ip
1748
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1749
                           source=myself.secondary_ip):
1750
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1751
                                   " based ping to noded port")
1752

    
1753
    self.new_node = objects.Node(name=node,
1754
                                 primary_ip=primary_ip,
1755
                                 secondary_ip=secondary_ip)
1756

    
1757
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1758
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1759
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1760
                                   constants.VNC_PASSWORD_FILE)
1761

    
1762
  def Exec(self, feedback_fn):
1763
    """Adds the new node to the cluster.
1764

1765
    """
1766
    new_node = self.new_node
1767
    node = new_node.name
1768

    
1769
    # set up inter-node password and certificate and restarts the node daemon
1770
    gntpass = self.sstore.GetNodeDaemonPassword()
1771
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1772
      raise errors.OpExecError("ganeti password corruption detected")
1773
    f = open(constants.SSL_CERT_FILE)
1774
    try:
1775
      gntpem = f.read(8192)
1776
    finally:
1777
      f.close()
1778
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1779
    # so we use this to detect an invalid certificate; as long as the
1780
    # cert doesn't contain this, the here-document will be correctly
1781
    # parsed by the shell sequence below
1782
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1783
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1784
    if not gntpem.endswith("\n"):
1785
      raise errors.OpExecError("PEM must end with newline")
1786
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1787

    
1788
    # and then connect with ssh to set password and start ganeti-noded
1789
    # note that all the below variables are sanitized at this point,
1790
    # either by being constants or by the checks above
1791
    ss = self.sstore
1792
    mycommand = ("umask 077 && "
1793
                 "echo '%s' > '%s' && "
1794
                 "cat > '%s' << '!EOF.' && \n"
1795
                 "%s!EOF.\n%s restart" %
1796
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1797
                  constants.SSL_CERT_FILE, gntpem,
1798
                  constants.NODE_INITD_SCRIPT))
1799

    
1800
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1801
    if result.failed:
1802
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1803
                               " output: %s" %
1804
                               (node, result.fail_reason, result.output))
1805

    
1806
    # check connectivity
1807
    time.sleep(4)
1808

    
1809
    result = rpc.call_version([node])[node]
1810
    if result:
1811
      if constants.PROTOCOL_VERSION == result:
1812
        logger.Info("communication to node %s fine, sw version %s match" %
1813
                    (node, result))
1814
      else:
1815
        raise errors.OpExecError("Version mismatch master version %s,"
1816
                                 " node version %s" %
1817
                                 (constants.PROTOCOL_VERSION, result))
1818
    else:
1819
      raise errors.OpExecError("Cannot get version from the new node")
1820

    
1821
    # setup ssh on node
1822
    logger.Info("copy ssh key to node %s" % node)
1823
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1824
    keyarray = []
1825
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1826
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1827
                priv_key, pub_key]
1828

    
1829
    for i in keyfiles:
1830
      f = open(i, 'r')
1831
      try:
1832
        keyarray.append(f.read())
1833
      finally:
1834
        f.close()
1835

    
1836
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1837
                               keyarray[3], keyarray[4], keyarray[5])
1838

    
1839
    if not result:
1840
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1841

    
1842
    # Add node to our /etc/hosts, and add key to known_hosts
1843
    _AddHostToEtcHosts(new_node.name)
1844

    
1845
    if new_node.secondary_ip != new_node.primary_ip:
1846
      if not rpc.call_node_tcp_ping(new_node.name,
1847
                                    constants.LOCALHOST_IP_ADDRESS,
1848
                                    new_node.secondary_ip,
1849
                                    constants.DEFAULT_NODED_PORT,
1850
                                    10, False):
1851
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1852
                                 " you gave (%s). Please fix and re-run this"
1853
                                 " command." % new_node.secondary_ip)
1854

    
1855
    success, msg = self.ssh.VerifyNodeHostname(node)
1856
    if not success:
1857
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1858
                               " than the one the resolver gives: %s."
1859
                               " Please fix and re-run this command." %
1860
                               (node, msg))
1861

    
1862
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1863
    # including the node just added
1864
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1865
    dist_nodes = self.cfg.GetNodeList()
1866
    if not self.op.readd:
1867
      dist_nodes.append(node)
1868
    if myself.name in dist_nodes:
1869
      dist_nodes.remove(myself.name)
1870

    
1871
    logger.Debug("Copying hosts and known_hosts to all nodes")
1872
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1873
      result = rpc.call_upload_file(dist_nodes, fname)
1874
      for to_node in dist_nodes:
1875
        if not result[to_node]:
1876
          logger.Error("copy of file %s to node %s failed" %
1877
                       (fname, to_node))
1878

    
1879
    to_copy = ss.GetFileList()
1880
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1881
      to_copy.append(constants.VNC_PASSWORD_FILE)
1882
    for fname in to_copy:
1883
      if not self.ssh.CopyFileToNode(node, fname):
1884
        logger.Error("could not copy file %s to node %s" % (fname, node))
1885

    
1886
    if not self.op.readd:
1887
      logger.Info("adding node %s to cluster.conf" % node)
1888
      self.cfg.AddNode(new_node)
1889

    
1890

    
1891
class LUMasterFailover(LogicalUnit):
1892
  """Failover the master node to the current node.
1893

1894
  This is a special LU in that it must run on a non-master node.
1895

1896
  """
1897
  HPATH = "master-failover"
1898
  HTYPE = constants.HTYPE_CLUSTER
1899
  REQ_MASTER = False
1900
  _OP_REQP = []
1901

    
1902
  def BuildHooksEnv(self):
1903
    """Build hooks env.
1904

1905
    This will run on the new master only in the pre phase, and on all
1906
    the nodes in the post phase.
1907

1908
    """
1909
    env = {
1910
      "OP_TARGET": self.new_master,
1911
      "NEW_MASTER": self.new_master,
1912
      "OLD_MASTER": self.old_master,
1913
      }
1914
    return env, [self.new_master], self.cfg.GetNodeList()
1915

    
1916
  def CheckPrereq(self):
1917
    """Check prerequisites.
1918

1919
    This checks that we are not already the master.
1920

1921
    """
1922
    self.new_master = utils.HostInfo().name
1923
    self.old_master = self.sstore.GetMasterNode()
1924

    
1925
    if self.old_master == self.new_master:
1926
      raise errors.OpPrereqError("This commands must be run on the node"
1927
                                 " where you want the new master to be."
1928
                                 " %s is already the master" %
1929
                                 self.old_master)
1930

    
1931
  def Exec(self, feedback_fn):
1932
    """Failover the master node.
1933

1934
    This command, when run on a non-master node, will cause the current
1935
    master to cease being master, and the non-master to become new
1936
    master.
1937

1938
    """
1939
    #TODO: do not rely on gethostname returning the FQDN
1940
    logger.Info("setting master to %s, old master: %s" %
1941
                (self.new_master, self.old_master))
1942

    
1943
    if not rpc.call_node_stop_master(self.old_master):
1944
      logger.Error("could disable the master role on the old master"
1945
                   " %s, please disable manually" % self.old_master)
1946

    
1947
    ss = self.sstore
1948
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1949
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1950
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1951
      logger.Error("could not distribute the new simple store master file"
1952
                   " to the other nodes, please check.")
1953

    
1954
    if not rpc.call_node_start_master(self.new_master):
1955
      logger.Error("could not start the master role on the new master"
1956
                   " %s, please check" % self.new_master)
1957
      feedback_fn("Error in activating the master IP on the new master,"
1958
                  " please fix manually.")
1959

    
1960

    
1961

    
1962
class LUQueryClusterInfo(NoHooksLU):
1963
  """Query cluster configuration.
1964

1965
  """
1966
  _OP_REQP = []
1967
  REQ_MASTER = False
1968

    
1969
  def CheckPrereq(self):
1970
    """No prerequsites needed for this LU.
1971

1972
    """
1973
    pass
1974

    
1975
  def Exec(self, feedback_fn):
1976
    """Return cluster config.
1977

1978
    """
1979
    result = {
1980
      "name": self.sstore.GetClusterName(),
1981
      "software_version": constants.RELEASE_VERSION,
1982
      "protocol_version": constants.PROTOCOL_VERSION,
1983
      "config_version": constants.CONFIG_VERSION,
1984
      "os_api_version": constants.OS_API_VERSION,
1985
      "export_version": constants.EXPORT_VERSION,
1986
      "master": self.sstore.GetMasterNode(),
1987
      "architecture": (platform.architecture()[0], platform.machine()),
1988
      }
1989

    
1990
    return result
1991

    
1992

    
1993
class LUClusterCopyFile(NoHooksLU):
1994
  """Copy file to cluster.
1995

1996
  """
1997
  _OP_REQP = ["nodes", "filename"]
1998

    
1999
  def CheckPrereq(self):
2000
    """Check prerequisites.
2001

2002
    It should check that the named file exists and that the given list
2003
    of nodes is valid.
2004

2005
    """
2006
    if not os.path.exists(self.op.filename):
2007
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2008

    
2009
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2010

    
2011
  def Exec(self, feedback_fn):
2012
    """Copy a file from master to some nodes.
2013

2014
    Args:
2015
      opts - class with options as members
2016
      args - list containing a single element, the file name
2017
    Opts used:
2018
      nodes - list containing the name of target nodes; if empty, all nodes
2019

2020
    """
2021
    filename = self.op.filename
2022

    
2023
    myname = utils.HostInfo().name
2024

    
2025
    for node in self.nodes:
2026
      if node == myname:
2027
        continue
2028
      if not self.ssh.CopyFileToNode(node, filename):
2029
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2030

    
2031

    
2032
class LUDumpClusterConfig(NoHooksLU):
2033
  """Return a text-representation of the cluster-config.
2034

2035
  """
2036
  _OP_REQP = []
2037

    
2038
  def CheckPrereq(self):
2039
    """No prerequisites.
2040

2041
    """
2042
    pass
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Dump a representation of the cluster config to the standard output.
2046

2047
    """
2048
    return self.cfg.DumpConfig()
2049

    
2050

    
2051
class LURunClusterCommand(NoHooksLU):
2052
  """Run a command on some nodes.
2053

2054
  """
2055
  _OP_REQP = ["command", "nodes"]
2056

    
2057
  def CheckPrereq(self):
2058
    """Check prerequisites.
2059

2060
    It checks that the given list of nodes is valid.
2061

2062
    """
2063
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2064

    
2065
  def Exec(self, feedback_fn):
2066
    """Run a command on some nodes.
2067

2068
    """
2069
    # put the master at the end of the nodes list
2070
    master_node = self.sstore.GetMasterNode()
2071
    if master_node in self.nodes:
2072
      self.nodes.remove(master_node)
2073
      self.nodes.append(master_node)
2074

    
2075
    data = []
2076
    for node in self.nodes:
2077
      result = self.ssh.Run(node, "root", self.op.command)
2078
      data.append((node, result.output, result.exit_code))
2079

    
2080
    return data
2081

    
2082

    
2083
class LUActivateInstanceDisks(NoHooksLU):
2084
  """Bring up an instance's disks.
2085

2086
  """
2087
  _OP_REQP = ["instance_name"]
2088

    
2089
  def CheckPrereq(self):
2090
    """Check prerequisites.
2091

2092
    This checks that the instance is in the cluster.
2093

2094
    """
2095
    instance = self.cfg.GetInstanceInfo(
2096
      self.cfg.ExpandInstanceName(self.op.instance_name))
2097
    if instance is None:
2098
      raise errors.OpPrereqError("Instance '%s' not known" %
2099
                                 self.op.instance_name)
2100
    self.instance = instance
2101

    
2102

    
2103
  def Exec(self, feedback_fn):
2104
    """Activate the disks.
2105

2106
    """
2107
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2108
    if not disks_ok:
2109
      raise errors.OpExecError("Cannot activate block devices")
2110

    
2111
    return disks_info
2112

    
2113

    
2114
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2115
  """Prepare the block devices for an instance.
2116

2117
  This sets up the block devices on all nodes.
2118

2119
  Args:
2120
    instance: a ganeti.objects.Instance object
2121
    ignore_secondaries: if true, errors on secondary nodes won't result
2122
                        in an error return from the function
2123

2124
  Returns:
2125
    false if the operation failed
2126
    list of (host, instance_visible_name, node_visible_name) if the operation
2127
         suceeded with the mapping from node devices to instance devices
2128
  """
2129
  device_info = []
2130
  disks_ok = True
2131
  iname = instance.name
2132
  # With the two passes mechanism we try to reduce the window of
2133
  # opportunity for the race condition of switching DRBD to primary
2134
  # before handshaking occured, but we do not eliminate it
2135

    
2136
  # The proper fix would be to wait (with some limits) until the
2137
  # connection has been made and drbd transitions from WFConnection
2138
  # into any other network-connected state (Connected, SyncTarget,
2139
  # SyncSource, etc.)
2140

    
2141
  # 1st pass, assemble on all nodes in secondary mode
2142
  for inst_disk in instance.disks:
2143
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2144
      cfg.SetDiskID(node_disk, node)
2145
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2146
      if not result:
2147
        logger.Error("could not prepare block device %s on node %s"
2148
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2149
        if not ignore_secondaries:
2150
          disks_ok = False
2151

    
2152
  # FIXME: race condition on drbd migration to primary
2153

    
2154
  # 2nd pass, do only the primary node
2155
  for inst_disk in instance.disks:
2156
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2157
      if node != instance.primary_node:
2158
        continue
2159
      cfg.SetDiskID(node_disk, node)
2160
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2161
      if not result:
2162
        logger.Error("could not prepare block device %s on node %s"
2163
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2164
        disks_ok = False
2165
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2166

    
2167
  # leave the disks configured for the primary node
2168
  # this is a workaround that would be fixed better by
2169
  # improving the logical/physical id handling
2170
  for disk in instance.disks:
2171
    cfg.SetDiskID(disk, instance.primary_node)
2172

    
2173
  return disks_ok, device_info
2174

    
2175

    
2176
def _StartInstanceDisks(cfg, instance, force):
2177
  """Start the disks of an instance.
2178

2179
  """
2180
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2181
                                           ignore_secondaries=force)
2182
  if not disks_ok:
2183
    _ShutdownInstanceDisks(instance, cfg)
2184
    if force is not None and not force:
2185
      logger.Error("If the message above refers to a secondary node,"
2186
                   " you can retry the operation using '--force'.")
2187
    raise errors.OpExecError("Disk consistency error")
2188

    
2189

    
2190
class LUDeactivateInstanceDisks(NoHooksLU):
2191
  """Shutdown an instance's disks.
2192

2193
  """
2194
  _OP_REQP = ["instance_name"]
2195

    
2196
  def CheckPrereq(self):
2197
    """Check prerequisites.
2198

2199
    This checks that the instance is in the cluster.
2200

2201
    """
2202
    instance = self.cfg.GetInstanceInfo(
2203
      self.cfg.ExpandInstanceName(self.op.instance_name))
2204
    if instance is None:
2205
      raise errors.OpPrereqError("Instance '%s' not known" %
2206
                                 self.op.instance_name)
2207
    self.instance = instance
2208

    
2209
  def Exec(self, feedback_fn):
2210
    """Deactivate the disks
2211

2212
    """
2213
    instance = self.instance
2214
    ins_l = rpc.call_instance_list([instance.primary_node])
2215
    ins_l = ins_l[instance.primary_node]
2216
    if not type(ins_l) is list:
2217
      raise errors.OpExecError("Can't contact node '%s'" %
2218
                               instance.primary_node)
2219

    
2220
    if self.instance.name in ins_l:
2221
      raise errors.OpExecError("Instance is running, can't shutdown"
2222
                               " block devices.")
2223

    
2224
    _ShutdownInstanceDisks(instance, self.cfg)
2225

    
2226

    
2227
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2228
  """Shutdown block devices of an instance.
2229

2230
  This does the shutdown on all nodes of the instance.
2231

2232
  If the ignore_primary is false, errors on the primary node are
2233
  ignored.
2234

2235
  """
2236
  result = True
2237
  for disk in instance.disks:
2238
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2239
      cfg.SetDiskID(top_disk, node)
2240
      if not rpc.call_blockdev_shutdown(node, top_disk):
2241
        logger.Error("could not shutdown block device %s on node %s" %
2242
                     (disk.iv_name, node))
2243
        if not ignore_primary or node != instance.primary_node:
2244
          result = False
2245
  return result
2246

    
2247

    
2248
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2249
  """Checks if a node has enough free memory.
2250

2251
  This function check if a given node has the needed amount of free
2252
  memory. In case the node has less memory or we cannot get the
2253
  information from the node, this function raise an OpPrereqError
2254
  exception.
2255

2256
  Args:
2257
    - cfg: a ConfigWriter instance
2258
    - node: the node name
2259
    - reason: string to use in the error message
2260
    - requested: the amount of memory in MiB
2261

2262
  """
2263
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2264
  if not nodeinfo or not isinstance(nodeinfo, dict):
2265
    raise errors.OpPrereqError("Could not contact node %s for resource"
2266
                             " information" % (node,))
2267

    
2268
  free_mem = nodeinfo[node].get('memory_free')
2269
  if not isinstance(free_mem, int):
2270
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271
                             " was '%s'" % (node, free_mem))
2272
  if requested > free_mem:
2273
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274
                             " needed %s MiB, available %s MiB" %
2275
                             (node, reason, requested, free_mem))
2276

    
2277

    
2278
class LUStartupInstance(LogicalUnit):
2279
  """Starts an instance.
2280

2281
  """
2282
  HPATH = "instance-start"
2283
  HTYPE = constants.HTYPE_INSTANCE
2284
  _OP_REQP = ["instance_name", "force"]
2285

    
2286
  def BuildHooksEnv(self):
2287
    """Build hooks env.
2288

2289
    This runs on master, primary and secondary nodes of the instance.
2290

2291
    """
2292
    env = {
2293
      "FORCE": self.op.force,
2294
      }
2295
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2296
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297
          list(self.instance.secondary_nodes))
2298
    return env, nl, nl
2299

    
2300
  def CheckPrereq(self):
2301
    """Check prerequisites.
2302

2303
    This checks that the instance is in the cluster.
2304

2305
    """
2306
    instance = self.cfg.GetInstanceInfo(
2307
      self.cfg.ExpandInstanceName(self.op.instance_name))
2308
    if instance is None:
2309
      raise errors.OpPrereqError("Instance '%s' not known" %
2310
                                 self.op.instance_name)
2311

    
2312
    # check bridges existance
2313
    _CheckInstanceBridgesExist(instance)
2314

    
2315
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316
                         "starting instance %s" % instance.name,
2317
                         instance.memory)
2318

    
2319
    self.instance = instance
2320
    self.op.instance_name = instance.name
2321

    
2322
  def Exec(self, feedback_fn):
2323
    """Start the instance.
2324

2325
    """
2326
    instance = self.instance
2327
    force = self.op.force
2328
    extra_args = getattr(self.op, "extra_args", "")
2329

    
2330
    self.cfg.MarkInstanceUp(instance.name)
2331

    
2332
    node_current = instance.primary_node
2333

    
2334
    _StartInstanceDisks(self.cfg, instance, force)
2335

    
2336
    if not rpc.call_instance_start(node_current, instance, extra_args):
2337
      _ShutdownInstanceDisks(instance, self.cfg)
2338
      raise errors.OpExecError("Could not start instance")
2339

    
2340

    
2341
class LURebootInstance(LogicalUnit):
2342
  """Reboot an instance.
2343

2344
  """
2345
  HPATH = "instance-reboot"
2346
  HTYPE = constants.HTYPE_INSTANCE
2347
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2348

    
2349
  def BuildHooksEnv(self):
2350
    """Build hooks env.
2351

2352
    This runs on master, primary and secondary nodes of the instance.
2353

2354
    """
2355
    env = {
2356
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2357
      }
2358
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2359
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360
          list(self.instance.secondary_nodes))
2361
    return env, nl, nl
2362

    
2363
  def CheckPrereq(self):
2364
    """Check prerequisites.
2365

2366
    This checks that the instance is in the cluster.
2367

2368
    """
2369
    instance = self.cfg.GetInstanceInfo(
2370
      self.cfg.ExpandInstanceName(self.op.instance_name))
2371
    if instance is None:
2372
      raise errors.OpPrereqError("Instance '%s' not known" %
2373
                                 self.op.instance_name)
2374

    
2375
    # check bridges existance
2376
    _CheckInstanceBridgesExist(instance)
2377

    
2378
    self.instance = instance
2379
    self.op.instance_name = instance.name
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Reboot the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    ignore_secondaries = self.op.ignore_secondaries
2387
    reboot_type = self.op.reboot_type
2388
    extra_args = getattr(self.op, "extra_args", "")
2389

    
2390
    node_current = instance.primary_node
2391

    
2392
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393
                           constants.INSTANCE_REBOOT_HARD,
2394
                           constants.INSTANCE_REBOOT_FULL]:
2395
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396
                                  (constants.INSTANCE_REBOOT_SOFT,
2397
                                   constants.INSTANCE_REBOOT_HARD,
2398
                                   constants.INSTANCE_REBOOT_FULL))
2399

    
2400
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401
                       constants.INSTANCE_REBOOT_HARD]:
2402
      if not rpc.call_instance_reboot(node_current, instance,
2403
                                      reboot_type, extra_args):
2404
        raise errors.OpExecError("Could not reboot instance")
2405
    else:
2406
      if not rpc.call_instance_shutdown(node_current, instance):
2407
        raise errors.OpExecError("could not shutdown instance for full reboot")
2408
      _ShutdownInstanceDisks(instance, self.cfg)
2409
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410
      if not rpc.call_instance_start(node_current, instance, extra_args):
2411
        _ShutdownInstanceDisks(instance, self.cfg)
2412
        raise errors.OpExecError("Could not start instance for full reboot")
2413

    
2414
    self.cfg.MarkInstanceUp(instance.name)
2415

    
2416

    
2417
class LUShutdownInstance(LogicalUnit):
2418
  """Shutdown an instance.
2419

2420
  """
2421
  HPATH = "instance-stop"
2422
  HTYPE = constants.HTYPE_INSTANCE
2423
  _OP_REQP = ["instance_name"]
2424

    
2425
  def BuildHooksEnv(self):
2426
    """Build hooks env.
2427

2428
    This runs on master, primary and secondary nodes of the instance.
2429

2430
    """
2431
    env = _BuildInstanceHookEnvByObject(self.instance)
2432
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433
          list(self.instance.secondary_nodes))
2434
    return env, nl, nl
2435

    
2436
  def CheckPrereq(self):
2437
    """Check prerequisites.
2438

2439
    This checks that the instance is in the cluster.
2440

2441
    """
2442
    instance = self.cfg.GetInstanceInfo(
2443
      self.cfg.ExpandInstanceName(self.op.instance_name))
2444
    if instance is None:
2445
      raise errors.OpPrereqError("Instance '%s' not known" %
2446
                                 self.op.instance_name)
2447
    self.instance = instance
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Shutdown the instance.
2451

2452
    """
2453
    instance = self.instance
2454
    node_current = instance.primary_node
2455
    self.cfg.MarkInstanceDown(instance.name)
2456
    if not rpc.call_instance_shutdown(node_current, instance):
2457
      logger.Error("could not shutdown instance")
2458

    
2459
    _ShutdownInstanceDisks(instance, self.cfg)
2460

    
2461

    
2462
class LUReinstallInstance(LogicalUnit):
2463
  """Reinstall an instance.
2464

2465
  """
2466
  HPATH = "instance-reinstall"
2467
  HTYPE = constants.HTYPE_INSTANCE
2468
  _OP_REQP = ["instance_name"]
2469

    
2470
  def BuildHooksEnv(self):
2471
    """Build hooks env.
2472

2473
    This runs on master, primary and secondary nodes of the instance.
2474

2475
    """
2476
    env = _BuildInstanceHookEnvByObject(self.instance)
2477
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478
          list(self.instance.secondary_nodes))
2479
    return env, nl, nl
2480

    
2481
  def CheckPrereq(self):
2482
    """Check prerequisites.
2483

2484
    This checks that the instance is in the cluster and is not running.
2485

2486
    """
2487
    instance = self.cfg.GetInstanceInfo(
2488
      self.cfg.ExpandInstanceName(self.op.instance_name))
2489
    if instance is None:
2490
      raise errors.OpPrereqError("Instance '%s' not known" %
2491
                                 self.op.instance_name)
2492
    if instance.disk_template == constants.DT_DISKLESS:
2493
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2494
                                 self.op.instance_name)
2495
    if instance.status != "down":
2496
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497
                                 self.op.instance_name)
2498
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2499
    if remote_info:
2500
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501
                                 (self.op.instance_name,
2502
                                  instance.primary_node))
2503

    
2504
    self.op.os_type = getattr(self.op, "os_type", None)
2505
    if self.op.os_type is not None:
2506
      # OS verification
2507
      pnode = self.cfg.GetNodeInfo(
2508
        self.cfg.ExpandNodeName(instance.primary_node))
2509
      if pnode is None:
2510
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2511
                                   self.op.pnode)
2512
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2513
      if not os_obj:
2514
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515
                                   " primary node"  % self.op.os_type)
2516

    
2517
    self.instance = instance
2518

    
2519
  def Exec(self, feedback_fn):
2520
    """Reinstall the instance.
2521

2522
    """
2523
    inst = self.instance
2524

    
2525
    if self.op.os_type is not None:
2526
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527
      inst.os = self.op.os_type
2528
      self.cfg.AddInstance(inst)
2529

    
2530
    _StartInstanceDisks(self.cfg, inst, None)
2531
    try:
2532
      feedback_fn("Running the instance OS create scripts...")
2533
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534
        raise errors.OpExecError("Could not install OS for instance %s"
2535
                                 " on node %s" %
2536
                                 (inst.name, inst.primary_node))
2537
    finally:
2538
      _ShutdownInstanceDisks(inst, self.cfg)
2539

    
2540

    
2541
class LURenameInstance(LogicalUnit):
2542
  """Rename an instance.
2543

2544
  """
2545
  HPATH = "instance-rename"
2546
  HTYPE = constants.HTYPE_INSTANCE
2547
  _OP_REQP = ["instance_name", "new_name"]
2548

    
2549
  def BuildHooksEnv(self):
2550
    """Build hooks env.
2551

2552
    This runs on master, primary and secondary nodes of the instance.
2553

2554
    """
2555
    env = _BuildInstanceHookEnvByObject(self.instance)
2556
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2557
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558
          list(self.instance.secondary_nodes))
2559
    return env, nl, nl
2560

    
2561
  def CheckPrereq(self):
2562
    """Check prerequisites.
2563

2564
    This checks that the instance is in the cluster and is not running.
2565

2566
    """
2567
    instance = self.cfg.GetInstanceInfo(
2568
      self.cfg.ExpandInstanceName(self.op.instance_name))
2569
    if instance is None:
2570
      raise errors.OpPrereqError("Instance '%s' not known" %
2571
                                 self.op.instance_name)
2572
    if instance.status != "down":
2573
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574
                                 self.op.instance_name)
2575
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2576
    if remote_info:
2577
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578
                                 (self.op.instance_name,
2579
                                  instance.primary_node))
2580
    self.instance = instance
2581

    
2582
    # new name verification
2583
    name_info = utils.HostInfo(self.op.new_name)
2584

    
2585
    self.op.new_name = new_name = name_info.name
2586
    instance_list = self.cfg.GetInstanceList()
2587
    if new_name in instance_list:
2588
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2589
                                 new_name)
2590

    
2591
    if not getattr(self.op, "ignore_ip", False):
2592
      command = ["fping", "-q", name_info.ip]
2593
      result = utils.RunCmd(command)
2594
      if not result.failed:
2595
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2596
                                   (name_info.ip, new_name))
2597

    
2598

    
2599
  def Exec(self, feedback_fn):
2600
    """Reinstall the instance.
2601

2602
    """
2603
    inst = self.instance
2604
    old_name = inst.name
2605

    
2606
    if inst.disk_template == constants.DT_FILE:
2607
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608

    
2609
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2610

    
2611
    # re-read the instance from the configuration after rename
2612
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2613

    
2614
    if inst.disk_template == constants.DT_FILE:
2615
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2616
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2617
                                                old_file_storage_dir,
2618
                                                new_file_storage_dir)
2619

    
2620
      if not result:
2621
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2622
                                 " directory '%s' to '%s' (but the instance"
2623
                                 " has been renamed in Ganeti)" % (
2624
                                 inst.primary_node, old_file_storage_dir,
2625
                                 new_file_storage_dir))
2626

    
2627
      if not result[0]:
2628
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2629
                                 " (but the instance has been renamed in"
2630
                                 " Ganeti)" % (old_file_storage_dir,
2631
                                               new_file_storage_dir))
2632

    
2633
    _StartInstanceDisks(self.cfg, inst, None)
2634
    try:
2635
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2636
                                          "sda", "sdb"):
2637
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2638
               " instance has been renamed in Ganeti)" %
2639
               (inst.name, inst.primary_node))
2640
        logger.Error(msg)
2641
    finally:
2642
      _ShutdownInstanceDisks(inst, self.cfg)
2643

    
2644

    
2645
class LURemoveInstance(LogicalUnit):
2646
  """Remove an instance.
2647

2648
  """
2649
  HPATH = "instance-remove"
2650
  HTYPE = constants.HTYPE_INSTANCE
2651
  _OP_REQP = ["instance_name"]
2652

    
2653
  def BuildHooksEnv(self):
2654
    """Build hooks env.
2655

2656
    This runs on master, primary and secondary nodes of the instance.
2657

2658
    """
2659
    env = _BuildInstanceHookEnvByObject(self.instance)
2660
    nl = [self.sstore.GetMasterNode()]
2661
    return env, nl, nl
2662

    
2663
  def CheckPrereq(self):
2664
    """Check prerequisites.
2665

2666
    This checks that the instance is in the cluster.
2667

2668
    """
2669
    instance = self.cfg.GetInstanceInfo(
2670
      self.cfg.ExpandInstanceName(self.op.instance_name))
2671
    if instance is None:
2672
      raise errors.OpPrereqError("Instance '%s' not known" %
2673
                                 self.op.instance_name)
2674
    self.instance = instance
2675

    
2676
  def Exec(self, feedback_fn):
2677
    """Remove the instance.
2678

2679
    """
2680
    instance = self.instance
2681
    logger.Info("shutting down instance %s on node %s" %
2682
                (instance.name, instance.primary_node))
2683

    
2684
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2685
      if self.op.ignore_failures:
2686
        feedback_fn("Warning: can't shutdown instance")
2687
      else:
2688
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2689
                                 (instance.name, instance.primary_node))
2690

    
2691
    logger.Info("removing block devices for instance %s" % instance.name)
2692

    
2693
    if not _RemoveDisks(instance, self.cfg):
2694
      if self.op.ignore_failures:
2695
        feedback_fn("Warning: can't remove instance's disks")
2696
      else:
2697
        raise errors.OpExecError("Can't remove instance's disks")
2698

    
2699
    logger.Info("removing instance %s out of cluster config" % instance.name)
2700

    
2701
    self.cfg.RemoveInstance(instance.name)
2702

    
2703

    
2704
class LUQueryInstances(NoHooksLU):
2705
  """Logical unit for querying instances.
2706

2707
  """
2708
  _OP_REQP = ["output_fields", "names"]
2709

    
2710
  def CheckPrereq(self):
2711
    """Check prerequisites.
2712

2713
    This checks that the fields required are valid output fields.
2714

2715
    """
2716
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2717
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2718
                               "admin_state", "admin_ram",
2719
                               "disk_template", "ip", "mac", "bridge",
2720
                               "sda_size", "sdb_size", "vcpus"],
2721
                       dynamic=self.dynamic_fields,
2722
                       selected=self.op.output_fields)
2723

    
2724
    self.wanted = _GetWantedInstances(self, self.op.names)
2725

    
2726
  def Exec(self, feedback_fn):
2727
    """Computes the list of nodes and their attributes.
2728

2729
    """
2730
    instance_names = self.wanted
2731
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2732
                     in instance_names]
2733

    
2734
    # begin data gathering
2735

    
2736
    nodes = frozenset([inst.primary_node for inst in instance_list])
2737

    
2738
    bad_nodes = []
2739
    if self.dynamic_fields.intersection(self.op.output_fields):
2740
      live_data = {}
2741
      node_data = rpc.call_all_instances_info(nodes)
2742
      for name in nodes:
2743
        result = node_data[name]
2744
        if result:
2745
          live_data.update(result)
2746
        elif result == False:
2747
          bad_nodes.append(name)
2748
        # else no instance is alive
2749
    else:
2750
      live_data = dict([(name, {}) for name in instance_names])
2751

    
2752
    # end data gathering
2753

    
2754
    output = []
2755
    for instance in instance_list:
2756
      iout = []
2757
      for field in self.op.output_fields:
2758
        if field == "name":
2759
          val = instance.name
2760
        elif field == "os":
2761
          val = instance.os
2762
        elif field == "pnode":
2763
          val = instance.primary_node
2764
        elif field == "snodes":
2765
          val = list(instance.secondary_nodes)
2766
        elif field == "admin_state":
2767
          val = (instance.status != "down")
2768
        elif field == "oper_state":
2769
          if instance.primary_node in bad_nodes:
2770
            val = None
2771
          else:
2772
            val = bool(live_data.get(instance.name))
2773
        elif field == "status":
2774
          if instance.primary_node in bad_nodes:
2775
            val = "ERROR_nodedown"
2776
          else:
2777
            running = bool(live_data.get(instance.name))
2778
            if running:
2779
              if instance.status != "down":
2780
                val = "running"
2781
              else:
2782
                val = "ERROR_up"
2783
            else:
2784
              if instance.status != "down":
2785
                val = "ERROR_down"
2786
              else:
2787
                val = "ADMIN_down"
2788
        elif field == "admin_ram":
2789
          val = instance.memory
2790
        elif field == "oper_ram":
2791
          if instance.primary_node in bad_nodes:
2792
            val = None
2793
          elif instance.name in live_data:
2794
            val = live_data[instance.name].get("memory", "?")
2795
          else:
2796
            val = "-"
2797
        elif field == "disk_template":
2798
          val = instance.disk_template
2799
        elif field == "ip":
2800
          val = instance.nics[0].ip
2801
        elif field == "bridge":
2802
          val = instance.nics[0].bridge
2803
        elif field == "mac":
2804
          val = instance.nics[0].mac
2805
        elif field == "sda_size" or field == "sdb_size":
2806
          disk = instance.FindDisk(field[:3])
2807
          if disk is None:
2808
            val = None
2809
          else:
2810
            val = disk.size
2811
        elif field == "vcpus":
2812
          val = instance.vcpus
2813
        else:
2814
          raise errors.ParameterError(field)
2815
        iout.append(val)
2816
      output.append(iout)
2817

    
2818
    return output
2819

    
2820

    
2821
class LUFailoverInstance(LogicalUnit):
2822
  """Failover an instance.
2823

2824
  """
2825
  HPATH = "instance-failover"
2826
  HTYPE = constants.HTYPE_INSTANCE
2827
  _OP_REQP = ["instance_name", "ignore_consistency"]
2828

    
2829
  def BuildHooksEnv(self):
2830
    """Build hooks env.
2831

2832
    This runs on master, primary and secondary nodes of the instance.
2833

2834
    """
2835
    env = {
2836
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2837
      }
2838
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2839
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2840
    return env, nl, nl
2841

    
2842
  def CheckPrereq(self):
2843
    """Check prerequisites.
2844

2845
    This checks that the instance is in the cluster.
2846

2847
    """
2848
    instance = self.cfg.GetInstanceInfo(
2849
      self.cfg.ExpandInstanceName(self.op.instance_name))
2850
    if instance is None:
2851
      raise errors.OpPrereqError("Instance '%s' not known" %
2852
                                 self.op.instance_name)
2853

    
2854
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2855
      raise errors.OpPrereqError("Instance's disk layout is not"
2856
                                 " network mirrored, cannot failover.")
2857

    
2858
    secondary_nodes = instance.secondary_nodes
2859
    if not secondary_nodes:
2860
      raise errors.ProgrammerError("no secondary node but using "
2861
                                   "DT_REMOTE_RAID1 template")
2862

    
2863
    target_node = secondary_nodes[0]
2864
    # check memory requirements on the secondary node
2865
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2866
                         instance.name, instance.memory)
2867

    
2868
    # check bridge existance
2869
    brlist = [nic.bridge for nic in instance.nics]
2870
    if not rpc.call_bridges_exist(target_node, brlist):
2871
      raise errors.OpPrereqError("One or more target bridges %s does not"
2872
                                 " exist on destination node '%s'" %
2873
                                 (brlist, target_node))
2874

    
2875
    self.instance = instance
2876

    
2877
  def Exec(self, feedback_fn):
2878
    """Failover an instance.
2879

2880
    The failover is done by shutting it down on its present node and
2881
    starting it on the secondary.
2882

2883
    """
2884
    instance = self.instance
2885

    
2886
    source_node = instance.primary_node
2887
    target_node = instance.secondary_nodes[0]
2888

    
2889
    feedback_fn("* checking disk consistency between source and target")
2890
    for dev in instance.disks:
2891
      # for remote_raid1, these are md over drbd
2892
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2893
        if instance.status == "up" and not self.op.ignore_consistency:
2894
          raise errors.OpExecError("Disk %s is degraded on target node,"
2895
                                   " aborting failover." % dev.iv_name)
2896

    
2897
    feedback_fn("* shutting down instance on source node")
2898
    logger.Info("Shutting down instance %s on node %s" %
2899
                (instance.name, source_node))
2900

    
2901
    if not rpc.call_instance_shutdown(source_node, instance):
2902
      if self.op.ignore_consistency:
2903
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2904
                     " anyway. Please make sure node %s is down"  %
2905
                     (instance.name, source_node, source_node))
2906
      else:
2907
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2908
                                 (instance.name, source_node))
2909

    
2910
    feedback_fn("* deactivating the instance's disks on source node")
2911
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2912
      raise errors.OpExecError("Can't shut down the instance's disks.")
2913

    
2914
    instance.primary_node = target_node
2915
    # distribute new instance config to the other nodes
2916
    self.cfg.AddInstance(instance)
2917

    
2918
    # Only start the instance if it's marked as up
2919
    if instance.status == "up":
2920
      feedback_fn("* activating the instance's disks on target node")
2921
      logger.Info("Starting instance %s on node %s" %
2922
                  (instance.name, target_node))
2923

    
2924
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2925
                                               ignore_secondaries=True)
2926
      if not disks_ok:
2927
        _ShutdownInstanceDisks(instance, self.cfg)
2928
        raise errors.OpExecError("Can't activate the instance's disks")
2929

    
2930
      feedback_fn("* starting the instance on the target node")
2931
      if not rpc.call_instance_start(target_node, instance, None):
2932
        _ShutdownInstanceDisks(instance, self.cfg)
2933
        raise errors.OpExecError("Could not start instance %s on node %s." %
2934
                                 (instance.name, target_node))
2935

    
2936

    
2937
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2938
  """Create a tree of block devices on the primary node.
2939

2940
  This always creates all devices.
2941

2942
  """
2943
  if device.children:
2944
    for child in device.children:
2945
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2946
        return False
2947

    
2948
  cfg.SetDiskID(device, node)
2949
  new_id = rpc.call_blockdev_create(node, device, device.size,
2950
                                    instance.name, True, info)
2951
  if not new_id:
2952
    return False
2953
  if device.physical_id is None:
2954
    device.physical_id = new_id
2955
  return True
2956

    
2957

    
2958
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2959
  """Create a tree of block devices on a secondary node.
2960

2961
  If this device type has to be created on secondaries, create it and
2962
  all its children.
2963

2964
  If not, just recurse to children keeping the same 'force' value.
2965

2966
  """
2967
  if device.CreateOnSecondary():
2968
    force = True
2969
  if device.children:
2970
    for child in device.children:
2971
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2972
                                        child, force, info):
2973
        return False
2974

    
2975
  if not force:
2976
    return True
2977
  cfg.SetDiskID(device, node)
2978
  new_id = rpc.call_blockdev_create(node, device, device.size,
2979
                                    instance.name, False, info)
2980
  if not new_id:
2981
    return False
2982
  if device.physical_id is None:
2983
    device.physical_id = new_id
2984
  return True
2985

    
2986

    
2987
def _GenerateUniqueNames(cfg, exts):
2988
  """Generate a suitable LV name.
2989

2990
  This will generate a logical volume name for the given instance.
2991

2992
  """
2993
  results = []
2994
  for val in exts:
2995
    new_id = cfg.GenerateUniqueID()
2996
    results.append("%s%s" % (new_id, val))
2997
  return results
2998

    
2999

    
3000
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3001
  """Generate a drbd device complete with its children.
3002

3003
  """
3004
  port = cfg.AllocatePort()
3005
  vgname = cfg.GetVGName()
3006
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3007
                          logical_id=(vgname, names[0]))
3008
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3009
                          logical_id=(vgname, names[1]))
3010
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3011
                          logical_id = (primary, secondary, port),
3012
                          children = [dev_data, dev_meta])
3013
  return drbd_dev
3014

    
3015

    
3016
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3017
  """Generate a drbd8 device complete with its children.
3018

3019
  """
3020
  port = cfg.AllocatePort()
3021
  vgname = cfg.GetVGName()
3022
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3023
                          logical_id=(vgname, names[0]))
3024
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3025
                          logical_id=(vgname, names[1]))
3026
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3027
                          logical_id = (primary, secondary, port),
3028
                          children = [dev_data, dev_meta],
3029
                          iv_name=iv_name)
3030
  return drbd_dev
3031

    
3032

    
3033
def _GenerateDiskTemplate(cfg, template_name,
3034
                          instance_name, primary_node,
3035
                          secondary_nodes, disk_sz, swap_sz,
3036
                          file_storage_dir, file_driver):
3037
  """Generate the entire disk layout for a given template type.
3038

3039
  """
3040
  #TODO: compute space requirements
3041

    
3042
  vgname = cfg.GetVGName()
3043
  if template_name == constants.DT_DISKLESS:
3044
    disks = []
3045
  elif template_name == constants.DT_PLAIN:
3046
    if len(secondary_nodes) != 0:
3047
      raise errors.ProgrammerError("Wrong template configuration")
3048

    
3049
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3050
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3051
                           logical_id=(vgname, names[0]),
3052
                           iv_name = "sda")
3053
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3054
                           logical_id=(vgname, names[1]),
3055
                           iv_name = "sdb")
3056
    disks = [sda_dev, sdb_dev]
3057
  elif template_name == constants.DT_DRBD8:
3058
    if len(secondary_nodes) != 1:
3059
      raise errors.ProgrammerError("Wrong template configuration")
3060
    remote_node = secondary_nodes[0]
3061
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3062
                                       ".sdb_data", ".sdb_meta"])
3063
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3064
                                         disk_sz, names[0:2], "sda")
3065
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3066
                                         swap_sz, names[2:4], "sdb")
3067
    disks = [drbd_sda_dev, drbd_sdb_dev]
3068
  elif template_name == constants.DT_FILE:
3069
    if len(secondary_nodes) != 0:
3070
      raise errors.ProgrammerError("Wrong template configuration")
3071

    
3072
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3073
                                iv_name="sda", logical_id=(file_driver,
3074
                                "%s/sda" % file_storage_dir))
3075
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3076
                                iv_name="sdb", logical_id=(file_driver,
3077
                                "%s/sdb" % file_storage_dir))
3078
    disks = [file_sda_dev, file_sdb_dev]
3079
  else:
3080
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3081
  return disks
3082

    
3083

    
3084
def _GetInstanceInfoText(instance):
3085
  """Compute that text that should be added to the disk's metadata.
3086

3087
  """
3088
  return "originstname+%s" % instance.name
3089

    
3090

    
3091
def _CreateDisks(cfg, instance):
3092
  """Create all disks for an instance.
3093

3094
  This abstracts away some work from AddInstance.
3095

3096
  Args:
3097
    instance: the instance object
3098

3099
  Returns:
3100
    True or False showing the success of the creation process
3101

3102
  """
3103
  info = _GetInstanceInfoText(instance)
3104

    
3105
  if instance.disk_template == constants.DT_FILE:
3106
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3107
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3108
                                              file_storage_dir)
3109

    
3110
    if not result:
3111
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3112
      return False
3113

    
3114
    if not result[0]:
3115
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3116
      return False
3117

    
3118
  for device in instance.disks:
3119
    logger.Info("creating volume %s for instance %s" %
3120
                (device.iv_name, instance.name))
3121
    #HARDCODE
3122
    for secondary_node in instance.secondary_nodes:
3123
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3124
                                        device, False, info):
3125
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3126
                     (device.iv_name, device, secondary_node))
3127
        return False
3128
    #HARDCODE
3129
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3130
                                    instance, device, info):
3131
      logger.Error("failed to create volume %s on primary!" %
3132
                   device.iv_name)
3133
      return False
3134

    
3135
  return True
3136

    
3137

    
3138
def _RemoveDisks(instance, cfg):
3139
  """Remove all disks for an instance.
3140

3141
  This abstracts away some work from `AddInstance()` and
3142
  `RemoveInstance()`. Note that in case some of the devices couldn't
3143
  be removed, the removal will continue with the other ones (compare
3144
  with `_CreateDisks()`).
3145

3146
  Args:
3147
    instance: the instance object
3148

3149
  Returns:
3150
    True or False showing the success of the removal proces
3151

3152
  """
3153
  logger.Info("removing block devices for instance %s" % instance.name)
3154

    
3155
  result = True
3156
  for device in instance.disks:
3157
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3158
      cfg.SetDiskID(disk, node)
3159
      if not rpc.call_blockdev_remove(node, disk):
3160
        logger.Error("could not remove block device %s on node %s,"
3161
                     " continuing anyway" %
3162
                     (device.iv_name, node))
3163
        result = False
3164

    
3165
  if instance.disk_template == constants.DT_FILE:
3166
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3167
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3168
                                            file_storage_dir):
3169
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3170
      result = False
3171

    
3172
  return result
3173

    
3174

    
3175
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3176
  """Compute disk size requirements in the volume group
3177

3178
  This is currently hard-coded for the two-drive layout.
3179

3180
  """
3181
  # Required free disk space as a function of disk and swap space
3182
  req_size_dict = {
3183
    constants.DT_DISKLESS: None,
3184
    constants.DT_PLAIN: disk_size + swap_size,
3185
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3186
    constants.DT_DRBD8: disk_size + swap_size + 256,
3187
    constants.DT_FILE: None,
3188
  }
3189

    
3190
  if disk_template not in req_size_dict:
3191
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3192
                                 " is unknown" %  disk_template)
3193

    
3194
  return req_size_dict[disk_template]
3195

    
3196

    
3197
class LUCreateInstance(LogicalUnit):
3198
  """Create an instance.
3199

3200
  """
3201
  HPATH = "instance-add"
3202
  HTYPE = constants.HTYPE_INSTANCE
3203
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3204
              "disk_template", "swap_size", "mode", "start", "vcpus",
3205
              "wait_for_sync", "ip_check", "mac"]
3206

    
3207
  def _RunAllocator(self):
3208
    """Run the allocator based on input opcode.
3209

3210
    """
3211
    disks = [{"size": self.op.disk_size, "mode": "w"},
3212
             {"size": self.op.swap_size, "mode": "w"}]
3213
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3214
             "bridge": self.op.bridge}]
3215
    ial = IAllocator(self.cfg, self.sstore,
3216
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3217
                     name=self.op.instance_name,
3218
                     disk_template=self.op.disk_template,
3219
                     tags=[],
3220
                     os=self.op.os_type,
3221
                     vcpus=self.op.vcpus,
3222
                     mem_size=self.op.mem_size,
3223
                     disks=disks,
3224
                     nics=nics,
3225
                     )
3226

    
3227
    ial.Run(self.op.iallocator)
3228

    
3229
    if not ial.success:
3230
      raise errors.OpPrereqError("Can't compute nodes using"
3231
                                 " iallocator '%s': %s" % (self.op.iallocator,
3232
                                                           ial.info))
3233
    if len(ial.nodes) != ial.required_nodes:
3234
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3235
                                 " of nodes (%s), required %s" %
3236
                                 (len(ial.nodes), ial.required_nodes))
3237
    self.op.pnode = ial.nodes[0]
3238
    logger.ToStdout("Selected nodes for the instance: %s" %
3239
                    (", ".join(ial.nodes),))
3240
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3241
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3242
    if ial.required_nodes == 2:
3243
      self.op.snode = ial.nodes[1]
3244

    
3245
  def BuildHooksEnv(self):
3246
    """Build hooks env.
3247

3248
    This runs on master, primary and secondary nodes of the instance.
3249

3250
    """
3251
    env = {
3252
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3253
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3254
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3255
      "INSTANCE_ADD_MODE": self.op.mode,
3256
      }
3257
    if self.op.mode == constants.INSTANCE_IMPORT:
3258
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3259
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3260
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3261

    
3262
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3263
      primary_node=self.op.pnode,
3264
      secondary_nodes=self.secondaries,
3265
      status=self.instance_status,
3266
      os_type=self.op.os_type,
3267
      memory=self.op.mem_size,
3268
      vcpus=self.op.vcpus,
3269
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3270
    ))
3271

    
3272
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3273
          self.secondaries)
3274
    return env, nl, nl
3275

    
3276

    
3277
  def CheckPrereq(self):
3278
    """Check prerequisites.
3279

3280
    """
3281
    # set optional parameters to none if they don't exist
3282
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3283
                 "iallocator"]:
3284
      if not hasattr(self.op, attr):
3285
        setattr(self.op, attr, None)
3286

    
3287
    if self.op.mode not in (constants.INSTANCE_CREATE,
3288
                            constants.INSTANCE_IMPORT):
3289
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3290
                                 self.op.mode)
3291

    
3292
    if (not self.cfg.GetVGName() and
3293
        self.op.disk_template not in constants.DTS_NOT_LVM):
3294
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3295
                                 " instances")
3296

    
3297
    if self.op.mode == constants.INSTANCE_IMPORT:
3298
      src_node = getattr(self.op, "src_node", None)
3299
      src_path = getattr(self.op, "src_path", None)
3300
      if src_node is None or src_path is None:
3301
        raise errors.OpPrereqError("Importing an instance requires source"
3302
                                   " node and path options")
3303
      src_node_full = self.cfg.ExpandNodeName(src_node)
3304
      if src_node_full is None:
3305
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3306
      self.op.src_node = src_node = src_node_full
3307

    
3308
      if not os.path.isabs(src_path):
3309
        raise errors.OpPrereqError("The source path must be absolute")
3310

    
3311
      export_info = rpc.call_export_info(src_node, src_path)
3312

    
3313
      if not export_info:
3314
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3315

    
3316
      if not export_info.has_section(constants.INISECT_EXP):
3317
        raise errors.ProgrammerError("Corrupted export config")
3318

    
3319
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3320
      if (int(ei_version) != constants.EXPORT_VERSION):
3321
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3322
                                   (ei_version, constants.EXPORT_VERSION))
3323

    
3324
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3325
        raise errors.OpPrereqError("Can't import instance with more than"
3326
                                   " one data disk")
3327

    
3328
      # FIXME: are the old os-es, disk sizes, etc. useful?
3329
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3330
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3331
                                                         'disk0_dump'))
3332
      self.src_image = diskimage
3333
    else: # INSTANCE_CREATE
3334
      if getattr(self.op, "os_type", None) is None:
3335
        raise errors.OpPrereqError("No guest OS specified")
3336

    
3337
    #### instance parameters check
3338

    
3339
    # disk template and mirror node verification
3340
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3341
      raise errors.OpPrereqError("Invalid disk template name")
3342

    
3343
    # instance name verification
3344
    hostname1 = utils.HostInfo(self.op.instance_name)
3345

    
3346
    self.op.instance_name = instance_name = hostname1.name
3347
    instance_list = self.cfg.GetInstanceList()
3348
    if instance_name in instance_list:
3349
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3350
                                 instance_name)
3351

    
3352
    # ip validity checks
3353
    ip = getattr(self.op, "ip", None)
3354
    if ip is None or ip.lower() == "none":
3355
      inst_ip = None
3356
    elif ip.lower() == "auto":
3357
      inst_ip = hostname1.ip
3358
    else:
3359
      if not utils.IsValidIP(ip):
3360
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3361
                                   " like a valid IP" % ip)
3362
      inst_ip = ip
3363
    self.inst_ip = self.op.ip = inst_ip
3364

    
3365
    if self.op.start and not self.op.ip_check:
3366
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3367
                                 " adding an instance in start mode")
3368

    
3369
    if self.op.ip_check:
3370
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3371
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3372
                                   (hostname1.ip, instance_name))
3373

    
3374
    # MAC address verification
3375
    if self.op.mac != "auto":
3376
      if not utils.IsValidMac(self.op.mac.lower()):
3377
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3378
                                   self.op.mac)
3379

    
3380
    # bridge verification
3381
    bridge = getattr(self.op, "bridge", None)
3382
    if bridge is None:
3383
      self.op.bridge = self.cfg.GetDefBridge()
3384
    else:
3385
      self.op.bridge = bridge
3386

    
3387
    # boot order verification
3388
    if self.op.hvm_boot_order is not None:
3389
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3390
        raise errors.OpPrereqError("invalid boot order specified,"
3391
                                   " must be one or more of [acdn]")
3392
    # file storage checks
3393
    if (self.op.file_driver and
3394
        not self.op.file_driver in constants.FILE_DRIVER):
3395
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3396
                                 self.op.file_driver)
3397

    
3398
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3399
      raise errors.OpPrereqError("File storage directory not a relative"
3400
                                 " path")
3401
    #### allocator run
3402

    
3403
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3404
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3405
                                 " node must be given")
3406

    
3407
    if self.op.iallocator is not None:
3408
      self._RunAllocator()
3409

    
3410
    #### node related checks
3411

    
3412
    # check primary node
3413
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3414
    if pnode is None:
3415
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3416
                                 self.op.pnode)
3417
    self.op.pnode = pnode.name
3418
    self.pnode = pnode
3419
    self.secondaries = []
3420

    
3421
    # mirror node verification
3422
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3423
      if getattr(self.op, "snode", None) is None:
3424
        raise errors.OpPrereqError("The networked disk templates need"
3425
                                   " a mirror node")
3426

    
3427
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3428
      if snode_name is None:
3429
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3430
                                   self.op.snode)
3431
      elif snode_name == pnode.name:
3432
        raise errors.OpPrereqError("The secondary node cannot be"
3433
                                   " the primary node.")
3434
      self.secondaries.append(snode_name)
3435

    
3436
    req_size = _ComputeDiskSize(self.op.disk_template,
3437
                                self.op.disk_size, self.op.swap_size)
3438

    
3439
    # Check lv size requirements
3440
    if req_size is not None:
3441
      nodenames = [pnode.name] + self.secondaries
3442
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3443
      for node in nodenames:
3444
        info = nodeinfo.get(node, None)
3445
        if not info:
3446
          raise errors.OpPrereqError("Cannot get current information"
3447
                                     " from node '%s'" % nodeinfo)
3448
        vg_free = info.get('vg_free', None)
3449
        if not isinstance(vg_free, int):
3450
          raise errors.OpPrereqError("Can't compute free disk space on"
3451
                                     " node %s" % node)
3452
        if req_size > info['vg_free']:
3453
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3454
                                     " %d MB available, %d MB required" %
3455
                                     (node, info['vg_free'], req_size))
3456

    
3457
    # os verification
3458
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3459
    if not os_obj:
3460
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3461
                                 " primary node"  % self.op.os_type)
3462

    
3463
    if self.op.kernel_path == constants.VALUE_NONE:
3464
      raise errors.OpPrereqError("Can't set instance kernel to none")
3465

    
3466

    
3467
    # bridge check on primary node
3468
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3469
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3470
                                 " destination node '%s'" %
3471
                                 (self.op.bridge, pnode.name))
3472

    
3473
    if self.op.start:
3474
      self.instance_status = 'up'
3475
    else:
3476
      self.instance_status = 'down'
3477

    
3478
  def Exec(self, feedback_fn):
3479
    """Create and add the instance to the cluster.
3480

3481
    """
3482
    instance = self.op.instance_name
3483
    pnode_name = self.pnode.name
3484

    
3485
    if self.op.mac == "auto":
3486
      mac_address = self.cfg.GenerateMAC()
3487
    else:
3488
      mac_address = self.op.mac
3489

    
3490
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3491
    if self.inst_ip is not None:
3492
      nic.ip = self.inst_ip
3493

    
3494
    ht_kind = self.sstore.GetHypervisorType()
3495
    if ht_kind in constants.HTS_REQ_PORT:
3496
      network_port = self.cfg.AllocatePort()
3497
    else:
3498
      network_port = None
3499

    
3500
    # this is needed because os.path.join does not accept None arguments
3501
    if self.op.file_storage_dir is None:
3502
      string_file_storage_dir = ""
3503
    else:
3504
      string_file_storage_dir = self.op.file_storage_dir
3505

    
3506
    # build the full file storage dir path
3507
    file_storage_dir = os.path.normpath(os.path.join(
3508
                                        self.sstore.GetFileStorageDir(),
3509
                                        string_file_storage_dir, instance))
3510

    
3511

    
3512
    disks = _GenerateDiskTemplate(self.cfg,
3513
                                  self.op.disk_template,
3514
                                  instance, pnode_name,
3515
                                  self.secondaries, self.op.disk_size,
3516
                                  self.op.swap_size,
3517
                                  file_storage_dir,
3518
                                  self.op.file_driver)
3519

    
3520
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3521
                            primary_node=pnode_name,
3522
                            memory=self.op.mem_size,
3523
                            vcpus=self.op.vcpus,
3524
                            nics=[nic], disks=disks,
3525
                            disk_template=self.op.disk_template,
3526
                            status=self.instance_status,
3527
                            network_port=network_port,
3528
                            kernel_path=self.op.kernel_path,
3529
                            initrd_path=self.op.initrd_path,
3530
                            hvm_boot_order=self.op.hvm_boot_order,
3531
                            )
3532

    
3533
    feedback_fn("* creating instance disks...")
3534
    if not _CreateDisks(self.cfg, iobj):
3535
      _RemoveDisks(iobj, self.cfg)
3536
      raise errors.OpExecError("Device creation failed, reverting...")
3537

    
3538
    feedback_fn("adding instance %s to cluster config" % instance)
3539

    
3540
    self.cfg.AddInstance(iobj)
3541

    
3542
    if self.op.wait_for_sync:
3543
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3544
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3545
      # make sure the disks are not degraded (still sync-ing is ok)
3546
      time.sleep(15)
3547
      feedback_fn("* checking mirrors status")
3548
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3549
    else:
3550
      disk_abort = False
3551

    
3552
    if disk_abort:
3553
      _RemoveDisks(iobj, self.cfg)
3554
      self.cfg.RemoveInstance(iobj.name)
3555
      raise errors.OpExecError("There are some degraded disks for"
3556
                               " this instance")
3557

    
3558
    feedback_fn("creating os for instance %s on node %s" %
3559
                (instance, pnode_name))
3560

    
3561
    if iobj.disk_template != constants.DT_DISKLESS:
3562
      if self.op.mode == constants.INSTANCE_CREATE:
3563
        feedback_fn("* running the instance OS create scripts...")
3564
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3565
          raise errors.OpExecError("could not add os for instance %s"
3566
                                   " on node %s" %
3567
                                   (instance, pnode_name))
3568

    
3569
      elif self.op.mode == constants.INSTANCE_IMPORT:
3570
        feedback_fn("* running the instance OS import scripts...")
3571
        src_node = self.op.src_node
3572
        src_image = self.src_image
3573
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3574
                                                src_node, src_image):
3575
          raise errors.OpExecError("Could not import os for instance"
3576
                                   " %s on node %s" %
3577
                                   (instance, pnode_name))
3578
      else:
3579
        # also checked in the prereq part
3580
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3581
                                     % self.op.mode)
3582

    
3583
    if self.op.start:
3584
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3585
      feedback_fn("* starting instance...")
3586
      if not rpc.call_instance_start(pnode_name, iobj, None):
3587
        raise errors.OpExecError("Could not start instance")
3588

    
3589

    
3590
class LUConnectConsole(NoHooksLU):
3591
  """Connect to an instance's console.
3592

3593
  This is somewhat special in that it returns the command line that
3594
  you need to run on the master node in order to connect to the
3595
  console.
3596

3597
  """
3598
  _OP_REQP = ["instance_name"]
3599

    
3600
  def CheckPrereq(self):
3601
    """Check prerequisites.
3602

3603
    This checks that the instance is in the cluster.
3604

3605
    """
3606
    instance = self.cfg.GetInstanceInfo(
3607
      self.cfg.ExpandInstanceName(self.op.instance_name))
3608
    if instance is None:
3609
      raise errors.OpPrereqError("Instance '%s' not known" %
3610
                                 self.op.instance_name)
3611
    self.instance = instance
3612

    
3613
  def Exec(self, feedback_fn):
3614
    """Connect to the console of an instance
3615

3616
    """
3617
    instance = self.instance
3618
    node = instance.primary_node
3619

    
3620
    node_insts = rpc.call_instance_list([node])[node]
3621
    if node_insts is False:
3622
      raise errors.OpExecError("Can't connect to node %s." % node)
3623

    
3624
    if instance.name not in node_insts:
3625
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3626

    
3627
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3628

    
3629
    hyper = hypervisor.GetHypervisor()
3630
    console_cmd = hyper.GetShellCommandForConsole(instance)
3631

    
3632
    # build ssh cmdline
3633
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3634

    
3635

    
3636
class LUReplaceDisks(LogicalUnit):
3637
  """Replace the disks of an instance.
3638

3639
  """
3640
  HPATH = "mirrors-replace"
3641
  HTYPE = constants.HTYPE_INSTANCE
3642
  _OP_REQP = ["instance_name", "mode", "disks"]
3643

    
3644
  def _RunAllocator(self):
3645
    """Compute a new secondary node using an IAllocator.
3646

3647
    """
3648
    ial = IAllocator(self.cfg, self.sstore,
3649
                     mode=constants.IALLOCATOR_MODE_RELOC,
3650
                     name=self.op.instance_name,
3651
                     relocate_from=[self.sec_node])
3652

    
3653
    ial.Run(self.op.iallocator)
3654

    
3655
    if not ial.success:
3656
      raise errors.OpPrereqError("Can't compute nodes using"
3657
                                 " iallocator '%s': %s" % (self.op.iallocator,
3658
                                                           ial.info))
3659
    if len(ial.nodes) != ial.required_nodes:
3660
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3661
                                 " of nodes (%s), required %s" %
3662
                                 (len(ial.nodes), ial.required_nodes))
3663
    self.op.remote_node = ial.nodes[0]
3664
    logger.ToStdout("Selected new secondary for the instance: %s" %
3665
                    self.op.remote_node)
3666

    
3667
  def BuildHooksEnv(self):
3668
    """Build hooks env.
3669

3670
    This runs on the master, the primary and all the secondaries.
3671

3672
    """
3673
    env = {
3674
      "MODE": self.op.mode,
3675
      "NEW_SECONDARY": self.op.remote_node,
3676
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3677
      }
3678
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3679
    nl = [
3680
      self.sstore.GetMasterNode(),
3681
      self.instance.primary_node,
3682
      ]
3683
    if self.op.remote_node is not None:
3684
      nl.append(self.op.remote_node)
3685
    return env, nl, nl
3686

    
3687
  def CheckPrereq(self):
3688
    """Check prerequisites.
3689

3690
    This checks that the instance is in the cluster.
3691

3692
    """
3693
    if not hasattr(self.op, "remote_node"):
3694
      self.op.remote_node = None
3695

    
3696
    instance = self.cfg.GetInstanceInfo(
3697
      self.cfg.ExpandInstanceName(self.op.instance_name))
3698
    if instance is None:
3699
      raise errors.OpPrereqError("Instance '%s' not known" %
3700
                                 self.op.instance_name)
3701
    self.instance = instance
3702
    self.op.instance_name = instance.name
3703

    
3704
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3705
      raise errors.OpPrereqError("Instance's disk layout is not"
3706
                                 " network mirrored.")
3707

    
3708
    if len(instance.secondary_nodes) != 1:
3709
      raise errors.OpPrereqError("The instance has a strange layout,"
3710
                                 " expected one secondary but found %d" %
3711
                                 len(instance.secondary_nodes))
3712

    
3713
    self.sec_node = instance.secondary_nodes[0]
3714

    
3715
    ia_name = getattr(self.op, "iallocator", None)
3716
    if ia_name is not None:
3717
      if self.op.remote_node is not None:
3718
        raise errors.OpPrereqError("Give either the iallocator or the new"
3719
                                   " secondary, not both")
3720
      self.op.remote_node = self._RunAllocator()
3721

    
3722
    remote_node = self.op.remote_node
3723
    if remote_node is not None:
3724
      remote_node = self.cfg.ExpandNodeName(remote_node)
3725
      if remote_node is None:
3726
        raise errors.OpPrereqError("Node '%s' not known" %
3727
                                   self.op.remote_node)
3728
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3729
    else:
3730
      self.remote_node_info = None
3731
    if remote_node == instance.primary_node:
3732
      raise errors.OpPrereqError("The specified node is the primary node of"
3733
                                 " the instance.")
3734
    elif remote_node == self.sec_node:
3735
      if self.op.mode == constants.REPLACE_DISK_SEC:
3736
        # this is for DRBD8, where we can't execute the same mode of
3737
        # replacement as for drbd7 (no different port allocated)
3738
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3739
                                   " replacement")
3740
      # the user gave the current secondary, switch to
3741
      # 'no-replace-secondary' mode for drbd7
3742
      remote_node = None
3743
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3744
        self.op.mode != constants.REPLACE_DISK_ALL):
3745
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3746
                                 " disks replacement, not individual ones")
3747
    if instance.disk_template == constants.DT_DRBD8:
3748
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3749
          remote_node is not None):
3750
        # switch to replace secondary mode
3751
        self.op.mode = constants.REPLACE_DISK_SEC
3752

    
3753
      if self.op.mode == constants.REPLACE_DISK_ALL:
3754
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3755
                                   " secondary disk replacement, not"
3756
                                   " both at once")
3757
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3758
        if remote_node is not None:
3759
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3760
                                     " the secondary while doing a primary"
3761
                                     " node disk replacement")
3762
        self.tgt_node = instance.primary_node
3763
        self.oth_node = instance.secondary_nodes[0]
3764
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3765
        self.new_node = remote_node # this can be None, in which case
3766
                                    # we don't change the secondary
3767
        self.tgt_node = instance.secondary_nodes[0]
3768
        self.oth_node = instance.primary_node
3769
      else:
3770
        raise errors.ProgrammerError("Unhandled disk replace mode")
3771

    
3772
    for name in self.op.disks:
3773
      if instance.FindDisk(name) is None:
3774
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3775
                                   (name, instance.name))
3776
    self.op.remote_node = remote_node
3777

    
3778
  def _ExecRR1(self, feedback_fn):
3779
    """Replace the disks of an instance.
3780

3781
    """
3782
    instance = self.instance
3783
    iv_names = {}
3784
    # start of work
3785
    if self.op.remote_node is None:
3786
      remote_node = self.sec_node
3787
    else:
3788
      remote_node = self.op.remote_node
3789
    cfg = self.cfg
3790
    for dev in instance.disks:
3791
      size = dev.size
3792
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3793
      names = _GenerateUniqueNames(cfg, lv_names)
3794
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3795
                                       remote_node, size, names)
3796
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3797
      logger.Info("adding new mirror component on secondary for %s" %
3798
                  dev.iv_name)
3799
      #HARDCODE
3800
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3801
                                        new_drbd, False,
3802
                                        _GetInstanceInfoText(instance)):
3803
        raise errors.OpExecError("Failed to create new component on secondary"
3804
                                 " node %s. Full abort, cleanup manually!" %
3805
                                 remote_node)
3806

    
3807
      logger.Info("adding new mirror component on primary")
3808
      #HARDCODE
3809
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3810
                                      instance, new_drbd,
3811
                                      _GetInstanceInfoText(instance)):
3812
        # remove secondary dev
3813
        cfg.SetDiskID(new_drbd, remote_node)
3814
        rpc.call_blockdev_remove(remote_node, new_drbd)
3815
        raise errors.OpExecError("Failed to create volume on primary!"
3816
                                 " Full abort, cleanup manually!!")
3817

    
3818
      # the device exists now
3819
      # call the primary node to add the mirror to md
3820
      logger.Info("adding new mirror component to md")
3821
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3822
                                           [new_drbd]):
3823
        logger.Error("Can't add mirror compoment to md!")
3824
        cfg.SetDiskID(new_drbd, remote_node)
3825
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3826
          logger.Error("Can't rollback on secondary")
3827
        cfg.SetDiskID(new_drbd, instance.primary_node)
3828
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3829
          logger.Error("Can't rollback on primary")
3830
        raise errors.OpExecError("Full abort, cleanup manually!!")
3831

    
3832
      dev.children.append(new_drbd)
3833
      cfg.AddInstance(instance)
3834

    
3835
    # this can fail as the old devices are degraded and _WaitForSync
3836
    # does a combined result over all disks, so we don't check its
3837
    # return value
3838
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3839

    
3840
    # so check manually all the devices
3841
    for name in iv_names:
3842
      dev, child, new_drbd = iv_names[name]
3843
      cfg.SetDiskID(dev, instance.primary_node)
3844
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3845
      if is_degr:
3846
        raise errors.OpExecError("MD device %s is degraded!" % name)
3847
      cfg.SetDiskID(new_drbd, instance.primary_node)
3848
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3849
      if is_degr:
3850
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3851

    
3852
    for name in iv_names:
3853
      dev, child, new_drbd = iv_names[name]
3854
      logger.Info("remove mirror %s component" % name)
3855
      cfg.SetDiskID(dev, instance.primary_node)
3856
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3857
                                              dev, [child]):
3858
        logger.Error("Can't remove child from mirror, aborting"
3859
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3860
        continue
3861

    
3862
      for node in child.logical_id[:2]:
3863
        logger.Info("remove child device on %s" % node)
3864
        cfg.SetDiskID(child, node)
3865
        if not rpc.call_blockdev_remove(node, child):
3866
          logger.Error("Warning: failed to remove device from node %s,"
3867
                       " continuing operation." % node)
3868

    
3869
      dev.children.remove(child)
3870

    
3871
      cfg.AddInstance(instance)
3872

    
3873
  def _ExecD8DiskOnly(self, feedback_fn):
3874
    """Replace a disk on the primary or secondary for dbrd8.
3875

3876
    The algorithm for replace is quite complicated:
3877
      - for each disk to be replaced:
3878
        - create new LVs on the target node with unique names
3879
        - detach old LVs from the drbd device
3880
        - rename old LVs to name_replaced.<time_t>
3881
        - rename new LVs to old LVs
3882
        - attach the new LVs (with the old names now) to the drbd device
3883
      - wait for sync across all devices
3884
      - for each modified disk:
3885
        - remove old LVs (which have the name name_replaces.<time_t>)
3886

3887
    Failures are not very well handled.
3888

3889
    """
3890
    steps_total = 6
3891
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3892
    instance = self.instance
3893
    iv_names = {}
3894
    vgname = self.cfg.GetVGName()
3895
    # start of work
3896
    cfg = self.cfg
3897
    tgt_node = self.tgt_node
3898
    oth_node = self.oth_node
3899

    
3900
    # Step: check device activation
3901
    self.proc.LogStep(1, steps_total, "check device existence")
3902
    info("checking volume groups")
3903
    my_vg = cfg.GetVGName()
3904
    results = rpc.call_vg_list([oth_node, tgt_node])
3905
    if not results:
3906
      raise errors.OpExecError("Can't list volume groups on the nodes")
3907
    for node in oth_node, tgt_node:
3908
      res = results.get(node, False)
3909
      if not res or my_vg not in res:
3910
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3911
                                 (my_vg, node))
3912
    for dev in instance.disks:
3913
      if not dev.iv_name in self.op.disks:
3914
        continue
3915
      for node in tgt_node, oth_node:
3916
        info("checking %s on %s" % (dev.iv_name, node))
3917
        cfg.SetDiskID(dev, node)
3918
        if not rpc.call_blockdev_find(node, dev):
3919
          raise errors.OpExecError("Can't find device %s on node %s" %
3920
                                   (dev.iv_name, node))
3921

    
3922
    # Step: check other node consistency
3923
    self.proc.LogStep(2, steps_total, "check peer consistency")
3924
    for dev in instance.disks:
3925
      if not dev.iv_name in self.op.disks:
3926
        continue
3927
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3928
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3929
                                   oth_node==instance.primary_node):
3930
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3931
                                 " to replace disks on this node (%s)" %
3932
                                 (oth_node, tgt_node))
3933

    
3934
    # Step: create new storage
3935
    self.proc.LogStep(3, steps_total, "allocate new storage")
3936
    for dev in instance.disks:
3937
      if not dev.iv_name in self.op.disks:
3938
        continue
3939
      size = dev.size
3940
      cfg.SetDiskID(dev, tgt_node)
3941
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3942
      names = _GenerateUniqueNames(cfg, lv_names)
3943
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3944
                             logical_id=(vgname, names[0]))
3945
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3946
                             logical_id=(vgname, names[1]))
3947
      new_lvs = [lv_data, lv_meta]
3948
      old_lvs = dev.children
3949
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3950
      info("creating new local storage on %s for %s" %
3951
           (tgt_node, dev.iv_name))
3952
      # since we *always* want to create this LV, we use the
3953
      # _Create...OnPrimary (which forces the creation), even if we
3954
      # are talking about the secondary node
3955
      for new_lv in new_lvs:
3956
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3957
                                        _GetInstanceInfoText(instance)):
3958
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3959
                                   " node '%s'" %
3960
                                   (new_lv.logical_id[1], tgt_node))
3961

    
3962
    # Step: for each lv, detach+rename*2+attach
3963
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3964
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3965
      info("detaching %s drbd from local storage" % dev.iv_name)
3966
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3967
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3968
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3969
      #dev.children = []
3970
      #cfg.Update(instance)
3971

    
3972
      # ok, we created the new LVs, so now we know we have the needed
3973
      # storage; as such, we proceed on the target node to rename
3974
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3975
      # using the assumption that logical_id == physical_id (which in
3976
      # turn is the unique_id on that node)
3977

    
3978
      # FIXME(iustin): use a better name for the replaced LVs
3979
      temp_suffix = int(time.time())
3980
      ren_fn = lambda d, suff: (d.physical_id[0],
3981
                                d.physical_id[1] + "_replaced-%s" % suff)
3982
      # build the rename list based on what LVs exist on the node
3983
      rlist = []
3984
      for to_ren in old_lvs:
3985
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3986
        if find_res is not None: # device exists
3987
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3988

    
3989
      info("renaming the old LVs on the target node")
3990
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3991
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3992
      # now we rename the new LVs to the old LVs
3993
      info("renaming the new LVs on the target node")
3994
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3995
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3996
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3997

    
3998
      for old, new in zip(old_lvs, new_lvs):
3999
        new.logical_id = old.logical_id
4000
        cfg.SetDiskID(new, tgt_node)
4001

    
4002
      for disk in old_lvs:
4003
        disk.logical_id = ren_fn(disk, temp_suffix)
4004
        cfg.SetDiskID(disk, tgt_node)
4005

    
4006
      # now that the new lvs have the old name, we can add them to the device
4007
      info("adding new mirror component on %s" % tgt_node)
4008
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4009
        for new_lv in new_lvs:
4010
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
4011
            warning("Can't rollback device %s", hint="manually cleanup unused"
4012
                    " logical volumes")
4013
        raise errors.OpExecError("Can't add local storage to drbd")
4014

    
4015
      dev.children = new_lvs
4016
      cfg.Update(instance)
4017

    
4018
    # Step: wait for sync
4019

    
4020
    # this can fail as the old devices are degraded and _WaitForSync
4021
    # does a combined result over all disks, so we don't check its
4022
    # return value
4023
    self.proc.LogStep(5, steps_total, "sync devices")
4024
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4025

    
4026
    # so check manually all the devices
4027
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4028
      cfg.SetDiskID(dev, instance.primary_node)
4029
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4030
      if is_degr:
4031
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4032

    
4033
    # Step: remove old storage
4034
    self.proc.LogStep(6, steps_total, "removing old storage")
4035
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4036
      info("remove logical volumes for %s" % name)
4037
      for lv in old_lvs:
4038
        cfg.SetDiskID(lv, tgt_node)
4039
        if not rpc.call_blockdev_remove(tgt_node, lv):
4040
          warning("Can't remove old LV", hint="manually remove unused LVs")
4041
          continue
4042

    
4043
  def _ExecD8Secondary(self, feedback_fn):
4044
    """Replace the secondary node for drbd8.
4045

4046
    The algorithm for replace is quite complicated:
4047
      - for all disks of the instance:
4048
        - create new LVs on the new node with same names
4049
        - shutdown the drbd device on the old secondary
4050
        - disconnect the drbd network on the primary
4051
        - create the drbd device on the new secondary
4052
        - network attach the drbd on the primary, using an artifice:
4053
          the drbd code for Attach() will connect to the network if it
4054
          finds a device which is connected to the good local disks but
4055
          not network enabled
4056
      - wait for sync across all devices
4057
      - remove all disks from the old secondary
4058

4059
    Failures are not very well handled.
4060

4061
    """
4062
    steps_total = 6
4063
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4064
    instance = self.instance
4065
    iv_names = {}
4066
    vgname = self.cfg.GetVGName()
4067
    # start of work
4068
    cfg = self.cfg
4069
    old_node = self.tgt_node
4070
    new_node = self.new_node
4071
    pri_node = instance.primary_node
4072

    
4073
    # Step: check device activation
4074
    self.proc.LogStep(1, steps_total, "check device existence")
4075
    info("checking volume groups")
4076
    my_vg = cfg.GetVGName()
4077
    results = rpc.call_vg_list([pri_node, new_node])
4078
    if not results:
4079
      raise errors.OpExecError("Can't list volume groups on the nodes")
4080
    for node in pri_node, new_node:
4081
      res = results.get(node, False)
4082
      if not res or my_vg not in res:
4083
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4084
                                 (my_vg, node))
4085
    for dev in instance.disks:
4086
      if not dev.iv_name in self.op.disks:
4087
        continue
4088
      info("checking %s on %s" % (dev.iv_name, pri_node))
4089
      cfg.SetDiskID(dev, pri_node)
4090
      if not rpc.call_blockdev_find(pri_node, dev):
4091
        raise errors.OpExecError("Can't find device %s on node %s" %
4092
                                 (dev.iv_name, pri_node))
4093

    
4094
    # Step: check other node consistency
4095
    self.proc.LogStep(2, steps_total, "check peer consistency")
4096
    for dev in instance.disks:
4097
      if not dev.iv_name in self.op.disks:
4098
        continue
4099
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4100
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4101
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4102
                                 " unsafe to replace the secondary" %
4103
                                 pri_node)
4104

    
4105
    # Step: create new storage
4106
    self.proc.LogStep(3, steps_total, "allocate new storage")
4107
    for dev in instance.disks:
4108
      size = dev.size
4109
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4110
      # since we *always* want to create this LV, we use the
4111
      # _Create...OnPrimary (which forces the creation), even if we
4112
      # are talking about the secondary node
4113
      for new_lv in dev.children:
4114
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4115
                                        _GetInstanceInfoText(instance)):
4116
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4117
                                   " node '%s'" %
4118
                                   (new_lv.logical_id[1], new_node))
4119

    
4120
      iv_names[dev.iv_name] = (dev, dev.children)
4121

    
4122
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4123
    for dev in instance.disks:
4124
      size = dev.size
4125
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4126
      # create new devices on new_node
4127
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4128
                              logical_id=(pri_node, new_node,
4129
                                          dev.logical_id[2]),
4130
                              children=dev.children)
4131
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4132
                                        new_drbd, False,
4133
                                      _GetInstanceInfoText(instance)):
4134
        raise errors.OpExecError("Failed to create new DRBD on"
4135
                                 " node '%s'" % new_node)
4136

    
4137
    for dev in instance.disks:
4138
      # we have new devices, shutdown the drbd on the old secondary
4139
      info("shutting down drbd for %s on old node" % dev.iv_name)
4140
      cfg.SetDiskID(dev, old_node)
4141
      if not rpc.call_blockdev_shutdown(old_node, dev):
4142
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4143
                hint="Please cleanup this device manually as soon as possible")
4144

    
4145
    info("detaching primary drbds from the network (=> standalone)")
4146
    done = 0
4147
    for dev in instance.disks:
4148
      cfg.SetDiskID(dev, pri_node)
4149
      # set the physical (unique in bdev terms) id to None, meaning
4150
      # detach from network
4151
      dev.physical_id = (None,) * len(dev.physical_id)
4152
      # and 'find' the device, which will 'fix' it to match the
4153
      # standalone state
4154
      if rpc.call_blockdev_find(pri_node, dev):
4155
        done += 1
4156
      else:
4157
        warning("Failed to detach drbd %s from network, unusual case" %
4158
                dev.iv_name)
4159

    
4160
    if not done:
4161
      # no detaches succeeded (very unlikely)
4162
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4163

    
4164
    # if we managed to detach at least one, we update all the disks of
4165
    # the instance to point to the new secondary
4166
    info("updating instance configuration")
4167
    for dev in instance.disks:
4168
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4169
      cfg.SetDiskID(dev, pri_node)
4170
    cfg.Update(instance)
4171

    
4172
    # and now perform the drbd attach
4173
    info("attaching primary drbds to new secondary (standalone => connected)")
4174
    failures = []
4175
    for dev in instance.disks:
4176
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4177
      # since the attach is smart, it's enough to 'find' the device,
4178
      # it will automatically activate the network, if the physical_id
4179
      # is correct
4180
      cfg.SetDiskID(dev, pri_node)
4181
      if not rpc.call_blockdev_find(pri_node, dev):
4182
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4183
                "please do a gnt-instance info to see the status of disks")
4184

    
4185
    # this can fail as the old devices are degraded and _WaitForSync
4186
    # does a combined result over all disks, so we don't check its
4187
    # return value
4188
    self.proc.LogStep(5, steps_total, "sync devices")
4189
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4190

    
4191
    # so check manually all the devices
4192
    for name, (dev, old_lvs) in iv_names.iteritems():
4193
      cfg.SetDiskID(dev, pri_node)
4194
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4195
      if is_degr:
4196
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4197

    
4198
    self.proc.LogStep(6, steps_total, "removing old storage")
4199
    for name, (dev, old_lvs) in iv_names.iteritems():
4200
      info("remove logical volumes for %s" % name)
4201
      for lv in old_lvs:
4202
        cfg.SetDiskID(lv, old_node)
4203
        if not rpc.call_blockdev_remove(old_node, lv):
4204
          warning("Can't remove LV on old secondary",
4205
                  hint="Cleanup stale volumes by hand")
4206

    
4207
  def Exec(self, feedback_fn):
4208
    """Execute disk replacement.
4209

4210
    This dispatches the disk replacement to the appropriate handler.
4211

4212
    """
4213
    instance = self.instance
4214
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4215
      fn = self._ExecRR1
4216
    elif instance.disk_template == constants.DT_DRBD8:
4217
      if self.op.remote_node is None:
4218
        fn = self._ExecD8DiskOnly
4219
      else:
4220
        fn = self._ExecD8Secondary
4221
    else:
4222
      raise errors.ProgrammerError("Unhandled disk replacement case")
4223
    return fn(feedback_fn)
4224

    
4225

    
4226
class LUQueryInstanceData(NoHooksLU):
4227
  """Query runtime instance data.
4228

4229
  """
4230
  _OP_REQP = ["instances"]
4231

    
4232
  def CheckPrereq(self):
4233
    """Check prerequisites.
4234

4235
    This only checks the optional instance list against the existing names.
4236

4237
    """
4238
    if not isinstance(self.op.instances, list):
4239
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4240
    if self.op.instances:
4241
      self.wanted_instances = []
4242
      names = self.op.instances
4243
      for name in names:
4244
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4245
        if instance is None:
4246
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4247
        self.wanted_instances.append(instance)
4248
    else:
4249
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4250
                               in self.cfg.GetInstanceList()]
4251
    return
4252

    
4253

    
4254
  def _ComputeDiskStatus(self, instance, snode, dev):
4255
    """Compute block device status.
4256

4257
    """
4258
    self.cfg.SetDiskID(dev, instance.primary_node)
4259
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4260
    if dev.dev_type in constants.LDS_DRBD:
4261
      # we change the snode then (otherwise we use the one passed in)
4262
      if dev.logical_id[0] == instance.primary_node:
4263
        snode = dev.logical_id[1]
4264
      else:
4265
        snode = dev.logical_id[0]
4266

    
4267
    if snode:
4268
      self.cfg.SetDiskID(dev, snode)
4269
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4270
    else:
4271
      dev_sstatus = None
4272

    
4273
    if dev.children:
4274
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4275
                      for child in dev.children]
4276
    else:
4277
      dev_children = []
4278

    
4279
    data = {
4280
      "iv_name": dev.iv_name,
4281
      "dev_type": dev.dev_type,
4282
      "logical_id": dev.logical_id,
4283
      "physical_id": dev.physical_id,
4284
      "pstatus": dev_pstatus,
4285
      "sstatus": dev_sstatus,
4286
      "children": dev_children,
4287
      }
4288

    
4289
    return data
4290

    
4291
  def Exec(self, feedback_fn):
4292
    """Gather and return data"""
4293
    result = {}
4294
    for instance in self.wanted_instances:
4295
      remote_info = rpc.call_instance_info(instance.primary_node,
4296
                                                instance.name)
4297
      if remote_info and "state" in remote_info:
4298
        remote_state = "up"
4299
      else:
4300
        remote_state = "down"
4301
      if instance.status == "down":
4302
        config_state = "down"
4303
      else:
4304
        config_state = "up"
4305

    
4306
      disks = [self._ComputeDiskStatus(instance, None, device)
4307
               for device in instance.disks]
4308

    
4309
      idict = {
4310
        "name": instance.name,
4311
        "config_state": config_state,
4312
        "run_state": remote_state,
4313
        "pnode": instance.primary_node,
4314
        "snodes": instance.secondary_nodes,
4315
        "os": instance.os,
4316
        "memory": instance.memory,
4317
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4318
        "disks": disks,
4319
        "network_port": instance.network_port,
4320
        "vcpus": instance.vcpus,
4321
        "kernel_path": instance.kernel_path,
4322
        "initrd_path": instance.initrd_path,
4323
        "hvm_boot_order": instance.hvm_boot_order,
4324
        }
4325

    
4326
      result[instance.name] = idict
4327

    
4328
    return result
4329

    
4330

    
4331
class LUSetInstanceParams(LogicalUnit):
4332
  """Modifies an instances's parameters.
4333

4334
  """
4335
  HPATH = "instance-modify"
4336
  HTYPE = constants.HTYPE_INSTANCE
4337
  _OP_REQP = ["instance_name"]
4338

    
4339
  def BuildHooksEnv(self):
4340
    """Build hooks env.
4341

4342
    This runs on the master, primary and secondaries.
4343

4344
    """
4345
    args = dict()
4346
    if self.mem:
4347
      args['memory'] = self.mem
4348
    if self.vcpus:
4349
      args['vcpus'] = self.vcpus
4350
    if self.do_ip or self.do_bridge or self.mac:
4351
      if self.do_ip:
4352
        ip = self.ip
4353
      else:
4354
        ip = self.instance.nics[0].ip
4355
      if self.bridge:
4356
        bridge = self.bridge
4357
      else:
4358
        bridge = self.instance.nics[0].bridge
4359
      if self.mac:
4360
        mac = self.mac
4361
      else:
4362
        mac = self.instance.nics[0].mac
4363
      args['nics'] = [(ip, bridge, mac)]
4364
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4365
    nl = [self.sstore.GetMasterNode(),
4366
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4367
    return env, nl, nl
4368

    
4369
  def CheckPrereq(self):
4370
    """Check prerequisites.
4371

4372
    This only checks the instance list against the existing names.
4373

4374
    """
4375
    self.mem = getattr(self.op, "mem", None)
4376
    self.vcpus = getattr(self.op, "vcpus", None)
4377
    self.ip = getattr(self.op, "ip", None)
4378
    self.mac = getattr(self.op, "mac", None)
4379
    self.bridge = getattr(self.op, "bridge", None)
4380
    self.kernel_path = getattr(self.op, "kernel_path", None)
4381
    self.initrd_path = getattr(self.op, "initrd_path", None)
4382
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4383
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4384
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4385
    if all_params.count(None) == len(all_params):
4386
      raise errors.OpPrereqError("No changes submitted")
4387
    if self.mem is not None:
4388
      try:
4389
        self.mem = int(self.mem)
4390
      except ValueError, err:
4391
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4392
    if self.vcpus is not None:
4393
      try:
4394
        self.vcpus = int(self.vcpus)
4395
      except ValueError, err:
4396
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4397
    if self.ip is not None:
4398
      self.do_ip = True
4399
      if self.ip.lower() == "none":
4400
        self.ip = None
4401
      else:
4402
        if not utils.IsValidIP(self.ip):
4403
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4404
    else:
4405
      self.do_ip = False
4406
    self.do_bridge = (self.bridge is not None)
4407
    if self.mac is not None:
4408
      if self.cfg.IsMacInUse(self.mac):
4409
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4410
                                   self.mac)
4411
      if not utils.IsValidMac(self.mac):
4412
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4413

    
4414
    if self.kernel_path is not None:
4415
      self.do_kernel_path = True
4416
      if self.kernel_path == constants.VALUE_NONE:
4417
        raise errors.OpPrereqError("Can't set instance to no kernel")
4418

    
4419
      if self.kernel_path != constants.VALUE_DEFAULT:
4420
        if not os.path.isabs(self.kernel_path):
4421
          raise errors.OpPrereqError("The kernel path must be an absolute"
4422
                                    " filename")
4423
    else:
4424
      self.do_kernel_path = False
4425

    
4426
    if self.initrd_path is not None:
4427
      self.do_initrd_path = True
4428
      if self.initrd_path not in (constants.VALUE_NONE,
4429
                                  constants.VALUE_DEFAULT):
4430
        if not os.path.isabs(self.initrd_path):
4431
          raise errors.OpPrereqError("The initrd path must be an absolute"
4432
                                    " filename")
4433
    else:
4434
      self.do_initrd_path = False
4435

    
4436
    # boot order verification
4437
    if self.hvm_boot_order is not None:
4438
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4439
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4440
          raise errors.OpPrereqError("invalid boot order specified,"
4441
                                     " must be one or more of [acdn]"
4442
                                     " or 'default'")
4443

    
4444
    instance = self.cfg.GetInstanceInfo(
4445
      self.cfg.ExpandInstanceName(self.op.instance_name))
4446
    if instance is None:
4447
      raise errors.OpPrereqError("No such instance name '%s'" %
4448
                                 self.op.instance_name)
4449
    self.op.instance_name = instance.name
4450
    self.instance = instance
4451
    return
4452

    
4453
  def Exec(self, feedback_fn):
4454
    """Modifies an instance.
4455

4456
    All parameters take effect only at the next restart of the instance.
4457
    """
4458
    result = []
4459
    instance = self.instance
4460
    if self.mem:
4461
      instance.memory = self.mem
4462
      result.append(("mem", self.mem))
4463
    if self.vcpus:
4464
      instance.vcpus = self.vcpus
4465
      result.append(("vcpus",  self.vcpus))
4466
    if self.do_ip:
4467
      instance.nics[0].ip = self.ip
4468
      result.append(("ip", self.ip))
4469
    if self.bridge:
4470
      instance.nics[0].bridge = self.bridge
4471
      result.append(("bridge", self.bridge))
4472
    if self.mac:
4473
      instance.nics[0].mac = self.mac
4474
      result.append(("mac", self.mac))
4475
    if self.do_kernel_path:
4476
      instance.kernel_path = self.kernel_path
4477
      result.append(("kernel_path", self.kernel_path))
4478
    if self.do_initrd_path:
4479
      instance.initrd_path = self.initrd_path
4480
      result.append(("initrd_path", self.initrd_path))
4481
    if self.hvm_boot_order:
4482
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4483
        instance.hvm_boot_order = None
4484
      else:
4485
        instance.hvm_boot_order = self.hvm_boot_order
4486
      result.append(("hvm_boot_order", self.hvm_boot_order))
4487

    
4488
    self.cfg.AddInstance(instance)
4489

    
4490
    return result
4491

    
4492

    
4493
class LUQueryExports(NoHooksLU):
4494
  """Query the exports list
4495

4496
  """
4497
  _OP_REQP = []
4498

    
4499
  def CheckPrereq(self):
4500
    """Check that the nodelist contains only existing nodes.
4501

4502
    """
4503
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4504

    
4505
  def Exec(self, feedback_fn):
4506
    """Compute the list of all the exported system images.
4507

4508
    Returns:
4509
      a dictionary with the structure node->(export-list)
4510
      where export-list is a list of the instances exported on
4511
      that node.
4512

4513
    """
4514
    return rpc.call_export_list(self.nodes)
4515

    
4516

    
4517
class LUExportInstance(LogicalUnit):
4518
  """Export an instance to an image in the cluster.
4519

4520
  """
4521
  HPATH = "instance-export"
4522
  HTYPE = constants.HTYPE_INSTANCE
4523
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4524

    
4525
  def BuildHooksEnv(self):
4526
    """Build hooks env.
4527

4528
    This will run on the master, primary node and target node.
4529

4530
    """
4531
    env = {
4532
      "EXPORT_NODE": self.op.target_node,
4533
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4534
      }
4535
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4536
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4537
          self.op.target_node]
4538
    return env, nl, nl
4539

    
4540
  def CheckPrereq(self):
4541
    """Check prerequisites.
4542

4543
    This checks that the instance and node names are valid.
4544

4545
    """
4546
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4547
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4548
    if self.instance is None:
4549
      raise errors.OpPrereqError("Instance '%s' not found" %
4550
                                 self.op.instance_name)
4551

    
4552
    # node verification
4553
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4554
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4555

    
4556
    if self.dst_node is None:
4557
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4558
                                 self.op.target_node)
4559
    self.op.target_node = self.dst_node.name
4560

    
4561
    # instance disk type verification
4562
    for disk in self.instance.disks:
4563
      if disk.dev_type == constants.LD_FILE:
4564
        raise errors.OpPrereqError("Export not supported for instances with"
4565
                                   " file-based disks")
4566

    
4567
  def Exec(self, feedback_fn):
4568
    """Export an instance to an image in the cluster.
4569

4570
    """
4571
    instance = self.instance
4572
    dst_node = self.dst_node
4573
    src_node = instance.primary_node
4574
    if self.op.shutdown:
4575
      # shutdown the instance, but not the disks
4576
      if not rpc.call_instance_shutdown(src_node, instance):
4577
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4578
                                  (instance.name, src_node))
4579

    
4580
    vgname = self.cfg.GetVGName()
4581

    
4582
    snap_disks = []
4583

    
4584
    try:
4585
      for disk in instance.disks:
4586
        if disk.iv_name == "sda":
4587
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4588
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4589

    
4590
          if not new_dev_name:
4591
            logger.Error("could not snapshot block device %s on node %s" %
4592
                         (disk.logical_id[1], src_node))
4593
          else:
4594
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4595
                                      logical_id=(vgname, new_dev_name),
4596
                                      physical_id=(vgname, new_dev_name),
4597
                                      iv_name=disk.iv_name)
4598
            snap_disks.append(new_dev)
4599

    
4600
    finally:
4601
      if self.op.shutdown and instance.status == "up":
4602
        if not rpc.call_instance_start(src_node, instance, None):
4603
          _ShutdownInstanceDisks(instance, self.cfg)
4604
          raise errors.OpExecError("Could not start instance")
4605

    
4606
    # TODO: check for size
4607

    
4608
    for dev in snap_disks:
4609
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4610
        logger.Error("could not export block device %s from node %s to node %s"
4611
                     % (dev.logical_id[1], src_node, dst_node.name))
4612
      if not rpc.call_blockdev_remove(src_node, dev):
4613
        logger.Error("could not remove snapshot block device %s from node %s" %
4614
                     (dev.logical_id[1], src_node))
4615

    
4616
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4617
      logger.Error("could not finalize export for instance %s on node %s" %
4618
                   (instance.name, dst_node.name))
4619

    
4620
    nodelist = self.cfg.GetNodeList()
4621
    nodelist.remove(dst_node.name)
4622

    
4623
    # on one-node clusters nodelist will be empty after the removal
4624
    # if we proceed the backup would be removed because OpQueryExports
4625
    # substitutes an empty list with the full cluster node list.
4626
    if nodelist:
4627
      op = opcodes.OpQueryExports(nodes=nodelist)
4628
      exportlist = self.proc.ChainOpCode(op)
4629
      for node in exportlist:
4630
        if instance.name in exportlist[node]:
4631
          if not rpc.call_export_remove(node, instance.name):
4632
            logger.Error("could not remove older export for instance %s"
4633
                         " on node %s" % (instance.name, node))
4634

    
4635

    
4636
class LURemoveExport(NoHooksLU):
4637
  """Remove exports related to the named instance.
4638

4639
  """
4640
  _OP_REQP = ["instance_name"]
4641

    
4642
  def CheckPrereq(self):
4643
    """Check prerequisites.
4644
    """
4645
    pass
4646

    
4647
  def Exec(self, feedback_fn):
4648
    """Remove any export.
4649

4650
    """
4651
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4652
    # If the instance was not found we'll try with the name that was passed in.
4653
    # This will only work if it was an FQDN, though.
4654
    fqdn_warn = False
4655
    if not instance_name:
4656
      fqdn_warn = True
4657
      instance_name = self.op.instance_name
4658

    
4659
    op = opcodes.OpQueryExports(nodes=[])
4660
    exportlist = self.proc.ChainOpCode(op)
4661
    found = False
4662
    for node in exportlist:
4663
      if instance_name in exportlist[node]:
4664
        found = True
4665
        if not rpc.call_export_remove(node, instance_name):
4666
          logger.Error("could not remove export for instance %s"
4667
                       " on node %s" % (instance_name, node))
4668

    
4669
    if fqdn_warn and not found:
4670
      feedback_fn("Export not found. If trying to remove an export belonging"
4671
                  " to a deleted instance please use its Fully Qualified"
4672
                  " Domain Name.")
4673

    
4674

    
4675
class TagsLU(NoHooksLU):
4676
  """Generic tags LU.
4677

4678
  This is an abstract class which is the parent of all the other tags LUs.
4679

4680
  """
4681
  def CheckPrereq(self):
4682
    """Check prerequisites.
4683

4684
    """
4685
    if self.op.kind == constants.TAG_CLUSTER:
4686
      self.target = self.cfg.GetClusterInfo()
4687
    elif self.op.kind == constants.TAG_NODE:
4688
      name = self.cfg.ExpandNodeName(self.op.name)
4689
      if name is None:
4690
        raise errors.OpPrereqError("Invalid node name (%s)" %
4691
                                   (self.op.name,))
4692
      self.op.name = name
4693
      self.target = self.cfg.GetNodeInfo(name)
4694
    elif self.op.kind == constants.TAG_INSTANCE:
4695
      name = self.cfg.ExpandInstanceName(self.op.name)
4696
      if name is None:
4697
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4698
                                   (self.op.name,))
4699
      self.op.name = name
4700
      self.target = self.cfg.GetInstanceInfo(name)
4701
    else:
4702
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4703
                                 str(self.op.kind))
4704

    
4705

    
4706
class LUGetTags(TagsLU):
4707
  """Returns the tags of a given object.
4708

4709
  """
4710
  _OP_REQP = ["kind", "name"]
4711

    
4712
  def Exec(self, feedback_fn):
4713
    """Returns the tag list.
4714

4715
    """
4716
    return self.target.GetTags()
4717

    
4718

    
4719
class LUSearchTags(NoHooksLU):
4720
  """Searches the tags for a given pattern.
4721

4722
  """
4723
  _OP_REQP = ["pattern"]
4724

    
4725
  def CheckPrereq(self):
4726
    """Check prerequisites.
4727

4728
    This checks the pattern passed for validity by compiling it.
4729

4730
    """
4731
    try:
4732
      self.re = re.compile(self.op.pattern)
4733
    except re.error, err:
4734
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4735
                                 (self.op.pattern, err))
4736

    
4737
  def Exec(self, feedback_fn):
4738
    """Returns the tag list.
4739

4740
    """
4741
    cfg = self.cfg
4742
    tgts = [("/cluster", cfg.GetClusterInfo())]
4743
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4744
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4745
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4746
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4747
    results = []
4748
    for path, target in tgts:
4749
      for tag in target.GetTags():
4750
        if self.re.search(tag):
4751
          results.append((path, tag))
4752
    return results
4753

    
4754

    
4755
class LUAddTags(TagsLU):
4756
  """Sets a tag on a given object.
4757

4758
  """
4759
  _OP_REQP = ["kind", "name", "tags"]
4760

    
4761
  def CheckPrereq(self):
4762
    """Check prerequisites.
4763

4764
    This checks the type and length of the tag name and value.
4765

4766
    """
4767
    TagsLU.CheckPrereq(self)
4768
    for tag in self.op.tags:
4769
      objects.TaggableObject.ValidateTag(tag)
4770

    
4771
  def Exec(self, feedback_fn):
4772
    """Sets the tag.
4773

4774
    """
4775
    try:
4776
      for tag in self.op.tags:
4777
        self.target.AddTag(tag)
4778
    except errors.TagError, err:
4779
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4780
    try:
4781
      self.cfg.Update(self.target)
4782
    except errors.ConfigurationError:
4783
      raise errors.OpRetryError("There has been a modification to the"
4784
                                " config file and the operation has been"
4785
                                " aborted. Please retry.")
4786

    
4787

    
4788
class LUDelTags(TagsLU):
4789
  """Delete a list of tags from a given object.
4790

4791
  """
4792
  _OP_REQP = ["kind", "name", "tags"]
4793

    
4794
  def CheckPrereq(self):
4795
    """Check prerequisites.
4796

4797
    This checks that we have the given tag.
4798

4799
    """
4800
    TagsLU.CheckPrereq(self)
4801
    for tag in self.op.tags:
4802
      objects.TaggableObject.ValidateTag(tag)
4803
    del_tags = frozenset(self.op.tags)
4804
    cur_tags = self.target.GetTags()
4805
    if not del_tags <= cur_tags:
4806
      diff_tags = del_tags - cur_tags
4807
      diff_names = ["'%s'" % tag for tag in diff_tags]
4808
      diff_names.sort()
4809
      raise errors.OpPrereqError("Tag(s) %s not found" %
4810
                                 (",".join(diff_names)))
4811

    
4812
  def Exec(self, feedback_fn):
4813
    """Remove the tag from the object.
4814

4815
    """
4816
    for tag in self.op.tags:
4817
      self.target.RemoveTag(tag)
4818
    try:
4819
      self.cfg.Update(self.target)
4820
    except errors.ConfigurationError:
4821
      raise errors.OpRetryError("There has been a modification to the"
4822
                                " config file and the operation has been"
4823
                                " aborted. Please retry.")
4824

    
4825
class LUTestDelay(NoHooksLU):
4826
  """Sleep for a specified amount of time.
4827

4828
  This LU sleeps on the master and/or nodes for a specified amoutn of
4829
  time.
4830

4831
  """
4832
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4833

    
4834
  def CheckPrereq(self):
4835
    """Check prerequisites.
4836

4837
    This checks that we have a good list of nodes and/or the duration
4838
    is valid.
4839

4840
    """
4841

    
4842
    if self.op.on_nodes:
4843
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4844

    
4845
  def Exec(self, feedback_fn):
4846
    """Do the actual sleep.
4847

4848
    """
4849
    if self.op.on_master:
4850
      if not utils.TestDelay(self.op.duration):
4851
        raise errors.OpExecError("Error during master delay test")
4852
    if self.op.on_nodes:
4853
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4854
      if not result:
4855
        raise errors.OpExecError("Complete failure from rpc call")
4856
      for node, node_result in result.items():
4857
        if not node_result:
4858
          raise errors.OpExecError("Failure during rpc call to node %s,"
4859
                                   " result: %s" % (node, node_result))
4860

    
4861

    
4862
class IAllocator(object):
4863
  """IAllocator framework.
4864

4865
  An IAllocator instance has three sets of attributes:
4866
    - cfg/sstore that are needed to query the cluster
4867
    - input data (all members of the _KEYS class attribute are required)
4868
    - four buffer attributes (in|out_data|text), that represent the
4869
      input (to the external script) in text and data structure format,
4870
      and the output from it, again in two formats
4871
    - the result variables from the script (success, info, nodes) for
4872
      easy usage
4873

4874
  """
4875
  _ALLO_KEYS = [
4876
    "mem_size", "disks", "disk_template",
4877
    "os", "tags", "nics", "vcpus",
4878
    ]
4879
  _RELO_KEYS = [
4880
    "relocate_from",
4881
    ]
4882

    
4883
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4884
    self.cfg = cfg
4885
    self.sstore = sstore
4886
    # init buffer variables
4887
    self.in_text = self.out_text = self.in_data = self.out_data = None
4888
    # init all input fields so that pylint is happy
4889
    self.mode = mode
4890
    self.name = name
4891
    self.mem_size = self.disks = self.disk_template = None
4892
    self.os = self.tags = self.nics = self.vcpus = None
4893
    self.relocate_from = None
4894
    # computed fields
4895
    self.required_nodes = None
4896
    # init result fields
4897
    self.success = self.info = self.nodes = None
4898
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4899
      keyset = self._ALLO_KEYS
4900
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4901
      keyset = self._RELO_KEYS
4902
    else:
4903
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4904
                                   " IAllocator" % self.mode)
4905
    for key in kwargs:
4906
      if key not in keyset:
4907
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4908
                                     " IAllocator" % key)
4909
      setattr(self, key, kwargs[key])
4910
    for key in keyset:
4911
      if key not in kwargs:
4912
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4913
                                     " IAllocator" % key)
4914
    self._BuildInputData()
4915

    
4916
  def _ComputeClusterData(self):
4917
    """Compute the generic allocator input data.
4918

4919
    This is the data that is independent of the actual operation.
4920

4921
    """
4922
    cfg = self.cfg
4923
    # cluster data
4924
    data = {
4925
      "version": 1,
4926
      "cluster_name": self.sstore.GetClusterName(),
4927
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4928
      "hypervisor_type": self.sstore.GetHypervisorType(),
4929
      # we don't have job IDs
4930
      }
4931

    
4932
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4933

    
4934
    # node data
4935
    node_results = {}
4936
    node_list = cfg.GetNodeList()
4937
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4938
    for nname in node_list:
4939
      ninfo = cfg.GetNodeInfo(nname)
4940
      if nname not in node_data or not isinstance(node_data[nname], dict):
4941
        raise errors.OpExecError("Can't get data for node %s" % nname)
4942
      remote_info = node_data[nname]
4943
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4944
                   'vg_size', 'vg_free', 'cpu_total']:
4945
        if attr not in remote_info:
4946
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4947
                                   (nname, attr))
4948
        try:
4949
          remote_info[attr] = int(remote_info[attr])
4950
        except ValueError, err:
4951
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4952
                                   " %s" % (nname, attr, str(err)))
4953
      # compute memory used by primary instances
4954
      i_p_mem = i_p_up_mem = 0
4955
      for iinfo in i_list:
4956
        if iinfo.primary_node == nname:
4957
          i_p_mem += iinfo.memory
4958
          if iinfo.status == "up":
4959
            i_p_up_mem += iinfo.memory
4960

    
4961
      # compute memory used by instances
4962
      pnr = {
4963
        "tags": list(ninfo.GetTags()),
4964
        "total_memory": remote_info['memory_total'],
4965
        "reserved_memory": remote_info['memory_dom0'],
4966
        "free_memory": remote_info['memory_free'],
4967
        "i_pri_memory": i_p_mem,
4968
        "i_pri_up_memory": i_p_up_mem,
4969
        "total_disk": remote_info['vg_size'],
4970
        "free_disk": remote_info['vg_free'],
4971
        "primary_ip": ninfo.primary_ip,
4972
        "secondary_ip": ninfo.secondary_ip,
4973
        "total_cpus": remote_info['cpu_total'],
4974
        }
4975
      node_results[nname] = pnr
4976
    data["nodes"] = node_results
4977

    
4978
    # instance data
4979
    instance_data = {}
4980
    for iinfo in i_list:
4981
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4982
                  for n in iinfo.nics]
4983
      pir = {
4984
        "tags": list(iinfo.GetTags()),
4985
        "should_run": iinfo.status == "up",
4986
        "vcpus": iinfo.vcpus,
4987
        "memory": iinfo.memory,
4988
        "os": iinfo.os,
4989
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4990
        "nics": nic_data,
4991
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4992
        "disk_template": iinfo.disk_template,
4993
        }
4994
      instance_data[iinfo.name] = pir
4995

    
4996
    data["instances"] = instance_data
4997

    
4998
    self.in_data = data
4999

    
5000
  def _AddNewInstance(self):
5001
    """Add new instance data to allocator structure.
5002

5003
    This in combination with _AllocatorGetClusterData will create the
5004
    correct structure needed as input for the allocator.
5005

5006
    The checks for the completeness of the opcode must have already been
5007
    done.
5008

5009
    """
5010
    data = self.in_data
5011
    if len(self.disks) != 2:
5012
      raise errors.OpExecError("Only two-disk configurations supported")
5013

    
5014
    disk_space = _ComputeDiskSize(self.disk_template,
5015
                                  self.disks[0]["size"], self.disks[1]["size"])
5016

    
5017
    if self.disk_template in constants.DTS_NET_MIRROR:
5018
      self.required_nodes = 2
5019
    else:
5020
      self.required_nodes = 1
5021
    request = {
5022
      "type": "allocate",
5023
      "name": self.name,
5024
      "disk_template": self.disk_template,
5025
      "tags": self.tags,
5026
      "os": self.os,
5027
      "vcpus": self.vcpus,
5028
      "memory": self.mem_size,
5029
      "disks": self.disks,
5030
      "disk_space_total": disk_space,
5031
      "nics": self.nics,
5032
      "required_nodes": self.required_nodes,
5033
      }
5034
    data["request"] = request
5035

    
5036
  def _AddRelocateInstance(self):
5037
    """Add relocate instance data to allocator structure.
5038

5039
    This in combination with _IAllocatorGetClusterData will create the
5040
    correct structure needed as input for the allocator.
5041

5042
    The checks for the completeness of the opcode must have already been
5043
    done.
5044

5045
    """
5046
    instance = self.cfg.GetInstanceInfo(self.name)
5047
    if instance is None:
5048
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5049
                                   " IAllocator" % self.name)
5050

    
5051
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5052
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5053

    
5054
    if len(instance.secondary_nodes) != 1:
5055
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5056

    
5057
    self.required_nodes = 1
5058

    
5059
    disk_space = _ComputeDiskSize(instance.disk_template,
5060
                                  instance.disks[0].size,
5061
                                  instance.disks[1].size)
5062

    
5063
    request = {
5064
      "type": "relocate",
5065
      "name": self.name,
5066
      "disk_space_total": disk_space,
5067
      "required_nodes": self.required_nodes,
5068
      "relocate_from": self.relocate_from,
5069
      }
5070
    self.in_data["request"] = request
5071

    
5072
  def _BuildInputData(self):
5073
    """Build input data structures.
5074

5075
    """
5076
    self._ComputeClusterData()
5077

    
5078
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5079
      self._AddNewInstance()
5080
    else:
5081
      self._AddRelocateInstance()
5082

    
5083
    self.in_text = serializer.Dump(self.in_data)
5084

    
5085
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5086
    """Run an instance allocator and return the results.
5087

5088
    """
5089
    data = self.in_text
5090

    
5091
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5092

    
5093
    if not isinstance(result, tuple) or len(result) != 4:
5094
      raise errors.OpExecError("Invalid result from master iallocator runner")
5095

    
5096
    rcode, stdout, stderr, fail = result
5097

    
5098
    if rcode == constants.IARUN_NOTFOUND:
5099
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5100
    elif rcode == constants.IARUN_FAILURE:
5101
        raise errors.OpExecError("Instance allocator call failed: %s,"
5102
                                 " output: %s" %
5103
                                 (fail, stdout+stderr))
5104
    self.out_text = stdout
5105
    if validate:
5106
      self._ValidateResult()
5107

    
5108
  def _ValidateResult(self):
5109
    """Process the allocator results.
5110

5111
    This will process and if successful save the result in
5112
    self.out_data and the other parameters.
5113

5114
    """
5115
    try:
5116
      rdict = serializer.Load(self.out_text)
5117
    except Exception, err:
5118
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5119

    
5120
    if not isinstance(rdict, dict):
5121
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5122

    
5123
    for key in "success", "info", "nodes":
5124
      if key not in rdict:
5125
        raise errors.OpExecError("Can't parse iallocator results:"
5126
                                 " missing key '%s'" % key)
5127
      setattr(self, key, rdict[key])
5128

    
5129
    if not isinstance(rdict["nodes"], list):
5130
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5131
                               " is not a list")
5132
    self.out_data = rdict
5133

    
5134

    
5135
class LUTestAllocator(NoHooksLU):
5136
  """Run allocator tests.
5137

5138
  This LU runs the allocator tests
5139

5140
  """
5141
  _OP_REQP = ["direction", "mode", "name"]
5142

    
5143
  def CheckPrereq(self):
5144
    """Check prerequisites.
5145

5146
    This checks the opcode parameters depending on the director and mode test.
5147

5148
    """
5149
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5150
      for attr in ["name", "mem_size", "disks", "disk_template",
5151
                   "os", "tags", "nics", "vcpus"]:
5152
        if not hasattr(self.op, attr):
5153
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5154
                                     attr)
5155
      iname = self.cfg.ExpandInstanceName(self.op.name)
5156
      if iname is not None:
5157
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5158
                                   iname)
5159
      if not isinstance(self.op.nics, list):
5160
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5161
      for row in self.op.nics:
5162
        if (not isinstance(row, dict) or
5163
            "mac" not in row or
5164
            "ip" not in row or
5165
            "bridge" not in row):
5166
          raise errors.OpPrereqError("Invalid contents of the"
5167
                                     " 'nics' parameter")
5168
      if not isinstance(self.op.disks, list):
5169
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5170
      if len(self.op.disks) != 2:
5171
        raise errors.OpPrereqError("Only two-disk configurations supported")
5172
      for row in self.op.disks:
5173
        if (not isinstance(row, dict) or
5174
            "size" not in row or
5175
            not isinstance(row["size"], int) or
5176
            "mode" not in row or
5177
            row["mode"] not in ['r', 'w']):
5178
          raise errors.OpPrereqError("Invalid contents of the"
5179
                                     " 'disks' parameter")
5180
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5181
      if not hasattr(self.op, "name"):
5182
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5183
      fname = self.cfg.ExpandInstanceName(self.op.name)
5184
      if fname is None:
5185
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5186
                                   self.op.name)
5187
      self.op.name = fname
5188
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5189
    else:
5190
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5191
                                 self.op.mode)
5192

    
5193
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5194
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5195
        raise errors.OpPrereqError("Missing allocator name")
5196
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5197
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5198
                                 self.op.direction)
5199

    
5200
  def Exec(self, feedback_fn):
5201
    """Run the allocator test.
5202

5203
    """
5204
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5205
      ial = IAllocator(self.cfg, self.sstore,
5206
                       mode=self.op.mode,
5207
                       name=self.op.name,
5208
                       mem_size=self.op.mem_size,
5209
                       disks=self.op.disks,
5210
                       disk_template=self.op.disk_template,
5211
                       os=self.op.os,
5212
                       tags=self.op.tags,
5213
                       nics=self.op.nics,
5214
                       vcpus=self.op.vcpus,
5215
                       )
5216
    else:
5217
      ial = IAllocator(self.cfg, self.sstore,
5218
                       mode=self.op.mode,
5219
                       name=self.op.name,
5220
                       relocate_from=list(self.relocate_from),
5221
                       )
5222

    
5223
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5224
      result = ial.in_text
5225
    else:
5226
      ial.Run(self.op.allocator, validate=False)
5227
      result = ial.out_text
5228
    return result