Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 8d14b30d

History | View | Annotate | Download (166.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    As for the node lists, the master should not be included in the
149
    them, as it will be added by the hooks runner in case this LU
150
    requires a cluster to run on (otherwise we don't have a node
151
    list). No nodes should be returned as an empty list (and not
152
    None).
153

154
    Note that if the HPATH for a LU class is None, this function will
155
    not be called.
156

157
    """
158
    raise NotImplementedError
159

    
160

    
161
class NoHooksLU(LogicalUnit):
162
  """Simple LU which runs no hooks.
163

164
  This LU is intended as a parent for other LogicalUnits which will
165
  run no hooks, in order to reduce duplicate code.
166

167
  """
168
  HPATH = None
169
  HTYPE = None
170

    
171
  def BuildHooksEnv(self):
172
    """Build hooks env.
173

174
    This is a no-op, since we don't run hooks.
175

176
    """
177
    return {}, [], []
178

    
179

    
180
def _AddHostToEtcHosts(hostname):
181
  """Wrapper around utils.SetEtcHostsEntry.
182

183
  """
184
  hi = utils.HostInfo(name=hostname)
185
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
186

    
187

    
188
def _RemoveHostFromEtcHosts(hostname):
189
  """Wrapper around utils.RemoveEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
194
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
195

    
196

    
197
def _GetWantedNodes(lu, nodes):
198
  """Returns list of checked and expanded node names.
199

200
  Args:
201
    nodes: List of nodes (strings) or None for all
202

203
  """
204
  if not isinstance(nodes, list):
205
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
206

    
207
  if nodes:
208
    wanted = []
209

    
210
    for name in nodes:
211
      node = lu.cfg.ExpandNodeName(name)
212
      if node is None:
213
        raise errors.OpPrereqError("No such node name '%s'" % name)
214
      wanted.append(node)
215

    
216
  else:
217
    wanted = lu.cfg.GetNodeList()
218
  return utils.NiceSort(wanted)
219

    
220

    
221
def _GetWantedInstances(lu, instances):
222
  """Returns list of checked and expanded instance names.
223

224
  Args:
225
    instances: List of instances (strings) or None for all
226

227
  """
228
  if not isinstance(instances, list):
229
    raise errors.OpPrereqError("Invalid argument type 'instances'")
230

    
231
  if instances:
232
    wanted = []
233

    
234
    for name in instances:
235
      instance = lu.cfg.ExpandInstanceName(name)
236
      if instance is None:
237
        raise errors.OpPrereqError("No such instance name '%s'" % name)
238
      wanted.append(instance)
239

    
240
  else:
241
    wanted = lu.cfg.GetInstanceList()
242
  return utils.NiceSort(wanted)
243

    
244

    
245
def _CheckOutputFields(static, dynamic, selected):
246
  """Checks whether all selected fields are valid.
247

248
  Args:
249
    static: Static fields
250
    dynamic: Dynamic fields
251

252
  """
253
  static_fields = frozenset(static)
254
  dynamic_fields = frozenset(dynamic)
255

    
256
  all_fields = static_fields | dynamic_fields
257

    
258
  if not all_fields.issuperset(selected):
259
    raise errors.OpPrereqError("Unknown output fields selected: %s"
260
                               % ",".join(frozenset(selected).
261
                                          difference(all_fields)))
262

    
263

    
264
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
265
                          memory, vcpus, nics):
266
  """Builds instance related env variables for hooks from single variables.
267

268
  Args:
269
    secondary_nodes: List of secondary nodes as strings
270
  """
271
  env = {
272
    "OP_TARGET": name,
273
    "INSTANCE_NAME": name,
274
    "INSTANCE_PRIMARY": primary_node,
275
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
276
    "INSTANCE_OS_TYPE": os_type,
277
    "INSTANCE_STATUS": status,
278
    "INSTANCE_MEMORY": memory,
279
    "INSTANCE_VCPUS": vcpus,
280
  }
281

    
282
  if nics:
283
    nic_count = len(nics)
284
    for idx, (ip, bridge, mac) in enumerate(nics):
285
      if ip is None:
286
        ip = ""
287
      env["INSTANCE_NIC%d_IP" % idx] = ip
288
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
289
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
290
  else:
291
    nic_count = 0
292

    
293
  env["INSTANCE_NIC_COUNT"] = nic_count
294

    
295
  return env
296

    
297

    
298
def _BuildInstanceHookEnvByObject(instance, override=None):
299
  """Builds instance related env variables for hooks from an object.
300

301
  Args:
302
    instance: objects.Instance object of instance
303
    override: dict of values to override
304
  """
305
  args = {
306
    'name': instance.name,
307
    'primary_node': instance.primary_node,
308
    'secondary_nodes': instance.secondary_nodes,
309
    'os_type': instance.os,
310
    'status': instance.os,
311
    'memory': instance.memory,
312
    'vcpus': instance.vcpus,
313
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
314
  }
315
  if override:
316
    args.update(override)
317
  return _BuildInstanceHookEnv(**args)
318

    
319

    
320
def _HasValidVG(vglist, vgname):
321
  """Checks if the volume group list is valid.
322

323
  A non-None return value means there's an error, and the return value
324
  is the error message.
325

326
  """
327
  vgsize = vglist.get(vgname, None)
328
  if vgsize is None:
329
    return "volume group '%s' missing" % vgname
330
  elif vgsize < 20480:
331
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
332
            (vgname, vgsize))
333
  return None
334

    
335

    
336
def _InitSSHSetup(node):
337
  """Setup the SSH configuration for the cluster.
338

339

340
  This generates a dsa keypair for root, adds the pub key to the
341
  permitted hosts and adds the hostkey to its own known hosts.
342

343
  Args:
344
    node: the name of this host as a fqdn
345

346
  """
347
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348

    
349
  for name in priv_key, pub_key:
350
    if os.path.exists(name):
351
      utils.CreateBackup(name)
352
    utils.RemoveFile(name)
353

    
354
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
355
                         "-f", priv_key,
356
                         "-q", "-N", ""])
357
  if result.failed:
358
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
359
                             result.output)
360

    
361
  f = open(pub_key, 'r')
362
  try:
363
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
364
  finally:
365
    f.close()
366

    
367

    
368
def _InitGanetiServerSetup(ss):
369
  """Setup the necessary configuration for the initial node daemon.
370

371
  This creates the nodepass file containing the shared password for
372
  the cluster and also generates the SSL certificate.
373

374
  """
375
  # Create pseudo random password
376
  randpass = sha.new(os.urandom(64)).hexdigest()
377
  # and write it into sstore
378
  ss.SetKey(ss.SS_NODED_PASS, randpass)
379

    
380
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
381
                         "-days", str(365*5), "-nodes", "-x509",
382
                         "-keyout", constants.SSL_CERT_FILE,
383
                         "-out", constants.SSL_CERT_FILE, "-batch"])
384
  if result.failed:
385
    raise errors.OpExecError("could not generate server ssl cert, command"
386
                             " %s had exitcode %s and error message %s" %
387
                             (result.cmd, result.exit_code, result.output))
388

    
389
  os.chmod(constants.SSL_CERT_FILE, 0400)
390

    
391
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
392

    
393
  if result.failed:
394
    raise errors.OpExecError("Could not start the node daemon, command %s"
395
                             " had exitcode %s and error %s" %
396
                             (result.cmd, result.exit_code, result.output))
397

    
398

    
399
def _CheckInstanceBridgesExist(instance):
400
  """Check that the brigdes needed by an instance exist.
401

402
  """
403
  # check bridges existance
404
  brlist = [nic.bridge for nic in instance.nics]
405
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
406
    raise errors.OpPrereqError("one or more target bridges %s does not"
407
                               " exist on destination node '%s'" %
408
                               (brlist, instance.primary_node))
409

    
410

    
411
class LUInitCluster(LogicalUnit):
412
  """Initialise the cluster.
413

414
  """
415
  HPATH = "cluster-init"
416
  HTYPE = constants.HTYPE_CLUSTER
417
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
418
              "def_bridge", "master_netdev", "file_storage_dir"]
419
  REQ_CLUSTER = False
420

    
421
  def BuildHooksEnv(self):
422
    """Build hooks env.
423

424
    Notes: Since we don't require a cluster, we must manually add
425
    ourselves in the post-run node list.
426

427
    """
428
    env = {"OP_TARGET": self.op.cluster_name}
429
    return env, [], [self.hostname.name]
430

    
431
  def CheckPrereq(self):
432
    """Verify that the passed name is a valid one.
433

434
    """
435
    if config.ConfigWriter.IsCluster():
436
      raise errors.OpPrereqError("Cluster is already initialised")
437

    
438
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
439
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
440
        raise errors.OpPrereqError("Please prepare the cluster VNC"
441
                                   "password file %s" %
442
                                   constants.VNC_PASSWORD_FILE)
443

    
444
    self.hostname = hostname = utils.HostInfo()
445

    
446
    if hostname.ip.startswith("127."):
447
      raise errors.OpPrereqError("This host's IP resolves to the private"
448
                                 " range (%s). Please fix DNS or %s." %
449
                                 (hostname.ip, constants.ETC_HOSTS))
450

    
451
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
452
                         source=constants.LOCALHOST_IP_ADDRESS):
453
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
454
                                 " to %s,\nbut this ip address does not"
455
                                 " belong to this host."
456
                                 " Aborting." % hostname.ip)
457

    
458
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459

    
460
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461
                     timeout=5):
462
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463

    
464
    secondary_ip = getattr(self.op, "secondary_ip", None)
465
    if secondary_ip and not utils.IsValidIP(secondary_ip):
466
      raise errors.OpPrereqError("Invalid secondary ip given")
467
    if (secondary_ip and
468
        secondary_ip != hostname.ip and
469
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
470
                           source=constants.LOCALHOST_IP_ADDRESS))):
471
      raise errors.OpPrereqError("You gave %s as secondary IP,"
472
                                 " but it does not belong to this host." %
473
                                 secondary_ip)
474
    self.secondary_ip = secondary_ip
475

    
476
    if not hasattr(self.op, "vg_name"):
477
      self.op.vg_name = None
478
    # if vg_name not None, checks if volume group is valid
479
    if self.op.vg_name:
480
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481
      if vgstatus:
482
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
483
                                   " you are not using lvm" % vgstatus)
484

    
485
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486

    
487
    if not os.path.isabs(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory you have is"
489
                                 " not an absolute path.")
490

    
491
    if not os.path.exists(self.op.file_storage_dir):
492
      try:
493
        os.makedirs(self.op.file_storage_dir, 0750)
494
      except OSError, err:
495
        raise errors.OpPrereqError("Cannot create file storage directory"
496
                                   " '%s': %s" %
497
                                   (self.op.file_storage_dir, err))
498

    
499
    if not os.path.isdir(self.op.file_storage_dir):
500
      raise errors.OpPrereqError("The file storage directory '%s' is not"
501
                                 " a directory." % self.op.file_storage_dir)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
506
                                 self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
509
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
510
                                 self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
515
                                 (self.op.master_netdev,
516
                                  result.output.strip()))
517

    
518
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
519
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
520
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
521
                                 " executable." % constants.NODE_INITD_SCRIPT)
522

    
523
  def Exec(self, feedback_fn):
524
    """Initialize the cluster.
525

526
    """
527
    clustername = self.clustername
528
    hostname = self.hostname
529

    
530
    # set up the simple store
531
    self.sstore = ss = ssconf.SimpleStore()
532
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
533
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
534
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
535
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
536
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
537
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538

    
539
    # set up the inter-node password and certificate
540
    _InitGanetiServerSetup(ss)
541

    
542
    # start the master ip
543
    rpc.call_node_start_master(hostname.name)
544

    
545
    # set up ssh config and /etc/hosts
546
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
547
    try:
548
      sshline = f.read()
549
    finally:
550
      f.close()
551
    sshkey = sshline.split(" ")[1]
552

    
553
    _AddHostToEtcHosts(hostname.name)
554
    _InitSSHSetup(hostname.name)
555

    
556
    # init of cluster config file
557
    self.cfg = cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
559
                    sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
563

    
564

    
565
class LUDestroyCluster(NoHooksLU):
566
  """Logical unit for destroying the cluster.
567

568
  """
569
  _OP_REQP = []
570

    
571
  def CheckPrereq(self):
572
    """Check prerequisites.
573

574
    This checks whether the cluster is empty.
575

576
    Any errors are signalled by raising errors.OpPrereqError.
577

578
    """
579
    master = self.sstore.GetMasterNode()
580

    
581
    nodelist = self.cfg.GetNodeList()
582
    if len(nodelist) != 1 or nodelist[0] != master:
583
      raise errors.OpPrereqError("There are still %d node(s) in"
584
                                 " this cluster." % (len(nodelist) - 1))
585
    instancelist = self.cfg.GetInstanceList()
586
    if instancelist:
587
      raise errors.OpPrereqError("There are still %d instance(s) in"
588
                                 " this cluster." % len(instancelist))
589

    
590
  def Exec(self, feedback_fn):
591
    """Destroys the cluster.
592

593
    """
594
    master = self.sstore.GetMasterNode()
595
    if not rpc.call_node_stop_master(master):
596
      raise errors.OpExecError("Could not disable the master role")
597
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598
    utils.CreateBackup(priv_key)
599
    utils.CreateBackup(pub_key)
600
    rpc.call_node_leave_cluster(master)
601

    
602

    
603
class LUVerifyCluster(NoHooksLU):
604
  """Verifies the cluster status.
605

606
  """
607
  _OP_REQP = ["skip_checks"]
608

    
609
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
610
                  remote_version, feedback_fn):
611
    """Run multiple tests against a node.
612

613
    Test list:
614
      - compares ganeti version
615
      - checks vg existance and size > 20G
616
      - checks config file checksum
617
      - checks ssh to other nodes
618

619
    Args:
620
      node: name of the node to check
621
      file_list: required list of files
622
      local_cksum: dictionary of local files and their checksums
623

624
    """
625
    # compares ganeti version
626
    local_version = constants.PROTOCOL_VERSION
627
    if not remote_version:
628
      feedback_fn("  - ERROR: connection to %s failed" % (node))
629
      return True
630

    
631
    if local_version != remote_version:
632
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
633
                      (local_version, node, remote_version))
634
      return True
635

    
636
    # checks vg existance and size > 20G
637

    
638
    bad = False
639
    if not vglist:
640
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
641
                      (node,))
642
      bad = True
643
    else:
644
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645
      if vgstatus:
646
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
647
        bad = True
648

    
649
    # checks config file checksum
650
    # checks ssh to any
651

    
652
    if 'filelist' not in node_result:
653
      bad = True
654
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
655
    else:
656
      remote_cksum = node_result['filelist']
657
      for file_name in file_list:
658
        if file_name not in remote_cksum:
659
          bad = True
660
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
661
        elif remote_cksum[file_name] != local_cksum[file_name]:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
664

    
665
    if 'nodelist' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
668
    else:
669
      if node_result['nodelist']:
670
        bad = True
671
        for node in node_result['nodelist']:
672
          feedback_fn("  - ERROR: communication with node '%s': %s" %
673
                          (node, node_result['nodelist'][node]))
674
    hyp_result = node_result.get('hypervisor', None)
675
    if hyp_result is not None:
676
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
677
    return bad
678

    
679
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
680
                      node_instance, feedback_fn):
681
    """Verify an instance.
682

683
    This function checks to see if the required block devices are
684
    available on the instance's node.
685

686
    """
687
    bad = False
688

    
689
    node_current = instanceconfig.primary_node
690

    
691
    node_vol_should = {}
692
    instanceconfig.MapLVsByNode(node_vol_should)
693

    
694
    for node in node_vol_should:
695
      for volume in node_vol_should[node]:
696
        if node not in node_vol_is or volume not in node_vol_is[node]:
697
          feedback_fn("  - ERROR: volume %s missing on node %s" %
698
                          (volume, node))
699
          bad = True
700

    
701
    if not instanceconfig.status == 'down':
702
      if (node_current not in node_instance or
703
          not instance in node_instance[node_current]):
704
        feedback_fn("  - ERROR: instance %s not running on node %s" %
705
                        (instance, node_current))
706
        bad = True
707

    
708
    for node in node_instance:
709
      if (not node == node_current):
710
        if instance in node_instance[node]:
711
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
712
                          (instance, node))
713
          bad = True
714

    
715
    return bad
716

    
717
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
718
    """Verify if there are any unknown volumes in the cluster.
719

720
    The .os, .swap and backup volumes are ignored. All other volumes are
721
    reported as unknown.
722

723
    """
724
    bad = False
725

    
726
    for node in node_vol_is:
727
      for volume in node_vol_is[node]:
728
        if node not in node_vol_should or volume not in node_vol_should[node]:
729
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
730
                      (volume, node))
731
          bad = True
732
    return bad
733

    
734
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
735
    """Verify the list of running instances.
736

737
    This checks what instances are running but unknown to the cluster.
738

739
    """
740
    bad = False
741
    for node in node_instance:
742
      for runninginstance in node_instance[node]:
743
        if runninginstance not in instancelist:
744
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
745
                          (runninginstance, node))
746
          bad = True
747
    return bad
748

    
749
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
750
    """Verify N+1 Memory Resilience.
751

752
    Check that if one single node dies we can still start all the instances it
753
    was primary for.
754

755
    """
756
    bad = False
757

    
758
    for node, nodeinfo in node_info.iteritems():
759
      # This code checks that every node which is now listed as secondary has
760
      # enough memory to host all instances it is supposed to should a single
761
      # other node in the cluster fail.
762
      # FIXME: not ready for failover to an arbitrary node
763
      # FIXME: does not support file-backed instances
764
      # WARNING: we currently take into account down instances as well as up
765
      # ones, considering that even if they're down someone might want to start
766
      # them even in the event of a node failure.
767
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
768
        needed_mem = 0
769
        for instance in instances:
770
          needed_mem += instance_cfg[instance].memory
771
        if nodeinfo['mfree'] < needed_mem:
772
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
773
                      " failovers should node %s fail" % (node, prinode))
774
          bad = True
775
    return bad
776

    
777
  def CheckPrereq(self):
778
    """Check prerequisites.
779

780
    Transform the list of checks we're going to skip into a set and check that
781
    all its members are valid.
782

783
    """
784
    self.skip_set = frozenset(self.op.skip_checks)
785
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
786
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
787

    
788
  def Exec(self, feedback_fn):
789
    """Verify integrity of cluster, performing various test on nodes.
790

791
    """
792
    bad = False
793
    feedback_fn("* Verifying global settings")
794
    for msg in self.cfg.VerifyConfig():
795
      feedback_fn("  - ERROR: %s" % msg)
796

    
797
    vg_name = self.cfg.GetVGName()
798
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = list(self.sstore.GetFileList())
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = rpc.call_instance_list(nodelist)
816
    all_vglist = rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': None,
821
      }
822
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
823
    all_rversion = rpc.call_version(nodelist)
824
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if isinstance(volumeinfo, basestring):
837
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
838
                    (node, volumeinfo[-400:].encode('string_escape')))
839
        bad = True
840
        node_volume[node] = {}
841
      elif not isinstance(volumeinfo, dict):
842
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
843
        bad = True
844
        continue
845
      else:
846
        node_volume[node] = volumeinfo
847

    
848
      # node_instance
849
      nodeinstance = all_instanceinfo[node]
850
      if type(nodeinstance) != list:
851
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
852
        bad = True
853
        continue
854

    
855
      node_instance[node] = nodeinstance
856

    
857
      # node_info
858
      nodeinfo = all_ninfo[node]
859
      if not isinstance(nodeinfo, dict):
860
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
861
        bad = True
862
        continue
863

    
864
      try:
865
        node_info[node] = {
866
          "mfree": int(nodeinfo['memory_free']),
867
          "dfree": int(nodeinfo['vg_free']),
868
          "pinst": [],
869
          "sinst": [],
870
          # dictionary holding all instances this node is secondary for,
871
          # grouped by their primary node. Each key is a cluster node, and each
872
          # value is a list of instances which have the key as primary and the
873
          # current node as secondary.  this is handy to calculate N+1 memory
874
          # availability if you can only failover from a primary to its
875
          # secondary.
876
          "sinst-by-pnode": {},
877
        }
878
      except ValueError:
879
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
880
        bad = True
881
        continue
882

    
883
    node_vol_should = {}
884

    
885
    for instance in instancelist:
886
      feedback_fn("* Verifying instance %s" % instance)
887
      inst_config = self.cfg.GetInstanceInfo(instance)
888
      result =  self._VerifyInstance(instance, inst_config, node_volume,
889
                                     node_instance, feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config.MapLVsByNode(node_vol_should)
893

    
894
      instance_cfg[instance] = inst_config
895

    
896
      pnode = inst_config.primary_node
897
      if pnode in node_info:
898
        node_info[pnode]['pinst'].append(instance)
899
      else:
900
        feedback_fn("  - ERROR: instance %s, connection to primary node"
901
                    " %s failed" % (instance, pnode))
902
        bad = True
903

    
904
      # If the instance is non-redundant we cannot survive losing its primary
905
      # node, so we are not N+1 compliant. On the other hand we have no disk
906
      # templates with more than one secondary so that situation is not well
907
      # supported either.
908
      # FIXME: does not support file-backed instances
909
      if len(inst_config.secondary_nodes) == 0:
910
        i_non_redundant.append(instance)
911
      elif len(inst_config.secondary_nodes) > 1:
912
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
913
                    % instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    return int(bad)
946

    
947

    
948
class LUVerifyDisks(NoHooksLU):
949
  """Verifies the cluster disks status.
950

951
  """
952
  _OP_REQP = []
953

    
954
  def CheckPrereq(self):
955
    """Check prerequisites.
956

957
    This has no prerequisites.
958

959
    """
960
    pass
961

    
962
  def Exec(self, feedback_fn):
963
    """Verify integrity of cluster disks.
964

965
    """
966
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
967

    
968
    vg_name = self.cfg.GetVGName()
969
    nodes = utils.NiceSort(self.cfg.GetNodeList())
970
    instances = [self.cfg.GetInstanceInfo(name)
971
                 for name in self.cfg.GetInstanceList()]
972

    
973
    nv_dict = {}
974
    for inst in instances:
975
      inst_lvs = {}
976
      if (inst.status != "up" or
977
          inst.disk_template not in constants.DTS_NET_MIRROR):
978
        continue
979
      inst.MapLVsByNode(inst_lvs)
980
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
981
      for node, vol_list in inst_lvs.iteritems():
982
        for vol in vol_list:
983
          nv_dict[(node, vol)] = inst
984

    
985
    if not nv_dict:
986
      return result
987

    
988
    node_lvs = rpc.call_volume_list(nodes, vg_name)
989

    
990
    to_act = set()
991
    for node in nodes:
992
      # node_volume
993
      lvs = node_lvs[node]
994

    
995
      if isinstance(lvs, basestring):
996
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
997
        res_nlvm[node] = lvs
998
      elif not isinstance(lvs, dict):
999
        logger.Info("connection to node %s failed or invalid data returned" %
1000
                    (node,))
1001
        res_nodes.append(node)
1002
        continue
1003

    
1004
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1005
        inst = nv_dict.pop((node, lv_name), None)
1006
        if (not lv_online and inst is not None
1007
            and inst.name not in res_instances):
1008
          res_instances.append(inst.name)
1009

    
1010
    # any leftover items in nv_dict are missing LVs, let's arrange the
1011
    # data better
1012
    for key, inst in nv_dict.iteritems():
1013
      if inst.name not in res_missing:
1014
        res_missing[inst.name] = []
1015
      res_missing[inst.name].append(key)
1016

    
1017
    return result
1018

    
1019

    
1020
class LURenameCluster(LogicalUnit):
1021
  """Rename the cluster.
1022

1023
  """
1024
  HPATH = "cluster-rename"
1025
  HTYPE = constants.HTYPE_CLUSTER
1026
  _OP_REQP = ["name"]
1027

    
1028
  def BuildHooksEnv(self):
1029
    """Build hooks env.
1030

1031
    """
1032
    env = {
1033
      "OP_TARGET": self.sstore.GetClusterName(),
1034
      "NEW_NAME": self.op.name,
1035
      }
1036
    mn = self.sstore.GetMasterNode()
1037
    return env, [mn], [mn]
1038

    
1039
  def CheckPrereq(self):
1040
    """Verify that the passed name is a valid one.
1041

1042
    """
1043
    hostname = utils.HostInfo(self.op.name)
1044

    
1045
    new_name = hostname.name
1046
    self.ip = new_ip = hostname.ip
1047
    old_name = self.sstore.GetClusterName()
1048
    old_ip = self.sstore.GetMasterIP()
1049
    if new_name == old_name and new_ip == old_ip:
1050
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1051
                                 " cluster has changed")
1052
    if new_ip != old_ip:
1053
      result = utils.RunCmd(["fping", "-q", new_ip])
1054
      if not result.failed:
1055
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056
                                   " reachable on the network. Aborting." %
1057
                                   new_ip)
1058

    
1059
    self.op.name = new_name
1060

    
1061
  def Exec(self, feedback_fn):
1062
    """Rename the cluster.
1063

1064
    """
1065
    clustername = self.op.name
1066
    ip = self.ip
1067
    ss = self.sstore
1068

    
1069
    # shutdown the master IP
1070
    master = ss.GetMasterNode()
1071
    if not rpc.call_node_stop_master(master):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      ss.SetKey(ss.SS_MASTER_IP, ip)
1077
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1078

    
1079
      # Distribute updated ss config to all nodes
1080
      myself = self.cfg.GetNodeInfo(master)
1081
      dist_nodes = self.cfg.GetNodeList()
1082
      if myself.name in dist_nodes:
1083
        dist_nodes.remove(myself.name)
1084

    
1085
      logger.Debug("Copying updated ssconf data to all nodes")
1086
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087
        fname = ss.KeyToFilename(keyname)
1088
        result = rpc.call_upload_file(dist_nodes, fname)
1089
        for to_node in dist_nodes:
1090
          if not result[to_node]:
1091
            logger.Error("copy of file %s to node %s failed" %
1092
                         (fname, to_node))
1093
    finally:
1094
      if not rpc.call_node_start_master(master):
1095
        logger.Error("Could not re-enable the master role on the master,"
1096
                     " please restart manually.")
1097

    
1098

    
1099
def _RecursiveCheckIfLVMBased(disk):
1100
  """Check if the given disk or its children are lvm-based.
1101

1102
  Args:
1103
    disk: ganeti.objects.Disk object
1104

1105
  Returns:
1106
    boolean indicating whether a LD_LV dev_type was found or not
1107

1108
  """
1109
  if disk.children:
1110
    for chdisk in disk.children:
1111
      if _RecursiveCheckIfLVMBased(chdisk):
1112
        return True
1113
  return disk.dev_type == constants.LD_LV
1114

    
1115

    
1116
class LUSetClusterParams(LogicalUnit):
1117
  """Change the parameters of the cluster.
1118

1119
  """
1120
  HPATH = "cluster-modify"
1121
  HTYPE = constants.HTYPE_CLUSTER
1122
  _OP_REQP = []
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    """
1128
    env = {
1129
      "OP_TARGET": self.sstore.GetClusterName(),
1130
      "NEW_VG_NAME": self.op.vg_name,
1131
      }
1132
    mn = self.sstore.GetMasterNode()
1133
    return env, [mn], [mn]
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks whether the given params don't conflict and
1139
    if the given volume group is valid.
1140

1141
    """
1142
    if not self.op.vg_name:
1143
      instances = [self.cfg.GetInstanceInfo(name)
1144
                   for name in self.cfg.GetInstanceList()]
1145
      for inst in instances:
1146
        for disk in inst.disks:
1147
          if _RecursiveCheckIfLVMBased(disk):
1148
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1149
                                       " lvm-based instances exist")
1150

    
1151
    # if vg_name not None, checks given volume group on all nodes
1152
    if self.op.vg_name:
1153
      node_list = self.cfg.GetNodeList()
1154
      vglist = rpc.call_vg_list(node_list)
1155
      for node in node_list:
1156
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    cfgw.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        proc.LogWarning("Can't compute data for node %s/%s" %
1206
                        (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                     (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    if unlock:
1224
      utils.Unlock('cmd')
1225
    try:
1226
      time.sleep(min(60, max_time))
1227
    finally:
1228
      if unlock:
1229
        utils.Lock('cmd')
1230

    
1231
  if done:
1232
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1233
  return not cumul_degraded
1234

    
1235

    
1236
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1237
  """Check that mirrors are not degraded.
1238

1239
  The ldisk parameter, if True, will change the test from the
1240
  is_degraded attribute (which represents overall non-ok status for
1241
  the device(s)) to the ldisk (representing the local storage status).
1242

1243
  """
1244
  cfgw.SetDiskID(dev, node)
1245
  if ldisk:
1246
    idx = 6
1247
  else:
1248
    idx = 5
1249

    
1250
  result = True
1251
  if on_primary or dev.AssembleOnSecondary():
1252
    rstats = rpc.call_blockdev_find(node, dev)
1253
    if not rstats:
1254
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1255
      result = False
1256
    else:
1257
      result = result and (not rstats[idx])
1258
  if dev.children:
1259
    for child in dev.children:
1260
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1261

    
1262
  return result
1263

    
1264

    
1265
class LUDiagnoseOS(NoHooksLU):
1266
  """Logical unit for OS diagnose/query.
1267

1268
  """
1269
  _OP_REQP = ["output_fields", "names"]
1270

    
1271
  def CheckPrereq(self):
1272
    """Check prerequisites.
1273

1274
    This always succeeds, since this is a pure query LU.
1275

1276
    """
1277
    if self.op.names:
1278
      raise errors.OpPrereqError("Selective OS query not supported")
1279

    
1280
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1281
    _CheckOutputFields(static=[],
1282
                       dynamic=self.dynamic_fields,
1283
                       selected=self.op.output_fields)
1284

    
1285
  @staticmethod
1286
  def _DiagnoseByOS(node_list, rlist):
1287
    """Remaps a per-node return list into an a per-os per-node dictionary
1288

1289
      Args:
1290
        node_list: a list with the names of all nodes
1291
        rlist: a map with node names as keys and OS objects as values
1292

1293
      Returns:
1294
        map: a map with osnames as keys and as value another map, with
1295
             nodes as
1296
             keys and list of OS objects as values
1297
             e.g. {"debian-etch": {"node1": [<object>,...],
1298
                                   "node2": [<object>,]}
1299
                  }
1300

1301
    """
1302
    all_os = {}
1303
    for node_name, nr in rlist.iteritems():
1304
      if not nr:
1305
        continue
1306
      for os in nr:
1307
        if os.name not in all_os:
1308
          # build a list of nodes for this os containing empty lists
1309
          # for each node in node_list
1310
          all_os[os.name] = {}
1311
          for nname in node_list:
1312
            all_os[os.name][nname] = []
1313
        all_os[os.name][node_name].append(os)
1314
    return all_os
1315

    
1316
  def Exec(self, feedback_fn):
1317
    """Compute the list of OSes.
1318

1319
    """
1320
    node_list = self.cfg.GetNodeList()
1321
    node_data = rpc.call_os_diagnose(node_list)
1322
    if node_data == False:
1323
      raise errors.OpExecError("Can't gather the list of OSes")
1324
    pol = self._DiagnoseByOS(node_list, node_data)
1325
    output = []
1326
    for os_name, os_data in pol.iteritems():
1327
      row = []
1328
      for field in self.op.output_fields:
1329
        if field == "name":
1330
          val = os_name
1331
        elif field == "valid":
1332
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1333
        elif field == "node_status":
1334
          val = {}
1335
          for node_name, nos_list in os_data.iteritems():
1336
            val[node_name] = [(v.status, v.path) for v in nos_list]
1337
        else:
1338
          raise errors.ParameterError(field)
1339
        row.append(val)
1340
      output.append(row)
1341

    
1342
    return output
1343

    
1344

    
1345
class LURemoveNode(LogicalUnit):
1346
  """Logical unit for removing a node.
1347

1348
  """
1349
  HPATH = "node-remove"
1350
  HTYPE = constants.HTYPE_NODE
1351
  _OP_REQP = ["node_name"]
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This doesn't run on the target node in the pre phase as a failed
1357
    node would not allows itself to run.
1358

1359
    """
1360
    env = {
1361
      "OP_TARGET": self.op.node_name,
1362
      "NODE_NAME": self.op.node_name,
1363
      }
1364
    all_nodes = self.cfg.GetNodeList()
1365
    all_nodes.remove(self.op.node_name)
1366
    return env, all_nodes, all_nodes
1367

    
1368
  def CheckPrereq(self):
1369
    """Check prerequisites.
1370

1371
    This checks:
1372
     - the node exists in the configuration
1373
     - it does not have primary or secondary instances
1374
     - it's not the master
1375

1376
    Any errors are signalled by raising errors.OpPrereqError.
1377

1378
    """
1379
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380
    if node is None:
1381
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382

    
1383
    instance_list = self.cfg.GetInstanceList()
1384

    
1385
    masternode = self.sstore.GetMasterNode()
1386
    if node.name == masternode:
1387
      raise errors.OpPrereqError("Node is the master node,"
1388
                                 " you need to failover first.")
1389

    
1390
    for instance_name in instance_list:
1391
      instance = self.cfg.GetInstanceInfo(instance_name)
1392
      if node.name == instance.primary_node:
1393
        raise errors.OpPrereqError("Instance %s still running on the node,"
1394
                                   " please remove first." % instance_name)
1395
      if node.name in instance.secondary_nodes:
1396
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397
                                   " please remove first." % instance_name)
1398
    self.op.node_name = node.name
1399
    self.node = node
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Removes the node from the cluster.
1403

1404
    """
1405
    node = self.node
1406
    logger.Info("stopping the node daemon and removing configs from node %s" %
1407
                node.name)
1408

    
1409
    rpc.call_node_leave_cluster(node.name)
1410

    
1411
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1412

    
1413
    logger.Info("Removing node %s from config" % node.name)
1414

    
1415
    self.cfg.RemoveNode(node.name)
1416

    
1417
    _RemoveHostFromEtcHosts(node.name)
1418

    
1419

    
1420
class LUQueryNodes(NoHooksLU):
1421
  """Logical unit for querying nodes.
1422

1423
  """
1424
  _OP_REQP = ["output_fields", "names"]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks that the fields required are valid output fields.
1430

1431
    """
1432
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1433
                                     "mtotal", "mnode", "mfree",
1434
                                     "bootid"])
1435

    
1436
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1437
                               "pinst_list", "sinst_list",
1438
                               "pip", "sip"],
1439
                       dynamic=self.dynamic_fields,
1440
                       selected=self.op.output_fields)
1441

    
1442
    self.wanted = _GetWantedNodes(self, self.op.names)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Computes the list of nodes and their attributes.
1446

1447
    """
1448
    nodenames = self.wanted
1449
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "bootid": nodeinfo['bootid'],
1466
            }
1467
        else:
1468
          live_data[name] = {}
1469
    else:
1470
      live_data = dict.fromkeys(nodenames, {})
1471

    
1472
    node_to_primary = dict([(name, set()) for name in nodenames])
1473
    node_to_secondary = dict([(name, set()) for name in nodenames])
1474

    
1475
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1476
                             "sinst_cnt", "sinst_list"))
1477
    if inst_fields & frozenset(self.op.output_fields):
1478
      instancelist = self.cfg.GetInstanceList()
1479

    
1480
      for instance_name in instancelist:
1481
        inst = self.cfg.GetInstanceInfo(instance_name)
1482
        if inst.primary_node in node_to_primary:
1483
          node_to_primary[inst.primary_node].add(inst.name)
1484
        for secnode in inst.secondary_nodes:
1485
          if secnode in node_to_secondary:
1486
            node_to_secondary[secnode].add(inst.name)
1487

    
1488
    # end data gathering
1489

    
1490
    output = []
1491
    for node in nodelist:
1492
      node_output = []
1493
      for field in self.op.output_fields:
1494
        if field == "name":
1495
          val = node.name
1496
        elif field == "pinst_list":
1497
          val = list(node_to_primary[node.name])
1498
        elif field == "sinst_list":
1499
          val = list(node_to_secondary[node.name])
1500
        elif field == "pinst_cnt":
1501
          val = len(node_to_primary[node.name])
1502
        elif field == "sinst_cnt":
1503
          val = len(node_to_secondary[node.name])
1504
        elif field == "pip":
1505
          val = node.primary_ip
1506
        elif field == "sip":
1507
          val = node.secondary_ip
1508
        elif field in self.dynamic_fields:
1509
          val = live_data[node.name].get(field, None)
1510
        else:
1511
          raise errors.ParameterError(field)
1512
        node_output.append(val)
1513
      output.append(node_output)
1514

    
1515
    return output
1516

    
1517

    
1518
class LUQueryNodeVolumes(NoHooksLU):
1519
  """Logical unit for getting volumes on node(s).
1520

1521
  """
1522
  _OP_REQP = ["nodes", "output_fields"]
1523

    
1524
  def CheckPrereq(self):
1525
    """Check prerequisites.
1526

1527
    This checks that the fields required are valid output fields.
1528

1529
    """
1530
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1531

    
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536

    
1537
  def Exec(self, feedback_fn):
1538
    """Computes the list of nodes and their attributes.
1539

1540
    """
1541
    nodenames = self.nodes
1542
    volumes = rpc.call_node_volumes(nodenames)
1543

    
1544
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1545
             in self.cfg.GetInstanceList()]
1546

    
1547
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1548

    
1549
    output = []
1550
    for node in nodenames:
1551
      if node not in volumes or not volumes[node]:
1552
        continue
1553

    
1554
      node_vols = volumes[node][:]
1555
      node_vols.sort(key=lambda vol: vol['dev'])
1556

    
1557
      for vol in node_vols:
1558
        node_output = []
1559
        for field in self.op.output_fields:
1560
          if field == "node":
1561
            val = node
1562
          elif field == "phys":
1563
            val = vol['dev']
1564
          elif field == "vg":
1565
            val = vol['vg']
1566
          elif field == "name":
1567
            val = vol['name']
1568
          elif field == "size":
1569
            val = int(float(vol['size']))
1570
          elif field == "instance":
1571
            for inst in ilist:
1572
              if node not in lv_by_node[inst]:
1573
                continue
1574
              if vol['name'] in lv_by_node[inst][node]:
1575
                val = inst.name
1576
                break
1577
            else:
1578
              val = '-'
1579
          else:
1580
            raise errors.ParameterError(field)
1581
          node_output.append(str(val))
1582

    
1583
        output.append(node_output)
1584

    
1585
    return output
1586

    
1587

    
1588
class LUAddNode(LogicalUnit):
1589
  """Logical unit for adding node to the cluster.
1590

1591
  """
1592
  HPATH = "node-add"
1593
  HTYPE = constants.HTYPE_NODE
1594
  _OP_REQP = ["node_name"]
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on all nodes before, and on all nodes + the new node after.
1600

1601
    """
1602
    env = {
1603
      "OP_TARGET": self.op.node_name,
1604
      "NODE_NAME": self.op.node_name,
1605
      "NODE_PIP": self.op.primary_ip,
1606
      "NODE_SIP": self.op.secondary_ip,
1607
      }
1608
    nodes_0 = self.cfg.GetNodeList()
1609
    nodes_1 = nodes_0 + [self.op.node_name, ]
1610
    return env, nodes_0, nodes_1
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks:
1616
     - the new node is not already in the config
1617
     - it is resolvable
1618
     - its parameters (single/dual homed) matches the cluster
1619

1620
    Any errors are signalled by raising errors.OpPrereqError.
1621

1622
    """
1623
    node_name = self.op.node_name
1624
    cfg = self.cfg
1625

    
1626
    dns_data = utils.HostInfo(node_name)
1627

    
1628
    node = dns_data.name
1629
    primary_ip = self.op.primary_ip = dns_data.ip
1630
    secondary_ip = getattr(self.op, "secondary_ip", None)
1631
    if secondary_ip is None:
1632
      secondary_ip = primary_ip
1633
    if not utils.IsValidIP(secondary_ip):
1634
      raise errors.OpPrereqError("Invalid secondary IP given")
1635
    self.op.secondary_ip = secondary_ip
1636

    
1637
    node_list = cfg.GetNodeList()
1638
    if not self.op.readd and node in node_list:
1639
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1640
                                 node)
1641
    elif self.op.readd and node not in node_list:
1642
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1643

    
1644
    for existing_node_name in node_list:
1645
      existing_node = cfg.GetNodeInfo(existing_node_name)
1646

    
1647
      if self.op.readd and node == existing_node_name:
1648
        if (existing_node.primary_ip != primary_ip or
1649
            existing_node.secondary_ip != secondary_ip):
1650
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1651
                                     " address configuration as before")
1652
        continue
1653

    
1654
      if (existing_node.primary_ip == primary_ip or
1655
          existing_node.secondary_ip == primary_ip or
1656
          existing_node.primary_ip == secondary_ip or
1657
          existing_node.secondary_ip == secondary_ip):
1658
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1659
                                   " existing node %s" % existing_node.name)
1660

    
1661
    # check that the type of the node (single versus dual homed) is the
1662
    # same as for the master
1663
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1664
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1665
    newbie_singlehomed = secondary_ip == primary_ip
1666
    if master_singlehomed != newbie_singlehomed:
1667
      if master_singlehomed:
1668
        raise errors.OpPrereqError("The master has no private ip but the"
1669
                                   " new node has one")
1670
      else:
1671
        raise errors.OpPrereqError("The master has a private ip but the"
1672
                                   " new node doesn't have one")
1673

    
1674
    # checks reachablity
1675
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1676
      raise errors.OpPrereqError("Node not reachable by ping")
1677

    
1678
    if not newbie_singlehomed:
1679
      # check reachability from my secondary ip to newbie's secondary ip
1680
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1681
                           source=myself.secondary_ip):
1682
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1683
                                   " based ping to noded port")
1684

    
1685
    self.new_node = objects.Node(name=node,
1686
                                 primary_ip=primary_ip,
1687
                                 secondary_ip=secondary_ip)
1688

    
1689
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1690
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1691
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1692
                                   constants.VNC_PASSWORD_FILE)
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Adds the new node to the cluster.
1696

1697
    """
1698
    new_node = self.new_node
1699
    node = new_node.name
1700

    
1701
    # set up inter-node password and certificate and restarts the node daemon
1702
    gntpass = self.sstore.GetNodeDaemonPassword()
1703
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1704
      raise errors.OpExecError("ganeti password corruption detected")
1705
    f = open(constants.SSL_CERT_FILE)
1706
    try:
1707
      gntpem = f.read(8192)
1708
    finally:
1709
      f.close()
1710
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1711
    # so we use this to detect an invalid certificate; as long as the
1712
    # cert doesn't contain this, the here-document will be correctly
1713
    # parsed by the shell sequence below
1714
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1715
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1716
    if not gntpem.endswith("\n"):
1717
      raise errors.OpExecError("PEM must end with newline")
1718
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1719

    
1720
    # and then connect with ssh to set password and start ganeti-noded
1721
    # note that all the below variables are sanitized at this point,
1722
    # either by being constants or by the checks above
1723
    ss = self.sstore
1724
    mycommand = ("umask 077 && "
1725
                 "echo '%s' > '%s' && "
1726
                 "cat > '%s' << '!EOF.' && \n"
1727
                 "%s!EOF.\n%s restart" %
1728
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1729
                  constants.SSL_CERT_FILE, gntpem,
1730
                  constants.NODE_INITD_SCRIPT))
1731

    
1732
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1733
    if result.failed:
1734
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1735
                               " output: %s" %
1736
                               (node, result.fail_reason, result.output))
1737

    
1738
    # check connectivity
1739
    time.sleep(4)
1740

    
1741
    result = rpc.call_version([node])[node]
1742
    if result:
1743
      if constants.PROTOCOL_VERSION == result:
1744
        logger.Info("communication to node %s fine, sw version %s match" %
1745
                    (node, result))
1746
      else:
1747
        raise errors.OpExecError("Version mismatch master version %s,"
1748
                                 " node version %s" %
1749
                                 (constants.PROTOCOL_VERSION, result))
1750
    else:
1751
      raise errors.OpExecError("Cannot get version from the new node")
1752

    
1753
    # setup ssh on node
1754
    logger.Info("copy ssh key to node %s" % node)
1755
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1756
    keyarray = []
1757
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1758
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1759
                priv_key, pub_key]
1760

    
1761
    for i in keyfiles:
1762
      f = open(i, 'r')
1763
      try:
1764
        keyarray.append(f.read())
1765
      finally:
1766
        f.close()
1767

    
1768
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1769
                               keyarray[3], keyarray[4], keyarray[5])
1770

    
1771
    if not result:
1772
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773

    
1774
    # Add node to our /etc/hosts, and add key to known_hosts
1775
    _AddHostToEtcHosts(new_node.name)
1776

    
1777
    if new_node.secondary_ip != new_node.primary_ip:
1778
      if not rpc.call_node_tcp_ping(new_node.name,
1779
                                    constants.LOCALHOST_IP_ADDRESS,
1780
                                    new_node.secondary_ip,
1781
                                    constants.DEFAULT_NODED_PORT,
1782
                                    10, False):
1783
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1784
                                 " you gave (%s). Please fix and re-run this"
1785
                                 " command." % new_node.secondary_ip)
1786

    
1787
    success, msg = self.ssh.VerifyNodeHostname(node)
1788
    if not success:
1789
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1790
                               " than the one the resolver gives: %s."
1791
                               " Please fix and re-run this command." %
1792
                               (node, msg))
1793

    
1794
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1795
    # including the node just added
1796
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1797
    dist_nodes = self.cfg.GetNodeList() + [node]
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = ss.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      if not self.ssh.CopyFileToNode(node, fname):
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if not self.op.readd:
1817
      logger.Info("adding node %s to cluster.conf" % node)
1818
      self.cfg.AddNode(new_node)
1819

    
1820

    
1821
class LUMasterFailover(LogicalUnit):
1822
  """Failover the master node to the current node.
1823

1824
  This is a special LU in that it must run on a non-master node.
1825

1826
  """
1827
  HPATH = "master-failover"
1828
  HTYPE = constants.HTYPE_CLUSTER
1829
  REQ_MASTER = False
1830
  _OP_REQP = []
1831

    
1832
  def BuildHooksEnv(self):
1833
    """Build hooks env.
1834

1835
    This will run on the new master only in the pre phase, and on all
1836
    the nodes in the post phase.
1837

1838
    """
1839
    env = {
1840
      "OP_TARGET": self.new_master,
1841
      "NEW_MASTER": self.new_master,
1842
      "OLD_MASTER": self.old_master,
1843
      }
1844
    return env, [self.new_master], self.cfg.GetNodeList()
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    This checks that we are not already the master.
1850

1851
    """
1852
    self.new_master = utils.HostInfo().name
1853
    self.old_master = self.sstore.GetMasterNode()
1854

    
1855
    if self.old_master == self.new_master:
1856
      raise errors.OpPrereqError("This commands must be run on the node"
1857
                                 " where you want the new master to be."
1858
                                 " %s is already the master" %
1859
                                 self.old_master)
1860

    
1861
  def Exec(self, feedback_fn):
1862
    """Failover the master node.
1863

1864
    This command, when run on a non-master node, will cause the current
1865
    master to cease being master, and the non-master to become new
1866
    master.
1867

1868
    """
1869
    #TODO: do not rely on gethostname returning the FQDN
1870
    logger.Info("setting master to %s, old master: %s" %
1871
                (self.new_master, self.old_master))
1872

    
1873
    if not rpc.call_node_stop_master(self.old_master):
1874
      logger.Error("could disable the master role on the old master"
1875
                   " %s, please disable manually" % self.old_master)
1876

    
1877
    ss = self.sstore
1878
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1879
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1880
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1881
      logger.Error("could not distribute the new simple store master file"
1882
                   " to the other nodes, please check.")
1883

    
1884
    if not rpc.call_node_start_master(self.new_master):
1885
      logger.Error("could not start the master role on the new master"
1886
                   " %s, please check" % self.new_master)
1887
      feedback_fn("Error in activating the master IP on the new master,"
1888
                  " please fix manually.")
1889

    
1890

    
1891

    
1892
class LUQueryClusterInfo(NoHooksLU):
1893
  """Query cluster configuration.
1894

1895
  """
1896
  _OP_REQP = []
1897
  REQ_MASTER = False
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequsites needed for this LU.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Return cluster config.
1907

1908
    """
1909
    result = {
1910
      "name": self.sstore.GetClusterName(),
1911
      "software_version": constants.RELEASE_VERSION,
1912
      "protocol_version": constants.PROTOCOL_VERSION,
1913
      "config_version": constants.CONFIG_VERSION,
1914
      "os_api_version": constants.OS_API_VERSION,
1915
      "export_version": constants.EXPORT_VERSION,
1916
      "master": self.sstore.GetMasterNode(),
1917
      "architecture": (platform.architecture()[0], platform.machine()),
1918
      }
1919

    
1920
    return result
1921

    
1922

    
1923
class LUClusterCopyFile(NoHooksLU):
1924
  """Copy file to cluster.
1925

1926
  """
1927
  _OP_REQP = ["nodes", "filename"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    It should check that the named file exists and that the given list
1933
    of nodes is valid.
1934

1935
    """
1936
    if not os.path.exists(self.op.filename):
1937
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1938

    
1939
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Copy a file from master to some nodes.
1943

1944
    Args:
1945
      opts - class with options as members
1946
      args - list containing a single element, the file name
1947
    Opts used:
1948
      nodes - list containing the name of target nodes; if empty, all nodes
1949

1950
    """
1951
    filename = self.op.filename
1952

    
1953
    myname = utils.HostInfo().name
1954

    
1955
    for node in self.nodes:
1956
      if node == myname:
1957
        continue
1958
      if not self.ssh.CopyFileToNode(node, filename):
1959
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1960

    
1961

    
1962
class LUDumpClusterConfig(NoHooksLU):
1963
  """Return a text-representation of the cluster-config.
1964

1965
  """
1966
  _OP_REQP = []
1967

    
1968
  def CheckPrereq(self):
1969
    """No prerequisites.
1970

1971
    """
1972
    pass
1973

    
1974
  def Exec(self, feedback_fn):
1975
    """Dump a representation of the cluster config to the standard output.
1976

1977
    """
1978
    return self.cfg.DumpConfig()
1979

    
1980

    
1981
class LURunClusterCommand(NoHooksLU):
1982
  """Run a command on some nodes.
1983

1984
  """
1985
  _OP_REQP = ["command", "nodes"]
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    It checks that the given list of nodes is valid.
1991

1992
    """
1993
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Run a command on some nodes.
1997

1998
    """
1999
    # put the master at the end of the nodes list
2000
    master_node = self.sstore.GetMasterNode()
2001
    if master_node in self.nodes:
2002
      self.nodes.remove(master_node)
2003
      self.nodes.append(master_node)
2004

    
2005
    data = []
2006
    for node in self.nodes:
2007
      result = self.ssh.Run(node, "root", self.op.command)
2008
      data.append((node, result.output, result.exit_code))
2009

    
2010
    return data
2011

    
2012

    
2013
class LUActivateInstanceDisks(NoHooksLU):
2014
  """Bring up an instance's disks.
2015

2016
  """
2017
  _OP_REQP = ["instance_name"]
2018

    
2019
  def CheckPrereq(self):
2020
    """Check prerequisites.
2021

2022
    This checks that the instance is in the cluster.
2023

2024
    """
2025
    instance = self.cfg.GetInstanceInfo(
2026
      self.cfg.ExpandInstanceName(self.op.instance_name))
2027
    if instance is None:
2028
      raise errors.OpPrereqError("Instance '%s' not known" %
2029
                                 self.op.instance_name)
2030
    self.instance = instance
2031

    
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Activate the disks.
2035

2036
    """
2037
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2038
    if not disks_ok:
2039
      raise errors.OpExecError("Cannot activate block devices")
2040

    
2041
    return disks_info
2042

    
2043

    
2044
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2045
  """Prepare the block devices for an instance.
2046

2047
  This sets up the block devices on all nodes.
2048

2049
  Args:
2050
    instance: a ganeti.objects.Instance object
2051
    ignore_secondaries: if true, errors on secondary nodes won't result
2052
                        in an error return from the function
2053

2054
  Returns:
2055
    false if the operation failed
2056
    list of (host, instance_visible_name, node_visible_name) if the operation
2057
         suceeded with the mapping from node devices to instance devices
2058
  """
2059
  device_info = []
2060
  disks_ok = True
2061
  iname = instance.name
2062
  # With the two passes mechanism we try to reduce the window of
2063
  # opportunity for the race condition of switching DRBD to primary
2064
  # before handshaking occured, but we do not eliminate it
2065

    
2066
  # The proper fix would be to wait (with some limits) until the
2067
  # connection has been made and drbd transitions from WFConnection
2068
  # into any other network-connected state (Connected, SyncTarget,
2069
  # SyncSource, etc.)
2070

    
2071
  # 1st pass, assemble on all nodes in secondary mode
2072
  for inst_disk in instance.disks:
2073
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2074
      cfg.SetDiskID(node_disk, node)
2075
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2076
      if not result:
2077
        logger.Error("could not prepare block device %s on node %s"
2078
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2079
        if not ignore_secondaries:
2080
          disks_ok = False
2081

    
2082
  # FIXME: race condition on drbd migration to primary
2083

    
2084
  # 2nd pass, do only the primary node
2085
  for inst_disk in instance.disks:
2086
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2087
      if node != instance.primary_node:
2088
        continue
2089
      cfg.SetDiskID(node_disk, node)
2090
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2091
      if not result:
2092
        logger.Error("could not prepare block device %s on node %s"
2093
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2094
        disks_ok = False
2095
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2096

    
2097
  # leave the disks configured for the primary node
2098
  # this is a workaround that would be fixed better by
2099
  # improving the logical/physical id handling
2100
  for disk in instance.disks:
2101
    cfg.SetDiskID(disk, instance.primary_node)
2102

    
2103
  return disks_ok, device_info
2104

    
2105

    
2106
def _StartInstanceDisks(cfg, instance, force):
2107
  """Start the disks of an instance.
2108

2109
  """
2110
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2111
                                           ignore_secondaries=force)
2112
  if not disks_ok:
2113
    _ShutdownInstanceDisks(instance, cfg)
2114
    if force is not None and not force:
2115
      logger.Error("If the message above refers to a secondary node,"
2116
                   " you can retry the operation using '--force'.")
2117
    raise errors.OpExecError("Disk consistency error")
2118

    
2119

    
2120
class LUDeactivateInstanceDisks(NoHooksLU):
2121
  """Shutdown an instance's disks.
2122

2123
  """
2124
  _OP_REQP = ["instance_name"]
2125

    
2126
  def CheckPrereq(self):
2127
    """Check prerequisites.
2128

2129
    This checks that the instance is in the cluster.
2130

2131
    """
2132
    instance = self.cfg.GetInstanceInfo(
2133
      self.cfg.ExpandInstanceName(self.op.instance_name))
2134
    if instance is None:
2135
      raise errors.OpPrereqError("Instance '%s' not known" %
2136
                                 self.op.instance_name)
2137
    self.instance = instance
2138

    
2139
  def Exec(self, feedback_fn):
2140
    """Deactivate the disks
2141

2142
    """
2143
    instance = self.instance
2144
    ins_l = rpc.call_instance_list([instance.primary_node])
2145
    ins_l = ins_l[instance.primary_node]
2146
    if not type(ins_l) is list:
2147
      raise errors.OpExecError("Can't contact node '%s'" %
2148
                               instance.primary_node)
2149

    
2150
    if self.instance.name in ins_l:
2151
      raise errors.OpExecError("Instance is running, can't shutdown"
2152
                               " block devices.")
2153

    
2154
    _ShutdownInstanceDisks(instance, self.cfg)
2155

    
2156

    
2157
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2158
  """Shutdown block devices of an instance.
2159

2160
  This does the shutdown on all nodes of the instance.
2161

2162
  If the ignore_primary is false, errors on the primary node are
2163
  ignored.
2164

2165
  """
2166
  result = True
2167
  for disk in instance.disks:
2168
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2169
      cfg.SetDiskID(top_disk, node)
2170
      if not rpc.call_blockdev_shutdown(node, top_disk):
2171
        logger.Error("could not shutdown block device %s on node %s" %
2172
                     (disk.iv_name, node))
2173
        if not ignore_primary or node != instance.primary_node:
2174
          result = False
2175
  return result
2176

    
2177

    
2178
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2179
  """Checks if a node has enough free memory.
2180

2181
  This function check if a given node has the needed amount of free
2182
  memory. In case the node has less memory or we cannot get the
2183
  information from the node, this function raise an OpPrereqError
2184
  exception.
2185

2186
  Args:
2187
    - cfg: a ConfigWriter instance
2188
    - node: the node name
2189
    - reason: string to use in the error message
2190
    - requested: the amount of memory in MiB
2191

2192
  """
2193
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2194
  if not nodeinfo or not isinstance(nodeinfo, dict):
2195
    raise errors.OpPrereqError("Could not contact node %s for resource"
2196
                             " information" % (node,))
2197

    
2198
  free_mem = nodeinfo[node].get('memory_free')
2199
  if not isinstance(free_mem, int):
2200
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2201
                             " was '%s'" % (node, free_mem))
2202
  if requested > free_mem:
2203
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2204
                             " needed %s MiB, available %s MiB" %
2205
                             (node, reason, requested, free_mem))
2206

    
2207

    
2208
class LUStartupInstance(LogicalUnit):
2209
  """Starts an instance.
2210

2211
  """
2212
  HPATH = "instance-start"
2213
  HTYPE = constants.HTYPE_INSTANCE
2214
  _OP_REQP = ["instance_name", "force"]
2215

    
2216
  def BuildHooksEnv(self):
2217
    """Build hooks env.
2218

2219
    This runs on master, primary and secondary nodes of the instance.
2220

2221
    """
2222
    env = {
2223
      "FORCE": self.op.force,
2224
      }
2225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster.
2234

2235
    """
2236
    instance = self.cfg.GetInstanceInfo(
2237
      self.cfg.ExpandInstanceName(self.op.instance_name))
2238
    if instance is None:
2239
      raise errors.OpPrereqError("Instance '%s' not known" %
2240
                                 self.op.instance_name)
2241

    
2242
    # check bridges existance
2243
    _CheckInstanceBridgesExist(instance)
2244

    
2245
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2246
                         "starting instance %s" % instance.name,
2247
                         instance.memory)
2248

    
2249
    self.instance = instance
2250
    self.op.instance_name = instance.name
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self.cfg, instance, force)
2265

    
2266
    if not rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(instance, self.cfg)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304

    
2305
    # check bridges existance
2306
    _CheckInstanceBridgesExist(instance)
2307

    
2308
    self.instance = instance
2309
    self.op.instance_name = instance.name
2310

    
2311
  def Exec(self, feedback_fn):
2312
    """Reboot the instance.
2313

2314
    """
2315
    instance = self.instance
2316
    ignore_secondaries = self.op.ignore_secondaries
2317
    reboot_type = self.op.reboot_type
2318
    extra_args = getattr(self.op, "extra_args", "")
2319

    
2320
    node_current = instance.primary_node
2321

    
2322
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2323
                           constants.INSTANCE_REBOOT_HARD,
2324
                           constants.INSTANCE_REBOOT_FULL]:
2325
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2326
                                  (constants.INSTANCE_REBOOT_SOFT,
2327
                                   constants.INSTANCE_REBOOT_HARD,
2328
                                   constants.INSTANCE_REBOOT_FULL))
2329

    
2330
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2331
                       constants.INSTANCE_REBOOT_HARD]:
2332
      if not rpc.call_instance_reboot(node_current, instance,
2333
                                      reboot_type, extra_args):
2334
        raise errors.OpExecError("Could not reboot instance")
2335
    else:
2336
      if not rpc.call_instance_shutdown(node_current, instance):
2337
        raise errors.OpExecError("could not shutdown instance for full reboot")
2338
      _ShutdownInstanceDisks(instance, self.cfg)
2339
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2340
      if not rpc.call_instance_start(node_current, instance, extra_args):
2341
        _ShutdownInstanceDisks(instance, self.cfg)
2342
        raise errors.OpExecError("Could not start instance for full reboot")
2343

    
2344
    self.cfg.MarkInstanceUp(instance.name)
2345

    
2346

    
2347
class LUShutdownInstance(LogicalUnit):
2348
  """Shutdown an instance.
2349

2350
  """
2351
  HPATH = "instance-stop"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = _BuildInstanceHookEnvByObject(self.instance)
2362
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2363
          list(self.instance.secondary_nodes))
2364
    return env, nl, nl
2365

    
2366
  def CheckPrereq(self):
2367
    """Check prerequisites.
2368

2369
    This checks that the instance is in the cluster.
2370

2371
    """
2372
    instance = self.cfg.GetInstanceInfo(
2373
      self.cfg.ExpandInstanceName(self.op.instance_name))
2374
    if instance is None:
2375
      raise errors.OpPrereqError("Instance '%s' not known" %
2376
                                 self.op.instance_name)
2377
    self.instance = instance
2378

    
2379
  def Exec(self, feedback_fn):
2380
    """Shutdown the instance.
2381

2382
    """
2383
    instance = self.instance
2384
    node_current = instance.primary_node
2385
    self.cfg.MarkInstanceDown(instance.name)
2386
    if not rpc.call_instance_shutdown(node_current, instance):
2387
      logger.Error("could not shutdown instance")
2388

    
2389
    _ShutdownInstanceDisks(instance, self.cfg)
2390

    
2391

    
2392
class LUReinstallInstance(LogicalUnit):
2393
  """Reinstall an instance.
2394

2395
  """
2396
  HPATH = "instance-reinstall"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2408
          list(self.instance.secondary_nodes))
2409
    return env, nl, nl
2410

    
2411
  def CheckPrereq(self):
2412
    """Check prerequisites.
2413

2414
    This checks that the instance is in the cluster and is not running.
2415

2416
    """
2417
    instance = self.cfg.GetInstanceInfo(
2418
      self.cfg.ExpandInstanceName(self.op.instance_name))
2419
    if instance is None:
2420
      raise errors.OpPrereqError("Instance '%s' not known" %
2421
                                 self.op.instance_name)
2422
    if instance.disk_template == constants.DT_DISKLESS:
2423
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2424
                                 self.op.instance_name)
2425
    if instance.status != "down":
2426
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2427
                                 self.op.instance_name)
2428
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2429
    if remote_info:
2430
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2431
                                 (self.op.instance_name,
2432
                                  instance.primary_node))
2433

    
2434
    self.op.os_type = getattr(self.op, "os_type", None)
2435
    if self.op.os_type is not None:
2436
      # OS verification
2437
      pnode = self.cfg.GetNodeInfo(
2438
        self.cfg.ExpandNodeName(instance.primary_node))
2439
      if pnode is None:
2440
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2441
                                   self.op.pnode)
2442
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2443
      if not os_obj:
2444
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2445
                                   " primary node"  % self.op.os_type)
2446

    
2447
    self.instance = instance
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Reinstall the instance.
2451

2452
    """
2453
    inst = self.instance
2454

    
2455
    if self.op.os_type is not None:
2456
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2457
      inst.os = self.op.os_type
2458
      self.cfg.AddInstance(inst)
2459

    
2460
    _StartInstanceDisks(self.cfg, inst, None)
2461
    try:
2462
      feedback_fn("Running the instance OS create scripts...")
2463
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2464
        raise errors.OpExecError("Could not install OS for instance %s"
2465
                                 " on node %s" %
2466
                                 (inst.name, inst.primary_node))
2467
    finally:
2468
      _ShutdownInstanceDisks(inst, self.cfg)
2469

    
2470

    
2471
class LURenameInstance(LogicalUnit):
2472
  """Rename an instance.
2473

2474
  """
2475
  HPATH = "instance-rename"
2476
  HTYPE = constants.HTYPE_INSTANCE
2477
  _OP_REQP = ["instance_name", "new_name"]
2478

    
2479
  def BuildHooksEnv(self):
2480
    """Build hooks env.
2481

2482
    This runs on master, primary and secondary nodes of the instance.
2483

2484
    """
2485
    env = _BuildInstanceHookEnvByObject(self.instance)
2486
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2487
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2488
          list(self.instance.secondary_nodes))
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster and is not running.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502
    if instance.status != "down":
2503
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2504
                                 self.op.instance_name)
2505
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2506
    if remote_info:
2507
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2508
                                 (self.op.instance_name,
2509
                                  instance.primary_node))
2510
    self.instance = instance
2511

    
2512
    # new name verification
2513
    name_info = utils.HostInfo(self.op.new_name)
2514

    
2515
    self.op.new_name = new_name = name_info.name
2516
    instance_list = self.cfg.GetInstanceList()
2517
    if new_name in instance_list:
2518
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2519
                                 new_name)
2520

    
2521
    if not getattr(self.op, "ignore_ip", False):
2522
      command = ["fping", "-q", name_info.ip]
2523
      result = utils.RunCmd(command)
2524
      if not result.failed:
2525
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2526
                                   (name_info.ip, new_name))
2527

    
2528

    
2529
  def Exec(self, feedback_fn):
2530
    """Reinstall the instance.
2531

2532
    """
2533
    inst = self.instance
2534
    old_name = inst.name
2535

    
2536
    if inst.disk_template == constants.DT_FILE:
2537
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2538

    
2539
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2540

    
2541
    # re-read the instance from the configuration after rename
2542
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2547
                                                old_file_storage_dir,
2548
                                                new_file_storage_dir)
2549

    
2550
      if not result:
2551
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2552
                                 " directory '%s' to '%s' (but the instance"
2553
                                 " has been renamed in Ganeti)" % (
2554
                                 inst.primary_node, old_file_storage_dir,
2555
                                 new_file_storage_dir))
2556

    
2557
      if not result[0]:
2558
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2559
                                 " (but the instance has been renamed in"
2560
                                 " Ganeti)" % (old_file_storage_dir,
2561
                                               new_file_storage_dir))
2562

    
2563
    _StartInstanceDisks(self.cfg, inst, None)
2564
    try:
2565
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2566
                                          "sda", "sdb"):
2567
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2568
               " instance has been renamed in Ganeti)" %
2569
               (inst.name, inst.primary_node))
2570
        logger.Error(msg)
2571
    finally:
2572
      _ShutdownInstanceDisks(inst, self.cfg)
2573

    
2574

    
2575
class LURemoveInstance(LogicalUnit):
2576
  """Remove an instance.
2577

2578
  """
2579
  HPATH = "instance-remove"
2580
  HTYPE = constants.HTYPE_INSTANCE
2581
  _OP_REQP = ["instance_name"]
2582

    
2583
  def BuildHooksEnv(self):
2584
    """Build hooks env.
2585

2586
    This runs on master, primary and secondary nodes of the instance.
2587

2588
    """
2589
    env = _BuildInstanceHookEnvByObject(self.instance)
2590
    nl = [self.sstore.GetMasterNode()]
2591
    return env, nl, nl
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    instance = self.cfg.GetInstanceInfo(
2600
      self.cfg.ExpandInstanceName(self.op.instance_name))
2601
    if instance is None:
2602
      raise errors.OpPrereqError("Instance '%s' not known" %
2603
                                 self.op.instance_name)
2604
    self.instance = instance
2605

    
2606
  def Exec(self, feedback_fn):
2607
    """Remove the instance.
2608

2609
    """
2610
    instance = self.instance
2611
    logger.Info("shutting down instance %s on node %s" %
2612
                (instance.name, instance.primary_node))
2613

    
2614
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2615
      if self.op.ignore_failures:
2616
        feedback_fn("Warning: can't shutdown instance")
2617
      else:
2618
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2619
                                 (instance.name, instance.primary_node))
2620

    
2621
    logger.Info("removing block devices for instance %s" % instance.name)
2622

    
2623
    if not _RemoveDisks(instance, self.cfg):
2624
      if self.op.ignore_failures:
2625
        feedback_fn("Warning: can't remove instance's disks")
2626
      else:
2627
        raise errors.OpExecError("Can't remove instance's disks")
2628

    
2629
    logger.Info("removing instance %s out of cluster config" % instance.name)
2630

    
2631
    self.cfg.RemoveInstance(instance.name)
2632

    
2633

    
2634
class LUQueryInstances(NoHooksLU):
2635
  """Logical unit for querying instances.
2636

2637
  """
2638
  _OP_REQP = ["output_fields", "names"]
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    This checks that the fields required are valid output fields.
2644

2645
    """
2646
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2647
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2648
                               "admin_state", "admin_ram",
2649
                               "disk_template", "ip", "mac", "bridge",
2650
                               "sda_size", "sdb_size", "vcpus"],
2651
                       dynamic=self.dynamic_fields,
2652
                       selected=self.op.output_fields)
2653

    
2654
    self.wanted = _GetWantedInstances(self, self.op.names)
2655

    
2656
  def Exec(self, feedback_fn):
2657
    """Computes the list of nodes and their attributes.
2658

2659
    """
2660
    instance_names = self.wanted
2661
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2662
                     in instance_names]
2663

    
2664
    # begin data gathering
2665

    
2666
    nodes = frozenset([inst.primary_node for inst in instance_list])
2667

    
2668
    bad_nodes = []
2669
    if self.dynamic_fields.intersection(self.op.output_fields):
2670
      live_data = {}
2671
      node_data = rpc.call_all_instances_info(nodes)
2672
      for name in nodes:
2673
        result = node_data[name]
2674
        if result:
2675
          live_data.update(result)
2676
        elif result == False:
2677
          bad_nodes.append(name)
2678
        # else no instance is alive
2679
    else:
2680
      live_data = dict([(name, {}) for name in instance_names])
2681

    
2682
    # end data gathering
2683

    
2684
    output = []
2685
    for instance in instance_list:
2686
      iout = []
2687
      for field in self.op.output_fields:
2688
        if field == "name":
2689
          val = instance.name
2690
        elif field == "os":
2691
          val = instance.os
2692
        elif field == "pnode":
2693
          val = instance.primary_node
2694
        elif field == "snodes":
2695
          val = list(instance.secondary_nodes)
2696
        elif field == "admin_state":
2697
          val = (instance.status != "down")
2698
        elif field == "oper_state":
2699
          if instance.primary_node in bad_nodes:
2700
            val = None
2701
          else:
2702
            val = bool(live_data.get(instance.name))
2703
        elif field == "status":
2704
          if instance.primary_node in bad_nodes:
2705
            val = "ERROR_nodedown"
2706
          else:
2707
            running = bool(live_data.get(instance.name))
2708
            if running:
2709
              if instance.status != "down":
2710
                val = "running"
2711
              else:
2712
                val = "ERROR_up"
2713
            else:
2714
              if instance.status != "down":
2715
                val = "ERROR_down"
2716
              else:
2717
                val = "ADMIN_down"
2718
        elif field == "admin_ram":
2719
          val = instance.memory
2720
        elif field == "oper_ram":
2721
          if instance.primary_node in bad_nodes:
2722
            val = None
2723
          elif instance.name in live_data:
2724
            val = live_data[instance.name].get("memory", "?")
2725
          else:
2726
            val = "-"
2727
        elif field == "disk_template":
2728
          val = instance.disk_template
2729
        elif field == "ip":
2730
          val = instance.nics[0].ip
2731
        elif field == "bridge":
2732
          val = instance.nics[0].bridge
2733
        elif field == "mac":
2734
          val = instance.nics[0].mac
2735
        elif field == "sda_size" or field == "sdb_size":
2736
          disk = instance.FindDisk(field[:3])
2737
          if disk is None:
2738
            val = None
2739
          else:
2740
            val = disk.size
2741
        elif field == "vcpus":
2742
          val = instance.vcpus
2743
        else:
2744
          raise errors.ParameterError(field)
2745
        iout.append(val)
2746
      output.append(iout)
2747

    
2748
    return output
2749

    
2750

    
2751
class LUFailoverInstance(LogicalUnit):
2752
  """Failover an instance.
2753

2754
  """
2755
  HPATH = "instance-failover"
2756
  HTYPE = constants.HTYPE_INSTANCE
2757
  _OP_REQP = ["instance_name", "ignore_consistency"]
2758

    
2759
  def BuildHooksEnv(self):
2760
    """Build hooks env.
2761

2762
    This runs on master, primary and secondary nodes of the instance.
2763

2764
    """
2765
    env = {
2766
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2767
      }
2768
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2769
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2770
    return env, nl, nl
2771

    
2772
  def CheckPrereq(self):
2773
    """Check prerequisites.
2774

2775
    This checks that the instance is in the cluster.
2776

2777
    """
2778
    instance = self.cfg.GetInstanceInfo(
2779
      self.cfg.ExpandInstanceName(self.op.instance_name))
2780
    if instance is None:
2781
      raise errors.OpPrereqError("Instance '%s' not known" %
2782
                                 self.op.instance_name)
2783

    
2784
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2785
      raise errors.OpPrereqError("Instance's disk layout is not"
2786
                                 " network mirrored, cannot failover.")
2787

    
2788
    secondary_nodes = instance.secondary_nodes
2789
    if not secondary_nodes:
2790
      raise errors.ProgrammerError("no secondary node but using "
2791
                                   "DT_REMOTE_RAID1 template")
2792

    
2793
    target_node = secondary_nodes[0]
2794
    # check memory requirements on the secondary node
2795
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2796
                         instance.name, instance.memory)
2797

    
2798
    # check bridge existance
2799
    brlist = [nic.bridge for nic in instance.nics]
2800
    if not rpc.call_bridges_exist(target_node, brlist):
2801
      raise errors.OpPrereqError("One or more target bridges %s does not"
2802
                                 " exist on destination node '%s'" %
2803
                                 (brlist, target_node))
2804

    
2805
    self.instance = instance
2806

    
2807
  def Exec(self, feedback_fn):
2808
    """Failover an instance.
2809

2810
    The failover is done by shutting it down on its present node and
2811
    starting it on the secondary.
2812

2813
    """
2814
    instance = self.instance
2815

    
2816
    source_node = instance.primary_node
2817
    target_node = instance.secondary_nodes[0]
2818

    
2819
    feedback_fn("* checking disk consistency between source and target")
2820
    for dev in instance.disks:
2821
      # for remote_raid1, these are md over drbd
2822
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823
        if instance.status == "up" and not self.op.ignore_consistency:
2824
          raise errors.OpExecError("Disk %s is degraded on target node,"
2825
                                   " aborting failover." % dev.iv_name)
2826

    
2827
    feedback_fn("* shutting down instance on source node")
2828
    logger.Info("Shutting down instance %s on node %s" %
2829
                (instance.name, source_node))
2830

    
2831
    if not rpc.call_instance_shutdown(source_node, instance):
2832
      if self.op.ignore_consistency:
2833
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834
                     " anyway. Please make sure node %s is down"  %
2835
                     (instance.name, source_node, source_node))
2836
      else:
2837
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838
                                 (instance.name, source_node))
2839

    
2840
    feedback_fn("* deactivating the instance's disks on source node")
2841
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842
      raise errors.OpExecError("Can't shut down the instance's disks.")
2843

    
2844
    instance.primary_node = target_node
2845
    # distribute new instance config to the other nodes
2846
    self.cfg.AddInstance(instance)
2847

    
2848
    # Only start the instance if it's marked as up
2849
    if instance.status == "up":
2850
      feedback_fn("* activating the instance's disks on target node")
2851
      logger.Info("Starting instance %s on node %s" %
2852
                  (instance.name, target_node))
2853

    
2854
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855
                                               ignore_secondaries=True)
2856
      if not disks_ok:
2857
        _ShutdownInstanceDisks(instance, self.cfg)
2858
        raise errors.OpExecError("Can't activate the instance's disks")
2859

    
2860
      feedback_fn("* starting the instance on the target node")
2861
      if not rpc.call_instance_start(target_node, instance, None):
2862
        _ShutdownInstanceDisks(instance, self.cfg)
2863
        raise errors.OpExecError("Could not start instance %s on node %s." %
2864
                                 (instance.name, target_node))
2865

    
2866

    
2867
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868
  """Create a tree of block devices on the primary node.
2869

2870
  This always creates all devices.
2871

2872
  """
2873
  if device.children:
2874
    for child in device.children:
2875
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876
        return False
2877

    
2878
  cfg.SetDiskID(device, node)
2879
  new_id = rpc.call_blockdev_create(node, device, device.size,
2880
                                    instance.name, True, info)
2881
  if not new_id:
2882
    return False
2883
  if device.physical_id is None:
2884
    device.physical_id = new_id
2885
  return True
2886

    
2887

    
2888
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889
  """Create a tree of block devices on a secondary node.
2890

2891
  If this device type has to be created on secondaries, create it and
2892
  all its children.
2893

2894
  If not, just recurse to children keeping the same 'force' value.
2895

2896
  """
2897
  if device.CreateOnSecondary():
2898
    force = True
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902
                                        child, force, info):
2903
        return False
2904

    
2905
  if not force:
2906
    return True
2907
  cfg.SetDiskID(device, node)
2908
  new_id = rpc.call_blockdev_create(node, device, device.size,
2909
                                    instance.name, False, info)
2910
  if not new_id:
2911
    return False
2912
  if device.physical_id is None:
2913
    device.physical_id = new_id
2914
  return True
2915

    
2916

    
2917
def _GenerateUniqueNames(cfg, exts):
2918
  """Generate a suitable LV name.
2919

2920
  This will generate a logical volume name for the given instance.
2921

2922
  """
2923
  results = []
2924
  for val in exts:
2925
    new_id = cfg.GenerateUniqueID()
2926
    results.append("%s%s" % (new_id, val))
2927
  return results
2928

    
2929

    
2930
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2931
  """Generate a drbd device complete with its children.
2932

2933
  """
2934
  port = cfg.AllocatePort()
2935
  vgname = cfg.GetVGName()
2936
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2937
                          logical_id=(vgname, names[0]))
2938
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2939
                          logical_id=(vgname, names[1]))
2940
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2941
                          logical_id = (primary, secondary, port),
2942
                          children = [dev_data, dev_meta])
2943
  return drbd_dev
2944

    
2945

    
2946
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2947
  """Generate a drbd8 device complete with its children.
2948

2949
  """
2950
  port = cfg.AllocatePort()
2951
  vgname = cfg.GetVGName()
2952
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2953
                          logical_id=(vgname, names[0]))
2954
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2955
                          logical_id=(vgname, names[1]))
2956
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2957
                          logical_id = (primary, secondary, port),
2958
                          children = [dev_data, dev_meta],
2959
                          iv_name=iv_name)
2960
  return drbd_dev
2961

    
2962

    
2963
def _GenerateDiskTemplate(cfg, template_name,
2964
                          instance_name, primary_node,
2965
                          secondary_nodes, disk_sz, swap_sz,
2966
                          file_storage_dir, file_driver):
2967
  """Generate the entire disk layout for a given template type.
2968

2969
  """
2970
  #TODO: compute space requirements
2971

    
2972
  vgname = cfg.GetVGName()
2973
  if template_name == constants.DT_DISKLESS:
2974
    disks = []
2975
  elif template_name == constants.DT_PLAIN:
2976
    if len(secondary_nodes) != 0:
2977
      raise errors.ProgrammerError("Wrong template configuration")
2978

    
2979
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2980
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2981
                           logical_id=(vgname, names[0]),
2982
                           iv_name = "sda")
2983
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2984
                           logical_id=(vgname, names[1]),
2985
                           iv_name = "sdb")
2986
    disks = [sda_dev, sdb_dev]
2987
  elif template_name == constants.DT_DRBD8:
2988
    if len(secondary_nodes) != 1:
2989
      raise errors.ProgrammerError("Wrong template configuration")
2990
    remote_node = secondary_nodes[0]
2991
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2992
                                       ".sdb_data", ".sdb_meta"])
2993
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2994
                                         disk_sz, names[0:2], "sda")
2995
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2996
                                         swap_sz, names[2:4], "sdb")
2997
    disks = [drbd_sda_dev, drbd_sdb_dev]
2998
  elif template_name == constants.DT_FILE:
2999
    if len(secondary_nodes) != 0:
3000
      raise errors.ProgrammerError("Wrong template configuration")
3001

    
3002
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3003
                                iv_name="sda", logical_id=(file_driver,
3004
                                "%s/sda" % file_storage_dir))
3005
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3006
                                iv_name="sdb", logical_id=(file_driver,
3007
                                "%s/sdb" % file_storage_dir))
3008
    disks = [file_sda_dev, file_sdb_dev]
3009
  else:
3010
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3011
  return disks
3012

    
3013

    
3014
def _GetInstanceInfoText(instance):
3015
  """Compute that text that should be added to the disk's metadata.
3016

3017
  """
3018
  return "originstname+%s" % instance.name
3019

    
3020

    
3021
def _CreateDisks(cfg, instance):
3022
  """Create all disks for an instance.
3023

3024
  This abstracts away some work from AddInstance.
3025

3026
  Args:
3027
    instance: the instance object
3028

3029
  Returns:
3030
    True or False showing the success of the creation process
3031

3032
  """
3033
  info = _GetInstanceInfoText(instance)
3034

    
3035
  if instance.disk_template == constants.DT_FILE:
3036
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3037
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3038
                                              file_storage_dir)
3039

    
3040
    if not result:
3041
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3042
      return False
3043

    
3044
    if not result[0]:
3045
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3046
      return False
3047

    
3048
  for device in instance.disks:
3049
    logger.Info("creating volume %s for instance %s" %
3050
                (device.iv_name, instance.name))
3051
    #HARDCODE
3052
    for secondary_node in instance.secondary_nodes:
3053
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3054
                                        device, False, info):
3055
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3056
                     (device.iv_name, device, secondary_node))
3057
        return False
3058
    #HARDCODE
3059
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3060
                                    instance, device, info):
3061
      logger.Error("failed to create volume %s on primary!" %
3062
                   device.iv_name)
3063
      return False
3064

    
3065
  return True
3066

    
3067

    
3068
def _RemoveDisks(instance, cfg):
3069
  """Remove all disks for an instance.
3070

3071
  This abstracts away some work from `AddInstance()` and
3072
  `RemoveInstance()`. Note that in case some of the devices couldn't
3073
  be removed, the removal will continue with the other ones (compare
3074
  with `_CreateDisks()`).
3075

3076
  Args:
3077
    instance: the instance object
3078

3079
  Returns:
3080
    True or False showing the success of the removal proces
3081

3082
  """
3083
  logger.Info("removing block devices for instance %s" % instance.name)
3084

    
3085
  result = True
3086
  for device in instance.disks:
3087
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3088
      cfg.SetDiskID(disk, node)
3089
      if not rpc.call_blockdev_remove(node, disk):
3090
        logger.Error("could not remove block device %s on node %s,"
3091
                     " continuing anyway" %
3092
                     (device.iv_name, node))
3093
        result = False
3094

    
3095
  if instance.disk_template == constants.DT_FILE:
3096
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3097
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3098
                                            file_storage_dir):
3099
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3100
      result = False
3101

    
3102
  return result
3103

    
3104

    
3105
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3106
  """Compute disk size requirements in the volume group
3107

3108
  This is currently hard-coded for the two-drive layout.
3109

3110
  """
3111
  # Required free disk space as a function of disk and swap space
3112
  req_size_dict = {
3113
    constants.DT_DISKLESS: None,
3114
    constants.DT_PLAIN: disk_size + swap_size,
3115
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3116
    constants.DT_DRBD8: disk_size + swap_size + 256,
3117
    constants.DT_FILE: None,
3118
  }
3119

    
3120
  if disk_template not in req_size_dict:
3121
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3122
                                 " is unknown" %  disk_template)
3123

    
3124
  return req_size_dict[disk_template]
3125

    
3126

    
3127
class LUCreateInstance(LogicalUnit):
3128
  """Create an instance.
3129

3130
  """
3131
  HPATH = "instance-add"
3132
  HTYPE = constants.HTYPE_INSTANCE
3133
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3134
              "disk_template", "swap_size", "mode", "start", "vcpus",
3135
              "wait_for_sync", "ip_check", "mac"]
3136

    
3137
  def _RunAllocator(self):
3138
    """Run the allocator based on input opcode.
3139

3140
    """
3141
    al_data = _IAllocatorGetClusterData(self.cfg, self.sstore)
3142
    disks = [{"size": self.op.disk_size, "mode": "w"},
3143
             {"size": self.op.swap_size, "mode": "w"}]
3144
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3145
             "bridge": self.op.bridge}]
3146
    op = opcodes.OpTestAllocator(name=self.op.instance_name,
3147
                                 disk_template=self.op.disk_template,
3148
                                 tags=[],
3149
                                 os=self.op.os_type,
3150
                                 vcpus=self.op.vcpus,
3151
                                 mem_size=self.op.mem_size,
3152
                                 disks=disks,
3153
                                 nics=nics)
3154

    
3155
    _IAllocatorAddNewInstance(al_data, op)
3156

    
3157
    text = serializer.Dump(al_data)
3158

    
3159
    result = _IAllocatorRun(self.op.iallocator, text)
3160

    
3161
    result = _IAllocatorValidateResult(result)
3162

    
3163
    if not result["success"]:
3164
      raise errors.OpPrereqError("Can't compute nodes using"
3165
                                 " iallocator '%s': %s" % (self.op.iallocator,
3166
                                                           result["info"]))
3167
    req_nodes = 1
3168
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3169
      req_nodes += 1
3170

    
3171
    if len(result["nodes"]) != req_nodes:
3172
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3173
                                 " of nodes (%s), required %s" %
3174
                                 (len(result["nodes"]), req_nodes))
3175
    self.op.pnode = result["nodes"][0]
3176
    logger.ToStdout("Selected nodes for the instance: %s" %
3177
                    (", ".join(result["nodes"]),))
3178
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3179
                (self.op.instance_name, self.op.iallocator, result["nodes"]))
3180
    if req_nodes == 2:
3181
      self.op.snode = result["nodes"][1]
3182

    
3183
  def BuildHooksEnv(self):
3184
    """Build hooks env.
3185

3186
    This runs on master, primary and secondary nodes of the instance.
3187

3188
    """
3189
    env = {
3190
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3191
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3192
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3193
      "INSTANCE_ADD_MODE": self.op.mode,
3194
      }
3195
    if self.op.mode == constants.INSTANCE_IMPORT:
3196
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3197
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3198
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3199

    
3200
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3201
      primary_node=self.op.pnode,
3202
      secondary_nodes=self.secondaries,
3203
      status=self.instance_status,
3204
      os_type=self.op.os_type,
3205
      memory=self.op.mem_size,
3206
      vcpus=self.op.vcpus,
3207
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3208
    ))
3209

    
3210
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3211
          self.secondaries)
3212
    return env, nl, nl
3213

    
3214

    
3215
  def CheckPrereq(self):
3216
    """Check prerequisites.
3217

3218
    """
3219
    # set optional parameters to none if they don't exist
3220
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3221
                 "iallocator"]:
3222
      if not hasattr(self.op, attr):
3223
        setattr(self.op, attr, None)
3224

    
3225
    if self.op.mode not in (constants.INSTANCE_CREATE,
3226
                            constants.INSTANCE_IMPORT):
3227
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3228
                                 self.op.mode)
3229

    
3230
    if (not self.cfg.GetVGName() and
3231
        self.op.disk_template not in constants.DTS_NOT_LVM):
3232
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3233
                                 " instances")
3234

    
3235
    if self.op.mode == constants.INSTANCE_IMPORT:
3236
      src_node = getattr(self.op, "src_node", None)
3237
      src_path = getattr(self.op, "src_path", None)
3238
      if src_node is None or src_path is None:
3239
        raise errors.OpPrereqError("Importing an instance requires source"
3240
                                   " node and path options")
3241
      src_node_full = self.cfg.ExpandNodeName(src_node)
3242
      if src_node_full is None:
3243
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3244
      self.op.src_node = src_node = src_node_full
3245

    
3246
      if not os.path.isabs(src_path):
3247
        raise errors.OpPrereqError("The source path must be absolute")
3248

    
3249
      export_info = rpc.call_export_info(src_node, src_path)
3250

    
3251
      if not export_info:
3252
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3253

    
3254
      if not export_info.has_section(constants.INISECT_EXP):
3255
        raise errors.ProgrammerError("Corrupted export config")
3256

    
3257
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3258
      if (int(ei_version) != constants.EXPORT_VERSION):
3259
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3260
                                   (ei_version, constants.EXPORT_VERSION))
3261

    
3262
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3263
        raise errors.OpPrereqError("Can't import instance with more than"
3264
                                   " one data disk")
3265

    
3266
      # FIXME: are the old os-es, disk sizes, etc. useful?
3267
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3268
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3269
                                                         'disk0_dump'))
3270
      self.src_image = diskimage
3271
    else: # INSTANCE_CREATE
3272
      if getattr(self.op, "os_type", None) is None:
3273
        raise errors.OpPrereqError("No guest OS specified")
3274

    
3275
    #### instance parameters check
3276

    
3277
    # disk template and mirror node verification
3278
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3279
      raise errors.OpPrereqError("Invalid disk template name")
3280

    
3281
    # instance name verification
3282
    hostname1 = utils.HostInfo(self.op.instance_name)
3283

    
3284
    self.op.instance_name = instance_name = hostname1.name
3285
    instance_list = self.cfg.GetInstanceList()
3286
    if instance_name in instance_list:
3287
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3288
                                 instance_name)
3289

    
3290
    # ip validity checks
3291
    ip = getattr(self.op, "ip", None)
3292
    if ip is None or ip.lower() == "none":
3293
      inst_ip = None
3294
    elif ip.lower() == "auto":
3295
      inst_ip = hostname1.ip
3296
    else:
3297
      if not utils.IsValidIP(ip):
3298
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3299
                                   " like a valid IP" % ip)
3300
      inst_ip = ip
3301
    self.inst_ip = self.op.ip = inst_ip
3302

    
3303
    if self.op.start and not self.op.ip_check:
3304
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3305
                                 " adding an instance in start mode")
3306

    
3307
    if self.op.ip_check:
3308
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3309
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3310
                                   (hostname1.ip, instance_name))
3311

    
3312
    # MAC address verification
3313
    if self.op.mac != "auto":
3314
      if not utils.IsValidMac(self.op.mac.lower()):
3315
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3316
                                   self.op.mac)
3317

    
3318
    # bridge verification
3319
    bridge = getattr(self.op, "bridge", None)
3320
    if bridge is None:
3321
      self.op.bridge = self.cfg.GetDefBridge()
3322
    else:
3323
      self.op.bridge = bridge
3324

    
3325
    # boot order verification
3326
    if self.op.hvm_boot_order is not None:
3327
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3328
        raise errors.OpPrereqError("invalid boot order specified,"
3329
                                   " must be one or more of [acdn]")
3330
    # file storage checks
3331
    if (self.op.file_driver and
3332
        not self.op.file_driver in constants.FILE_DRIVER):
3333
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3334
                                 self.op.file_driver)
3335

    
3336
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3337
        raise errors.OpPrereqError("File storage directory not a relative"
3338
                                   " path")
3339
    #### allocator run
3340

    
3341
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3342
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3343
                                 " node must be given")
3344

    
3345
    if self.op.iallocator is not None:
3346
      self._RunAllocator()
3347

    
3348
    #### node related checks
3349

    
3350
    # check primary node
3351
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3352
    if pnode is None:
3353
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3354
                                 self.op.pnode)
3355
    self.op.pnode = pnode.name
3356
    self.pnode = pnode
3357
    self.secondaries = []
3358

    
3359
    # mirror node verification
3360
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3361
      if getattr(self.op, "snode", None) is None:
3362
        raise errors.OpPrereqError("The networked disk templates need"
3363
                                   " a mirror node")
3364

    
3365
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3366
      if snode_name is None:
3367
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3368
                                   self.op.snode)
3369
      elif snode_name == pnode.name:
3370
        raise errors.OpPrereqError("The secondary node cannot be"
3371
                                   " the primary node.")
3372
      self.secondaries.append(snode_name)
3373

    
3374
    req_size = _ComputeDiskSize(self.op.disk_template,
3375
                                self.op.disk_size, self.op.swap_size)
3376

    
3377
    # Check lv size requirements
3378
    if req_size is not None:
3379
      nodenames = [pnode.name] + self.secondaries
3380
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3381
      for node in nodenames:
3382
        info = nodeinfo.get(node, None)
3383
        if not info:
3384
          raise errors.OpPrereqError("Cannot get current information"
3385
                                     " from node '%s'" % nodeinfo)
3386
        vg_free = info.get('vg_free', None)
3387
        if not isinstance(vg_free, int):
3388
          raise errors.OpPrereqError("Can't compute free disk space on"
3389
                                     " node %s" % node)
3390
        if req_size > info['vg_free']:
3391
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3392
                                     " %d MB available, %d MB required" %
3393
                                     (node, info['vg_free'], req_size))
3394

    
3395
    # os verification
3396
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3397
    if not os_obj:
3398
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3399
                                 " primary node"  % self.op.os_type)
3400

    
3401
    if self.op.kernel_path == constants.VALUE_NONE:
3402
      raise errors.OpPrereqError("Can't set instance kernel to none")
3403

    
3404

    
3405
    # bridge check on primary node
3406
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3407
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3408
                                 " destination node '%s'" %
3409
                                 (self.op.bridge, pnode.name))
3410

    
3411
    if self.op.start:
3412
      self.instance_status = 'up'
3413
    else:
3414
      self.instance_status = 'down'
3415

    
3416
  def Exec(self, feedback_fn):
3417
    """Create and add the instance to the cluster.
3418

3419
    """
3420
    instance = self.op.instance_name
3421
    pnode_name = self.pnode.name
3422

    
3423
    if self.op.mac == "auto":
3424
      mac_address = self.cfg.GenerateMAC()
3425
    else:
3426
      mac_address = self.op.mac
3427

    
3428
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3429
    if self.inst_ip is not None:
3430
      nic.ip = self.inst_ip
3431

    
3432
    ht_kind = self.sstore.GetHypervisorType()
3433
    if ht_kind in constants.HTS_REQ_PORT:
3434
      network_port = self.cfg.AllocatePort()
3435
    else:
3436
      network_port = None
3437

    
3438
    # this is needed because os.path.join does not accept None arguments
3439
    if self.op.file_storage_dir is None:
3440
      string_file_storage_dir = ""
3441
    else:
3442
      string_file_storage_dir = self.op.file_storage_dir
3443

    
3444
    # build the full file storage dir path
3445
    file_storage_dir = os.path.normpath(os.path.join(
3446
                                        self.sstore.GetFileStorageDir(),
3447
                                        string_file_storage_dir, instance))
3448

    
3449

    
3450
    disks = _GenerateDiskTemplate(self.cfg,
3451
                                  self.op.disk_template,
3452
                                  instance, pnode_name,
3453
                                  self.secondaries, self.op.disk_size,
3454
                                  self.op.swap_size,
3455
                                  file_storage_dir,
3456
                                  self.op.file_driver)
3457

    
3458
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3459
                            primary_node=pnode_name,
3460
                            memory=self.op.mem_size,
3461
                            vcpus=self.op.vcpus,
3462
                            nics=[nic], disks=disks,
3463
                            disk_template=self.op.disk_template,
3464
                            status=self.instance_status,
3465
                            network_port=network_port,
3466
                            kernel_path=self.op.kernel_path,
3467
                            initrd_path=self.op.initrd_path,
3468
                            hvm_boot_order=self.op.hvm_boot_order,
3469
                            )
3470

    
3471
    feedback_fn("* creating instance disks...")
3472
    if not _CreateDisks(self.cfg, iobj):
3473
      _RemoveDisks(iobj, self.cfg)
3474
      raise errors.OpExecError("Device creation failed, reverting...")
3475

    
3476
    feedback_fn("adding instance %s to cluster config" % instance)
3477

    
3478
    self.cfg.AddInstance(iobj)
3479

    
3480
    if self.op.wait_for_sync:
3481
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3482
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3483
      # make sure the disks are not degraded (still sync-ing is ok)
3484
      time.sleep(15)
3485
      feedback_fn("* checking mirrors status")
3486
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3487
    else:
3488
      disk_abort = False
3489

    
3490
    if disk_abort:
3491
      _RemoveDisks(iobj, self.cfg)
3492
      self.cfg.RemoveInstance(iobj.name)
3493
      raise errors.OpExecError("There are some degraded disks for"
3494
                               " this instance")
3495

    
3496
    feedback_fn("creating os for instance %s on node %s" %
3497
                (instance, pnode_name))
3498

    
3499
    if iobj.disk_template != constants.DT_DISKLESS:
3500
      if self.op.mode == constants.INSTANCE_CREATE:
3501
        feedback_fn("* running the instance OS create scripts...")
3502
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3503
          raise errors.OpExecError("could not add os for instance %s"
3504
                                   " on node %s" %
3505
                                   (instance, pnode_name))
3506

    
3507
      elif self.op.mode == constants.INSTANCE_IMPORT:
3508
        feedback_fn("* running the instance OS import scripts...")
3509
        src_node = self.op.src_node
3510
        src_image = self.src_image
3511
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3512
                                                src_node, src_image):
3513
          raise errors.OpExecError("Could not import os for instance"
3514
                                   " %s on node %s" %
3515
                                   (instance, pnode_name))
3516
      else:
3517
        # also checked in the prereq part
3518
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3519
                                     % self.op.mode)
3520

    
3521
    if self.op.start:
3522
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3523
      feedback_fn("* starting instance...")
3524
      if not rpc.call_instance_start(pnode_name, iobj, None):
3525
        raise errors.OpExecError("Could not start instance")
3526

    
3527

    
3528
class LUConnectConsole(NoHooksLU):
3529
  """Connect to an instance's console.
3530

3531
  This is somewhat special in that it returns the command line that
3532
  you need to run on the master node in order to connect to the
3533
  console.
3534

3535
  """
3536
  _OP_REQP = ["instance_name"]
3537

    
3538
  def CheckPrereq(self):
3539
    """Check prerequisites.
3540

3541
    This checks that the instance is in the cluster.
3542

3543
    """
3544
    instance = self.cfg.GetInstanceInfo(
3545
      self.cfg.ExpandInstanceName(self.op.instance_name))
3546
    if instance is None:
3547
      raise errors.OpPrereqError("Instance '%s' not known" %
3548
                                 self.op.instance_name)
3549
    self.instance = instance
3550

    
3551
  def Exec(self, feedback_fn):
3552
    """Connect to the console of an instance
3553

3554
    """
3555
    instance = self.instance
3556
    node = instance.primary_node
3557

    
3558
    node_insts = rpc.call_instance_list([node])[node]
3559
    if node_insts is False:
3560
      raise errors.OpExecError("Can't connect to node %s." % node)
3561

    
3562
    if instance.name not in node_insts:
3563
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3564

    
3565
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3566

    
3567
    hyper = hypervisor.GetHypervisor()
3568
    console_cmd = hyper.GetShellCommandForConsole(instance)
3569

    
3570
    # build ssh cmdline
3571
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3572

    
3573

    
3574
class LUReplaceDisks(LogicalUnit):
3575
  """Replace the disks of an instance.
3576

3577
  """
3578
  HPATH = "mirrors-replace"
3579
  HTYPE = constants.HTYPE_INSTANCE
3580
  _OP_REQP = ["instance_name", "mode", "disks"]
3581

    
3582
  def BuildHooksEnv(self):
3583
    """Build hooks env.
3584

3585
    This runs on the master, the primary and all the secondaries.
3586

3587
    """
3588
    env = {
3589
      "MODE": self.op.mode,
3590
      "NEW_SECONDARY": self.op.remote_node,
3591
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3592
      }
3593
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3594
    nl = [
3595
      self.sstore.GetMasterNode(),
3596
      self.instance.primary_node,
3597
      ]
3598
    if self.op.remote_node is not None:
3599
      nl.append(self.op.remote_node)
3600
    return env, nl, nl
3601

    
3602
  def CheckPrereq(self):
3603
    """Check prerequisites.
3604

3605
    This checks that the instance is in the cluster.
3606

3607
    """
3608
    instance = self.cfg.GetInstanceInfo(
3609
      self.cfg.ExpandInstanceName(self.op.instance_name))
3610
    if instance is None:
3611
      raise errors.OpPrereqError("Instance '%s' not known" %
3612
                                 self.op.instance_name)
3613
    self.instance = instance
3614
    self.op.instance_name = instance.name
3615

    
3616
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3617
      raise errors.OpPrereqError("Instance's disk layout is not"
3618
                                 " network mirrored.")
3619

    
3620
    if len(instance.secondary_nodes) != 1:
3621
      raise errors.OpPrereqError("The instance has a strange layout,"
3622
                                 " expected one secondary but found %d" %
3623
                                 len(instance.secondary_nodes))
3624

    
3625
    self.sec_node = instance.secondary_nodes[0]
3626

    
3627
    remote_node = getattr(self.op, "remote_node", None)
3628
    if remote_node is not None:
3629
      remote_node = self.cfg.ExpandNodeName(remote_node)
3630
      if remote_node is None:
3631
        raise errors.OpPrereqError("Node '%s' not known" %
3632
                                   self.op.remote_node)
3633
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3634
    else:
3635
      self.remote_node_info = None
3636
    if remote_node == instance.primary_node:
3637
      raise errors.OpPrereqError("The specified node is the primary node of"
3638
                                 " the instance.")
3639
    elif remote_node == self.sec_node:
3640
      if self.op.mode == constants.REPLACE_DISK_SEC:
3641
        # this is for DRBD8, where we can't execute the same mode of
3642
        # replacement as for drbd7 (no different port allocated)
3643
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3644
                                   " replacement")
3645
      # the user gave the current secondary, switch to
3646
      # 'no-replace-secondary' mode for drbd7
3647
      remote_node = None
3648
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3649
        self.op.mode != constants.REPLACE_DISK_ALL):
3650
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3651
                                 " disks replacement, not individual ones")
3652
    if instance.disk_template == constants.DT_DRBD8:
3653
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3654
          remote_node is not None):
3655
        # switch to replace secondary mode
3656
        self.op.mode = constants.REPLACE_DISK_SEC
3657

    
3658
      if self.op.mode == constants.REPLACE_DISK_ALL:
3659
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3660
                                   " secondary disk replacement, not"
3661
                                   " both at once")
3662
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3663
        if remote_node is not None:
3664
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3665
                                     " the secondary while doing a primary"
3666
                                     " node disk replacement")
3667
        self.tgt_node = instance.primary_node
3668
        self.oth_node = instance.secondary_nodes[0]
3669
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3670
        self.new_node = remote_node # this can be None, in which case
3671
                                    # we don't change the secondary
3672
        self.tgt_node = instance.secondary_nodes[0]
3673
        self.oth_node = instance.primary_node
3674
      else:
3675
        raise errors.ProgrammerError("Unhandled disk replace mode")
3676

    
3677
    for name in self.op.disks:
3678
      if instance.FindDisk(name) is None:
3679
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3680
                                   (name, instance.name))
3681
    self.op.remote_node = remote_node
3682

    
3683
  def _ExecRR1(self, feedback_fn):
3684
    """Replace the disks of an instance.
3685

3686
    """
3687
    instance = self.instance
3688
    iv_names = {}
3689
    # start of work
3690
    if self.op.remote_node is None:
3691
      remote_node = self.sec_node
3692
    else:
3693
      remote_node = self.op.remote_node
3694
    cfg = self.cfg
3695
    for dev in instance.disks:
3696
      size = dev.size
3697
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3698
      names = _GenerateUniqueNames(cfg, lv_names)
3699
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3700
                                       remote_node, size, names)
3701
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3702
      logger.Info("adding new mirror component on secondary for %s" %
3703
                  dev.iv_name)
3704
      #HARDCODE
3705
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3706
                                        new_drbd, False,
3707
                                        _GetInstanceInfoText(instance)):
3708
        raise errors.OpExecError("Failed to create new component on secondary"
3709
                                 " node %s. Full abort, cleanup manually!" %
3710
                                 remote_node)
3711

    
3712
      logger.Info("adding new mirror component on primary")
3713
      #HARDCODE
3714
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3715
                                      instance, new_drbd,
3716
                                      _GetInstanceInfoText(instance)):
3717
        # remove secondary dev
3718
        cfg.SetDiskID(new_drbd, remote_node)
3719
        rpc.call_blockdev_remove(remote_node, new_drbd)
3720
        raise errors.OpExecError("Failed to create volume on primary!"
3721
                                 " Full abort, cleanup manually!!")
3722

    
3723
      # the device exists now
3724
      # call the primary node to add the mirror to md
3725
      logger.Info("adding new mirror component to md")
3726
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3727
                                           [new_drbd]):
3728
        logger.Error("Can't add mirror compoment to md!")
3729
        cfg.SetDiskID(new_drbd, remote_node)
3730
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3731
          logger.Error("Can't rollback on secondary")
3732
        cfg.SetDiskID(new_drbd, instance.primary_node)
3733
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3734
          logger.Error("Can't rollback on primary")
3735
        raise errors.OpExecError("Full abort, cleanup manually!!")
3736

    
3737
      dev.children.append(new_drbd)
3738
      cfg.AddInstance(instance)
3739

    
3740
    # this can fail as the old devices are degraded and _WaitForSync
3741
    # does a combined result over all disks, so we don't check its
3742
    # return value
3743
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3744

    
3745
    # so check manually all the devices
3746
    for name in iv_names:
3747
      dev, child, new_drbd = iv_names[name]
3748
      cfg.SetDiskID(dev, instance.primary_node)
3749
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3750
      if is_degr:
3751
        raise errors.OpExecError("MD device %s is degraded!" % name)
3752
      cfg.SetDiskID(new_drbd, instance.primary_node)
3753
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3754
      if is_degr:
3755
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3756

    
3757
    for name in iv_names:
3758
      dev, child, new_drbd = iv_names[name]
3759
      logger.Info("remove mirror %s component" % name)
3760
      cfg.SetDiskID(dev, instance.primary_node)
3761
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3762
                                              dev, [child]):
3763
        logger.Error("Can't remove child from mirror, aborting"
3764
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3765
        continue
3766

    
3767
      for node in child.logical_id[:2]:
3768
        logger.Info("remove child device on %s" % node)
3769
        cfg.SetDiskID(child, node)
3770
        if not rpc.call_blockdev_remove(node, child):
3771
          logger.Error("Warning: failed to remove device from node %s,"
3772
                       " continuing operation." % node)
3773

    
3774
      dev.children.remove(child)
3775

    
3776
      cfg.AddInstance(instance)
3777

    
3778
  def _ExecD8DiskOnly(self, feedback_fn):
3779
    """Replace a disk on the primary or secondary for dbrd8.
3780

3781
    The algorithm for replace is quite complicated:
3782
      - for each disk to be replaced:
3783
        - create new LVs on the target node with unique names
3784
        - detach old LVs from the drbd device
3785
        - rename old LVs to name_replaced.<time_t>
3786
        - rename new LVs to old LVs
3787
        - attach the new LVs (with the old names now) to the drbd device
3788
      - wait for sync across all devices
3789
      - for each modified disk:
3790
        - remove old LVs (which have the name name_replaces.<time_t>)
3791

3792
    Failures are not very well handled.
3793

3794
    """
3795
    steps_total = 6
3796
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3797
    instance = self.instance
3798
    iv_names = {}
3799
    vgname = self.cfg.GetVGName()
3800
    # start of work
3801
    cfg = self.cfg
3802
    tgt_node = self.tgt_node
3803
    oth_node = self.oth_node
3804

    
3805
    # Step: check device activation
3806
    self.proc.LogStep(1, steps_total, "check device existence")
3807
    info("checking volume groups")
3808
    my_vg = cfg.GetVGName()
3809
    results = rpc.call_vg_list([oth_node, tgt_node])
3810
    if not results:
3811
      raise errors.OpExecError("Can't list volume groups on the nodes")
3812
    for node in oth_node, tgt_node:
3813
      res = results.get(node, False)
3814
      if not res or my_vg not in res:
3815
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3816
                                 (my_vg, node))
3817
    for dev in instance.disks:
3818
      if not dev.iv_name in self.op.disks:
3819
        continue
3820
      for node in tgt_node, oth_node:
3821
        info("checking %s on %s" % (dev.iv_name, node))
3822
        cfg.SetDiskID(dev, node)
3823
        if not rpc.call_blockdev_find(node, dev):
3824
          raise errors.OpExecError("Can't find device %s on node %s" %
3825
                                   (dev.iv_name, node))
3826

    
3827
    # Step: check other node consistency
3828
    self.proc.LogStep(2, steps_total, "check peer consistency")
3829
    for dev in instance.disks:
3830
      if not dev.iv_name in self.op.disks:
3831
        continue
3832
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3833
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3834
                                   oth_node==instance.primary_node):
3835
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3836
                                 " to replace disks on this node (%s)" %
3837
                                 (oth_node, tgt_node))
3838

    
3839
    # Step: create new storage
3840
    self.proc.LogStep(3, steps_total, "allocate new storage")
3841
    for dev in instance.disks:
3842
      if not dev.iv_name in self.op.disks:
3843
        continue
3844
      size = dev.size
3845
      cfg.SetDiskID(dev, tgt_node)
3846
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3847
      names = _GenerateUniqueNames(cfg, lv_names)
3848
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3849
                             logical_id=(vgname, names[0]))
3850
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3851
                             logical_id=(vgname, names[1]))
3852
      new_lvs = [lv_data, lv_meta]
3853
      old_lvs = dev.children
3854
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3855
      info("creating new local storage on %s for %s" %
3856
           (tgt_node, dev.iv_name))
3857
      # since we *always* want to create this LV, we use the
3858
      # _Create...OnPrimary (which forces the creation), even if we
3859
      # are talking about the secondary node
3860
      for new_lv in new_lvs:
3861
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3862
                                        _GetInstanceInfoText(instance)):
3863
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3864
                                   " node '%s'" %
3865
                                   (new_lv.logical_id[1], tgt_node))
3866

    
3867
    # Step: for each lv, detach+rename*2+attach
3868
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3869
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3870
      info("detaching %s drbd from local storage" % dev.iv_name)
3871
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3872
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3873
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3874
      #dev.children = []
3875
      #cfg.Update(instance)
3876

    
3877
      # ok, we created the new LVs, so now we know we have the needed
3878
      # storage; as such, we proceed on the target node to rename
3879
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3880
      # using the assumption that logical_id == physical_id (which in
3881
      # turn is the unique_id on that node)
3882

    
3883
      # FIXME(iustin): use a better name for the replaced LVs
3884
      temp_suffix = int(time.time())
3885
      ren_fn = lambda d, suff: (d.physical_id[0],
3886
                                d.physical_id[1] + "_replaced-%s" % suff)
3887
      # build the rename list based on what LVs exist on the node
3888
      rlist = []
3889
      for to_ren in old_lvs:
3890
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3891
        if find_res is not None: # device exists
3892
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3893

    
3894
      info("renaming the old LVs on the target node")
3895
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3896
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3897
      # now we rename the new LVs to the old LVs
3898
      info("renaming the new LVs on the target node")
3899
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3900
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3901
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3902

    
3903
      for old, new in zip(old_lvs, new_lvs):
3904
        new.logical_id = old.logical_id
3905
        cfg.SetDiskID(new, tgt_node)
3906

    
3907
      for disk in old_lvs:
3908
        disk.logical_id = ren_fn(disk, temp_suffix)
3909
        cfg.SetDiskID(disk, tgt_node)
3910

    
3911
      # now that the new lvs have the old name, we can add them to the device
3912
      info("adding new mirror component on %s" % tgt_node)
3913
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3914
        for new_lv in new_lvs:
3915
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3916
            warning("Can't rollback device %s", hint="manually cleanup unused"
3917
                    " logical volumes")
3918
        raise errors.OpExecError("Can't add local storage to drbd")
3919

    
3920
      dev.children = new_lvs
3921
      cfg.Update(instance)
3922

    
3923
    # Step: wait for sync
3924

    
3925
    # this can fail as the old devices are degraded and _WaitForSync
3926
    # does a combined result over all disks, so we don't check its
3927
    # return value
3928
    self.proc.LogStep(5, steps_total, "sync devices")
3929
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3930

    
3931
    # so check manually all the devices
3932
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3933
      cfg.SetDiskID(dev, instance.primary_node)
3934
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3935
      if is_degr:
3936
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3937

    
3938
    # Step: remove old storage
3939
    self.proc.LogStep(6, steps_total, "removing old storage")
3940
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3941
      info("remove logical volumes for %s" % name)
3942
      for lv in old_lvs:
3943
        cfg.SetDiskID(lv, tgt_node)
3944
        if not rpc.call_blockdev_remove(tgt_node, lv):
3945
          warning("Can't remove old LV", hint="manually remove unused LVs")
3946
          continue
3947

    
3948
  def _ExecD8Secondary(self, feedback_fn):
3949
    """Replace the secondary node for drbd8.
3950

3951
    The algorithm for replace is quite complicated:
3952
      - for all disks of the instance:
3953
        - create new LVs on the new node with same names
3954
        - shutdown the drbd device on the old secondary
3955
        - disconnect the drbd network on the primary
3956
        - create the drbd device on the new secondary
3957
        - network attach the drbd on the primary, using an artifice:
3958
          the drbd code for Attach() will connect to the network if it
3959
          finds a device which is connected to the good local disks but
3960
          not network enabled
3961
      - wait for sync across all devices
3962
      - remove all disks from the old secondary
3963

3964
    Failures are not very well handled.
3965

3966
    """
3967
    steps_total = 6
3968
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3969
    instance = self.instance
3970
    iv_names = {}
3971
    vgname = self.cfg.GetVGName()
3972
    # start of work
3973
    cfg = self.cfg
3974
    old_node = self.tgt_node
3975
    new_node = self.new_node
3976
    pri_node = instance.primary_node
3977

    
3978
    # Step: check device activation
3979
    self.proc.LogStep(1, steps_total, "check device existence")
3980
    info("checking volume groups")
3981
    my_vg = cfg.GetVGName()
3982
    results = rpc.call_vg_list([pri_node, new_node])
3983
    if not results:
3984
      raise errors.OpExecError("Can't list volume groups on the nodes")
3985
    for node in pri_node, new_node:
3986
      res = results.get(node, False)
3987
      if not res or my_vg not in res:
3988
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3989
                                 (my_vg, node))
3990
    for dev in instance.disks:
3991
      if not dev.iv_name in self.op.disks:
3992
        continue
3993
      info("checking %s on %s" % (dev.iv_name, pri_node))
3994
      cfg.SetDiskID(dev, pri_node)
3995
      if not rpc.call_blockdev_find(pri_node, dev):
3996
        raise errors.OpExecError("Can't find device %s on node %s" %
3997
                                 (dev.iv_name, pri_node))
3998

    
3999
    # Step: check other node consistency
4000
    self.proc.LogStep(2, steps_total, "check peer consistency")
4001
    for dev in instance.disks:
4002
      if not dev.iv_name in self.op.disks:
4003
        continue
4004
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4005
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4006
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4007
                                 " unsafe to replace the secondary" %
4008
                                 pri_node)
4009

    
4010
    # Step: create new storage
4011
    self.proc.LogStep(3, steps_total, "allocate new storage")
4012
    for dev in instance.disks:
4013
      size = dev.size
4014
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4015
      # since we *always* want to create this LV, we use the
4016
      # _Create...OnPrimary (which forces the creation), even if we
4017
      # are talking about the secondary node
4018
      for new_lv in dev.children:
4019
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4020
                                        _GetInstanceInfoText(instance)):
4021
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4022
                                   " node '%s'" %
4023
                                   (new_lv.logical_id[1], new_node))
4024

    
4025
      iv_names[dev.iv_name] = (dev, dev.children)
4026

    
4027
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4028
    for dev in instance.disks:
4029
      size = dev.size
4030
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4031
      # create new devices on new_node
4032
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4033
                              logical_id=(pri_node, new_node,
4034
                                          dev.logical_id[2]),
4035
                              children=dev.children)
4036
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4037
                                        new_drbd, False,
4038
                                      _GetInstanceInfoText(instance)):
4039
        raise errors.OpExecError("Failed to create new DRBD on"
4040
                                 " node '%s'" % new_node)
4041

    
4042
    for dev in instance.disks:
4043
      # we have new devices, shutdown the drbd on the old secondary
4044
      info("shutting down drbd for %s on old node" % dev.iv_name)
4045
      cfg.SetDiskID(dev, old_node)
4046
      if not rpc.call_blockdev_shutdown(old_node, dev):
4047
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4048
                hint="Please cleanup this device manually as soon as possible")
4049

    
4050
    info("detaching primary drbds from the network (=> standalone)")
4051
    done = 0
4052
    for dev in instance.disks:
4053
      cfg.SetDiskID(dev, pri_node)
4054
      # set the physical (unique in bdev terms) id to None, meaning
4055
      # detach from network
4056
      dev.physical_id = (None,) * len(dev.physical_id)
4057
      # and 'find' the device, which will 'fix' it to match the
4058
      # standalone state
4059
      if rpc.call_blockdev_find(pri_node, dev):
4060
        done += 1
4061
      else:
4062
        warning("Failed to detach drbd %s from network, unusual case" %
4063
                dev.iv_name)
4064

    
4065
    if not done:
4066
      # no detaches succeeded (very unlikely)
4067
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4068

    
4069
    # if we managed to detach at least one, we update all the disks of
4070
    # the instance to point to the new secondary
4071
    info("updating instance configuration")
4072
    for dev in instance.disks:
4073
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4074
      cfg.SetDiskID(dev, pri_node)
4075
    cfg.Update(instance)
4076

    
4077
    # and now perform the drbd attach
4078
    info("attaching primary drbds to new secondary (standalone => connected)")
4079
    failures = []
4080
    for dev in instance.disks:
4081
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4082
      # since the attach is smart, it's enough to 'find' the device,
4083
      # it will automatically activate the network, if the physical_id
4084
      # is correct
4085
      cfg.SetDiskID(dev, pri_node)
4086
      if not rpc.call_blockdev_find(pri_node, dev):
4087
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4088
                "please do a gnt-instance info to see the status of disks")
4089

    
4090
    # this can fail as the old devices are degraded and _WaitForSync
4091
    # does a combined result over all disks, so we don't check its
4092
    # return value
4093
    self.proc.LogStep(5, steps_total, "sync devices")
4094
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4095

    
4096
    # so check manually all the devices
4097
    for name, (dev, old_lvs) in iv_names.iteritems():
4098
      cfg.SetDiskID(dev, pri_node)
4099
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4100
      if is_degr:
4101
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4102

    
4103
    self.proc.LogStep(6, steps_total, "removing old storage")
4104
    for name, (dev, old_lvs) in iv_names.iteritems():
4105
      info("remove logical volumes for %s" % name)
4106
      for lv in old_lvs:
4107
        cfg.SetDiskID(lv, old_node)
4108
        if not rpc.call_blockdev_remove(old_node, lv):
4109
          warning("Can't remove LV on old secondary",
4110
                  hint="Cleanup stale volumes by hand")
4111

    
4112
  def Exec(self, feedback_fn):
4113
    """Execute disk replacement.
4114

4115
    This dispatches the disk replacement to the appropriate handler.
4116

4117
    """
4118
    instance = self.instance
4119
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4120
      fn = self._ExecRR1
4121
    elif instance.disk_template == constants.DT_DRBD8:
4122
      if self.op.remote_node is None:
4123
        fn = self._ExecD8DiskOnly
4124
      else:
4125
        fn = self._ExecD8Secondary
4126
    else:
4127
      raise errors.ProgrammerError("Unhandled disk replacement case")
4128
    return fn(feedback_fn)
4129

    
4130

    
4131
class LUQueryInstanceData(NoHooksLU):
4132
  """Query runtime instance data.
4133

4134
  """
4135
  _OP_REQP = ["instances"]
4136

    
4137
  def CheckPrereq(self):
4138
    """Check prerequisites.
4139

4140
    This only checks the optional instance list against the existing names.
4141

4142
    """
4143
    if not isinstance(self.op.instances, list):
4144
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4145
    if self.op.instances:
4146
      self.wanted_instances = []
4147
      names = self.op.instances
4148
      for name in names:
4149
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4150
        if instance is None:
4151
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4152
        self.wanted_instances.append(instance)
4153
    else:
4154
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4155
                               in self.cfg.GetInstanceList()]
4156
    return
4157

    
4158

    
4159
  def _ComputeDiskStatus(self, instance, snode, dev):
4160
    """Compute block device status.
4161

4162
    """
4163
    self.cfg.SetDiskID(dev, instance.primary_node)
4164
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4165
    if dev.dev_type in constants.LDS_DRBD:
4166
      # we change the snode then (otherwise we use the one passed in)
4167
      if dev.logical_id[0] == instance.primary_node:
4168
        snode = dev.logical_id[1]
4169
      else:
4170
        snode = dev.logical_id[0]
4171

    
4172
    if snode:
4173
      self.cfg.SetDiskID(dev, snode)
4174
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4175
    else:
4176
      dev_sstatus = None
4177

    
4178
    if dev.children:
4179
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4180
                      for child in dev.children]
4181
    else:
4182
      dev_children = []
4183

    
4184
    data = {
4185
      "iv_name": dev.iv_name,
4186
      "dev_type": dev.dev_type,
4187
      "logical_id": dev.logical_id,
4188
      "physical_id": dev.physical_id,
4189
      "pstatus": dev_pstatus,
4190
      "sstatus": dev_sstatus,
4191
      "children": dev_children,
4192
      }
4193

    
4194
    return data
4195

    
4196
  def Exec(self, feedback_fn):
4197
    """Gather and return data"""
4198
    result = {}
4199
    for instance in self.wanted_instances:
4200
      remote_info = rpc.call_instance_info(instance.primary_node,
4201
                                                instance.name)
4202
      if remote_info and "state" in remote_info:
4203
        remote_state = "up"
4204
      else:
4205
        remote_state = "down"
4206
      if instance.status == "down":
4207
        config_state = "down"
4208
      else:
4209
        config_state = "up"
4210

    
4211
      disks = [self._ComputeDiskStatus(instance, None, device)
4212
               for device in instance.disks]
4213

    
4214
      idict = {
4215
        "name": instance.name,
4216
        "config_state": config_state,
4217
        "run_state": remote_state,
4218
        "pnode": instance.primary_node,
4219
        "snodes": instance.secondary_nodes,
4220
        "os": instance.os,
4221
        "memory": instance.memory,
4222
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4223
        "disks": disks,
4224
        "network_port": instance.network_port,
4225
        "vcpus": instance.vcpus,
4226
        "kernel_path": instance.kernel_path,
4227
        "initrd_path": instance.initrd_path,
4228
        "hvm_boot_order": instance.hvm_boot_order,
4229
        }
4230

    
4231
      result[instance.name] = idict
4232

    
4233
    return result
4234

    
4235

    
4236
class LUSetInstanceParams(LogicalUnit):
4237
  """Modifies an instances's parameters.
4238

4239
  """
4240
  HPATH = "instance-modify"
4241
  HTYPE = constants.HTYPE_INSTANCE
4242
  _OP_REQP = ["instance_name"]
4243

    
4244
  def BuildHooksEnv(self):
4245
    """Build hooks env.
4246

4247
    This runs on the master, primary and secondaries.
4248

4249
    """
4250
    args = dict()
4251
    if self.mem:
4252
      args['memory'] = self.mem
4253
    if self.vcpus:
4254
      args['vcpus'] = self.vcpus
4255
    if self.do_ip or self.do_bridge or self.mac:
4256
      if self.do_ip:
4257
        ip = self.ip
4258
      else:
4259
        ip = self.instance.nics[0].ip
4260
      if self.bridge:
4261
        bridge = self.bridge
4262
      else:
4263
        bridge = self.instance.nics[0].bridge
4264
      if self.mac:
4265
        mac = self.mac
4266
      else:
4267
        mac = self.instance.nics[0].mac
4268
      args['nics'] = [(ip, bridge, mac)]
4269
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4270
    nl = [self.sstore.GetMasterNode(),
4271
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4272
    return env, nl, nl
4273

    
4274
  def CheckPrereq(self):
4275
    """Check prerequisites.
4276

4277
    This only checks the instance list against the existing names.
4278

4279
    """
4280
    self.mem = getattr(self.op, "mem", None)
4281
    self.vcpus = getattr(self.op, "vcpus", None)
4282
    self.ip = getattr(self.op, "ip", None)
4283
    self.mac = getattr(self.op, "mac", None)
4284
    self.bridge = getattr(self.op, "bridge", None)
4285
    self.kernel_path = getattr(self.op, "kernel_path", None)
4286
    self.initrd_path = getattr(self.op, "initrd_path", None)
4287
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4288
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4289
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4290
    if all_params.count(None) == len(all_params):
4291
      raise errors.OpPrereqError("No changes submitted")
4292
    if self.mem is not None:
4293
      try:
4294
        self.mem = int(self.mem)
4295
      except ValueError, err:
4296
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4297
    if self.vcpus is not None:
4298
      try:
4299
        self.vcpus = int(self.vcpus)
4300
      except ValueError, err:
4301
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4302
    if self.ip is not None:
4303
      self.do_ip = True
4304
      if self.ip.lower() == "none":
4305
        self.ip = None
4306
      else:
4307
        if not utils.IsValidIP(self.ip):
4308
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4309
    else:
4310
      self.do_ip = False
4311
    self.do_bridge = (self.bridge is not None)
4312
    if self.mac is not None:
4313
      if self.cfg.IsMacInUse(self.mac):
4314
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4315
                                   self.mac)
4316
      if not utils.IsValidMac(self.mac):
4317
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4318

    
4319
    if self.kernel_path is not None:
4320
      self.do_kernel_path = True
4321
      if self.kernel_path == constants.VALUE_NONE:
4322
        raise errors.OpPrereqError("Can't set instance to no kernel")
4323

    
4324
      if self.kernel_path != constants.VALUE_DEFAULT:
4325
        if not os.path.isabs(self.kernel_path):
4326
          raise errors.OpPrereqError("The kernel path must be an absolute"
4327
                                    " filename")
4328
    else:
4329
      self.do_kernel_path = False
4330

    
4331
    if self.initrd_path is not None:
4332
      self.do_initrd_path = True
4333
      if self.initrd_path not in (constants.VALUE_NONE,
4334
                                  constants.VALUE_DEFAULT):
4335
        if not os.path.isabs(self.initrd_path):
4336
          raise errors.OpPrereqError("The initrd path must be an absolute"
4337
                                    " filename")
4338
    else:
4339
      self.do_initrd_path = False
4340

    
4341
    # boot order verification
4342
    if self.hvm_boot_order is not None:
4343
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4344
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4345
          raise errors.OpPrereqError("invalid boot order specified,"
4346
                                     " must be one or more of [acdn]"
4347
                                     " or 'default'")
4348

    
4349
    instance = self.cfg.GetInstanceInfo(
4350
      self.cfg.ExpandInstanceName(self.op.instance_name))
4351
    if instance is None:
4352
      raise errors.OpPrereqError("No such instance name '%s'" %
4353
                                 self.op.instance_name)
4354
    self.op.instance_name = instance.name
4355
    self.instance = instance
4356
    return
4357

    
4358
  def Exec(self, feedback_fn):
4359
    """Modifies an instance.
4360

4361
    All parameters take effect only at the next restart of the instance.
4362
    """
4363
    result = []
4364
    instance = self.instance
4365
    if self.mem:
4366
      instance.memory = self.mem
4367
      result.append(("mem", self.mem))
4368
    if self.vcpus:
4369
      instance.vcpus = self.vcpus
4370
      result.append(("vcpus",  self.vcpus))
4371
    if self.do_ip:
4372
      instance.nics[0].ip = self.ip
4373
      result.append(("ip", self.ip))
4374
    if self.bridge:
4375
      instance.nics[0].bridge = self.bridge
4376
      result.append(("bridge", self.bridge))
4377
    if self.mac:
4378
      instance.nics[0].mac = self.mac
4379
      result.append(("mac", self.mac))
4380
    if self.do_kernel_path:
4381
      instance.kernel_path = self.kernel_path
4382
      result.append(("kernel_path", self.kernel_path))
4383
    if self.do_initrd_path:
4384
      instance.initrd_path = self.initrd_path
4385
      result.append(("initrd_path", self.initrd_path))
4386
    if self.hvm_boot_order:
4387
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4388
        instance.hvm_boot_order = None
4389
      else:
4390
        instance.hvm_boot_order = self.hvm_boot_order
4391
      result.append(("hvm_boot_order", self.hvm_boot_order))
4392

    
4393
    self.cfg.AddInstance(instance)
4394

    
4395
    return result
4396

    
4397

    
4398
class LUQueryExports(NoHooksLU):
4399
  """Query the exports list
4400

4401
  """
4402
  _OP_REQP = []
4403

    
4404
  def CheckPrereq(self):
4405
    """Check that the nodelist contains only existing nodes.
4406

4407
    """
4408
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4409

    
4410
  def Exec(self, feedback_fn):
4411
    """Compute the list of all the exported system images.
4412

4413
    Returns:
4414
      a dictionary with the structure node->(export-list)
4415
      where export-list is a list of the instances exported on
4416
      that node.
4417

4418
    """
4419
    return rpc.call_export_list(self.nodes)
4420

    
4421

    
4422
class LUExportInstance(LogicalUnit):
4423
  """Export an instance to an image in the cluster.
4424

4425
  """
4426
  HPATH = "instance-export"
4427
  HTYPE = constants.HTYPE_INSTANCE
4428
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4429

    
4430
  def BuildHooksEnv(self):
4431
    """Build hooks env.
4432

4433
    This will run on the master, primary node and target node.
4434

4435
    """
4436
    env = {
4437
      "EXPORT_NODE": self.op.target_node,
4438
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4439
      }
4440
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4441
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4442
          self.op.target_node]
4443
    return env, nl, nl
4444

    
4445
  def CheckPrereq(self):
4446
    """Check prerequisites.
4447

4448
    This checks that the instance name is a valid one.
4449

4450
    """
4451
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4452
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4453
    if self.instance is None:
4454
      raise errors.OpPrereqError("Instance '%s' not found" %
4455
                                 self.op.instance_name)
4456

    
4457
    # node verification
4458
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4459
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4460

    
4461
    if self.dst_node is None:
4462
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4463
                                 self.op.target_node)
4464
    self.op.target_node = self.dst_node.name
4465

    
4466
  def Exec(self, feedback_fn):
4467
    """Export an instance to an image in the cluster.
4468

4469
    """
4470
    instance = self.instance
4471
    dst_node = self.dst_node
4472
    src_node = instance.primary_node
4473
    if self.op.shutdown:
4474
      # shutdown the instance, but not the disks
4475
      if not rpc.call_instance_shutdown(src_node, instance):
4476
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4477
                                 (instance.name, src_node))
4478

    
4479
    vgname = self.cfg.GetVGName()
4480

    
4481
    snap_disks = []
4482

    
4483
    try:
4484
      for disk in instance.disks:
4485
        if disk.iv_name == "sda":
4486
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4487
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4488

    
4489
          if not new_dev_name:
4490
            logger.Error("could not snapshot block device %s on node %s" %
4491
                         (disk.logical_id[1], src_node))
4492
          else:
4493
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4494
                                      logical_id=(vgname, new_dev_name),
4495
                                      physical_id=(vgname, new_dev_name),
4496
                                      iv_name=disk.iv_name)
4497
            snap_disks.append(new_dev)
4498

    
4499
    finally:
4500
      if self.op.shutdown and instance.status == "up":
4501
        if not rpc.call_instance_start(src_node, instance, None):
4502
          _ShutdownInstanceDisks(instance, self.cfg)
4503
          raise errors.OpExecError("Could not start instance")
4504

    
4505
    # TODO: check for size
4506

    
4507
    for dev in snap_disks:
4508
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4509
        logger.Error("could not export block device %s from node %s to node %s"
4510
                     % (dev.logical_id[1], src_node, dst_node.name))
4511
      if not rpc.call_blockdev_remove(src_node, dev):
4512
        logger.Error("could not remove snapshot block device %s from node %s" %
4513
                     (dev.logical_id[1], src_node))
4514

    
4515
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4516
      logger.Error("could not finalize export for instance %s on node %s" %
4517
                   (instance.name, dst_node.name))
4518

    
4519
    nodelist = self.cfg.GetNodeList()
4520
    nodelist.remove(dst_node.name)
4521

    
4522
    # on one-node clusters nodelist will be empty after the removal
4523
    # if we proceed the backup would be removed because OpQueryExports
4524
    # substitutes an empty list with the full cluster node list.
4525
    if nodelist:
4526
      op = opcodes.OpQueryExports(nodes=nodelist)
4527
      exportlist = self.proc.ChainOpCode(op)
4528
      for node in exportlist:
4529
        if instance.name in exportlist[node]:
4530
          if not rpc.call_export_remove(node, instance.name):
4531
            logger.Error("could not remove older export for instance %s"
4532
                         " on node %s" % (instance.name, node))
4533

    
4534

    
4535
class TagsLU(NoHooksLU):
4536
  """Generic tags LU.
4537

4538
  This is an abstract class which is the parent of all the other tags LUs.
4539

4540
  """
4541
  def CheckPrereq(self):
4542
    """Check prerequisites.
4543

4544
    """
4545
    if self.op.kind == constants.TAG_CLUSTER:
4546
      self.target = self.cfg.GetClusterInfo()
4547
    elif self.op.kind == constants.TAG_NODE:
4548
      name = self.cfg.ExpandNodeName(self.op.name)
4549
      if name is None:
4550
        raise errors.OpPrereqError("Invalid node name (%s)" %
4551
                                   (self.op.name,))
4552
      self.op.name = name
4553
      self.target = self.cfg.GetNodeInfo(name)
4554
    elif self.op.kind == constants.TAG_INSTANCE:
4555
      name = self.cfg.ExpandInstanceName(self.op.name)
4556
      if name is None:
4557
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4558
                                   (self.op.name,))
4559
      self.op.name = name
4560
      self.target = self.cfg.GetInstanceInfo(name)
4561
    else:
4562
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4563
                                 str(self.op.kind))
4564

    
4565

    
4566
class LUGetTags(TagsLU):
4567
  """Returns the tags of a given object.
4568

4569
  """
4570
  _OP_REQP = ["kind", "name"]
4571

    
4572
  def Exec(self, feedback_fn):
4573
    """Returns the tag list.
4574

4575
    """
4576
    return self.target.GetTags()
4577

    
4578

    
4579
class LUSearchTags(NoHooksLU):
4580
  """Searches the tags for a given pattern.
4581

4582
  """
4583
  _OP_REQP = ["pattern"]
4584

    
4585
  def CheckPrereq(self):
4586
    """Check prerequisites.
4587

4588
    This checks the pattern passed for validity by compiling it.
4589

4590
    """
4591
    try:
4592
      self.re = re.compile(self.op.pattern)
4593
    except re.error, err:
4594
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4595
                                 (self.op.pattern, err))
4596

    
4597
  def Exec(self, feedback_fn):
4598
    """Returns the tag list.
4599

4600
    """
4601
    cfg = self.cfg
4602
    tgts = [("/cluster", cfg.GetClusterInfo())]
4603
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4604
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4605
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4606
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4607
    results = []
4608
    for path, target in tgts:
4609
      for tag in target.GetTags():
4610
        if self.re.search(tag):
4611
          results.append((path, tag))
4612
    return results
4613

    
4614

    
4615
class LUAddTags(TagsLU):
4616
  """Sets a tag on a given object.
4617

4618
  """
4619
  _OP_REQP = ["kind", "name", "tags"]
4620

    
4621
  def CheckPrereq(self):
4622
    """Check prerequisites.
4623

4624
    This checks the type and length of the tag name and value.
4625

4626
    """
4627
    TagsLU.CheckPrereq(self)
4628
    for tag in self.op.tags:
4629
      objects.TaggableObject.ValidateTag(tag)
4630

    
4631
  def Exec(self, feedback_fn):
4632
    """Sets the tag.
4633

4634
    """
4635
    try:
4636
      for tag in self.op.tags:
4637
        self.target.AddTag(tag)
4638
    except errors.TagError, err:
4639
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4640
    try:
4641
      self.cfg.Update(self.target)
4642
    except errors.ConfigurationError:
4643
      raise errors.OpRetryError("There has been a modification to the"
4644
                                " config file and the operation has been"
4645
                                " aborted. Please retry.")
4646

    
4647

    
4648
class LUDelTags(TagsLU):
4649
  """Delete a list of tags from a given object.
4650

4651
  """
4652
  _OP_REQP = ["kind", "name", "tags"]
4653

    
4654
  def CheckPrereq(self):
4655
    """Check prerequisites.
4656

4657
    This checks that we have the given tag.
4658

4659
    """
4660
    TagsLU.CheckPrereq(self)
4661
    for tag in self.op.tags:
4662
      objects.TaggableObject.ValidateTag(tag)
4663
    del_tags = frozenset(self.op.tags)
4664
    cur_tags = self.target.GetTags()
4665
    if not del_tags <= cur_tags:
4666
      diff_tags = del_tags - cur_tags
4667
      diff_names = ["'%s'" % tag for tag in diff_tags]
4668
      diff_names.sort()
4669
      raise errors.OpPrereqError("Tag(s) %s not found" %
4670
                                 (",".join(diff_names)))
4671

    
4672
  def Exec(self, feedback_fn):
4673
    """Remove the tag from the object.
4674

4675
    """
4676
    for tag in self.op.tags:
4677
      self.target.RemoveTag(tag)
4678
    try:
4679
      self.cfg.Update(self.target)
4680
    except errors.ConfigurationError:
4681
      raise errors.OpRetryError("There has been a modification to the"
4682
                                " config file and the operation has been"
4683
                                " aborted. Please retry.")
4684

    
4685
class LUTestDelay(NoHooksLU):
4686
  """Sleep for a specified amount of time.
4687

4688
  This LU sleeps on the master and/or nodes for a specified amoutn of
4689
  time.
4690

4691
  """
4692
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4693

    
4694
  def CheckPrereq(self):
4695
    """Check prerequisites.
4696

4697
    This checks that we have a good list of nodes and/or the duration
4698
    is valid.
4699

4700
    """
4701

    
4702
    if self.op.on_nodes:
4703
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4704

    
4705
  def Exec(self, feedback_fn):
4706
    """Do the actual sleep.
4707

4708
    """
4709
    if self.op.on_master:
4710
      if not utils.TestDelay(self.op.duration):
4711
        raise errors.OpExecError("Error during master delay test")
4712
    if self.op.on_nodes:
4713
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4714
      if not result:
4715
        raise errors.OpExecError("Complete failure from rpc call")
4716
      for node, node_result in result.items():
4717
        if not node_result:
4718
          raise errors.OpExecError("Failure during rpc call to node %s,"
4719
                                   " result: %s" % (node, node_result))
4720

    
4721

    
4722
def _IAllocatorGetClusterData(cfg, sstore):
4723
  """Compute the generic allocator input data.
4724

4725
  This is the data that is independent of the actual operation.
4726

4727
  """
4728
  # cluster data
4729
  data = {
4730
    "version": 1,
4731
    "cluster_name": sstore.GetClusterName(),
4732
    "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4733
    # we don't have job IDs
4734
    }
4735

    
4736
  # node data
4737
  node_results = {}
4738
  node_list = cfg.GetNodeList()
4739
  node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4740
  for nname in node_list:
4741
    ninfo = cfg.GetNodeInfo(nname)
4742
    if nname not in node_data or not isinstance(node_data[nname], dict):
4743
      raise errors.OpExecError("Can't get data for node %s" % nname)
4744
    remote_info = node_data[nname]
4745
    for attr in ['memory_total', 'memory_free',
4746
                 'vg_size', 'vg_free']:
4747
      if attr not in remote_info:
4748
        raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4749
                                 (nname, attr))
4750
      try:
4751
        int(remote_info[attr])
4752
      except ValueError, err:
4753
        raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4754
                                 " %s" % (nname, attr, str(err)))
4755
    pnr = {
4756
      "tags": list(ninfo.GetTags()),
4757
      "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4758
      "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4759
      "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4760
      "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4761
      "primary_ip": ninfo.primary_ip,
4762
      "secondary_ip": ninfo.secondary_ip,
4763
      }
4764
    node_results[nname] = pnr
4765
  data["nodes"] = node_results
4766

    
4767
  # instance data
4768
  instance_data = {}
4769
  i_list = cfg.GetInstanceList()
4770
  for iname in i_list:
4771
    iinfo = cfg.GetInstanceInfo(iname)
4772
    nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4773
                for n in iinfo.nics]
4774
    pir = {
4775
      "tags": list(iinfo.GetTags()),
4776
      "should_run": iinfo.status == "up",
4777
      "vcpus": iinfo.vcpus,
4778
      "memory": iinfo.memory,
4779
      "os": iinfo.os,
4780
      "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4781
      "nics": nic_data,
4782
      "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4783
      "disk_template": iinfo.disk_template,
4784
      }
4785
    instance_data[iname] = pir
4786

    
4787
  data["instances"] = instance_data
4788

    
4789
  return data
4790

    
4791

    
4792
def _IAllocatorAddNewInstance(data, op):
4793
  """Add new instance data to allocator structure.
4794

4795
  This in combination with _AllocatorGetClusterData will create the
4796
  correct structure needed as input for the allocator.
4797

4798
  The checks for the completeness of the opcode must have already been
4799
  done.
4800

4801
  """
4802
  if len(op.disks) != 2:
4803
    raise errors.OpExecError("Only two-disk configurations supported")
4804

    
4805
  disk_space = _ComputeDiskSize(op.disk_template,
4806
                                op.disks[0]["size"], op.disks[1]["size"])
4807

    
4808
  request = {
4809
    "type": "allocate",
4810
    "name": op.name,
4811
    "disk_template": op.disk_template,
4812
    "tags": op.tags,
4813
    "os": op.os,
4814
    "vcpus": op.vcpus,
4815
    "memory": op.mem_size,
4816
    "disks": op.disks,
4817
    "disk_space_total": disk_space,
4818
    "nics": op.nics,
4819
    }
4820
  data["request"] = request
4821

    
4822

    
4823
def _IAllocatorAddRelocateInstance(data, op):
4824
  """Add relocate instance data to allocator structure.
4825

4826
  This in combination with _IAllocatorGetClusterData will create the
4827
  correct structure needed as input for the allocator.
4828

4829
  The checks for the completeness of the opcode must have already been
4830
  done.
4831

4832
  """
4833
  request = {
4834
    "type": "replace_secondary",
4835
    "name": op.name,
4836
    }
4837
  data["request"] = request
4838

    
4839

    
4840
def _IAllocatorRun(name, data):
4841
  """Run an instance allocator and return the results.
4842

4843
  """
4844
  alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4845
                                os.path.isfile)
4846
  if alloc_script is None:
4847
    raise errors.OpExecError("Can't find allocator '%s'" % name)
4848

    
4849
  fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4850
  try:
4851
    os.write(fd, data)
4852
    os.close(fd)
4853
    result = utils.RunCmd([alloc_script, fin_name])
4854
    if result.failed:
4855
      raise errors.OpExecError("Instance allocator call failed: %s,"
4856
                               " output: %s" %
4857
                               (result.fail_reason, result.stdout))
4858
  finally:
4859
    os.unlink(fin_name)
4860
  return result.stdout
4861

    
4862

    
4863
def _IAllocatorValidateResult(data):
4864
  """Process the allocator results.
4865

4866
  """
4867
  try:
4868
    rdict = serializer.Load(data)
4869
  except Exception, err:
4870
    raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4871

    
4872
  if not isinstance(rdict, dict):
4873
    raise errors.OpExecError("Can't parse iallocator results: not a dict")
4874

    
4875
  for key in "success", "info", "nodes":
4876
    if key not in rdict:
4877
      raise errors.OpExecError("Can't parse iallocator results:"
4878
                               " missing key '%s'" % key)
4879

    
4880
  if not isinstance(rdict["nodes"], list):
4881
    raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4882
                             " is not a list")
4883
  return rdict
4884

    
4885

    
4886
class LUTestAllocator(NoHooksLU):
4887
  """Run allocator tests.
4888

4889
  This LU runs the allocator tests
4890

4891
  """
4892
  _OP_REQP = ["direction", "mode", "name"]
4893

    
4894
  def CheckPrereq(self):
4895
    """Check prerequisites.
4896

4897
    This checks the opcode parameters depending on the director and mode test.
4898

4899
    """
4900
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4901
      for attr in ["name", "mem_size", "disks", "disk_template",
4902
                   "os", "tags", "nics", "vcpus"]:
4903
        if not hasattr(self.op, attr):
4904
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4905
                                     attr)
4906
      iname = self.cfg.ExpandInstanceName(self.op.name)
4907
      if iname is not None:
4908
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4909
                                   iname)
4910
      if not isinstance(self.op.nics, list):
4911
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4912
      for row in self.op.nics:
4913
        if (not isinstance(row, dict) or
4914
            "mac" not in row or
4915
            "ip" not in row or
4916
            "bridge" not in row):
4917
          raise errors.OpPrereqError("Invalid contents of the"
4918
                                     " 'nics' parameter")
4919
      if not isinstance(self.op.disks, list):
4920
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4921
      if len(self.op.disks) != 2:
4922
        raise errors.OpPrereqError("Only two-disk configurations supported")
4923
      for row in self.op.disks:
4924
        if (not isinstance(row, dict) or
4925
            "size" not in row or
4926
            not isinstance(row["size"], int) or
4927
            "mode" not in row or
4928
            row["mode"] not in ['r', 'w']):
4929
          raise errors.OpPrereqError("Invalid contents of the"
4930
                                     " 'disks' parameter")
4931
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4932
      if not hasattr(self.op, "name"):
4933
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4934
      fname = self.cfg.ExpandInstanceName(self.op.name)
4935
      if fname is None:
4936
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4937
                                   self.op.name)
4938
      self.op.name = fname
4939
    else:
4940
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4941
                                 self.op.mode)
4942

    
4943
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4944
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4945
        raise errors.OpPrereqError("Missing allocator name")
4946
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4947
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4948
                                 self.op.direction)
4949

    
4950
  def Exec(self, feedback_fn):
4951
    """Run the allocator test.
4952

4953
    """
4954
    data = _IAllocatorGetClusterData(self.cfg, self.sstore)
4955
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4956
      _IAllocatorAddNewInstance(data, self.op)
4957
    else:
4958
      _IAllocatorAddRelocateInstance(data, self.op)
4959

    
4960
    text = serializer.Dump(data)
4961
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4962
      result = text
4963
    else:
4964
      result = _IAllocatorRun(self.op.allocator, text)
4965
    return result