Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d9c02ca6

History | View | Annotate | Download (176.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_MASTER); note that all
58
      commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84

    
85
    if not cfg.IsCluster():
86
      raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                 " use 'gnt-cluster init' first.")
88
    if self.REQ_MASTER:
89
      master = sstore.GetMasterNode()
90
      if master != utils.HostInfo().name:
91
        raise errors.OpPrereqError("Commands must be run on the master"
92
                                   " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    No nodes should be returned as an empty list (and not None).
148

149
    Note that if the HPATH for a LU class is None, this function will
150
    not be called.
151

152
    """
153
    raise NotImplementedError
154

    
155
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
156
    """Notify the LU about the results of its hooks.
157

158
    This method is called every time a hooks phase is executed, and notifies
159
    the Logical Unit about the hooks' result. The LU can then use it to alter
160
    its result based on the hooks.  By default the method does nothing and the
161
    previous result is passed back unchanged but any LU can define it if it
162
    wants to use the local cluster hook-scripts somehow.
163

164
    Args:
165
      phase: the hooks phase that has just been run
166
      hooks_results: the results of the multi-node hooks rpc call
167
      feedback_fn: function to send feedback back to the caller
168
      lu_result: the previous result this LU had, or None in the PRE phase.
169

170
    """
171
    return lu_result
172

    
173

    
174
class NoHooksLU(LogicalUnit):
175
  """Simple LU which runs no hooks.
176

177
  This LU is intended as a parent for other LogicalUnits which will
178
  run no hooks, in order to reduce duplicate code.
179

180
  """
181
  HPATH = None
182
  HTYPE = None
183

    
184

    
185
def _GetWantedNodes(lu, nodes):
186
  """Returns list of checked and expanded node names.
187

188
  Args:
189
    nodes: List of nodes (strings) or None for all
190

191
  """
192
  if not isinstance(nodes, list):
193
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
194

    
195
  if nodes:
196
    wanted = []
197

    
198
    for name in nodes:
199
      node = lu.cfg.ExpandNodeName(name)
200
      if node is None:
201
        raise errors.OpPrereqError("No such node name '%s'" % name)
202
      wanted.append(node)
203

    
204
  else:
205
    wanted = lu.cfg.GetNodeList()
206
  return utils.NiceSort(wanted)
207

    
208

    
209
def _GetWantedInstances(lu, instances):
210
  """Returns list of checked and expanded instance names.
211

212
  Args:
213
    instances: List of instances (strings) or None for all
214

215
  """
216
  if not isinstance(instances, list):
217
    raise errors.OpPrereqError("Invalid argument type 'instances'")
218

    
219
  if instances:
220
    wanted = []
221

    
222
    for name in instances:
223
      instance = lu.cfg.ExpandInstanceName(name)
224
      if instance is None:
225
        raise errors.OpPrereqError("No such instance name '%s'" % name)
226
      wanted.append(instance)
227

    
228
  else:
229
    wanted = lu.cfg.GetInstanceList()
230
  return utils.NiceSort(wanted)
231

    
232

    
233
def _CheckOutputFields(static, dynamic, selected):
234
  """Checks whether all selected fields are valid.
235

236
  Args:
237
    static: Static fields
238
    dynamic: Dynamic fields
239

240
  """
241
  static_fields = frozenset(static)
242
  dynamic_fields = frozenset(dynamic)
243

    
244
  all_fields = static_fields | dynamic_fields
245

    
246
  if not all_fields.issuperset(selected):
247
    raise errors.OpPrereqError("Unknown output fields selected: %s"
248
                               % ",".join(frozenset(selected).
249
                                          difference(all_fields)))
250

    
251

    
252
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253
                          memory, vcpus, nics):
254
  """Builds instance related env variables for hooks from single variables.
255

256
  Args:
257
    secondary_nodes: List of secondary nodes as strings
258
  """
259
  env = {
260
    "OP_TARGET": name,
261
    "INSTANCE_NAME": name,
262
    "INSTANCE_PRIMARY": primary_node,
263
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264
    "INSTANCE_OS_TYPE": os_type,
265
    "INSTANCE_STATUS": status,
266
    "INSTANCE_MEMORY": memory,
267
    "INSTANCE_VCPUS": vcpus,
268
  }
269

    
270
  if nics:
271
    nic_count = len(nics)
272
    for idx, (ip, bridge, mac) in enumerate(nics):
273
      if ip is None:
274
        ip = ""
275
      env["INSTANCE_NIC%d_IP" % idx] = ip
276
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
278
  else:
279
    nic_count = 0
280

    
281
  env["INSTANCE_NIC_COUNT"] = nic_count
282

    
283
  return env
284

    
285

    
286
def _BuildInstanceHookEnvByObject(instance, override=None):
287
  """Builds instance related env variables for hooks from an object.
288

289
  Args:
290
    instance: objects.Instance object of instance
291
    override: dict of values to override
292
  """
293
  args = {
294
    'name': instance.name,
295
    'primary_node': instance.primary_node,
296
    'secondary_nodes': instance.secondary_nodes,
297
    'os_type': instance.os,
298
    'status': instance.os,
299
    'memory': instance.memory,
300
    'vcpus': instance.vcpus,
301
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302
  }
303
  if override:
304
    args.update(override)
305
  return _BuildInstanceHookEnv(**args)
306

    
307

    
308
def _HasValidVG(vglist, vgname):
309
  """Checks if the volume group list is valid.
310

311
  A non-None return value means there's an error, and the return value
312
  is the error message.
313

314
  """
315
  vgsize = vglist.get(vgname, None)
316
  if vgsize is None:
317
    return "volume group '%s' missing" % vgname
318
  elif vgsize < 20480:
319
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
320
            (vgname, vgsize))
321
  return None
322

    
323

    
324
def _InitSSHSetup(node):
325
  """Setup the SSH configuration for the cluster.
326

327

328
  This generates a dsa keypair for root, adds the pub key to the
329
  permitted hosts and adds the hostkey to its own known hosts.
330

331
  Args:
332
    node: the name of this host as a fqdn
333

334
  """
335
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
336

    
337
  for name in priv_key, pub_key:
338
    if os.path.exists(name):
339
      utils.CreateBackup(name)
340
    utils.RemoveFile(name)
341

    
342
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
343
                         "-f", priv_key,
344
                         "-q", "-N", ""])
345
  if result.failed:
346
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
347
                             result.output)
348

    
349
  f = open(pub_key, 'r')
350
  try:
351
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
352
  finally:
353
    f.close()
354

    
355

    
356
def _InitGanetiServerSetup(ss):
357
  """Setup the necessary configuration for the initial node daemon.
358

359
  This creates the nodepass file containing the shared password for
360
  the cluster and also generates the SSL certificate.
361

362
  """
363
  # Create pseudo random password
364
  randpass = sha.new(os.urandom(64)).hexdigest()
365
  # and write it into sstore
366
  ss.SetKey(ss.SS_NODED_PASS, randpass)
367

    
368
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
369
                         "-days", str(365*5), "-nodes", "-x509",
370
                         "-keyout", constants.SSL_CERT_FILE,
371
                         "-out", constants.SSL_CERT_FILE, "-batch"])
372
  if result.failed:
373
    raise errors.OpExecError("could not generate server ssl cert, command"
374
                             " %s had exitcode %s and error message %s" %
375
                             (result.cmd, result.exit_code, result.output))
376

    
377
  os.chmod(constants.SSL_CERT_FILE, 0400)
378

    
379
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
380

    
381
  if result.failed:
382
    raise errors.OpExecError("Could not start the node daemon, command %s"
383
                             " had exitcode %s and error %s" %
384
                             (result.cmd, result.exit_code, result.output))
385

    
386

    
387
def _CheckInstanceBridgesExist(instance):
388
  """Check that the brigdes needed by an instance exist.
389

390
  """
391
  # check bridges existance
392
  brlist = [nic.bridge for nic in instance.nics]
393
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
394
    raise errors.OpPrereqError("one or more target bridges %s does not"
395
                               " exist on destination node '%s'" %
396
                               (brlist, instance.primary_node))
397

    
398

    
399
class LUInitCluster(LogicalUnit):
400
  """Initialise the cluster.
401

402
  """
403
  HPATH = "cluster-init"
404
  HTYPE = constants.HTYPE_CLUSTER
405
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
406
              "def_bridge", "master_netdev", "file_storage_dir"]
407
  REQ_CLUSTER = False
408

    
409
  def BuildHooksEnv(self):
410
    """Build hooks env.
411

412
    Notes: Since we don't require a cluster, we must manually add
413
    ourselves in the post-run node list.
414

415
    """
416
    env = {"OP_TARGET": self.op.cluster_name}
417
    return env, [], [self.hostname.name]
418

    
419
  def CheckPrereq(self):
420
    """Verify that the passed name is a valid one.
421

422
    """
423
    if config.ConfigWriter.IsCluster():
424
      raise errors.OpPrereqError("Cluster is already initialised")
425

    
426
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
427
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
428
        raise errors.OpPrereqError("Please prepare the cluster VNC"
429
                                   "password file %s" %
430
                                   constants.VNC_PASSWORD_FILE)
431

    
432
    self.hostname = hostname = utils.HostInfo()
433

    
434
    if hostname.ip.startswith("127."):
435
      raise errors.OpPrereqError("This host's IP resolves to the private"
436
                                 " range (%s). Please fix DNS or %s." %
437
                                 (hostname.ip, constants.ETC_HOSTS))
438

    
439
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
440
                         source=constants.LOCALHOST_IP_ADDRESS):
441
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
442
                                 " to %s,\nbut this ip address does not"
443
                                 " belong to this host."
444
                                 " Aborting." % hostname.ip)
445

    
446
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
447

    
448
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
449
                     timeout=5):
450
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
451

    
452
    secondary_ip = getattr(self.op, "secondary_ip", None)
453
    if secondary_ip and not utils.IsValidIP(secondary_ip):
454
      raise errors.OpPrereqError("Invalid secondary ip given")
455
    if (secondary_ip and
456
        secondary_ip != hostname.ip and
457
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
458
                           source=constants.LOCALHOST_IP_ADDRESS))):
459
      raise errors.OpPrereqError("You gave %s as secondary IP,"
460
                                 " but it does not belong to this host." %
461
                                 secondary_ip)
462
    self.secondary_ip = secondary_ip
463

    
464
    if not hasattr(self.op, "vg_name"):
465
      self.op.vg_name = None
466
    # if vg_name not None, checks if volume group is valid
467
    if self.op.vg_name:
468
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
469
      if vgstatus:
470
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
471
                                   " you are not using lvm" % vgstatus)
472

    
473
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
474

    
475
    if not os.path.isabs(self.op.file_storage_dir):
476
      raise errors.OpPrereqError("The file storage directory you have is"
477
                                 " not an absolute path.")
478

    
479
    if not os.path.exists(self.op.file_storage_dir):
480
      try:
481
        os.makedirs(self.op.file_storage_dir, 0750)
482
      except OSError, err:
483
        raise errors.OpPrereqError("Cannot create file storage directory"
484
                                   " '%s': %s" %
485
                                   (self.op.file_storage_dir, err))
486

    
487
    if not os.path.isdir(self.op.file_storage_dir):
488
      raise errors.OpPrereqError("The file storage directory '%s' is not"
489
                                 " a directory." % self.op.file_storage_dir)
490

    
491
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
492
                    self.op.mac_prefix):
493
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
494
                                 self.op.mac_prefix)
495

    
496
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
497
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
498
                                 self.op.hypervisor_type)
499

    
500
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
501
    if result.failed:
502
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
503
                                 (self.op.master_netdev,
504
                                  result.output.strip()))
505

    
506
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
507
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
508
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
509
                                 " executable." % constants.NODE_INITD_SCRIPT)
510

    
511
  def Exec(self, feedback_fn):
512
    """Initialize the cluster.
513

514
    """
515
    clustername = self.clustername
516
    hostname = self.hostname
517

    
518
    # set up the simple store
519
    self.sstore = ss = ssconf.SimpleStore()
520
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
521
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
522
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
523
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
524
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
525
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
526
    ss.SetKey(ss.SS_CONFIG_VERSION, constants.CONFIG_VERSION)
527

    
528
    # set up the inter-node password and certificate
529
    _InitGanetiServerSetup(ss)
530

    
531
    # start the master ip
532
    rpc.call_node_start_master(hostname.name)
533

    
534
    # set up ssh config and /etc/hosts
535
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
536
    try:
537
      sshline = f.read()
538
    finally:
539
      f.close()
540
    sshkey = sshline.split(" ")[1]
541

    
542
    utils.AddHostToEtcHosts(hostname.name)
543
    _InitSSHSetup(hostname.name)
544

    
545
    # init of cluster config file
546
    self.cfg = cfgw = config.ConfigWriter()
547
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
548
                    sshkey, self.op.mac_prefix,
549
                    self.op.vg_name, self.op.def_bridge)
550

    
551
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
552

    
553

    
554
class LUDestroyCluster(NoHooksLU):
555
  """Logical unit for destroying the cluster.
556

557
  """
558
  _OP_REQP = []
559

    
560
  def CheckPrereq(self):
561
    """Check prerequisites.
562

563
    This checks whether the cluster is empty.
564

565
    Any errors are signalled by raising errors.OpPrereqError.
566

567
    """
568
    master = self.sstore.GetMasterNode()
569

    
570
    nodelist = self.cfg.GetNodeList()
571
    if len(nodelist) != 1 or nodelist[0] != master:
572
      raise errors.OpPrereqError("There are still %d node(s) in"
573
                                 " this cluster." % (len(nodelist) - 1))
574
    instancelist = self.cfg.GetInstanceList()
575
    if instancelist:
576
      raise errors.OpPrereqError("There are still %d instance(s) in"
577
                                 " this cluster." % len(instancelist))
578

    
579
  def Exec(self, feedback_fn):
580
    """Destroys the cluster.
581

582
    """
583
    master = self.sstore.GetMasterNode()
584
    if not rpc.call_node_stop_master(master):
585
      raise errors.OpExecError("Could not disable the master role")
586
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
587
    utils.CreateBackup(priv_key)
588
    utils.CreateBackup(pub_key)
589
    rpc.call_node_leave_cluster(master)
590

    
591

    
592
class LUVerifyCluster(LogicalUnit):
593
  """Verifies the cluster status.
594

595
  """
596
  HPATH = "cluster-verify"
597
  HTYPE = constants.HTYPE_CLUSTER
598
  _OP_REQP = ["skip_checks"]
599

    
600
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
601
                  remote_version, feedback_fn):
602
    """Run multiple tests against a node.
603

604
    Test list:
605
      - compares ganeti version
606
      - checks vg existance and size > 20G
607
      - checks config file checksum
608
      - checks ssh to other nodes
609

610
    Args:
611
      node: name of the node to check
612
      file_list: required list of files
613
      local_cksum: dictionary of local files and their checksums
614

615
    """
616
    # compares ganeti version
617
    local_version = constants.PROTOCOL_VERSION
618
    if not remote_version:
619
      feedback_fn("  - ERROR: connection to %s failed" % (node))
620
      return True
621

    
622
    if local_version != remote_version:
623
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
624
                      (local_version, node, remote_version))
625
      return True
626

    
627
    # checks vg existance and size > 20G
628

    
629
    bad = False
630
    if not vglist:
631
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
632
                      (node,))
633
      bad = True
634
    else:
635
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
636
      if vgstatus:
637
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
638
        bad = True
639

    
640
    # checks config file checksum
641
    # checks ssh to any
642

    
643
    if 'filelist' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
646
    else:
647
      remote_cksum = node_result['filelist']
648
      for file_name in file_list:
649
        if file_name not in remote_cksum:
650
          bad = True
651
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
652
        elif remote_cksum[file_name] != local_cksum[file_name]:
653
          bad = True
654
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
655

    
656
    if 'nodelist' not in node_result:
657
      bad = True
658
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
659
    else:
660
      if node_result['nodelist']:
661
        bad = True
662
        for node in node_result['nodelist']:
663
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
664
                          (node, node_result['nodelist'][node]))
665
    if 'node-net-test' not in node_result:
666
      bad = True
667
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
668
    else:
669
      if node_result['node-net-test']:
670
        bad = True
671
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
672
        for node in nlist:
673
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
674
                          (node, node_result['node-net-test'][node]))
675

    
676
    hyp_result = node_result.get('hypervisor', None)
677
    if hyp_result is not None:
678
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
679
    return bad
680

    
681
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
682
                      node_instance, feedback_fn):
683
    """Verify an instance.
684

685
    This function checks to see if the required block devices are
686
    available on the instance's node.
687

688
    """
689
    bad = False
690

    
691
    node_current = instanceconfig.primary_node
692

    
693
    node_vol_should = {}
694
    instanceconfig.MapLVsByNode(node_vol_should)
695

    
696
    for node in node_vol_should:
697
      for volume in node_vol_should[node]:
698
        if node not in node_vol_is or volume not in node_vol_is[node]:
699
          feedback_fn("  - ERROR: volume %s missing on node %s" %
700
                          (volume, node))
701
          bad = True
702

    
703
    if not instanceconfig.status == 'down':
704
      if (node_current not in node_instance or
705
          not instance in node_instance[node_current]):
706
        feedback_fn("  - ERROR: instance %s not running on node %s" %
707
                        (instance, node_current))
708
        bad = True
709

    
710
    for node in node_instance:
711
      if (not node == node_current):
712
        if instance in node_instance[node]:
713
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
714
                          (instance, node))
715
          bad = True
716

    
717
    return bad
718

    
719
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
720
    """Verify if there are any unknown volumes in the cluster.
721

722
    The .os, .swap and backup volumes are ignored. All other volumes are
723
    reported as unknown.
724

725
    """
726
    bad = False
727

    
728
    for node in node_vol_is:
729
      for volume in node_vol_is[node]:
730
        if node not in node_vol_should or volume not in node_vol_should[node]:
731
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
732
                      (volume, node))
733
          bad = True
734
    return bad
735

    
736
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
737
    """Verify the list of running instances.
738

739
    This checks what instances are running but unknown to the cluster.
740

741
    """
742
    bad = False
743
    for node in node_instance:
744
      for runninginstance in node_instance[node]:
745
        if runninginstance not in instancelist:
746
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
747
                          (runninginstance, node))
748
          bad = True
749
    return bad
750

    
751
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
752
    """Verify N+1 Memory Resilience.
753

754
    Check that if one single node dies we can still start all the instances it
755
    was primary for.
756

757
    """
758
    bad = False
759

    
760
    for node, nodeinfo in node_info.iteritems():
761
      # This code checks that every node which is now listed as secondary has
762
      # enough memory to host all instances it is supposed to should a single
763
      # other node in the cluster fail.
764
      # FIXME: not ready for failover to an arbitrary node
765
      # FIXME: does not support file-backed instances
766
      # WARNING: we currently take into account down instances as well as up
767
      # ones, considering that even if they're down someone might want to start
768
      # them even in the event of a node failure.
769
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
770
        needed_mem = 0
771
        for instance in instances:
772
          needed_mem += instance_cfg[instance].memory
773
        if nodeinfo['mfree'] < needed_mem:
774
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
775
                      " failovers should node %s fail" % (node, prinode))
776
          bad = True
777
    return bad
778

    
779
  def CheckPrereq(self):
780
    """Check prerequisites.
781

782
    Transform the list of checks we're going to skip into a set and check that
783
    all its members are valid.
784

785
    """
786
    self.skip_set = frozenset(self.op.skip_checks)
787
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
788
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
789

    
790
  def BuildHooksEnv(self):
791
    """Build hooks env.
792

793
    Cluster-Verify hooks just rone in the post phase and their failure makes
794
    the output be logged in the verify output and the verification to fail.
795

796
    """
797
    all_nodes = self.cfg.GetNodeList()
798
    # TODO: populate the environment with useful information for verify hooks
799
    env = {}
800
    return env, [], all_nodes
801

    
802
  def Exec(self, feedback_fn):
803
    """Verify integrity of cluster, performing various test on nodes.
804

805
    """
806
    bad = False
807
    feedback_fn("* Verifying global settings")
808
    for msg in self.cfg.VerifyConfig():
809
      feedback_fn("  - ERROR: %s" % msg)
810

    
811
    vg_name = self.cfg.GetVGName()
812
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
813
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
814
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815
    i_non_redundant = [] # Non redundant instances
816
    node_volume = {}
817
    node_instance = {}
818
    node_info = {}
819
    instance_cfg = {}
820

    
821
    # FIXME: verify OS list
822
    # do local checksums
823
    file_names = list(self.sstore.GetFileList())
824
    file_names.append(constants.SSL_CERT_FILE)
825
    file_names.append(constants.CLUSTER_CONF_FILE)
826
    local_checksums = utils.FingerprintFiles(file_names)
827

    
828
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
829
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
830
    all_instanceinfo = rpc.call_instance_list(nodelist)
831
    all_vglist = rpc.call_vg_list(nodelist)
832
    node_verify_param = {
833
      'filelist': file_names,
834
      'nodelist': nodelist,
835
      'hypervisor': None,
836
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
837
                        for node in nodeinfo]
838
      }
839
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
840
    all_rversion = rpc.call_version(nodelist)
841
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
842

    
843
    for node in nodelist:
844
      feedback_fn("* Verifying node %s" % node)
845
      result = self._VerifyNode(node, file_names, local_checksums,
846
                                all_vglist[node], all_nvinfo[node],
847
                                all_rversion[node], feedback_fn)
848
      bad = bad or result
849

    
850
      # node_volume
851
      volumeinfo = all_volumeinfo[node]
852

    
853
      if isinstance(volumeinfo, basestring):
854
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
855
                    (node, volumeinfo[-400:].encode('string_escape')))
856
        bad = True
857
        node_volume[node] = {}
858
      elif not isinstance(volumeinfo, dict):
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862
      else:
863
        node_volume[node] = volumeinfo
864

    
865
      # node_instance
866
      nodeinstance = all_instanceinfo[node]
867
      if type(nodeinstance) != list:
868
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
869
        bad = True
870
        continue
871

    
872
      node_instance[node] = nodeinstance
873

    
874
      # node_info
875
      nodeinfo = all_ninfo[node]
876
      if not isinstance(nodeinfo, dict):
877
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
878
        bad = True
879
        continue
880

    
881
      try:
882
        node_info[node] = {
883
          "mfree": int(nodeinfo['memory_free']),
884
          "dfree": int(nodeinfo['vg_free']),
885
          "pinst": [],
886
          "sinst": [],
887
          # dictionary holding all instances this node is secondary for,
888
          # grouped by their primary node. Each key is a cluster node, and each
889
          # value is a list of instances which have the key as primary and the
890
          # current node as secondary.  this is handy to calculate N+1 memory
891
          # availability if you can only failover from a primary to its
892
          # secondary.
893
          "sinst-by-pnode": {},
894
        }
895
      except ValueError:
896
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
897
        bad = True
898
        continue
899

    
900
    node_vol_should = {}
901

    
902
    for instance in instancelist:
903
      feedback_fn("* Verifying instance %s" % instance)
904
      inst_config = self.cfg.GetInstanceInfo(instance)
905
      result =  self._VerifyInstance(instance, inst_config, node_volume,
906
                                     node_instance, feedback_fn)
907
      bad = bad or result
908

    
909
      inst_config.MapLVsByNode(node_vol_should)
910

    
911
      instance_cfg[instance] = inst_config
912

    
913
      pnode = inst_config.primary_node
914
      if pnode in node_info:
915
        node_info[pnode]['pinst'].append(instance)
916
      else:
917
        feedback_fn("  - ERROR: instance %s, connection to primary node"
918
                    " %s failed" % (instance, pnode))
919
        bad = True
920

    
921
      # If the instance is non-redundant we cannot survive losing its primary
922
      # node, so we are not N+1 compliant. On the other hand we have no disk
923
      # templates with more than one secondary so that situation is not well
924
      # supported either.
925
      # FIXME: does not support file-backed instances
926
      if len(inst_config.secondary_nodes) == 0:
927
        i_non_redundant.append(instance)
928
      elif len(inst_config.secondary_nodes) > 1:
929
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
930
                    % instance)
931

    
932
      for snode in inst_config.secondary_nodes:
933
        if snode in node_info:
934
          node_info[snode]['sinst'].append(instance)
935
          if pnode not in node_info[snode]['sinst-by-pnode']:
936
            node_info[snode]['sinst-by-pnode'][pnode] = []
937
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
938
        else:
939
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
940
                      " %s failed" % (instance, snode))
941

    
942
    feedback_fn("* Verifying orphan volumes")
943
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
944
                                       feedback_fn)
945
    bad = bad or result
946

    
947
    feedback_fn("* Verifying remaining instances")
948
    result = self._VerifyOrphanInstances(instancelist, node_instance,
949
                                         feedback_fn)
950
    bad = bad or result
951

    
952
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
953
      feedback_fn("* Verifying N+1 Memory redundancy")
954
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
955
      bad = bad or result
956

    
957
    feedback_fn("* Other Notes")
958
    if i_non_redundant:
959
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
960
                  % len(i_non_redundant))
961

    
962
    return int(bad)
963

    
964
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
965
    """Analize the post-hooks' result, handle it, and send some
966
    nicely-formatted feedback back to the user.
967

968
    Args:
969
      phase: the hooks phase that has just been run
970
      hooks_results: the results of the multi-node hooks rpc call
971
      feedback_fn: function to send feedback back to the caller
972
      lu_result: previous Exec result
973

974
    """
975
    # We only really run POST phase hooks, and are only interested in their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011

    
1012
  def CheckPrereq(self):
1013
    """Check prerequisites.
1014

1015
    This has no prerequisites.
1016

1017
    """
1018
    pass
1019

    
1020
  def Exec(self, feedback_fn):
1021
    """Verify integrity of cluster disks.
1022

1023
    """
1024
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1025

    
1026
    vg_name = self.cfg.GetVGName()
1027
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1028
    instances = [self.cfg.GetInstanceInfo(name)
1029
                 for name in self.cfg.GetInstanceList()]
1030

    
1031
    nv_dict = {}
1032
    for inst in instances:
1033
      inst_lvs = {}
1034
      if (inst.status != "up" or
1035
          inst.disk_template not in constants.DTS_NET_MIRROR):
1036
        continue
1037
      inst.MapLVsByNode(inst_lvs)
1038
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039
      for node, vol_list in inst_lvs.iteritems():
1040
        for vol in vol_list:
1041
          nv_dict[(node, vol)] = inst
1042

    
1043
    if not nv_dict:
1044
      return result
1045

    
1046
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1047

    
1048
    to_act = set()
1049
    for node in nodes:
1050
      # node_volume
1051
      lvs = node_lvs[node]
1052

    
1053
      if isinstance(lvs, basestring):
1054
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1055
        res_nlvm[node] = lvs
1056
      elif not isinstance(lvs, dict):
1057
        logger.Info("connection to node %s failed or invalid data returned" %
1058
                    (node,))
1059
        res_nodes.append(node)
1060
        continue
1061

    
1062
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063
        inst = nv_dict.pop((node, lv_name), None)
1064
        if (not lv_online and inst is not None
1065
            and inst.name not in res_instances):
1066
          res_instances.append(inst.name)
1067

    
1068
    # any leftover items in nv_dict are missing LVs, let's arrange the
1069
    # data better
1070
    for key, inst in nv_dict.iteritems():
1071
      if inst.name not in res_missing:
1072
        res_missing[inst.name] = []
1073
      res_missing[inst.name].append(key)
1074

    
1075
    return result
1076

    
1077

    
1078
class LURenameCluster(LogicalUnit):
1079
  """Rename the cluster.
1080

1081
  """
1082
  HPATH = "cluster-rename"
1083
  HTYPE = constants.HTYPE_CLUSTER
1084
  _OP_REQP = ["name"]
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.sstore.GetClusterName(),
1092
      "NEW_NAME": self.op.name,
1093
      }
1094
    mn = self.sstore.GetMasterNode()
1095
    return env, [mn], [mn]
1096

    
1097
  def CheckPrereq(self):
1098
    """Verify that the passed name is a valid one.
1099

1100
    """
1101
    hostname = utils.HostInfo(self.op.name)
1102

    
1103
    new_name = hostname.name
1104
    self.ip = new_ip = hostname.ip
1105
    old_name = self.sstore.GetClusterName()
1106
    old_ip = self.sstore.GetMasterIP()
1107
    if new_name == old_name and new_ip == old_ip:
1108
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109
                                 " cluster has changed")
1110
    if new_ip != old_ip:
1111
      result = utils.RunCmd(["fping", "-q", new_ip])
1112
      if not result.failed:
1113
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1114
                                   " reachable on the network. Aborting." %
1115
                                   new_ip)
1116

    
1117
    self.op.name = new_name
1118

    
1119
  def Exec(self, feedback_fn):
1120
    """Rename the cluster.
1121

1122
    """
1123
    clustername = self.op.name
1124
    ip = self.ip
1125
    ss = self.sstore
1126

    
1127
    # shutdown the master IP
1128
    master = ss.GetMasterNode()
1129
    if not rpc.call_node_stop_master(master):
1130
      raise errors.OpExecError("Could not disable the master role")
1131

    
1132
    try:
1133
      # modify the sstore
1134
      ss.SetKey(ss.SS_MASTER_IP, ip)
1135
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1136

    
1137
      # Distribute updated ss config to all nodes
1138
      myself = self.cfg.GetNodeInfo(master)
1139
      dist_nodes = self.cfg.GetNodeList()
1140
      if myself.name in dist_nodes:
1141
        dist_nodes.remove(myself.name)
1142

    
1143
      logger.Debug("Copying updated ssconf data to all nodes")
1144
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1145
        fname = ss.KeyToFilename(keyname)
1146
        result = rpc.call_upload_file(dist_nodes, fname)
1147
        for to_node in dist_nodes:
1148
          if not result[to_node]:
1149
            logger.Error("copy of file %s to node %s failed" %
1150
                         (fname, to_node))
1151
    finally:
1152
      if not rpc.call_node_start_master(master):
1153
        logger.Error("Could not re-enable the master role on the master,"
1154
                     " please restart manually.")
1155

    
1156

    
1157
def _RecursiveCheckIfLVMBased(disk):
1158
  """Check if the given disk or its children are lvm-based.
1159

1160
  Args:
1161
    disk: ganeti.objects.Disk object
1162

1163
  Returns:
1164
    boolean indicating whether a LD_LV dev_type was found or not
1165

1166
  """
1167
  if disk.children:
1168
    for chdisk in disk.children:
1169
      if _RecursiveCheckIfLVMBased(chdisk):
1170
        return True
1171
  return disk.dev_type == constants.LD_LV
1172

    
1173

    
1174
class LUSetClusterParams(LogicalUnit):
1175
  """Change the parameters of the cluster.
1176

1177
  """
1178
  HPATH = "cluster-modify"
1179
  HTYPE = constants.HTYPE_CLUSTER
1180
  _OP_REQP = []
1181

    
1182
  def BuildHooksEnv(self):
1183
    """Build hooks env.
1184

1185
    """
1186
    env = {
1187
      "OP_TARGET": self.sstore.GetClusterName(),
1188
      "NEW_VG_NAME": self.op.vg_name,
1189
      }
1190
    mn = self.sstore.GetMasterNode()
1191
    return env, [mn], [mn]
1192

    
1193
  def CheckPrereq(self):
1194
    """Check prerequisites.
1195

1196
    This checks whether the given params don't conflict and
1197
    if the given volume group is valid.
1198

1199
    """
1200
    if not self.op.vg_name:
1201
      instances = [self.cfg.GetInstanceInfo(name)
1202
                   for name in self.cfg.GetInstanceList()]
1203
      for inst in instances:
1204
        for disk in inst.disks:
1205
          if _RecursiveCheckIfLVMBased(disk):
1206
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1207
                                       " lvm-based instances exist")
1208

    
1209
    # if vg_name not None, checks given volume group on all nodes
1210
    if self.op.vg_name:
1211
      node_list = self.cfg.GetNodeList()
1212
      vglist = rpc.call_vg_list(node_list)
1213
      for node in node_list:
1214
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1215
        if vgstatus:
1216
          raise errors.OpPrereqError("Error on node '%s': %s" %
1217
                                     (node, vgstatus))
1218

    
1219
  def Exec(self, feedback_fn):
1220
    """Change the parameters of the cluster.
1221

1222
    """
1223
    if self.op.vg_name != self.cfg.GetVGName():
1224
      self.cfg.SetVGName(self.op.vg_name)
1225
    else:
1226
      feedback_fn("Cluster LVM configuration already in desired"
1227
                  " state, not changing")
1228

    
1229

    
1230
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1231
  """Sleep and poll for an instance's disk to sync.
1232

1233
  """
1234
  if not instance.disks:
1235
    return True
1236

    
1237
  if not oneshot:
1238
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239

    
1240
  node = instance.primary_node
1241

    
1242
  for dev in instance.disks:
1243
    cfgw.SetDiskID(dev, node)
1244

    
1245
  retries = 0
1246
  while True:
1247
    max_time = 0
1248
    done = True
1249
    cumul_degraded = False
1250
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251
    if not rstats:
1252
      proc.LogWarning("Can't get any data from node %s" % node)
1253
      retries += 1
1254
      if retries >= 10:
1255
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1256
                                 " aborting." % node)
1257
      time.sleep(6)
1258
      continue
1259
    retries = 0
1260
    for i in range(len(rstats)):
1261
      mstat = rstats[i]
1262
      if mstat is None:
1263
        proc.LogWarning("Can't compute data for node %s/%s" %
1264
                        (node, instance.disks[i].iv_name))
1265
        continue
1266
      # we ignore the ldisk parameter
1267
      perc_done, est_time, is_degraded, _ = mstat
1268
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1269
      if perc_done is not None:
1270
        done = False
1271
        if est_time is not None:
1272
          rem_time = "%d estimated seconds remaining" % est_time
1273
          max_time = est_time
1274
        else:
1275
          rem_time = "no time estimate"
1276
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1277
                     (instance.disks[i].iv_name, perc_done, rem_time))
1278
    if done or oneshot:
1279
      break
1280

    
1281
    if unlock:
1282
      #utils.Unlock('cmd')
1283
      pass
1284
    try:
1285
      time.sleep(min(60, max_time))
1286
    finally:
1287
      if unlock:
1288
        #utils.Lock('cmd')
1289
        pass
1290

    
1291
  if done:
1292
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1293
  return not cumul_degraded
1294

    
1295

    
1296
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1297
  """Check that mirrors are not degraded.
1298

1299
  The ldisk parameter, if True, will change the test from the
1300
  is_degraded attribute (which represents overall non-ok status for
1301
  the device(s)) to the ldisk (representing the local storage status).
1302

1303
  """
1304
  cfgw.SetDiskID(dev, node)
1305
  if ldisk:
1306
    idx = 6
1307
  else:
1308
    idx = 5
1309

    
1310
  result = True
1311
  if on_primary or dev.AssembleOnSecondary():
1312
    rstats = rpc.call_blockdev_find(node, dev)
1313
    if not rstats:
1314
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1315
      result = False
1316
    else:
1317
      result = result and (not rstats[idx])
1318
  if dev.children:
1319
    for child in dev.children:
1320
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1321

    
1322
  return result
1323

    
1324

    
1325
class LUDiagnoseOS(NoHooksLU):
1326
  """Logical unit for OS diagnose/query.
1327

1328
  """
1329
  _OP_REQP = ["output_fields", "names"]
1330

    
1331
  def CheckPrereq(self):
1332
    """Check prerequisites.
1333

1334
    This always succeeds, since this is a pure query LU.
1335

1336
    """
1337
    if self.op.names:
1338
      raise errors.OpPrereqError("Selective OS query not supported")
1339

    
1340
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1341
    _CheckOutputFields(static=[],
1342
                       dynamic=self.dynamic_fields,
1343
                       selected=self.op.output_fields)
1344

    
1345
  @staticmethod
1346
  def _DiagnoseByOS(node_list, rlist):
1347
    """Remaps a per-node return list into an a per-os per-node dictionary
1348

1349
      Args:
1350
        node_list: a list with the names of all nodes
1351
        rlist: a map with node names as keys and OS objects as values
1352

1353
      Returns:
1354
        map: a map with osnames as keys and as value another map, with
1355
             nodes as
1356
             keys and list of OS objects as values
1357
             e.g. {"debian-etch": {"node1": [<object>,...],
1358
                                   "node2": [<object>,]}
1359
                  }
1360

1361
    """
1362
    all_os = {}
1363
    for node_name, nr in rlist.iteritems():
1364
      if not nr:
1365
        continue
1366
      for os_obj in nr:
1367
        if os_obj.name not in all_os:
1368
          # build a list of nodes for this os containing empty lists
1369
          # for each node in node_list
1370
          all_os[os_obj.name] = {}
1371
          for nname in node_list:
1372
            all_os[os_obj.name][nname] = []
1373
        all_os[os_obj.name][node_name].append(os_obj)
1374
    return all_os
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Compute the list of OSes.
1378

1379
    """
1380
    node_list = self.cfg.GetNodeList()
1381
    node_data = rpc.call_os_diagnose(node_list)
1382
    if node_data == False:
1383
      raise errors.OpExecError("Can't gather the list of OSes")
1384
    pol = self._DiagnoseByOS(node_list, node_data)
1385
    output = []
1386
    for os_name, os_data in pol.iteritems():
1387
      row = []
1388
      for field in self.op.output_fields:
1389
        if field == "name":
1390
          val = os_name
1391
        elif field == "valid":
1392
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1393
        elif field == "node_status":
1394
          val = {}
1395
          for node_name, nos_list in os_data.iteritems():
1396
            val[node_name] = [(v.status, v.path) for v in nos_list]
1397
        else:
1398
          raise errors.ParameterError(field)
1399
        row.append(val)
1400
      output.append(row)
1401

    
1402
    return output
1403

    
1404

    
1405
class LURemoveNode(LogicalUnit):
1406
  """Logical unit for removing a node.
1407

1408
  """
1409
  HPATH = "node-remove"
1410
  HTYPE = constants.HTYPE_NODE
1411
  _OP_REQP = ["node_name"]
1412

    
1413
  def BuildHooksEnv(self):
1414
    """Build hooks env.
1415

1416
    This doesn't run on the target node in the pre phase as a failed
1417
    node would not allows itself to run.
1418

1419
    """
1420
    env = {
1421
      "OP_TARGET": self.op.node_name,
1422
      "NODE_NAME": self.op.node_name,
1423
      }
1424
    all_nodes = self.cfg.GetNodeList()
1425
    all_nodes.remove(self.op.node_name)
1426
    return env, all_nodes, all_nodes
1427

    
1428
  def CheckPrereq(self):
1429
    """Check prerequisites.
1430

1431
    This checks:
1432
     - the node exists in the configuration
1433
     - it does not have primary or secondary instances
1434
     - it's not the master
1435

1436
    Any errors are signalled by raising errors.OpPrereqError.
1437

1438
    """
1439
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1440
    if node is None:
1441
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1442

    
1443
    instance_list = self.cfg.GetInstanceList()
1444

    
1445
    masternode = self.sstore.GetMasterNode()
1446
    if node.name == masternode:
1447
      raise errors.OpPrereqError("Node is the master node,"
1448
                                 " you need to failover first.")
1449

    
1450
    for instance_name in instance_list:
1451
      instance = self.cfg.GetInstanceInfo(instance_name)
1452
      if node.name == instance.primary_node:
1453
        raise errors.OpPrereqError("Instance %s still running on the node,"
1454
                                   " please remove first." % instance_name)
1455
      if node.name in instance.secondary_nodes:
1456
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1457
                                   " please remove first." % instance_name)
1458
    self.op.node_name = node.name
1459
    self.node = node
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Removes the node from the cluster.
1463

1464
    """
1465
    node = self.node
1466
    logger.Info("stopping the node daemon and removing configs from node %s" %
1467
                node.name)
1468

    
1469
    rpc.call_node_leave_cluster(node.name)
1470

    
1471
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1472

    
1473
    logger.Info("Removing node %s from config" % node.name)
1474

    
1475
    self.cfg.RemoveNode(node.name)
1476

    
1477
    utils.RemoveHostFromEtcHosts(node.name)
1478

    
1479

    
1480
class LUQueryNodes(NoHooksLU):
1481
  """Logical unit for querying nodes.
1482

1483
  """
1484
  _OP_REQP = ["output_fields", "names"]
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks that the fields required are valid output fields.
1490

1491
    """
1492
    self.dynamic_fields = frozenset([
1493
      "dtotal", "dfree",
1494
      "mtotal", "mnode", "mfree",
1495
      "bootid",
1496
      "ctotal",
1497
      ])
1498

    
1499
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1500
                               "pinst_list", "sinst_list",
1501
                               "pip", "sip"],
1502
                       dynamic=self.dynamic_fields,
1503
                       selected=self.op.output_fields)
1504

    
1505
    self.wanted = _GetWantedNodes(self, self.op.names)
1506

    
1507
  def Exec(self, feedback_fn):
1508
    """Computes the list of nodes and their attributes.
1509

1510
    """
1511
    nodenames = self.wanted
1512
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1513

    
1514
    # begin data gathering
1515

    
1516
    if self.dynamic_fields.intersection(self.op.output_fields):
1517
      live_data = {}
1518
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1519
      for name in nodenames:
1520
        nodeinfo = node_data.get(name, None)
1521
        if nodeinfo:
1522
          live_data[name] = {
1523
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1524
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1525
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1526
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1527
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1528
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1529
            "bootid": nodeinfo['bootid'],
1530
            }
1531
        else:
1532
          live_data[name] = {}
1533
    else:
1534
      live_data = dict.fromkeys(nodenames, {})
1535

    
1536
    node_to_primary = dict([(name, set()) for name in nodenames])
1537
    node_to_secondary = dict([(name, set()) for name in nodenames])
1538

    
1539
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1540
                             "sinst_cnt", "sinst_list"))
1541
    if inst_fields & frozenset(self.op.output_fields):
1542
      instancelist = self.cfg.GetInstanceList()
1543

    
1544
      for instance_name in instancelist:
1545
        inst = self.cfg.GetInstanceInfo(instance_name)
1546
        if inst.primary_node in node_to_primary:
1547
          node_to_primary[inst.primary_node].add(inst.name)
1548
        for secnode in inst.secondary_nodes:
1549
          if secnode in node_to_secondary:
1550
            node_to_secondary[secnode].add(inst.name)
1551

    
1552
    # end data gathering
1553

    
1554
    output = []
1555
    for node in nodelist:
1556
      node_output = []
1557
      for field in self.op.output_fields:
1558
        if field == "name":
1559
          val = node.name
1560
        elif field == "pinst_list":
1561
          val = list(node_to_primary[node.name])
1562
        elif field == "sinst_list":
1563
          val = list(node_to_secondary[node.name])
1564
        elif field == "pinst_cnt":
1565
          val = len(node_to_primary[node.name])
1566
        elif field == "sinst_cnt":
1567
          val = len(node_to_secondary[node.name])
1568
        elif field == "pip":
1569
          val = node.primary_ip
1570
        elif field == "sip":
1571
          val = node.secondary_ip
1572
        elif field in self.dynamic_fields:
1573
          val = live_data[node.name].get(field, None)
1574
        else:
1575
          raise errors.ParameterError(field)
1576
        node_output.append(val)
1577
      output.append(node_output)
1578

    
1579
    return output
1580

    
1581

    
1582
class LUQueryNodeVolumes(NoHooksLU):
1583
  """Logical unit for getting volumes on node(s).
1584

1585
  """
1586
  _OP_REQP = ["nodes", "output_fields"]
1587

    
1588
  def CheckPrereq(self):
1589
    """Check prerequisites.
1590

1591
    This checks that the fields required are valid output fields.
1592

1593
    """
1594
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1595

    
1596
    _CheckOutputFields(static=["node"],
1597
                       dynamic=["phys", "vg", "name", "size", "instance"],
1598
                       selected=self.op.output_fields)
1599

    
1600

    
1601
  def Exec(self, feedback_fn):
1602
    """Computes the list of nodes and their attributes.
1603

1604
    """
1605
    nodenames = self.nodes
1606
    volumes = rpc.call_node_volumes(nodenames)
1607

    
1608
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1609
             in self.cfg.GetInstanceList()]
1610

    
1611
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1612

    
1613
    output = []
1614
    for node in nodenames:
1615
      if node not in volumes or not volumes[node]:
1616
        continue
1617

    
1618
      node_vols = volumes[node][:]
1619
      node_vols.sort(key=lambda vol: vol['dev'])
1620

    
1621
      for vol in node_vols:
1622
        node_output = []
1623
        for field in self.op.output_fields:
1624
          if field == "node":
1625
            val = node
1626
          elif field == "phys":
1627
            val = vol['dev']
1628
          elif field == "vg":
1629
            val = vol['vg']
1630
          elif field == "name":
1631
            val = vol['name']
1632
          elif field == "size":
1633
            val = int(float(vol['size']))
1634
          elif field == "instance":
1635
            for inst in ilist:
1636
              if node not in lv_by_node[inst]:
1637
                continue
1638
              if vol['name'] in lv_by_node[inst][node]:
1639
                val = inst.name
1640
                break
1641
            else:
1642
              val = '-'
1643
          else:
1644
            raise errors.ParameterError(field)
1645
          node_output.append(str(val))
1646

    
1647
        output.append(node_output)
1648

    
1649
    return output
1650

    
1651

    
1652
class LUAddNode(LogicalUnit):
1653
  """Logical unit for adding node to the cluster.
1654

1655
  """
1656
  HPATH = "node-add"
1657
  HTYPE = constants.HTYPE_NODE
1658
  _OP_REQP = ["node_name"]
1659

    
1660
  def BuildHooksEnv(self):
1661
    """Build hooks env.
1662

1663
    This will run on all nodes before, and on all nodes + the new node after.
1664

1665
    """
1666
    env = {
1667
      "OP_TARGET": self.op.node_name,
1668
      "NODE_NAME": self.op.node_name,
1669
      "NODE_PIP": self.op.primary_ip,
1670
      "NODE_SIP": self.op.secondary_ip,
1671
      }
1672
    nodes_0 = self.cfg.GetNodeList()
1673
    nodes_1 = nodes_0 + [self.op.node_name, ]
1674
    return env, nodes_0, nodes_1
1675

    
1676
  def CheckPrereq(self):
1677
    """Check prerequisites.
1678

1679
    This checks:
1680
     - the new node is not already in the config
1681
     - it is resolvable
1682
     - its parameters (single/dual homed) matches the cluster
1683

1684
    Any errors are signalled by raising errors.OpPrereqError.
1685

1686
    """
1687
    node_name = self.op.node_name
1688
    cfg = self.cfg
1689

    
1690
    dns_data = utils.HostInfo(node_name)
1691

    
1692
    node = dns_data.name
1693
    primary_ip = self.op.primary_ip = dns_data.ip
1694
    secondary_ip = getattr(self.op, "secondary_ip", None)
1695
    if secondary_ip is None:
1696
      secondary_ip = primary_ip
1697
    if not utils.IsValidIP(secondary_ip):
1698
      raise errors.OpPrereqError("Invalid secondary IP given")
1699
    self.op.secondary_ip = secondary_ip
1700

    
1701
    node_list = cfg.GetNodeList()
1702
    if not self.op.readd and node in node_list:
1703
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1704
                                 node)
1705
    elif self.op.readd and node not in node_list:
1706
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1707

    
1708
    for existing_node_name in node_list:
1709
      existing_node = cfg.GetNodeInfo(existing_node_name)
1710

    
1711
      if self.op.readd and node == existing_node_name:
1712
        if (existing_node.primary_ip != primary_ip or
1713
            existing_node.secondary_ip != secondary_ip):
1714
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1715
                                     " address configuration as before")
1716
        continue
1717

    
1718
      if (existing_node.primary_ip == primary_ip or
1719
          existing_node.secondary_ip == primary_ip or
1720
          existing_node.primary_ip == secondary_ip or
1721
          existing_node.secondary_ip == secondary_ip):
1722
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1723
                                   " existing node %s" % existing_node.name)
1724

    
1725
    # check that the type of the node (single versus dual homed) is the
1726
    # same as for the master
1727
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1728
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1729
    newbie_singlehomed = secondary_ip == primary_ip
1730
    if master_singlehomed != newbie_singlehomed:
1731
      if master_singlehomed:
1732
        raise errors.OpPrereqError("The master has no private ip but the"
1733
                                   " new node has one")
1734
      else:
1735
        raise errors.OpPrereqError("The master has a private ip but the"
1736
                                   " new node doesn't have one")
1737

    
1738
    # checks reachablity
1739
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1740
      raise errors.OpPrereqError("Node not reachable by ping")
1741

    
1742
    if not newbie_singlehomed:
1743
      # check reachability from my secondary ip to newbie's secondary ip
1744
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1745
                           source=myself.secondary_ip):
1746
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1747
                                   " based ping to noded port")
1748

    
1749
    self.new_node = objects.Node(name=node,
1750
                                 primary_ip=primary_ip,
1751
                                 secondary_ip=secondary_ip)
1752

    
1753
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1754
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1755
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1756
                                   constants.VNC_PASSWORD_FILE)
1757

    
1758
  def Exec(self, feedback_fn):
1759
    """Adds the new node to the cluster.
1760

1761
    """
1762
    new_node = self.new_node
1763
    node = new_node.name
1764

    
1765
    # set up inter-node password and certificate and restarts the node daemon
1766
    gntpass = self.sstore.GetNodeDaemonPassword()
1767
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1768
      raise errors.OpExecError("ganeti password corruption detected")
1769
    f = open(constants.SSL_CERT_FILE)
1770
    try:
1771
      gntpem = f.read(8192)
1772
    finally:
1773
      f.close()
1774
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1775
    # so we use this to detect an invalid certificate; as long as the
1776
    # cert doesn't contain this, the here-document will be correctly
1777
    # parsed by the shell sequence below
1778
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1779
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1780
    if not gntpem.endswith("\n"):
1781
      raise errors.OpExecError("PEM must end with newline")
1782
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1783

    
1784
    # and then connect with ssh to set password and start ganeti-noded
1785
    # note that all the below variables are sanitized at this point,
1786
    # either by being constants or by the checks above
1787
    ss = self.sstore
1788
    mycommand = ("umask 077 && "
1789
                 "echo '%s' > '%s' && "
1790
                 "cat > '%s' << '!EOF.' && \n"
1791
                 "%s!EOF.\n%s restart" %
1792
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1793
                  constants.SSL_CERT_FILE, gntpem,
1794
                  constants.NODE_INITD_SCRIPT))
1795

    
1796
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1797
    if result.failed:
1798
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1799
                               " output: %s" %
1800
                               (node, result.fail_reason, result.output))
1801

    
1802
    # check connectivity
1803
    time.sleep(4)
1804

    
1805
    result = rpc.call_version([node])[node]
1806
    if result:
1807
      if constants.PROTOCOL_VERSION == result:
1808
        logger.Info("communication to node %s fine, sw version %s match" %
1809
                    (node, result))
1810
      else:
1811
        raise errors.OpExecError("Version mismatch master version %s,"
1812
                                 " node version %s" %
1813
                                 (constants.PROTOCOL_VERSION, result))
1814
    else:
1815
      raise errors.OpExecError("Cannot get version from the new node")
1816

    
1817
    # setup ssh on node
1818
    logger.Info("copy ssh key to node %s" % node)
1819
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1820
    keyarray = []
1821
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1822
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1823
                priv_key, pub_key]
1824

    
1825
    for i in keyfiles:
1826
      f = open(i, 'r')
1827
      try:
1828
        keyarray.append(f.read())
1829
      finally:
1830
        f.close()
1831

    
1832
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1833
                               keyarray[3], keyarray[4], keyarray[5])
1834

    
1835
    if not result:
1836
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1837

    
1838
    # Add node to our /etc/hosts, and add key to known_hosts
1839
    utils.AddHostToEtcHosts(new_node.name)
1840

    
1841
    if new_node.secondary_ip != new_node.primary_ip:
1842
      if not rpc.call_node_tcp_ping(new_node.name,
1843
                                    constants.LOCALHOST_IP_ADDRESS,
1844
                                    new_node.secondary_ip,
1845
                                    constants.DEFAULT_NODED_PORT,
1846
                                    10, False):
1847
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1848
                                 " you gave (%s). Please fix and re-run this"
1849
                                 " command." % new_node.secondary_ip)
1850

    
1851
    success, msg = self.ssh.VerifyNodeHostname(node)
1852
    if not success:
1853
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1854
                               " than the one the resolver gives: %s."
1855
                               " Please fix and re-run this command." %
1856
                               (node, msg))
1857

    
1858
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1859
    # including the node just added
1860
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1861
    dist_nodes = self.cfg.GetNodeList()
1862
    if not self.op.readd:
1863
      dist_nodes.append(node)
1864
    if myself.name in dist_nodes:
1865
      dist_nodes.remove(myself.name)
1866

    
1867
    logger.Debug("Copying hosts and known_hosts to all nodes")
1868
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1869
      result = rpc.call_upload_file(dist_nodes, fname)
1870
      for to_node in dist_nodes:
1871
        if not result[to_node]:
1872
          logger.Error("copy of file %s to node %s failed" %
1873
                       (fname, to_node))
1874

    
1875
    to_copy = ss.GetFileList()
1876
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1877
      to_copy.append(constants.VNC_PASSWORD_FILE)
1878
    for fname in to_copy:
1879
      if not self.ssh.CopyFileToNode(node, fname):
1880
        logger.Error("could not copy file %s to node %s" % (fname, node))
1881

    
1882
    if not self.op.readd:
1883
      logger.Info("adding node %s to cluster.conf" % node)
1884
      self.cfg.AddNode(new_node)
1885

    
1886

    
1887
class LUMasterFailover(LogicalUnit):
1888
  """Failover the master node to the current node.
1889

1890
  This is a special LU in that it must run on a non-master node.
1891

1892
  """
1893
  HPATH = "master-failover"
1894
  HTYPE = constants.HTYPE_CLUSTER
1895
  REQ_MASTER = False
1896
  _OP_REQP = []
1897

    
1898
  def BuildHooksEnv(self):
1899
    """Build hooks env.
1900

1901
    This will run on the new master only in the pre phase, and on all
1902
    the nodes in the post phase.
1903

1904
    """
1905
    env = {
1906
      "OP_TARGET": self.new_master,
1907
      "NEW_MASTER": self.new_master,
1908
      "OLD_MASTER": self.old_master,
1909
      }
1910
    return env, [self.new_master], self.cfg.GetNodeList()
1911

    
1912
  def CheckPrereq(self):
1913
    """Check prerequisites.
1914

1915
    This checks that we are not already the master.
1916

1917
    """
1918
    self.new_master = utils.HostInfo().name
1919
    self.old_master = self.sstore.GetMasterNode()
1920

    
1921
    if self.old_master == self.new_master:
1922
      raise errors.OpPrereqError("This commands must be run on the node"
1923
                                 " where you want the new master to be."
1924
                                 " %s is already the master" %
1925
                                 self.old_master)
1926

    
1927
  def Exec(self, feedback_fn):
1928
    """Failover the master node.
1929

1930
    This command, when run on a non-master node, will cause the current
1931
    master to cease being master, and the non-master to become new
1932
    master.
1933

1934
    """
1935
    #TODO: do not rely on gethostname returning the FQDN
1936
    logger.Info("setting master to %s, old master: %s" %
1937
                (self.new_master, self.old_master))
1938

    
1939
    if not rpc.call_node_stop_master(self.old_master):
1940
      logger.Error("could disable the master role on the old master"
1941
                   " %s, please disable manually" % self.old_master)
1942

    
1943
    ss = self.sstore
1944
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1945
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1946
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1947
      logger.Error("could not distribute the new simple store master file"
1948
                   " to the other nodes, please check.")
1949

    
1950
    if not rpc.call_node_start_master(self.new_master):
1951
      logger.Error("could not start the master role on the new master"
1952
                   " %s, please check" % self.new_master)
1953
      feedback_fn("Error in activating the master IP on the new master,"
1954
                  " please fix manually.")
1955

    
1956

    
1957

    
1958
class LUQueryClusterInfo(NoHooksLU):
1959
  """Query cluster configuration.
1960

1961
  """
1962
  _OP_REQP = []
1963
  REQ_MASTER = False
1964

    
1965
  def CheckPrereq(self):
1966
    """No prerequsites needed for this LU.
1967

1968
    """
1969
    pass
1970

    
1971
  def Exec(self, feedback_fn):
1972
    """Return cluster config.
1973

1974
    """
1975
    result = {
1976
      "name": self.sstore.GetClusterName(),
1977
      "software_version": constants.RELEASE_VERSION,
1978
      "protocol_version": constants.PROTOCOL_VERSION,
1979
      "config_version": constants.CONFIG_VERSION,
1980
      "os_api_version": constants.OS_API_VERSION,
1981
      "export_version": constants.EXPORT_VERSION,
1982
      "master": self.sstore.GetMasterNode(),
1983
      "architecture": (platform.architecture()[0], platform.machine()),
1984
      "hypervisor_type": self.sstore.GetHypervisorType(),
1985
      }
1986

    
1987
    return result
1988

    
1989

    
1990
class LUClusterCopyFile(NoHooksLU):
1991
  """Copy file to cluster.
1992

1993
  """
1994
  _OP_REQP = ["nodes", "filename"]
1995

    
1996
  def CheckPrereq(self):
1997
    """Check prerequisites.
1998

1999
    It should check that the named file exists and that the given list
2000
    of nodes is valid.
2001

2002
    """
2003
    if not os.path.exists(self.op.filename):
2004
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2005

    
2006
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2007

    
2008
  def Exec(self, feedback_fn):
2009
    """Copy a file from master to some nodes.
2010

2011
    Args:
2012
      opts - class with options as members
2013
      args - list containing a single element, the file name
2014
    Opts used:
2015
      nodes - list containing the name of target nodes; if empty, all nodes
2016

2017
    """
2018
    filename = self.op.filename
2019

    
2020
    myname = utils.HostInfo().name
2021

    
2022
    for node in self.nodes:
2023
      if node == myname:
2024
        continue
2025
      if not self.ssh.CopyFileToNode(node, filename):
2026
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2027

    
2028

    
2029
class LUDumpClusterConfig(NoHooksLU):
2030
  """Return a text-representation of the cluster-config.
2031

2032
  """
2033
  _OP_REQP = []
2034

    
2035
  def CheckPrereq(self):
2036
    """No prerequisites.
2037

2038
    """
2039
    pass
2040

    
2041
  def Exec(self, feedback_fn):
2042
    """Dump a representation of the cluster config to the standard output.
2043

2044
    """
2045
    return self.cfg.DumpConfig()
2046

    
2047

    
2048
class LURunClusterCommand(NoHooksLU):
2049
  """Run a command on some nodes.
2050

2051
  """
2052
  _OP_REQP = ["command", "nodes"]
2053

    
2054
  def CheckPrereq(self):
2055
    """Check prerequisites.
2056

2057
    It checks that the given list of nodes is valid.
2058

2059
    """
2060
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2061

    
2062
  def Exec(self, feedback_fn):
2063
    """Run a command on some nodes.
2064

2065
    """
2066
    # put the master at the end of the nodes list
2067
    master_node = self.sstore.GetMasterNode()
2068
    if master_node in self.nodes:
2069
      self.nodes.remove(master_node)
2070
      self.nodes.append(master_node)
2071

    
2072
    data = []
2073
    for node in self.nodes:
2074
      result = self.ssh.Run(node, "root", self.op.command)
2075
      data.append((node, result.output, result.exit_code))
2076

    
2077
    return data
2078

    
2079

    
2080
class LUActivateInstanceDisks(NoHooksLU):
2081
  """Bring up an instance's disks.
2082

2083
  """
2084
  _OP_REQP = ["instance_name"]
2085

    
2086
  def CheckPrereq(self):
2087
    """Check prerequisites.
2088

2089
    This checks that the instance is in the cluster.
2090

2091
    """
2092
    instance = self.cfg.GetInstanceInfo(
2093
      self.cfg.ExpandInstanceName(self.op.instance_name))
2094
    if instance is None:
2095
      raise errors.OpPrereqError("Instance '%s' not known" %
2096
                                 self.op.instance_name)
2097
    self.instance = instance
2098

    
2099

    
2100
  def Exec(self, feedback_fn):
2101
    """Activate the disks.
2102

2103
    """
2104
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2105
    if not disks_ok:
2106
      raise errors.OpExecError("Cannot activate block devices")
2107

    
2108
    return disks_info
2109

    
2110

    
2111
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2112
  """Prepare the block devices for an instance.
2113

2114
  This sets up the block devices on all nodes.
2115

2116
  Args:
2117
    instance: a ganeti.objects.Instance object
2118
    ignore_secondaries: if true, errors on secondary nodes won't result
2119
                        in an error return from the function
2120

2121
  Returns:
2122
    false if the operation failed
2123
    list of (host, instance_visible_name, node_visible_name) if the operation
2124
         suceeded with the mapping from node devices to instance devices
2125
  """
2126
  device_info = []
2127
  disks_ok = True
2128
  iname = instance.name
2129
  # With the two passes mechanism we try to reduce the window of
2130
  # opportunity for the race condition of switching DRBD to primary
2131
  # before handshaking occured, but we do not eliminate it
2132

    
2133
  # The proper fix would be to wait (with some limits) until the
2134
  # connection has been made and drbd transitions from WFConnection
2135
  # into any other network-connected state (Connected, SyncTarget,
2136
  # SyncSource, etc.)
2137

    
2138
  # 1st pass, assemble on all nodes in secondary mode
2139
  for inst_disk in instance.disks:
2140
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2141
      cfg.SetDiskID(node_disk, node)
2142
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2143
      if not result:
2144
        logger.Error("could not prepare block device %s on node %s"
2145
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2146
        if not ignore_secondaries:
2147
          disks_ok = False
2148

    
2149
  # FIXME: race condition on drbd migration to primary
2150

    
2151
  # 2nd pass, do only the primary node
2152
  for inst_disk in instance.disks:
2153
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2154
      if node != instance.primary_node:
2155
        continue
2156
      cfg.SetDiskID(node_disk, node)
2157
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2158
      if not result:
2159
        logger.Error("could not prepare block device %s on node %s"
2160
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2161
        disks_ok = False
2162
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2163

    
2164
  # leave the disks configured for the primary node
2165
  # this is a workaround that would be fixed better by
2166
  # improving the logical/physical id handling
2167
  for disk in instance.disks:
2168
    cfg.SetDiskID(disk, instance.primary_node)
2169

    
2170
  return disks_ok, device_info
2171

    
2172

    
2173
def _StartInstanceDisks(cfg, instance, force):
2174
  """Start the disks of an instance.
2175

2176
  """
2177
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2178
                                           ignore_secondaries=force)
2179
  if not disks_ok:
2180
    _ShutdownInstanceDisks(instance, cfg)
2181
    if force is not None and not force:
2182
      logger.Error("If the message above refers to a secondary node,"
2183
                   " you can retry the operation using '--force'.")
2184
    raise errors.OpExecError("Disk consistency error")
2185

    
2186

    
2187
class LUDeactivateInstanceDisks(NoHooksLU):
2188
  """Shutdown an instance's disks.
2189

2190
  """
2191
  _OP_REQP = ["instance_name"]
2192

    
2193
  def CheckPrereq(self):
2194
    """Check prerequisites.
2195

2196
    This checks that the instance is in the cluster.
2197

2198
    """
2199
    instance = self.cfg.GetInstanceInfo(
2200
      self.cfg.ExpandInstanceName(self.op.instance_name))
2201
    if instance is None:
2202
      raise errors.OpPrereqError("Instance '%s' not known" %
2203
                                 self.op.instance_name)
2204
    self.instance = instance
2205

    
2206
  def Exec(self, feedback_fn):
2207
    """Deactivate the disks
2208

2209
    """
2210
    instance = self.instance
2211
    ins_l = rpc.call_instance_list([instance.primary_node])
2212
    ins_l = ins_l[instance.primary_node]
2213
    if not type(ins_l) is list:
2214
      raise errors.OpExecError("Can't contact node '%s'" %
2215
                               instance.primary_node)
2216

    
2217
    if self.instance.name in ins_l:
2218
      raise errors.OpExecError("Instance is running, can't shutdown"
2219
                               " block devices.")
2220

    
2221
    _ShutdownInstanceDisks(instance, self.cfg)
2222

    
2223

    
2224
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2225
  """Shutdown block devices of an instance.
2226

2227
  This does the shutdown on all nodes of the instance.
2228

2229
  If the ignore_primary is false, errors on the primary node are
2230
  ignored.
2231

2232
  """
2233
  result = True
2234
  for disk in instance.disks:
2235
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2236
      cfg.SetDiskID(top_disk, node)
2237
      if not rpc.call_blockdev_shutdown(node, top_disk):
2238
        logger.Error("could not shutdown block device %s on node %s" %
2239
                     (disk.iv_name, node))
2240
        if not ignore_primary or node != instance.primary_node:
2241
          result = False
2242
  return result
2243

    
2244

    
2245
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2246
  """Checks if a node has enough free memory.
2247

2248
  This function check if a given node has the needed amount of free
2249
  memory. In case the node has less memory or we cannot get the
2250
  information from the node, this function raise an OpPrereqError
2251
  exception.
2252

2253
  Args:
2254
    - cfg: a ConfigWriter instance
2255
    - node: the node name
2256
    - reason: string to use in the error message
2257
    - requested: the amount of memory in MiB
2258

2259
  """
2260
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2261
  if not nodeinfo or not isinstance(nodeinfo, dict):
2262
    raise errors.OpPrereqError("Could not contact node %s for resource"
2263
                             " information" % (node,))
2264

    
2265
  free_mem = nodeinfo[node].get('memory_free')
2266
  if not isinstance(free_mem, int):
2267
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2268
                             " was '%s'" % (node, free_mem))
2269
  if requested > free_mem:
2270
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2271
                             " needed %s MiB, available %s MiB" %
2272
                             (node, reason, requested, free_mem))
2273

    
2274

    
2275
class LUStartupInstance(LogicalUnit):
2276
  """Starts an instance.
2277

2278
  """
2279
  HPATH = "instance-start"
2280
  HTYPE = constants.HTYPE_INSTANCE
2281
  _OP_REQP = ["instance_name", "force"]
2282

    
2283
  def BuildHooksEnv(self):
2284
    """Build hooks env.
2285

2286
    This runs on master, primary and secondary nodes of the instance.
2287

2288
    """
2289
    env = {
2290
      "FORCE": self.op.force,
2291
      }
2292
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2293
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2294
          list(self.instance.secondary_nodes))
2295
    return env, nl, nl
2296

    
2297
  def CheckPrereq(self):
2298
    """Check prerequisites.
2299

2300
    This checks that the instance is in the cluster.
2301

2302
    """
2303
    instance = self.cfg.GetInstanceInfo(
2304
      self.cfg.ExpandInstanceName(self.op.instance_name))
2305
    if instance is None:
2306
      raise errors.OpPrereqError("Instance '%s' not known" %
2307
                                 self.op.instance_name)
2308

    
2309
    # check bridges existance
2310
    _CheckInstanceBridgesExist(instance)
2311

    
2312
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2313
                         "starting instance %s" % instance.name,
2314
                         instance.memory)
2315

    
2316
    self.instance = instance
2317
    self.op.instance_name = instance.name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Start the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    force = self.op.force
2325
    extra_args = getattr(self.op, "extra_args", "")
2326

    
2327
    self.cfg.MarkInstanceUp(instance.name)
2328

    
2329
    node_current = instance.primary_node
2330

    
2331
    _StartInstanceDisks(self.cfg, instance, force)
2332

    
2333
    if not rpc.call_instance_start(node_current, instance, extra_args):
2334
      _ShutdownInstanceDisks(instance, self.cfg)
2335
      raise errors.OpExecError("Could not start instance")
2336

    
2337

    
2338
class LURebootInstance(LogicalUnit):
2339
  """Reboot an instance.
2340

2341
  """
2342
  HPATH = "instance-reboot"
2343
  HTYPE = constants.HTYPE_INSTANCE
2344
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2345

    
2346
  def BuildHooksEnv(self):
2347
    """Build hooks env.
2348

2349
    This runs on master, primary and secondary nodes of the instance.
2350

2351
    """
2352
    env = {
2353
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2354
      }
2355
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2356
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2357
          list(self.instance.secondary_nodes))
2358
    return env, nl, nl
2359

    
2360
  def CheckPrereq(self):
2361
    """Check prerequisites.
2362

2363
    This checks that the instance is in the cluster.
2364

2365
    """
2366
    instance = self.cfg.GetInstanceInfo(
2367
      self.cfg.ExpandInstanceName(self.op.instance_name))
2368
    if instance is None:
2369
      raise errors.OpPrereqError("Instance '%s' not known" %
2370
                                 self.op.instance_name)
2371

    
2372
    # check bridges existance
2373
    _CheckInstanceBridgesExist(instance)
2374

    
2375
    self.instance = instance
2376
    self.op.instance_name = instance.name
2377

    
2378
  def Exec(self, feedback_fn):
2379
    """Reboot the instance.
2380

2381
    """
2382
    instance = self.instance
2383
    ignore_secondaries = self.op.ignore_secondaries
2384
    reboot_type = self.op.reboot_type
2385
    extra_args = getattr(self.op, "extra_args", "")
2386

    
2387
    node_current = instance.primary_node
2388

    
2389
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2390
                           constants.INSTANCE_REBOOT_HARD,
2391
                           constants.INSTANCE_REBOOT_FULL]:
2392
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2393
                                  (constants.INSTANCE_REBOOT_SOFT,
2394
                                   constants.INSTANCE_REBOOT_HARD,
2395
                                   constants.INSTANCE_REBOOT_FULL))
2396

    
2397
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2398
                       constants.INSTANCE_REBOOT_HARD]:
2399
      if not rpc.call_instance_reboot(node_current, instance,
2400
                                      reboot_type, extra_args):
2401
        raise errors.OpExecError("Could not reboot instance")
2402
    else:
2403
      if not rpc.call_instance_shutdown(node_current, instance):
2404
        raise errors.OpExecError("could not shutdown instance for full reboot")
2405
      _ShutdownInstanceDisks(instance, self.cfg)
2406
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2407
      if not rpc.call_instance_start(node_current, instance, extra_args):
2408
        _ShutdownInstanceDisks(instance, self.cfg)
2409
        raise errors.OpExecError("Could not start instance for full reboot")
2410

    
2411
    self.cfg.MarkInstanceUp(instance.name)
2412

    
2413

    
2414
class LUShutdownInstance(LogicalUnit):
2415
  """Shutdown an instance.
2416

2417
  """
2418
  HPATH = "instance-stop"
2419
  HTYPE = constants.HTYPE_INSTANCE
2420
  _OP_REQP = ["instance_name"]
2421

    
2422
  def BuildHooksEnv(self):
2423
    """Build hooks env.
2424

2425
    This runs on master, primary and secondary nodes of the instance.
2426

2427
    """
2428
    env = _BuildInstanceHookEnvByObject(self.instance)
2429
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2430
          list(self.instance.secondary_nodes))
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster.
2437

2438
    """
2439
    instance = self.cfg.GetInstanceInfo(
2440
      self.cfg.ExpandInstanceName(self.op.instance_name))
2441
    if instance is None:
2442
      raise errors.OpPrereqError("Instance '%s' not known" %
2443
                                 self.op.instance_name)
2444
    self.instance = instance
2445

    
2446
  def Exec(self, feedback_fn):
2447
    """Shutdown the instance.
2448

2449
    """
2450
    instance = self.instance
2451
    node_current = instance.primary_node
2452
    self.cfg.MarkInstanceDown(instance.name)
2453
    if not rpc.call_instance_shutdown(node_current, instance):
2454
      logger.Error("could not shutdown instance")
2455

    
2456
    _ShutdownInstanceDisks(instance, self.cfg)
2457

    
2458

    
2459
class LUReinstallInstance(LogicalUnit):
2460
  """Reinstall an instance.
2461

2462
  """
2463
  HPATH = "instance-reinstall"
2464
  HTYPE = constants.HTYPE_INSTANCE
2465
  _OP_REQP = ["instance_name"]
2466

    
2467
  def BuildHooksEnv(self):
2468
    """Build hooks env.
2469

2470
    This runs on master, primary and secondary nodes of the instance.
2471

2472
    """
2473
    env = _BuildInstanceHookEnvByObject(self.instance)
2474
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2475
          list(self.instance.secondary_nodes))
2476
    return env, nl, nl
2477

    
2478
  def CheckPrereq(self):
2479
    """Check prerequisites.
2480

2481
    This checks that the instance is in the cluster and is not running.
2482

2483
    """
2484
    instance = self.cfg.GetInstanceInfo(
2485
      self.cfg.ExpandInstanceName(self.op.instance_name))
2486
    if instance is None:
2487
      raise errors.OpPrereqError("Instance '%s' not known" %
2488
                                 self.op.instance_name)
2489
    if instance.disk_template == constants.DT_DISKLESS:
2490
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2491
                                 self.op.instance_name)
2492
    if instance.status != "down":
2493
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2494
                                 self.op.instance_name)
2495
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2496
    if remote_info:
2497
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2498
                                 (self.op.instance_name,
2499
                                  instance.primary_node))
2500

    
2501
    self.op.os_type = getattr(self.op, "os_type", None)
2502
    if self.op.os_type is not None:
2503
      # OS verification
2504
      pnode = self.cfg.GetNodeInfo(
2505
        self.cfg.ExpandNodeName(instance.primary_node))
2506
      if pnode is None:
2507
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2508
                                   self.op.pnode)
2509
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2510
      if not os_obj:
2511
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2512
                                   " primary node"  % self.op.os_type)
2513

    
2514
    self.instance = instance
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Reinstall the instance.
2518

2519
    """
2520
    inst = self.instance
2521

    
2522
    if self.op.os_type is not None:
2523
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2524
      inst.os = self.op.os_type
2525
      self.cfg.AddInstance(inst)
2526

    
2527
    _StartInstanceDisks(self.cfg, inst, None)
2528
    try:
2529
      feedback_fn("Running the instance OS create scripts...")
2530
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2531
        raise errors.OpExecError("Could not install OS for instance %s"
2532
                                 " on node %s" %
2533
                                 (inst.name, inst.primary_node))
2534
    finally:
2535
      _ShutdownInstanceDisks(inst, self.cfg)
2536

    
2537

    
2538
class LURenameInstance(LogicalUnit):
2539
  """Rename an instance.
2540

2541
  """
2542
  HPATH = "instance-rename"
2543
  HTYPE = constants.HTYPE_INSTANCE
2544
  _OP_REQP = ["instance_name", "new_name"]
2545

    
2546
  def BuildHooksEnv(self):
2547
    """Build hooks env.
2548

2549
    This runs on master, primary and secondary nodes of the instance.
2550

2551
    """
2552
    env = _BuildInstanceHookEnvByObject(self.instance)
2553
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2554
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2555
          list(self.instance.secondary_nodes))
2556
    return env, nl, nl
2557

    
2558
  def CheckPrereq(self):
2559
    """Check prerequisites.
2560

2561
    This checks that the instance is in the cluster and is not running.
2562

2563
    """
2564
    instance = self.cfg.GetInstanceInfo(
2565
      self.cfg.ExpandInstanceName(self.op.instance_name))
2566
    if instance is None:
2567
      raise errors.OpPrereqError("Instance '%s' not known" %
2568
                                 self.op.instance_name)
2569
    if instance.status != "down":
2570
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2571
                                 self.op.instance_name)
2572
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2573
    if remote_info:
2574
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2575
                                 (self.op.instance_name,
2576
                                  instance.primary_node))
2577
    self.instance = instance
2578

    
2579
    # new name verification
2580
    name_info = utils.HostInfo(self.op.new_name)
2581

    
2582
    self.op.new_name = new_name = name_info.name
2583
    instance_list = self.cfg.GetInstanceList()
2584
    if new_name in instance_list:
2585
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2586
                                 new_name)
2587

    
2588
    if not getattr(self.op, "ignore_ip", False):
2589
      command = ["fping", "-q", name_info.ip]
2590
      result = utils.RunCmd(command)
2591
      if not result.failed:
2592
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2593
                                   (name_info.ip, new_name))
2594

    
2595

    
2596
  def Exec(self, feedback_fn):
2597
    """Reinstall the instance.
2598

2599
    """
2600
    inst = self.instance
2601
    old_name = inst.name
2602

    
2603
    if inst.disk_template == constants.DT_FILE:
2604
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2605

    
2606
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2607

    
2608
    # re-read the instance from the configuration after rename
2609
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2610

    
2611
    if inst.disk_template == constants.DT_FILE:
2612
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2613
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2614
                                                old_file_storage_dir,
2615
                                                new_file_storage_dir)
2616

    
2617
      if not result:
2618
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2619
                                 " directory '%s' to '%s' (but the instance"
2620
                                 " has been renamed in Ganeti)" % (
2621
                                 inst.primary_node, old_file_storage_dir,
2622
                                 new_file_storage_dir))
2623

    
2624
      if not result[0]:
2625
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2626
                                 " (but the instance has been renamed in"
2627
                                 " Ganeti)" % (old_file_storage_dir,
2628
                                               new_file_storage_dir))
2629

    
2630
    _StartInstanceDisks(self.cfg, inst, None)
2631
    try:
2632
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2633
                                          "sda", "sdb"):
2634
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2635
               " instance has been renamed in Ganeti)" %
2636
               (inst.name, inst.primary_node))
2637
        logger.Error(msg)
2638
    finally:
2639
      _ShutdownInstanceDisks(inst, self.cfg)
2640

    
2641

    
2642
class LURemoveInstance(LogicalUnit):
2643
  """Remove an instance.
2644

2645
  """
2646
  HPATH = "instance-remove"
2647
  HTYPE = constants.HTYPE_INSTANCE
2648
  _OP_REQP = ["instance_name", "ignore_failures"]
2649

    
2650
  def BuildHooksEnv(self):
2651
    """Build hooks env.
2652

2653
    This runs on master, primary and secondary nodes of the instance.
2654

2655
    """
2656
    env = _BuildInstanceHookEnvByObject(self.instance)
2657
    nl = [self.sstore.GetMasterNode()]
2658
    return env, nl, nl
2659

    
2660
  def CheckPrereq(self):
2661
    """Check prerequisites.
2662

2663
    This checks that the instance is in the cluster.
2664

2665
    """
2666
    instance = self.cfg.GetInstanceInfo(
2667
      self.cfg.ExpandInstanceName(self.op.instance_name))
2668
    if instance is None:
2669
      raise errors.OpPrereqError("Instance '%s' not known" %
2670
                                 self.op.instance_name)
2671
    self.instance = instance
2672

    
2673
  def Exec(self, feedback_fn):
2674
    """Remove the instance.
2675

2676
    """
2677
    instance = self.instance
2678
    logger.Info("shutting down instance %s on node %s" %
2679
                (instance.name, instance.primary_node))
2680

    
2681
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2682
      if self.op.ignore_failures:
2683
        feedback_fn("Warning: can't shutdown instance")
2684
      else:
2685
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2686
                                 (instance.name, instance.primary_node))
2687

    
2688
    logger.Info("removing block devices for instance %s" % instance.name)
2689

    
2690
    if not _RemoveDisks(instance, self.cfg):
2691
      if self.op.ignore_failures:
2692
        feedback_fn("Warning: can't remove instance's disks")
2693
      else:
2694
        raise errors.OpExecError("Can't remove instance's disks")
2695

    
2696
    logger.Info("removing instance %s out of cluster config" % instance.name)
2697

    
2698
    self.cfg.RemoveInstance(instance.name)
2699

    
2700

    
2701
class LUQueryInstances(NoHooksLU):
2702
  """Logical unit for querying instances.
2703

2704
  """
2705
  _OP_REQP = ["output_fields", "names"]
2706

    
2707
  def CheckPrereq(self):
2708
    """Check prerequisites.
2709

2710
    This checks that the fields required are valid output fields.
2711

2712
    """
2713
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2714
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2715
                               "admin_state", "admin_ram",
2716
                               "disk_template", "ip", "mac", "bridge",
2717
                               "sda_size", "sdb_size", "vcpus"],
2718
                       dynamic=self.dynamic_fields,
2719
                       selected=self.op.output_fields)
2720

    
2721
    self.wanted = _GetWantedInstances(self, self.op.names)
2722

    
2723
  def Exec(self, feedback_fn):
2724
    """Computes the list of nodes and their attributes.
2725

2726
    """
2727
    instance_names = self.wanted
2728
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2729
                     in instance_names]
2730

    
2731
    # begin data gathering
2732

    
2733
    nodes = frozenset([inst.primary_node for inst in instance_list])
2734

    
2735
    bad_nodes = []
2736
    if self.dynamic_fields.intersection(self.op.output_fields):
2737
      live_data = {}
2738
      node_data = rpc.call_all_instances_info(nodes)
2739
      for name in nodes:
2740
        result = node_data[name]
2741
        if result:
2742
          live_data.update(result)
2743
        elif result == False:
2744
          bad_nodes.append(name)
2745
        # else no instance is alive
2746
    else:
2747
      live_data = dict([(name, {}) for name in instance_names])
2748

    
2749
    # end data gathering
2750

    
2751
    output = []
2752
    for instance in instance_list:
2753
      iout = []
2754
      for field in self.op.output_fields:
2755
        if field == "name":
2756
          val = instance.name
2757
        elif field == "os":
2758
          val = instance.os
2759
        elif field == "pnode":
2760
          val = instance.primary_node
2761
        elif field == "snodes":
2762
          val = list(instance.secondary_nodes)
2763
        elif field == "admin_state":
2764
          val = (instance.status != "down")
2765
        elif field == "oper_state":
2766
          if instance.primary_node in bad_nodes:
2767
            val = None
2768
          else:
2769
            val = bool(live_data.get(instance.name))
2770
        elif field == "status":
2771
          if instance.primary_node in bad_nodes:
2772
            val = "ERROR_nodedown"
2773
          else:
2774
            running = bool(live_data.get(instance.name))
2775
            if running:
2776
              if instance.status != "down":
2777
                val = "running"
2778
              else:
2779
                val = "ERROR_up"
2780
            else:
2781
              if instance.status != "down":
2782
                val = "ERROR_down"
2783
              else:
2784
                val = "ADMIN_down"
2785
        elif field == "admin_ram":
2786
          val = instance.memory
2787
        elif field == "oper_ram":
2788
          if instance.primary_node in bad_nodes:
2789
            val = None
2790
          elif instance.name in live_data:
2791
            val = live_data[instance.name].get("memory", "?")
2792
          else:
2793
            val = "-"
2794
        elif field == "disk_template":
2795
          val = instance.disk_template
2796
        elif field == "ip":
2797
          val = instance.nics[0].ip
2798
        elif field == "bridge":
2799
          val = instance.nics[0].bridge
2800
        elif field == "mac":
2801
          val = instance.nics[0].mac
2802
        elif field == "sda_size" or field == "sdb_size":
2803
          disk = instance.FindDisk(field[:3])
2804
          if disk is None:
2805
            val = None
2806
          else:
2807
            val = disk.size
2808
        elif field == "vcpus":
2809
          val = instance.vcpus
2810
        else:
2811
          raise errors.ParameterError(field)
2812
        iout.append(val)
2813
      output.append(iout)
2814

    
2815
    return output
2816

    
2817

    
2818
class LUFailoverInstance(LogicalUnit):
2819
  """Failover an instance.
2820

2821
  """
2822
  HPATH = "instance-failover"
2823
  HTYPE = constants.HTYPE_INSTANCE
2824
  _OP_REQP = ["instance_name", "ignore_consistency"]
2825

    
2826
  def BuildHooksEnv(self):
2827
    """Build hooks env.
2828

2829
    This runs on master, primary and secondary nodes of the instance.
2830

2831
    """
2832
    env = {
2833
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2834
      }
2835
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2836
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2837
    return env, nl, nl
2838

    
2839
  def CheckPrereq(self):
2840
    """Check prerequisites.
2841

2842
    This checks that the instance is in the cluster.
2843

2844
    """
2845
    instance = self.cfg.GetInstanceInfo(
2846
      self.cfg.ExpandInstanceName(self.op.instance_name))
2847
    if instance is None:
2848
      raise errors.OpPrereqError("Instance '%s' not known" %
2849
                                 self.op.instance_name)
2850

    
2851
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2852
      raise errors.OpPrereqError("Instance's disk layout is not"
2853
                                 " network mirrored, cannot failover.")
2854

    
2855
    secondary_nodes = instance.secondary_nodes
2856
    if not secondary_nodes:
2857
      raise errors.ProgrammerError("no secondary node but using "
2858
                                   "a mirrored disk template")
2859

    
2860
    target_node = secondary_nodes[0]
2861
    # check memory requirements on the secondary node
2862
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2863
                         instance.name, instance.memory)
2864

    
2865
    # check bridge existance
2866
    brlist = [nic.bridge for nic in instance.nics]
2867
    if not rpc.call_bridges_exist(target_node, brlist):
2868
      raise errors.OpPrereqError("One or more target bridges %s does not"
2869
                                 " exist on destination node '%s'" %
2870
                                 (brlist, target_node))
2871

    
2872
    self.instance = instance
2873

    
2874
  def Exec(self, feedback_fn):
2875
    """Failover an instance.
2876

2877
    The failover is done by shutting it down on its present node and
2878
    starting it on the secondary.
2879

2880
    """
2881
    instance = self.instance
2882

    
2883
    source_node = instance.primary_node
2884
    target_node = instance.secondary_nodes[0]
2885

    
2886
    feedback_fn("* checking disk consistency between source and target")
2887
    for dev in instance.disks:
2888
      # for drbd, these are drbd over lvm
2889
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2890
        if instance.status == "up" and not self.op.ignore_consistency:
2891
          raise errors.OpExecError("Disk %s is degraded on target node,"
2892
                                   " aborting failover." % dev.iv_name)
2893

    
2894
    feedback_fn("* shutting down instance on source node")
2895
    logger.Info("Shutting down instance %s on node %s" %
2896
                (instance.name, source_node))
2897

    
2898
    if not rpc.call_instance_shutdown(source_node, instance):
2899
      if self.op.ignore_consistency:
2900
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2901
                     " anyway. Please make sure node %s is down"  %
2902
                     (instance.name, source_node, source_node))
2903
      else:
2904
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2905
                                 (instance.name, source_node))
2906

    
2907
    feedback_fn("* deactivating the instance's disks on source node")
2908
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2909
      raise errors.OpExecError("Can't shut down the instance's disks.")
2910

    
2911
    instance.primary_node = target_node
2912
    # distribute new instance config to the other nodes
2913
    self.cfg.AddInstance(instance)
2914

    
2915
    # Only start the instance if it's marked as up
2916
    if instance.status == "up":
2917
      feedback_fn("* activating the instance's disks on target node")
2918
      logger.Info("Starting instance %s on node %s" %
2919
                  (instance.name, target_node))
2920

    
2921
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2922
                                               ignore_secondaries=True)
2923
      if not disks_ok:
2924
        _ShutdownInstanceDisks(instance, self.cfg)
2925
        raise errors.OpExecError("Can't activate the instance's disks")
2926

    
2927
      feedback_fn("* starting the instance on the target node")
2928
      if not rpc.call_instance_start(target_node, instance, None):
2929
        _ShutdownInstanceDisks(instance, self.cfg)
2930
        raise errors.OpExecError("Could not start instance %s on node %s." %
2931
                                 (instance.name, target_node))
2932

    
2933

    
2934
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2935
  """Create a tree of block devices on the primary node.
2936

2937
  This always creates all devices.
2938

2939
  """
2940
  if device.children:
2941
    for child in device.children:
2942
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2943
        return False
2944

    
2945
  cfg.SetDiskID(device, node)
2946
  new_id = rpc.call_blockdev_create(node, device, device.size,
2947
                                    instance.name, True, info)
2948
  if not new_id:
2949
    return False
2950
  if device.physical_id is None:
2951
    device.physical_id = new_id
2952
  return True
2953

    
2954

    
2955
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2956
  """Create a tree of block devices on a secondary node.
2957

2958
  If this device type has to be created on secondaries, create it and
2959
  all its children.
2960

2961
  If not, just recurse to children keeping the same 'force' value.
2962

2963
  """
2964
  if device.CreateOnSecondary():
2965
    force = True
2966
  if device.children:
2967
    for child in device.children:
2968
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2969
                                        child, force, info):
2970
        return False
2971

    
2972
  if not force:
2973
    return True
2974
  cfg.SetDiskID(device, node)
2975
  new_id = rpc.call_blockdev_create(node, device, device.size,
2976
                                    instance.name, False, info)
2977
  if not new_id:
2978
    return False
2979
  if device.physical_id is None:
2980
    device.physical_id = new_id
2981
  return True
2982

    
2983

    
2984
def _GenerateUniqueNames(cfg, exts):
2985
  """Generate a suitable LV name.
2986

2987
  This will generate a logical volume name for the given instance.
2988

2989
  """
2990
  results = []
2991
  for val in exts:
2992
    new_id = cfg.GenerateUniqueID()
2993
    results.append("%s%s" % (new_id, val))
2994
  return results
2995

    
2996

    
2997
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2998
  """Generate a drbd device complete with its children.
2999

3000
  """
3001
  port = cfg.AllocatePort()
3002
  vgname = cfg.GetVGName()
3003
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3004
                          logical_id=(vgname, names[0]))
3005
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3006
                          logical_id=(vgname, names[1]))
3007
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3008
                          logical_id = (primary, secondary, port),
3009
                          children = [dev_data, dev_meta])
3010
  return drbd_dev
3011

    
3012

    
3013
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3014
  """Generate a drbd8 device complete with its children.
3015

3016
  """
3017
  port = cfg.AllocatePort()
3018
  vgname = cfg.GetVGName()
3019
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3020
                          logical_id=(vgname, names[0]))
3021
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3022
                          logical_id=(vgname, names[1]))
3023
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3024
                          logical_id = (primary, secondary, port),
3025
                          children = [dev_data, dev_meta],
3026
                          iv_name=iv_name)
3027
  return drbd_dev
3028

    
3029

    
3030
def _GenerateDiskTemplate(cfg, template_name,
3031
                          instance_name, primary_node,
3032
                          secondary_nodes, disk_sz, swap_sz,
3033
                          file_storage_dir, file_driver):
3034
  """Generate the entire disk layout for a given template type.
3035

3036
  """
3037
  #TODO: compute space requirements
3038

    
3039
  vgname = cfg.GetVGName()
3040
  if template_name == constants.DT_DISKLESS:
3041
    disks = []
3042
  elif template_name == constants.DT_PLAIN:
3043
    if len(secondary_nodes) != 0:
3044
      raise errors.ProgrammerError("Wrong template configuration")
3045

    
3046
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3047
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3048
                           logical_id=(vgname, names[0]),
3049
                           iv_name = "sda")
3050
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3051
                           logical_id=(vgname, names[1]),
3052
                           iv_name = "sdb")
3053
    disks = [sda_dev, sdb_dev]
3054
  elif template_name == constants.DT_DRBD8:
3055
    if len(secondary_nodes) != 1:
3056
      raise errors.ProgrammerError("Wrong template configuration")
3057
    remote_node = secondary_nodes[0]
3058
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3059
                                       ".sdb_data", ".sdb_meta"])
3060
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3061
                                         disk_sz, names[0:2], "sda")
3062
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3063
                                         swap_sz, names[2:4], "sdb")
3064
    disks = [drbd_sda_dev, drbd_sdb_dev]
3065
  elif template_name == constants.DT_FILE:
3066
    if len(secondary_nodes) != 0:
3067
      raise errors.ProgrammerError("Wrong template configuration")
3068

    
3069
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3070
                                iv_name="sda", logical_id=(file_driver,
3071
                                "%s/sda" % file_storage_dir))
3072
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3073
                                iv_name="sdb", logical_id=(file_driver,
3074
                                "%s/sdb" % file_storage_dir))
3075
    disks = [file_sda_dev, file_sdb_dev]
3076
  else:
3077
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3078
  return disks
3079

    
3080

    
3081
def _GetInstanceInfoText(instance):
3082
  """Compute that text that should be added to the disk's metadata.
3083

3084
  """
3085
  return "originstname+%s" % instance.name
3086

    
3087

    
3088
def _CreateDisks(cfg, instance):
3089
  """Create all disks for an instance.
3090

3091
  This abstracts away some work from AddInstance.
3092

3093
  Args:
3094
    instance: the instance object
3095

3096
  Returns:
3097
    True or False showing the success of the creation process
3098

3099
  """
3100
  info = _GetInstanceInfoText(instance)
3101

    
3102
  if instance.disk_template == constants.DT_FILE:
3103
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3104
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3105
                                              file_storage_dir)
3106

    
3107
    if not result:
3108
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3109
      return False
3110

    
3111
    if not result[0]:
3112
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3113
      return False
3114

    
3115
  for device in instance.disks:
3116
    logger.Info("creating volume %s for instance %s" %
3117
                (device.iv_name, instance.name))
3118
    #HARDCODE
3119
    for secondary_node in instance.secondary_nodes:
3120
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3121
                                        device, False, info):
3122
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3123
                     (device.iv_name, device, secondary_node))
3124
        return False
3125
    #HARDCODE
3126
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3127
                                    instance, device, info):
3128
      logger.Error("failed to create volume %s on primary!" %
3129
                   device.iv_name)
3130
      return False
3131

    
3132
  return True
3133

    
3134

    
3135
def _RemoveDisks(instance, cfg):
3136
  """Remove all disks for an instance.
3137

3138
  This abstracts away some work from `AddInstance()` and
3139
  `RemoveInstance()`. Note that in case some of the devices couldn't
3140
  be removed, the removal will continue with the other ones (compare
3141
  with `_CreateDisks()`).
3142

3143
  Args:
3144
    instance: the instance object
3145

3146
  Returns:
3147
    True or False showing the success of the removal proces
3148

3149
  """
3150
  logger.Info("removing block devices for instance %s" % instance.name)
3151

    
3152
  result = True
3153
  for device in instance.disks:
3154
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3155
      cfg.SetDiskID(disk, node)
3156
      if not rpc.call_blockdev_remove(node, disk):
3157
        logger.Error("could not remove block device %s on node %s,"
3158
                     " continuing anyway" %
3159
                     (device.iv_name, node))
3160
        result = False
3161

    
3162
  if instance.disk_template == constants.DT_FILE:
3163
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3164
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3165
                                            file_storage_dir):
3166
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3167
      result = False
3168

    
3169
  return result
3170

    
3171

    
3172
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3173
  """Compute disk size requirements in the volume group
3174

3175
  This is currently hard-coded for the two-drive layout.
3176

3177
  """
3178
  # Required free disk space as a function of disk and swap space
3179
  req_size_dict = {
3180
    constants.DT_DISKLESS: None,
3181
    constants.DT_PLAIN: disk_size + swap_size,
3182
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3183
    constants.DT_DRBD8: disk_size + swap_size + 256,
3184
    constants.DT_FILE: None,
3185
  }
3186

    
3187
  if disk_template not in req_size_dict:
3188
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3189
                                 " is unknown" %  disk_template)
3190

    
3191
  return req_size_dict[disk_template]
3192

    
3193

    
3194
class LUCreateInstance(LogicalUnit):
3195
  """Create an instance.
3196

3197
  """
3198
  HPATH = "instance-add"
3199
  HTYPE = constants.HTYPE_INSTANCE
3200
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3201
              "disk_template", "swap_size", "mode", "start", "vcpus",
3202
              "wait_for_sync", "ip_check", "mac"]
3203

    
3204
  def _RunAllocator(self):
3205
    """Run the allocator based on input opcode.
3206

3207
    """
3208
    disks = [{"size": self.op.disk_size, "mode": "w"},
3209
             {"size": self.op.swap_size, "mode": "w"}]
3210
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3211
             "bridge": self.op.bridge}]
3212
    ial = IAllocator(self.cfg, self.sstore,
3213
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3214
                     name=self.op.instance_name,
3215
                     disk_template=self.op.disk_template,
3216
                     tags=[],
3217
                     os=self.op.os_type,
3218
                     vcpus=self.op.vcpus,
3219
                     mem_size=self.op.mem_size,
3220
                     disks=disks,
3221
                     nics=nics,
3222
                     )
3223

    
3224
    ial.Run(self.op.iallocator)
3225

    
3226
    if not ial.success:
3227
      raise errors.OpPrereqError("Can't compute nodes using"
3228
                                 " iallocator '%s': %s" % (self.op.iallocator,
3229
                                                           ial.info))
3230
    if len(ial.nodes) != ial.required_nodes:
3231
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3232
                                 " of nodes (%s), required %s" %
3233
                                 (len(ial.nodes), ial.required_nodes))
3234
    self.op.pnode = ial.nodes[0]
3235
    logger.ToStdout("Selected nodes for the instance: %s" %
3236
                    (", ".join(ial.nodes),))
3237
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3238
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3239
    if ial.required_nodes == 2:
3240
      self.op.snode = ial.nodes[1]
3241

    
3242
  def BuildHooksEnv(self):
3243
    """Build hooks env.
3244

3245
    This runs on master, primary and secondary nodes of the instance.
3246

3247
    """
3248
    env = {
3249
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3250
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3251
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3252
      "INSTANCE_ADD_MODE": self.op.mode,
3253
      }
3254
    if self.op.mode == constants.INSTANCE_IMPORT:
3255
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3256
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3257
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3258

    
3259
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3260
      primary_node=self.op.pnode,
3261
      secondary_nodes=self.secondaries,
3262
      status=self.instance_status,
3263
      os_type=self.op.os_type,
3264
      memory=self.op.mem_size,
3265
      vcpus=self.op.vcpus,
3266
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3267
    ))
3268

    
3269
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3270
          self.secondaries)
3271
    return env, nl, nl
3272

    
3273

    
3274
  def CheckPrereq(self):
3275
    """Check prerequisites.
3276

3277
    """
3278
    # set optional parameters to none if they don't exist
3279
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3280
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3281
                 "vnc_bind_address"]:
3282
      if not hasattr(self.op, attr):
3283
        setattr(self.op, attr, None)
3284

    
3285
    if self.op.mode not in (constants.INSTANCE_CREATE,
3286
                            constants.INSTANCE_IMPORT):
3287
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3288
                                 self.op.mode)
3289

    
3290
    if (not self.cfg.GetVGName() and
3291
        self.op.disk_template not in constants.DTS_NOT_LVM):
3292
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3293
                                 " instances")
3294

    
3295
    if self.op.mode == constants.INSTANCE_IMPORT:
3296
      src_node = getattr(self.op, "src_node", None)
3297
      src_path = getattr(self.op, "src_path", None)
3298
      if src_node is None or src_path is None:
3299
        raise errors.OpPrereqError("Importing an instance requires source"
3300
                                   " node and path options")
3301
      src_node_full = self.cfg.ExpandNodeName(src_node)
3302
      if src_node_full is None:
3303
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3304
      self.op.src_node = src_node = src_node_full
3305

    
3306
      if not os.path.isabs(src_path):
3307
        raise errors.OpPrereqError("The source path must be absolute")
3308

    
3309
      export_info = rpc.call_export_info(src_node, src_path)
3310

    
3311
      if not export_info:
3312
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3313

    
3314
      if not export_info.has_section(constants.INISECT_EXP):
3315
        raise errors.ProgrammerError("Corrupted export config")
3316

    
3317
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3318
      if (int(ei_version) != constants.EXPORT_VERSION):
3319
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3320
                                   (ei_version, constants.EXPORT_VERSION))
3321

    
3322
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3323
        raise errors.OpPrereqError("Can't import instance with more than"
3324
                                   " one data disk")
3325

    
3326
      # FIXME: are the old os-es, disk sizes, etc. useful?
3327
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3328
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3329
                                                         'disk0_dump'))
3330
      self.src_image = diskimage
3331
    else: # INSTANCE_CREATE
3332
      if getattr(self.op, "os_type", None) is None:
3333
        raise errors.OpPrereqError("No guest OS specified")
3334

    
3335
    #### instance parameters check
3336

    
3337
    # disk template and mirror node verification
3338
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3339
      raise errors.OpPrereqError("Invalid disk template name")
3340

    
3341
    # instance name verification
3342
    hostname1 = utils.HostInfo(self.op.instance_name)
3343

    
3344
    self.op.instance_name = instance_name = hostname1.name
3345
    instance_list = self.cfg.GetInstanceList()
3346
    if instance_name in instance_list:
3347
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3348
                                 instance_name)
3349

    
3350
    # ip validity checks
3351
    ip = getattr(self.op, "ip", None)
3352
    if ip is None or ip.lower() == "none":
3353
      inst_ip = None
3354
    elif ip.lower() == "auto":
3355
      inst_ip = hostname1.ip
3356
    else:
3357
      if not utils.IsValidIP(ip):
3358
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3359
                                   " like a valid IP" % ip)
3360
      inst_ip = ip
3361
    self.inst_ip = self.op.ip = inst_ip
3362

    
3363
    if self.op.start and not self.op.ip_check:
3364
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3365
                                 " adding an instance in start mode")
3366

    
3367
    if self.op.ip_check:
3368
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3369
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3370
                                   (hostname1.ip, instance_name))
3371

    
3372
    # MAC address verification
3373
    if self.op.mac != "auto":
3374
      if not utils.IsValidMac(self.op.mac.lower()):
3375
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3376
                                   self.op.mac)
3377

    
3378
    # bridge verification
3379
    bridge = getattr(self.op, "bridge", None)
3380
    if bridge is None:
3381
      self.op.bridge = self.cfg.GetDefBridge()
3382
    else:
3383
      self.op.bridge = bridge
3384

    
3385
    # boot order verification
3386
    if self.op.hvm_boot_order is not None:
3387
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3388
        raise errors.OpPrereqError("invalid boot order specified,"
3389
                                   " must be one or more of [acdn]")
3390
    # file storage checks
3391
    if (self.op.file_driver and
3392
        not self.op.file_driver in constants.FILE_DRIVER):
3393
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3394
                                 self.op.file_driver)
3395

    
3396
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3397
      raise errors.OpPrereqError("File storage directory not a relative"
3398
                                 " path")
3399
    #### allocator run
3400

    
3401
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3402
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3403
                                 " node must be given")
3404

    
3405
    if self.op.iallocator is not None:
3406
      self._RunAllocator()
3407

    
3408
    #### node related checks
3409

    
3410
    # check primary node
3411
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3412
    if pnode is None:
3413
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3414
                                 self.op.pnode)
3415
    self.op.pnode = pnode.name
3416
    self.pnode = pnode
3417
    self.secondaries = []
3418

    
3419
    # mirror node verification
3420
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3421
      if getattr(self.op, "snode", None) is None:
3422
        raise errors.OpPrereqError("The networked disk templates need"
3423
                                   " a mirror node")
3424

    
3425
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3426
      if snode_name is None:
3427
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3428
                                   self.op.snode)
3429
      elif snode_name == pnode.name:
3430
        raise errors.OpPrereqError("The secondary node cannot be"
3431
                                   " the primary node.")
3432
      self.secondaries.append(snode_name)
3433

    
3434
    req_size = _ComputeDiskSize(self.op.disk_template,
3435
                                self.op.disk_size, self.op.swap_size)
3436

    
3437
    # Check lv size requirements
3438
    if req_size is not None:
3439
      nodenames = [pnode.name] + self.secondaries
3440
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3441
      for node in nodenames:
3442
        info = nodeinfo.get(node, None)
3443
        if not info:
3444
          raise errors.OpPrereqError("Cannot get current information"
3445
                                     " from node '%s'" % nodeinfo)
3446
        vg_free = info.get('vg_free', None)
3447
        if not isinstance(vg_free, int):
3448
          raise errors.OpPrereqError("Can't compute free disk space on"
3449
                                     " node %s" % node)
3450
        if req_size > info['vg_free']:
3451
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3452
                                     " %d MB available, %d MB required" %
3453
                                     (node, info['vg_free'], req_size))
3454

    
3455
    # os verification
3456
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3457
    if not os_obj:
3458
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3459
                                 " primary node"  % self.op.os_type)
3460

    
3461
    if self.op.kernel_path == constants.VALUE_NONE:
3462
      raise errors.OpPrereqError("Can't set instance kernel to none")
3463

    
3464

    
3465
    # bridge check on primary node
3466
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3467
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3468
                                 " destination node '%s'" %
3469
                                 (self.op.bridge, pnode.name))
3470

    
3471
    # memory check on primary node
3472
    if self.op.start:
3473
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3474
                           "creating instance %s" % self.op.instance_name,
3475
                           self.op.mem_size)
3476

    
3477
    # hvm_cdrom_image_path verification
3478
    if self.op.hvm_cdrom_image_path is not None:
3479
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3480
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3481
                                   " be an absolute path or None, not %s" %
3482
                                   self.op.hvm_cdrom_image_path)
3483
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3484
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3485
                                   " regular file or a symlink pointing to"
3486
                                   " an existing regular file, not %s" %
3487
                                   self.op.hvm_cdrom_image_path)
3488

    
3489
    # vnc_bind_address verification
3490
    if self.op.vnc_bind_address is not None:
3491
      if not utils.IsValidIP(self.op.vnc_bind_address):
3492
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3493
                                   " like a valid IP address" %
3494
                                   self.op.vnc_bind_address)
3495

    
3496
    if self.op.start:
3497
      self.instance_status = 'up'
3498
    else:
3499
      self.instance_status = 'down'
3500

    
3501
  def Exec(self, feedback_fn):
3502
    """Create and add the instance to the cluster.
3503

3504
    """
3505
    instance = self.op.instance_name
3506
    pnode_name = self.pnode.name
3507

    
3508
    if self.op.mac == "auto":
3509
      mac_address = self.cfg.GenerateMAC()
3510
    else:
3511
      mac_address = self.op.mac
3512

    
3513
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3514
    if self.inst_ip is not None:
3515
      nic.ip = self.inst_ip
3516

    
3517
    ht_kind = self.sstore.GetHypervisorType()
3518
    if ht_kind in constants.HTS_REQ_PORT:
3519
      network_port = self.cfg.AllocatePort()
3520
    else:
3521
      network_port = None
3522

    
3523
    if self.op.vnc_bind_address is None:
3524
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3525

    
3526
    # this is needed because os.path.join does not accept None arguments
3527
    if self.op.file_storage_dir is None:
3528
      string_file_storage_dir = ""
3529
    else:
3530
      string_file_storage_dir = self.op.file_storage_dir
3531

    
3532
    # build the full file storage dir path
3533
    file_storage_dir = os.path.normpath(os.path.join(
3534
                                        self.sstore.GetFileStorageDir(),
3535
                                        string_file_storage_dir, instance))
3536

    
3537

    
3538
    disks = _GenerateDiskTemplate(self.cfg,
3539
                                  self.op.disk_template,
3540
                                  instance, pnode_name,
3541
                                  self.secondaries, self.op.disk_size,
3542
                                  self.op.swap_size,
3543
                                  file_storage_dir,
3544
                                  self.op.file_driver)
3545

    
3546
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3547
                            primary_node=pnode_name,
3548
                            memory=self.op.mem_size,
3549
                            vcpus=self.op.vcpus,
3550
                            nics=[nic], disks=disks,
3551
                            disk_template=self.op.disk_template,
3552
                            status=self.instance_status,
3553
                            network_port=network_port,
3554
                            kernel_path=self.op.kernel_path,
3555
                            initrd_path=self.op.initrd_path,
3556
                            hvm_boot_order=self.op.hvm_boot_order,
3557
                            hvm_acpi=self.op.hvm_acpi,
3558
                            hvm_pae=self.op.hvm_pae,
3559
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3560
                            vnc_bind_address=self.op.vnc_bind_address,
3561
                            )
3562

    
3563
    feedback_fn("* creating instance disks...")
3564
    if not _CreateDisks(self.cfg, iobj):
3565
      _RemoveDisks(iobj, self.cfg)
3566
      raise errors.OpExecError("Device creation failed, reverting...")
3567

    
3568
    feedback_fn("adding instance %s to cluster config" % instance)
3569

    
3570
    self.cfg.AddInstance(iobj)
3571

    
3572
    if self.op.wait_for_sync:
3573
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3574
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3575
      # make sure the disks are not degraded (still sync-ing is ok)
3576
      time.sleep(15)
3577
      feedback_fn("* checking mirrors status")
3578
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3579
    else:
3580
      disk_abort = False
3581

    
3582
    if disk_abort:
3583
      _RemoveDisks(iobj, self.cfg)
3584
      self.cfg.RemoveInstance(iobj.name)
3585
      raise errors.OpExecError("There are some degraded disks for"
3586
                               " this instance")
3587

    
3588
    feedback_fn("creating os for instance %s on node %s" %
3589
                (instance, pnode_name))
3590

    
3591
    if iobj.disk_template != constants.DT_DISKLESS:
3592
      if self.op.mode == constants.INSTANCE_CREATE:
3593
        feedback_fn("* running the instance OS create scripts...")
3594
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3595
          raise errors.OpExecError("could not add os for instance %s"
3596
                                   " on node %s" %
3597
                                   (instance, pnode_name))
3598

    
3599
      elif self.op.mode == constants.INSTANCE_IMPORT:
3600
        feedback_fn("* running the instance OS import scripts...")
3601
        src_node = self.op.src_node
3602
        src_image = self.src_image
3603
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3604
                                                src_node, src_image):
3605
          raise errors.OpExecError("Could not import os for instance"
3606
                                   " %s on node %s" %
3607
                                   (instance, pnode_name))
3608
      else:
3609
        # also checked in the prereq part
3610
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3611
                                     % self.op.mode)
3612

    
3613
    if self.op.start:
3614
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3615
      feedback_fn("* starting instance...")
3616
      if not rpc.call_instance_start(pnode_name, iobj, None):
3617
        raise errors.OpExecError("Could not start instance")
3618

    
3619

    
3620
class LUConnectConsole(NoHooksLU):
3621
  """Connect to an instance's console.
3622

3623
  This is somewhat special in that it returns the command line that
3624
  you need to run on the master node in order to connect to the
3625
  console.
3626

3627
  """
3628
  _OP_REQP = ["instance_name"]
3629

    
3630
  def CheckPrereq(self):
3631
    """Check prerequisites.
3632

3633
    This checks that the instance is in the cluster.
3634

3635
    """
3636
    instance = self.cfg.GetInstanceInfo(
3637
      self.cfg.ExpandInstanceName(self.op.instance_name))
3638
    if instance is None:
3639
      raise errors.OpPrereqError("Instance '%s' not known" %
3640
                                 self.op.instance_name)
3641
    self.instance = instance
3642

    
3643
  def Exec(self, feedback_fn):
3644
    """Connect to the console of an instance
3645

3646
    """
3647
    instance = self.instance
3648
    node = instance.primary_node
3649

    
3650
    node_insts = rpc.call_instance_list([node])[node]
3651
    if node_insts is False:
3652
      raise errors.OpExecError("Can't connect to node %s." % node)
3653

    
3654
    if instance.name not in node_insts:
3655
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3656

    
3657
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3658

    
3659
    hyper = hypervisor.GetHypervisor()
3660
    console_cmd = hyper.GetShellCommandForConsole(instance)
3661

    
3662
    # build ssh cmdline
3663
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3664

    
3665

    
3666
class LUReplaceDisks(LogicalUnit):
3667
  """Replace the disks of an instance.
3668

3669
  """
3670
  HPATH = "mirrors-replace"
3671
  HTYPE = constants.HTYPE_INSTANCE
3672
  _OP_REQP = ["instance_name", "mode", "disks"]
3673

    
3674
  def _RunAllocator(self):
3675
    """Compute a new secondary node using an IAllocator.
3676

3677
    """
3678
    ial = IAllocator(self.cfg, self.sstore,
3679
                     mode=constants.IALLOCATOR_MODE_RELOC,
3680
                     name=self.op.instance_name,
3681
                     relocate_from=[self.sec_node])
3682

    
3683
    ial.Run(self.op.iallocator)
3684

    
3685
    if not ial.success:
3686
      raise errors.OpPrereqError("Can't compute nodes using"
3687
                                 " iallocator '%s': %s" % (self.op.iallocator,
3688
                                                           ial.info))
3689
    if len(ial.nodes) != ial.required_nodes:
3690
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3691
                                 " of nodes (%s), required %s" %
3692
                                 (len(ial.nodes), ial.required_nodes))
3693
    self.op.remote_node = ial.nodes[0]
3694
    logger.ToStdout("Selected new secondary for the instance: %s" %
3695
                    self.op.remote_node)
3696

    
3697
  def BuildHooksEnv(self):
3698
    """Build hooks env.
3699

3700
    This runs on the master, the primary and all the secondaries.
3701

3702
    """
3703
    env = {
3704
      "MODE": self.op.mode,
3705
      "NEW_SECONDARY": self.op.remote_node,
3706
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3707
      }
3708
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3709
    nl = [
3710
      self.sstore.GetMasterNode(),
3711
      self.instance.primary_node,
3712
      ]
3713
    if self.op.remote_node is not None:
3714
      nl.append(self.op.remote_node)
3715
    return env, nl, nl
3716

    
3717
  def CheckPrereq(self):
3718
    """Check prerequisites.
3719

3720
    This checks that the instance is in the cluster.
3721

3722
    """
3723
    if not hasattr(self.op, "remote_node"):
3724
      self.op.remote_node = None
3725

    
3726
    instance = self.cfg.GetInstanceInfo(
3727
      self.cfg.ExpandInstanceName(self.op.instance_name))
3728
    if instance is None:
3729
      raise errors.OpPrereqError("Instance '%s' not known" %
3730
                                 self.op.instance_name)
3731
    self.instance = instance
3732
    self.op.instance_name = instance.name
3733

    
3734
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3735
      raise errors.OpPrereqError("Instance's disk layout is not"
3736
                                 " network mirrored.")
3737

    
3738
    if len(instance.secondary_nodes) != 1:
3739
      raise errors.OpPrereqError("The instance has a strange layout,"
3740
                                 " expected one secondary but found %d" %
3741
                                 len(instance.secondary_nodes))
3742

    
3743
    self.sec_node = instance.secondary_nodes[0]
3744

    
3745
    ia_name = getattr(self.op, "iallocator", None)
3746
    if ia_name is not None:
3747
      if self.op.remote_node is not None:
3748
        raise errors.OpPrereqError("Give either the iallocator or the new"
3749
                                   " secondary, not both")
3750
      self.op.remote_node = self._RunAllocator()
3751

    
3752
    remote_node = self.op.remote_node
3753
    if remote_node is not None:
3754
      remote_node = self.cfg.ExpandNodeName(remote_node)
3755
      if remote_node is None:
3756
        raise errors.OpPrereqError("Node '%s' not known" %
3757
                                   self.op.remote_node)
3758
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3759
    else:
3760
      self.remote_node_info = None
3761
    if remote_node == instance.primary_node:
3762
      raise errors.OpPrereqError("The specified node is the primary node of"
3763
                                 " the instance.")
3764
    elif remote_node == self.sec_node:
3765
      if self.op.mode == constants.REPLACE_DISK_SEC:
3766
        # this is for DRBD8, where we can't execute the same mode of
3767
        # replacement as for drbd7 (no different port allocated)
3768
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3769
                                   " replacement")
3770
    if instance.disk_template == constants.DT_DRBD8:
3771
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3772
          remote_node is not None):
3773
        # switch to replace secondary mode
3774
        self.op.mode = constants.REPLACE_DISK_SEC
3775

    
3776
      if self.op.mode == constants.REPLACE_DISK_ALL:
3777
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3778
                                   " secondary disk replacement, not"
3779
                                   " both at once")
3780
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3781
        if remote_node is not None:
3782
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3783
                                     " the secondary while doing a primary"
3784
                                     " node disk replacement")
3785
        self.tgt_node = instance.primary_node
3786
        self.oth_node = instance.secondary_nodes[0]
3787
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3788
        self.new_node = remote_node # this can be None, in which case
3789
                                    # we don't change the secondary
3790
        self.tgt_node = instance.secondary_nodes[0]
3791
        self.oth_node = instance.primary_node
3792
      else:
3793
        raise errors.ProgrammerError("Unhandled disk replace mode")
3794

    
3795
    for name in self.op.disks:
3796
      if instance.FindDisk(name) is None:
3797
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3798
                                   (name, instance.name))
3799
    self.op.remote_node = remote_node
3800

    
3801
  def _ExecD8DiskOnly(self, feedback_fn):
3802
    """Replace a disk on the primary or secondary for dbrd8.
3803

3804
    The algorithm for replace is quite complicated:
3805
      - for each disk to be replaced:
3806
        - create new LVs on the target node with unique names
3807
        - detach old LVs from the drbd device
3808
        - rename old LVs to name_replaced.<time_t>
3809
        - rename new LVs to old LVs
3810
        - attach the new LVs (with the old names now) to the drbd device
3811
      - wait for sync across all devices
3812
      - for each modified disk:
3813
        - remove old LVs (which have the name name_replaces.<time_t>)
3814

3815
    Failures are not very well handled.
3816

3817
    """
3818
    steps_total = 6
3819
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3820
    instance = self.instance
3821
    iv_names = {}
3822
    vgname = self.cfg.GetVGName()
3823
    # start of work
3824
    cfg = self.cfg
3825
    tgt_node = self.tgt_node
3826
    oth_node = self.oth_node
3827

    
3828
    # Step: check device activation
3829
    self.proc.LogStep(1, steps_total, "check device existence")
3830
    info("checking volume groups")
3831
    my_vg = cfg.GetVGName()
3832
    results = rpc.call_vg_list([oth_node, tgt_node])
3833
    if not results:
3834
      raise errors.OpExecError("Can't list volume groups on the nodes")
3835
    for node in oth_node, tgt_node:
3836
      res = results.get(node, False)
3837
      if not res or my_vg not in res:
3838
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3839
                                 (my_vg, node))
3840
    for dev in instance.disks:
3841
      if not dev.iv_name in self.op.disks:
3842
        continue
3843
      for node in tgt_node, oth_node:
3844
        info("checking %s on %s" % (dev.iv_name, node))
3845
        cfg.SetDiskID(dev, node)
3846
        if not rpc.call_blockdev_find(node, dev):
3847
          raise errors.OpExecError("Can't find device %s on node %s" %
3848
                                   (dev.iv_name, node))
3849

    
3850
    # Step: check other node consistency
3851
    self.proc.LogStep(2, steps_total, "check peer consistency")
3852
    for dev in instance.disks:
3853
      if not dev.iv_name in self.op.disks:
3854
        continue
3855
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3856
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3857
                                   oth_node==instance.primary_node):
3858
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3859
                                 " to replace disks on this node (%s)" %
3860
                                 (oth_node, tgt_node))
3861

    
3862
    # Step: create new storage
3863
    self.proc.LogStep(3, steps_total, "allocate new storage")
3864
    for dev in instance.disks:
3865
      if not dev.iv_name in self.op.disks:
3866
        continue
3867
      size = dev.size
3868
      cfg.SetDiskID(dev, tgt_node)
3869
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3870
      names = _GenerateUniqueNames(cfg, lv_names)
3871
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3872
                             logical_id=(vgname, names[0]))
3873
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3874
                             logical_id=(vgname, names[1]))
3875
      new_lvs = [lv_data, lv_meta]
3876
      old_lvs = dev.children
3877
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3878
      info("creating new local storage on %s for %s" %
3879
           (tgt_node, dev.iv_name))
3880
      # since we *always* want to create this LV, we use the
3881
      # _Create...OnPrimary (which forces the creation), even if we
3882
      # are talking about the secondary node
3883
      for new_lv in new_lvs:
3884
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3885
                                        _GetInstanceInfoText(instance)):
3886
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3887
                                   " node '%s'" %
3888
                                   (new_lv.logical_id[1], tgt_node))
3889

    
3890
    # Step: for each lv, detach+rename*2+attach
3891
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3892
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3893
      info("detaching %s drbd from local storage" % dev.iv_name)
3894
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3895
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3896
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3897
      #dev.children = []
3898
      #cfg.Update(instance)
3899

    
3900
      # ok, we created the new LVs, so now we know we have the needed
3901
      # storage; as such, we proceed on the target node to rename
3902
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3903
      # using the assumption that logical_id == physical_id (which in
3904
      # turn is the unique_id on that node)
3905

    
3906
      # FIXME(iustin): use a better name for the replaced LVs
3907
      temp_suffix = int(time.time())
3908
      ren_fn = lambda d, suff: (d.physical_id[0],
3909
                                d.physical_id[1] + "_replaced-%s" % suff)
3910
      # build the rename list based on what LVs exist on the node
3911
      rlist = []
3912
      for to_ren in old_lvs:
3913
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3914
        if find_res is not None: # device exists
3915
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3916

    
3917
      info("renaming the old LVs on the target node")
3918
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3919
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3920
      # now we rename the new LVs to the old LVs
3921
      info("renaming the new LVs on the target node")
3922
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3923
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3924
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3925

    
3926
      for old, new in zip(old_lvs, new_lvs):
3927
        new.logical_id = old.logical_id
3928
        cfg.SetDiskID(new, tgt_node)
3929

    
3930
      for disk in old_lvs:
3931
        disk.logical_id = ren_fn(disk, temp_suffix)
3932
        cfg.SetDiskID(disk, tgt_node)
3933

    
3934
      # now that the new lvs have the old name, we can add them to the device
3935
      info("adding new mirror component on %s" % tgt_node)
3936
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3937
        for new_lv in new_lvs:
3938
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3939
            warning("Can't rollback device %s", hint="manually cleanup unused"
3940
                    " logical volumes")
3941
        raise errors.OpExecError("Can't add local storage to drbd")
3942

    
3943
      dev.children = new_lvs
3944
      cfg.Update(instance)
3945

    
3946
    # Step: wait for sync
3947

    
3948
    # this can fail as the old devices are degraded and _WaitForSync
3949
    # does a combined result over all disks, so we don't check its
3950
    # return value
3951
    self.proc.LogStep(5, steps_total, "sync devices")
3952
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3953

    
3954
    # so check manually all the devices
3955
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3956
      cfg.SetDiskID(dev, instance.primary_node)
3957
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3958
      if is_degr:
3959
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3960

    
3961
    # Step: remove old storage
3962
    self.proc.LogStep(6, steps_total, "removing old storage")
3963
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3964
      info("remove logical volumes for %s" % name)
3965
      for lv in old_lvs:
3966
        cfg.SetDiskID(lv, tgt_node)
3967
        if not rpc.call_blockdev_remove(tgt_node, lv):
3968
          warning("Can't remove old LV", hint="manually remove unused LVs")
3969
          continue
3970

    
3971
  def _ExecD8Secondary(self, feedback_fn):
3972
    """Replace the secondary node for drbd8.
3973

3974
    The algorithm for replace is quite complicated:
3975
      - for all disks of the instance:
3976
        - create new LVs on the new node with same names
3977
        - shutdown the drbd device on the old secondary
3978
        - disconnect the drbd network on the primary
3979
        - create the drbd device on the new secondary
3980
        - network attach the drbd on the primary, using an artifice:
3981
          the drbd code for Attach() will connect to the network if it
3982
          finds a device which is connected to the good local disks but
3983
          not network enabled
3984
      - wait for sync across all devices
3985
      - remove all disks from the old secondary
3986

3987
    Failures are not very well handled.
3988

3989
    """
3990
    steps_total = 6
3991
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3992
    instance = self.instance
3993
    iv_names = {}
3994
    vgname = self.cfg.GetVGName()
3995
    # start of work
3996
    cfg = self.cfg
3997
    old_node = self.tgt_node
3998
    new_node = self.new_node
3999
    pri_node = instance.primary_node
4000

    
4001
    # Step: check device activation
4002
    self.proc.LogStep(1, steps_total, "check device existence")
4003
    info("checking volume groups")
4004
    my_vg = cfg.GetVGName()
4005
    results = rpc.call_vg_list([pri_node, new_node])
4006
    if not results:
4007
      raise errors.OpExecError("Can't list volume groups on the nodes")
4008
    for node in pri_node, new_node:
4009
      res = results.get(node, False)
4010
      if not res or my_vg not in res:
4011
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4012
                                 (my_vg, node))
4013
    for dev in instance.disks:
4014
      if not dev.iv_name in self.op.disks:
4015
        continue
4016
      info("checking %s on %s" % (dev.iv_name, pri_node))
4017
      cfg.SetDiskID(dev, pri_node)
4018
      if not rpc.call_blockdev_find(pri_node, dev):
4019
        raise errors.OpExecError("Can't find device %s on node %s" %
4020
                                 (dev.iv_name, pri_node))
4021

    
4022
    # Step: check other node consistency
4023
    self.proc.LogStep(2, steps_total, "check peer consistency")
4024
    for dev in instance.disks:
4025
      if not dev.iv_name in self.op.disks:
4026
        continue
4027
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4028
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4029
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4030
                                 " unsafe to replace the secondary" %
4031
                                 pri_node)
4032

    
4033
    # Step: create new storage
4034
    self.proc.LogStep(3, steps_total, "allocate new storage")
4035
    for dev in instance.disks:
4036
      size = dev.size
4037
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4038
      # since we *always* want to create this LV, we use the
4039
      # _Create...OnPrimary (which forces the creation), even if we
4040
      # are talking about the secondary node
4041
      for new_lv in dev.children:
4042
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4043
                                        _GetInstanceInfoText(instance)):
4044
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4045
                                   " node '%s'" %
4046
                                   (new_lv.logical_id[1], new_node))
4047

    
4048
      iv_names[dev.iv_name] = (dev, dev.children)
4049

    
4050
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4051
    for dev in instance.disks:
4052
      size = dev.size
4053
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4054
      # create new devices on new_node
4055
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4056
                              logical_id=(pri_node, new_node,
4057
                                          dev.logical_id[2]),
4058
                              children=dev.children)
4059
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4060
                                        new_drbd, False,
4061
                                      _GetInstanceInfoText(instance)):
4062
        raise errors.OpExecError("Failed to create new DRBD on"
4063
                                 " node '%s'" % new_node)
4064

    
4065
    for dev in instance.disks:
4066
      # we have new devices, shutdown the drbd on the old secondary
4067
      info("shutting down drbd for %s on old node" % dev.iv_name)
4068
      cfg.SetDiskID(dev, old_node)
4069
      if not rpc.call_blockdev_shutdown(old_node, dev):
4070
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4071
                hint="Please cleanup this device manually as soon as possible")
4072

    
4073
    info("detaching primary drbds from the network (=> standalone)")
4074
    done = 0
4075
    for dev in instance.disks:
4076
      cfg.SetDiskID(dev, pri_node)
4077
      # set the physical (unique in bdev terms) id to None, meaning
4078
      # detach from network
4079
      dev.physical_id = (None,) * len(dev.physical_id)
4080
      # and 'find' the device, which will 'fix' it to match the
4081
      # standalone state
4082
      if rpc.call_blockdev_find(pri_node, dev):
4083
        done += 1
4084
      else:
4085
        warning("Failed to detach drbd %s from network, unusual case" %
4086
                dev.iv_name)
4087

    
4088
    if not done:
4089
      # no detaches succeeded (very unlikely)
4090
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4091

    
4092
    # if we managed to detach at least one, we update all the disks of
4093
    # the instance to point to the new secondary
4094
    info("updating instance configuration")
4095
    for dev in instance.disks:
4096
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4097
      cfg.SetDiskID(dev, pri_node)
4098
    cfg.Update(instance)
4099

    
4100
    # and now perform the drbd attach
4101
    info("attaching primary drbds to new secondary (standalone => connected)")
4102
    failures = []
4103
    for dev in instance.disks:
4104
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4105
      # since the attach is smart, it's enough to 'find' the device,
4106
      # it will automatically activate the network, if the physical_id
4107
      # is correct
4108
      cfg.SetDiskID(dev, pri_node)
4109
      if not rpc.call_blockdev_find(pri_node, dev):
4110
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4111
                "please do a gnt-instance info to see the status of disks")
4112

    
4113
    # this can fail as the old devices are degraded and _WaitForSync
4114
    # does a combined result over all disks, so we don't check its
4115
    # return value
4116
    self.proc.LogStep(5, steps_total, "sync devices")
4117
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4118

    
4119
    # so check manually all the devices
4120
    for name, (dev, old_lvs) in iv_names.iteritems():
4121
      cfg.SetDiskID(dev, pri_node)
4122
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4123
      if is_degr:
4124
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4125

    
4126
    self.proc.LogStep(6, steps_total, "removing old storage")
4127
    for name, (dev, old_lvs) in iv_names.iteritems():
4128
      info("remove logical volumes for %s" % name)
4129
      for lv in old_lvs:
4130
        cfg.SetDiskID(lv, old_node)
4131
        if not rpc.call_blockdev_remove(old_node, lv):
4132
          warning("Can't remove LV on old secondary",
4133
                  hint="Cleanup stale volumes by hand")
4134

    
4135
  def Exec(self, feedback_fn):
4136
    """Execute disk replacement.
4137

4138
    This dispatches the disk replacement to the appropriate handler.
4139

4140
    """
4141
    instance = self.instance
4142
    if instance.disk_template == constants.DT_DRBD8:
4143
      if self.op.remote_node is None:
4144
        fn = self._ExecD8DiskOnly
4145
      else:
4146
        fn = self._ExecD8Secondary
4147
    else:
4148
      raise errors.ProgrammerError("Unhandled disk replacement case")
4149
    return fn(feedback_fn)
4150

    
4151

    
4152
class LUQueryInstanceData(NoHooksLU):
4153
  """Query runtime instance data.
4154

4155
  """
4156
  _OP_REQP = ["instances"]
4157

    
4158
  def CheckPrereq(self):
4159
    """Check prerequisites.
4160

4161
    This only checks the optional instance list against the existing names.
4162

4163
    """
4164
    if not isinstance(self.op.instances, list):
4165
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4166
    if self.op.instances:
4167
      self.wanted_instances = []
4168
      names = self.op.instances
4169
      for name in names:
4170
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4171
        if instance is None:
4172
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4173
        self.wanted_instances.append(instance)
4174
    else:
4175
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4176
                               in self.cfg.GetInstanceList()]
4177
    return
4178

    
4179

    
4180
  def _ComputeDiskStatus(self, instance, snode, dev):
4181
    """Compute block device status.
4182

4183
    """
4184
    self.cfg.SetDiskID(dev, instance.primary_node)
4185
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4186
    if dev.dev_type in constants.LDS_DRBD:
4187
      # we change the snode then (otherwise we use the one passed in)
4188
      if dev.logical_id[0] == instance.primary_node:
4189
        snode = dev.logical_id[1]
4190
      else:
4191
        snode = dev.logical_id[0]
4192

    
4193
    if snode:
4194
      self.cfg.SetDiskID(dev, snode)
4195
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4196
    else:
4197
      dev_sstatus = None
4198

    
4199
    if dev.children:
4200
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4201
                      for child in dev.children]
4202
    else:
4203
      dev_children = []
4204

    
4205
    data = {
4206
      "iv_name": dev.iv_name,
4207
      "dev_type": dev.dev_type,
4208
      "logical_id": dev.logical_id,
4209
      "physical_id": dev.physical_id,
4210
      "pstatus": dev_pstatus,
4211
      "sstatus": dev_sstatus,
4212
      "children": dev_children,
4213
      }
4214

    
4215
    return data
4216

    
4217
  def Exec(self, feedback_fn):
4218
    """Gather and return data"""
4219
    result = {}
4220
    for instance in self.wanted_instances:
4221
      remote_info = rpc.call_instance_info(instance.primary_node,
4222
                                                instance.name)
4223
      if remote_info and "state" in remote_info:
4224
        remote_state = "up"
4225
      else:
4226
        remote_state = "down"
4227
      if instance.status == "down":
4228
        config_state = "down"
4229
      else:
4230
        config_state = "up"
4231

    
4232
      disks = [self._ComputeDiskStatus(instance, None, device)
4233
               for device in instance.disks]
4234

    
4235
      idict = {
4236
        "name": instance.name,
4237
        "config_state": config_state,
4238
        "run_state": remote_state,
4239
        "pnode": instance.primary_node,
4240
        "snodes": instance.secondary_nodes,
4241
        "os": instance.os,
4242
        "memory": instance.memory,
4243
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4244
        "disks": disks,
4245
        "vcpus": instance.vcpus,
4246
        }
4247

    
4248
      htkind = self.sstore.GetHypervisorType()
4249
      if htkind == constants.HT_XEN_PVM30:
4250
        idict["kernel_path"] = instance.kernel_path
4251
        idict["initrd_path"] = instance.initrd_path
4252

    
4253
      if htkind == constants.HT_XEN_HVM31:
4254
        idict["hvm_boot_order"] = instance.hvm_boot_order
4255
        idict["hvm_acpi"] = instance.hvm_acpi
4256
        idict["hvm_pae"] = instance.hvm_pae
4257
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4258

    
4259
      if htkind in constants.HTS_REQ_PORT:
4260
        idict["vnc_bind_address"] = instance.vnc_bind_address
4261
        idict["network_port"] = instance.network_port
4262

    
4263
      result[instance.name] = idict
4264

    
4265
    return result
4266

    
4267

    
4268
class LUSetInstanceParams(LogicalUnit):
4269
  """Modifies an instances's parameters.
4270

4271
  """
4272
  HPATH = "instance-modify"
4273
  HTYPE = constants.HTYPE_INSTANCE
4274
  _OP_REQP = ["instance_name"]
4275

    
4276
  def BuildHooksEnv(self):
4277
    """Build hooks env.
4278

4279
    This runs on the master, primary and secondaries.
4280

4281
    """
4282
    args = dict()
4283
    if self.mem:
4284
      args['memory'] = self.mem
4285
    if self.vcpus:
4286
      args['vcpus'] = self.vcpus
4287
    if self.do_ip or self.do_bridge or self.mac:
4288
      if self.do_ip:
4289
        ip = self.ip
4290
      else:
4291
        ip = self.instance.nics[0].ip
4292
      if self.bridge:
4293
        bridge = self.bridge
4294
      else:
4295
        bridge = self.instance.nics[0].bridge
4296
      if self.mac:
4297
        mac = self.mac
4298
      else:
4299
        mac = self.instance.nics[0].mac
4300
      args['nics'] = [(ip, bridge, mac)]
4301
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4302
    nl = [self.sstore.GetMasterNode(),
4303
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4304
    return env, nl, nl
4305

    
4306
  def CheckPrereq(self):
4307
    """Check prerequisites.
4308

4309
    This only checks the instance list against the existing names.
4310

4311
    """
4312
    self.mem = getattr(self.op, "mem", None)
4313
    self.vcpus = getattr(self.op, "vcpus", None)
4314
    self.ip = getattr(self.op, "ip", None)
4315
    self.mac = getattr(self.op, "mac", None)
4316
    self.bridge = getattr(self.op, "bridge", None)
4317
    self.kernel_path = getattr(self.op, "kernel_path", None)
4318
    self.initrd_path = getattr(self.op, "initrd_path", None)
4319
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4320
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4321
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4322
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4323
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4324
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4325
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4326
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4327
                 self.vnc_bind_address]
4328
    if all_parms.count(None) == len(all_parms):
4329
      raise errors.OpPrereqError("No changes submitted")
4330
    if self.mem is not None:
4331
      try:
4332
        self.mem = int(self.mem)
4333
      except ValueError, err:
4334
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4335
    if self.vcpus is not None:
4336
      try:
4337
        self.vcpus = int(self.vcpus)
4338
      except ValueError, err:
4339
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4340
    if self.ip is not None:
4341
      self.do_ip = True
4342
      if self.ip.lower() == "none":
4343
        self.ip = None
4344
      else:
4345
        if not utils.IsValidIP(self.ip):
4346
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4347
    else:
4348
      self.do_ip = False
4349
    self.do_bridge = (self.bridge is not None)
4350
    if self.mac is not None:
4351
      if self.cfg.IsMacInUse(self.mac):
4352
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4353
                                   self.mac)
4354
      if not utils.IsValidMac(self.mac):
4355
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4356

    
4357
    if self.kernel_path is not None:
4358
      self.do_kernel_path = True
4359
      if self.kernel_path == constants.VALUE_NONE:
4360
        raise errors.OpPrereqError("Can't set instance to no kernel")
4361

    
4362
      if self.kernel_path != constants.VALUE_DEFAULT:
4363
        if not os.path.isabs(self.kernel_path):
4364
          raise errors.OpPrereqError("The kernel path must be an absolute"
4365
                                    " filename")
4366
    else:
4367
      self.do_kernel_path = False
4368

    
4369
    if self.initrd_path is not None:
4370
      self.do_initrd_path = True
4371
      if self.initrd_path not in (constants.VALUE_NONE,
4372
                                  constants.VALUE_DEFAULT):
4373
        if not os.path.isabs(self.initrd_path):
4374
          raise errors.OpPrereqError("The initrd path must be an absolute"
4375
                                    " filename")
4376
    else:
4377
      self.do_initrd_path = False
4378

    
4379
    # boot order verification
4380
    if self.hvm_boot_order is not None:
4381
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4382
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4383
          raise errors.OpPrereqError("invalid boot order specified,"
4384
                                     " must be one or more of [acdn]"
4385
                                     " or 'default'")
4386

    
4387
    # hvm_cdrom_image_path verification
4388
    if self.op.hvm_cdrom_image_path is not None:
4389
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4390
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4391
                                   " be an absolute path or None, not %s" %
4392
                                   self.op.hvm_cdrom_image_path)
4393
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4394
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4395
                                   " regular file or a symlink pointing to"
4396
                                   " an existing regular file, not %s" %
4397
                                   self.op.hvm_cdrom_image_path)
4398

    
4399
    # vnc_bind_address verification
4400
    if self.op.vnc_bind_address is not None:
4401
      if not utils.IsValidIP(self.op.vnc_bind_address):
4402
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4403
                                   " like a valid IP address" %
4404
                                   self.op.vnc_bind_address)
4405

    
4406
    instance = self.cfg.GetInstanceInfo(
4407
      self.cfg.ExpandInstanceName(self.op.instance_name))
4408
    if instance is None:
4409
      raise errors.OpPrereqError("No such instance name '%s'" %
4410
                                 self.op.instance_name)
4411
    self.op.instance_name = instance.name
4412
    self.instance = instance
4413
    return
4414

    
4415
  def Exec(self, feedback_fn):
4416
    """Modifies an instance.
4417

4418
    All parameters take effect only at the next restart of the instance.
4419
    """
4420
    result = []
4421
    instance = self.instance
4422
    if self.mem:
4423
      instance.memory = self.mem
4424
      result.append(("mem", self.mem))
4425
    if self.vcpus:
4426
      instance.vcpus = self.vcpus
4427
      result.append(("vcpus",  self.vcpus))
4428
    if self.do_ip:
4429
      instance.nics[0].ip = self.ip
4430
      result.append(("ip", self.ip))
4431
    if self.bridge:
4432
      instance.nics[0].bridge = self.bridge
4433
      result.append(("bridge", self.bridge))
4434
    if self.mac:
4435
      instance.nics[0].mac = self.mac
4436
      result.append(("mac", self.mac))
4437
    if self.do_kernel_path:
4438
      instance.kernel_path = self.kernel_path
4439
      result.append(("kernel_path", self.kernel_path))
4440
    if self.do_initrd_path:
4441
      instance.initrd_path = self.initrd_path
4442
      result.append(("initrd_path", self.initrd_path))
4443
    if self.hvm_boot_order:
4444
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4445
        instance.hvm_boot_order = None
4446
      else:
4447
        instance.hvm_boot_order = self.hvm_boot_order
4448
      result.append(("hvm_boot_order", self.hvm_boot_order))
4449
    if self.hvm_acpi:
4450
      instance.hvm_acpi = self.hvm_acpi
4451
      result.append(("hvm_acpi", self.hvm_acpi))
4452
    if self.hvm_pae:
4453
      instance.hvm_pae = self.hvm_pae
4454
      result.append(("hvm_pae", self.hvm_pae))
4455
    if self.hvm_cdrom_image_path:
4456
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4457
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4458
    if self.vnc_bind_address:
4459
      instance.vnc_bind_address = self.vnc_bind_address
4460
      result.append(("vnc_bind_address", self.vnc_bind_address))
4461

    
4462
    self.cfg.AddInstance(instance)
4463

    
4464
    return result
4465

    
4466

    
4467
class LUQueryExports(NoHooksLU):
4468
  """Query the exports list
4469

4470
  """
4471
  _OP_REQP = []
4472

    
4473
  def CheckPrereq(self):
4474
    """Check that the nodelist contains only existing nodes.
4475

4476
    """
4477
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4478

    
4479
  def Exec(self, feedback_fn):
4480
    """Compute the list of all the exported system images.
4481

4482
    Returns:
4483
      a dictionary with the structure node->(export-list)
4484
      where export-list is a list of the instances exported on
4485
      that node.
4486

4487
    """
4488
    return rpc.call_export_list(self.nodes)
4489

    
4490

    
4491
class LUExportInstance(LogicalUnit):
4492
  """Export an instance to an image in the cluster.
4493

4494
  """
4495
  HPATH = "instance-export"
4496
  HTYPE = constants.HTYPE_INSTANCE
4497
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4498

    
4499
  def BuildHooksEnv(self):
4500
    """Build hooks env.
4501

4502
    This will run on the master, primary node and target node.
4503

4504
    """
4505
    env = {
4506
      "EXPORT_NODE": self.op.target_node,
4507
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4508
      }
4509
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4510
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4511
          self.op.target_node]
4512
    return env, nl, nl
4513

    
4514
  def CheckPrereq(self):
4515
    """Check prerequisites.
4516

4517
    This checks that the instance and node names are valid.
4518

4519
    """
4520
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4521
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4522
    if self.instance is None:
4523
      raise errors.OpPrereqError("Instance '%s' not found" %
4524
                                 self.op.instance_name)
4525

    
4526
    # node verification
4527
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4528
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4529

    
4530
    if self.dst_node is None:
4531
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4532
                                 self.op.target_node)
4533
    self.op.target_node = self.dst_node.name
4534

    
4535
    # instance disk type verification
4536
    for disk in self.instance.disks:
4537
      if disk.dev_type == constants.LD_FILE:
4538
        raise errors.OpPrereqError("Export not supported for instances with"
4539
                                   " file-based disks")
4540

    
4541
  def Exec(self, feedback_fn):
4542
    """Export an instance to an image in the cluster.
4543

4544
    """
4545
    instance = self.instance
4546
    dst_node = self.dst_node
4547
    src_node = instance.primary_node
4548
    if self.op.shutdown:
4549
      # shutdown the instance, but not the disks
4550
      if not rpc.call_instance_shutdown(src_node, instance):
4551
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4552
                                  (instance.name, src_node))
4553

    
4554
    vgname = self.cfg.GetVGName()
4555

    
4556
    snap_disks = []
4557

    
4558
    try:
4559
      for disk in instance.disks:
4560
        if disk.iv_name == "sda":
4561
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4562
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4563

    
4564
          if not new_dev_name:
4565
            logger.Error("could not snapshot block device %s on node %s" %
4566
                         (disk.logical_id[1], src_node))
4567
          else:
4568
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4569
                                      logical_id=(vgname, new_dev_name),
4570
                                      physical_id=(vgname, new_dev_name),
4571
                                      iv_name=disk.iv_name)
4572
            snap_disks.append(new_dev)
4573

    
4574
    finally:
4575
      if self.op.shutdown and instance.status == "up":
4576
        if not rpc.call_instance_start(src_node, instance, None):
4577
          _ShutdownInstanceDisks(instance, self.cfg)
4578
          raise errors.OpExecError("Could not start instance")
4579

    
4580
    # TODO: check for size
4581

    
4582
    for dev in snap_disks:
4583
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4584
        logger.Error("could not export block device %s from node %s to node %s"
4585
                     % (dev.logical_id[1], src_node, dst_node.name))
4586
      if not rpc.call_blockdev_remove(src_node, dev):
4587
        logger.Error("could not remove snapshot block device %s from node %s" %
4588
                     (dev.logical_id[1], src_node))
4589

    
4590
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4591
      logger.Error("could not finalize export for instance %s on node %s" %
4592
                   (instance.name, dst_node.name))
4593

    
4594
    nodelist = self.cfg.GetNodeList()
4595
    nodelist.remove(dst_node.name)
4596

    
4597
    # on one-node clusters nodelist will be empty after the removal
4598
    # if we proceed the backup would be removed because OpQueryExports
4599
    # substitutes an empty list with the full cluster node list.
4600
    if nodelist:
4601
      op = opcodes.OpQueryExports(nodes=nodelist)
4602
      exportlist = self.proc.ChainOpCode(op)
4603
      for node in exportlist:
4604
        if instance.name in exportlist[node]:
4605
          if not rpc.call_export_remove(node, instance.name):
4606
            logger.Error("could not remove older export for instance %s"
4607
                         " on node %s" % (instance.name, node))
4608

    
4609

    
4610
class LURemoveExport(NoHooksLU):
4611
  """Remove exports related to the named instance.
4612

4613
  """
4614
  _OP_REQP = ["instance_name"]
4615

    
4616
  def CheckPrereq(self):
4617
    """Check prerequisites.
4618
    """
4619
    pass
4620

    
4621
  def Exec(self, feedback_fn):
4622
    """Remove any export.
4623

4624
    """
4625
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4626
    # If the instance was not found we'll try with the name that was passed in.
4627
    # This will only work if it was an FQDN, though.
4628
    fqdn_warn = False
4629
    if not instance_name:
4630
      fqdn_warn = True
4631
      instance_name = self.op.instance_name
4632

    
4633
    op = opcodes.OpQueryExports(nodes=[])
4634
    exportlist = self.proc.ChainOpCode(op)
4635
    found = False
4636
    for node in exportlist:
4637
      if instance_name in exportlist[node]:
4638
        found = True
4639
        if not rpc.call_export_remove(node, instance_name):
4640
          logger.Error("could not remove export for instance %s"
4641
                       " on node %s" % (instance_name, node))
4642

    
4643
    if fqdn_warn and not found:
4644
      feedback_fn("Export not found. If trying to remove an export belonging"
4645
                  " to a deleted instance please use its Fully Qualified"
4646
                  " Domain Name.")
4647

    
4648

    
4649
class TagsLU(NoHooksLU):
4650
  """Generic tags LU.
4651

4652
  This is an abstract class which is the parent of all the other tags LUs.
4653

4654
  """
4655
  def CheckPrereq(self):
4656
    """Check prerequisites.
4657

4658
    """
4659
    if self.op.kind == constants.TAG_CLUSTER:
4660
      self.target = self.cfg.GetClusterInfo()
4661
    elif self.op.kind == constants.TAG_NODE:
4662
      name = self.cfg.ExpandNodeName(self.op.name)
4663
      if name is None:
4664
        raise errors.OpPrereqError("Invalid node name (%s)" %
4665
                                   (self.op.name,))
4666
      self.op.name = name
4667
      self.target = self.cfg.GetNodeInfo(name)
4668
    elif self.op.kind == constants.TAG_INSTANCE:
4669
      name = self.cfg.ExpandInstanceName(self.op.name)
4670
      if name is None:
4671
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4672
                                   (self.op.name,))
4673
      self.op.name = name
4674
      self.target = self.cfg.GetInstanceInfo(name)
4675
    else:
4676
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4677
                                 str(self.op.kind))
4678

    
4679

    
4680
class LUGetTags(TagsLU):
4681
  """Returns the tags of a given object.
4682

4683
  """
4684
  _OP_REQP = ["kind", "name"]
4685

    
4686
  def Exec(self, feedback_fn):
4687
    """Returns the tag list.
4688

4689
    """
4690
    return self.target.GetTags()
4691

    
4692

    
4693
class LUSearchTags(NoHooksLU):
4694
  """Searches the tags for a given pattern.
4695

4696
  """
4697
  _OP_REQP = ["pattern"]
4698

    
4699
  def CheckPrereq(self):
4700
    """Check prerequisites.
4701

4702
    This checks the pattern passed for validity by compiling it.
4703

4704
    """
4705
    try:
4706
      self.re = re.compile(self.op.pattern)
4707
    except re.error, err:
4708
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4709
                                 (self.op.pattern, err))
4710

    
4711
  def Exec(self, feedback_fn):
4712
    """Returns the tag list.
4713

4714
    """
4715
    cfg = self.cfg
4716
    tgts = [("/cluster", cfg.GetClusterInfo())]
4717
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4718
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4719
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4720
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4721
    results = []
4722
    for path, target in tgts:
4723
      for tag in target.GetTags():
4724
        if self.re.search(tag):
4725
          results.append((path, tag))
4726
    return results
4727

    
4728

    
4729
class LUAddTags(TagsLU):
4730
  """Sets a tag on a given object.
4731

4732
  """
4733
  _OP_REQP = ["kind", "name", "tags"]
4734

    
4735
  def CheckPrereq(self):
4736
    """Check prerequisites.
4737

4738
    This checks the type and length of the tag name and value.
4739

4740
    """
4741
    TagsLU.CheckPrereq(self)
4742
    for tag in self.op.tags:
4743
      objects.TaggableObject.ValidateTag(tag)
4744

    
4745
  def Exec(self, feedback_fn):
4746
    """Sets the tag.
4747

4748
    """
4749
    try:
4750
      for tag in self.op.tags:
4751
        self.target.AddTag(tag)
4752
    except errors.TagError, err:
4753
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4754
    try:
4755
      self.cfg.Update(self.target)
4756
    except errors.ConfigurationError:
4757
      raise errors.OpRetryError("There has been a modification to the"
4758
                                " config file and the operation has been"
4759
                                " aborted. Please retry.")
4760

    
4761

    
4762
class LUDelTags(TagsLU):
4763
  """Delete a list of tags from a given object.
4764

4765
  """
4766
  _OP_REQP = ["kind", "name", "tags"]
4767

    
4768
  def CheckPrereq(self):
4769
    """Check prerequisites.
4770

4771
    This checks that we have the given tag.
4772

4773
    """
4774
    TagsLU.CheckPrereq(self)
4775
    for tag in self.op.tags:
4776
      objects.TaggableObject.ValidateTag(tag)
4777
    del_tags = frozenset(self.op.tags)
4778
    cur_tags = self.target.GetTags()
4779
    if not del_tags <= cur_tags:
4780
      diff_tags = del_tags - cur_tags
4781
      diff_names = ["'%s'" % tag for tag in diff_tags]
4782
      diff_names.sort()
4783
      raise errors.OpPrereqError("Tag(s) %s not found" %
4784
                                 (",".join(diff_names)))
4785

    
4786
  def Exec(self, feedback_fn):
4787
    """Remove the tag from the object.
4788

4789
    """
4790
    for tag in self.op.tags:
4791
      self.target.RemoveTag(tag)
4792
    try:
4793
      self.cfg.Update(self.target)
4794
    except errors.ConfigurationError:
4795
      raise errors.OpRetryError("There has been a modification to the"
4796
                                " config file and the operation has been"
4797
                                " aborted. Please retry.")
4798

    
4799
class LUTestDelay(NoHooksLU):
4800
  """Sleep for a specified amount of time.
4801

4802
  This LU sleeps on the master and/or nodes for a specified amoutn of
4803
  time.
4804

4805
  """
4806
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4807

    
4808
  def CheckPrereq(self):
4809
    """Check prerequisites.
4810

4811
    This checks that we have a good list of nodes and/or the duration
4812
    is valid.
4813

4814
    """
4815

    
4816
    if self.op.on_nodes:
4817
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4818

    
4819
  def Exec(self, feedback_fn):
4820
    """Do the actual sleep.
4821

4822
    """
4823
    if self.op.on_master:
4824
      if not utils.TestDelay(self.op.duration):
4825
        raise errors.OpExecError("Error during master delay test")
4826
    if self.op.on_nodes:
4827
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4828
      if not result:
4829
        raise errors.OpExecError("Complete failure from rpc call")
4830
      for node, node_result in result.items():
4831
        if not node_result:
4832
          raise errors.OpExecError("Failure during rpc call to node %s,"
4833
                                   " result: %s" % (node, node_result))
4834

    
4835

    
4836
class IAllocator(object):
4837
  """IAllocator framework.
4838

4839
  An IAllocator instance has three sets of attributes:
4840
    - cfg/sstore that are needed to query the cluster
4841
    - input data (all members of the _KEYS class attribute are required)
4842
    - four buffer attributes (in|out_data|text), that represent the
4843
      input (to the external script) in text and data structure format,
4844
      and the output from it, again in two formats
4845
    - the result variables from the script (success, info, nodes) for
4846
      easy usage
4847

4848
  """
4849
  _ALLO_KEYS = [
4850
    "mem_size", "disks", "disk_template",
4851
    "os", "tags", "nics", "vcpus",
4852
    ]
4853
  _RELO_KEYS = [
4854
    "relocate_from",
4855
    ]
4856

    
4857
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4858
    self.cfg = cfg
4859
    self.sstore = sstore
4860
    # init buffer variables
4861
    self.in_text = self.out_text = self.in_data = self.out_data = None
4862
    # init all input fields so that pylint is happy
4863
    self.mode = mode
4864
    self.name = name
4865
    self.mem_size = self.disks = self.disk_template = None
4866
    self.os = self.tags = self.nics = self.vcpus = None
4867
    self.relocate_from = None
4868
    # computed fields
4869
    self.required_nodes = None
4870
    # init result fields
4871
    self.success = self.info = self.nodes = None
4872
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4873
      keyset = self._ALLO_KEYS
4874
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4875
      keyset = self._RELO_KEYS
4876
    else:
4877
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4878
                                   " IAllocator" % self.mode)
4879
    for key in kwargs:
4880
      if key not in keyset:
4881
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4882
                                     " IAllocator" % key)
4883
      setattr(self, key, kwargs[key])
4884
    for key in keyset:
4885
      if key not in kwargs:
4886
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4887
                                     " IAllocator" % key)
4888
    self._BuildInputData()
4889

    
4890
  def _ComputeClusterData(self):
4891
    """Compute the generic allocator input data.
4892

4893
    This is the data that is independent of the actual operation.
4894

4895
    """
4896
    cfg = self.cfg
4897
    # cluster data
4898
    data = {
4899
      "version": 1,
4900
      "cluster_name": self.sstore.GetClusterName(),
4901
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4902
      "hypervisor_type": self.sstore.GetHypervisorType(),
4903
      # we don't have job IDs
4904
      }
4905

    
4906
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4907

    
4908
    # node data
4909
    node_results = {}
4910
    node_list = cfg.GetNodeList()
4911
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4912
    for nname in node_list:
4913
      ninfo = cfg.GetNodeInfo(nname)
4914
      if nname not in node_data or not isinstance(node_data[nname], dict):
4915
        raise errors.OpExecError("Can't get data for node %s" % nname)
4916
      remote_info = node_data[nname]
4917
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4918
                   'vg_size', 'vg_free', 'cpu_total']:
4919
        if attr not in remote_info:
4920
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4921
                                   (nname, attr))
4922
        try:
4923
          remote_info[attr] = int(remote_info[attr])
4924
        except ValueError, err:
4925
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4926
                                   " %s" % (nname, attr, str(err)))
4927
      # compute memory used by primary instances
4928
      i_p_mem = i_p_up_mem = 0
4929
      for iinfo in i_list:
4930
        if iinfo.primary_node == nname:
4931
          i_p_mem += iinfo.memory
4932
          if iinfo.status == "up":
4933
            i_p_up_mem += iinfo.memory
4934

    
4935
      # compute memory used by instances
4936
      pnr = {
4937
        "tags": list(ninfo.GetTags()),
4938
        "total_memory": remote_info['memory_total'],
4939
        "reserved_memory": remote_info['memory_dom0'],
4940
        "free_memory": remote_info['memory_free'],
4941
        "i_pri_memory": i_p_mem,
4942
        "i_pri_up_memory": i_p_up_mem,
4943
        "total_disk": remote_info['vg_size'],
4944
        "free_disk": remote_info['vg_free'],
4945
        "primary_ip": ninfo.primary_ip,
4946
        "secondary_ip": ninfo.secondary_ip,
4947
        "total_cpus": remote_info['cpu_total'],
4948
        }
4949
      node_results[nname] = pnr
4950
    data["nodes"] = node_results
4951

    
4952
    # instance data
4953
    instance_data = {}
4954
    for iinfo in i_list:
4955
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4956
                  for n in iinfo.nics]
4957
      pir = {
4958
        "tags": list(iinfo.GetTags()),
4959
        "should_run": iinfo.status == "up",
4960
        "vcpus": iinfo.vcpus,
4961
        "memory": iinfo.memory,
4962
        "os": iinfo.os,
4963
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4964
        "nics": nic_data,
4965
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4966
        "disk_template": iinfo.disk_template,
4967
        }
4968
      instance_data[iinfo.name] = pir
4969

    
4970
    data["instances"] = instance_data
4971

    
4972
    self.in_data = data
4973

    
4974
  def _AddNewInstance(self):
4975
    """Add new instance data to allocator structure.
4976

4977
    This in combination with _AllocatorGetClusterData will create the
4978
    correct structure needed as input for the allocator.
4979

4980
    The checks for the completeness of the opcode must have already been
4981
    done.
4982

4983
    """
4984
    data = self.in_data
4985
    if len(self.disks) != 2:
4986
      raise errors.OpExecError("Only two-disk configurations supported")
4987

    
4988
    disk_space = _ComputeDiskSize(self.disk_template,
4989
                                  self.disks[0]["size"], self.disks[1]["size"])
4990

    
4991
    if self.disk_template in constants.DTS_NET_MIRROR:
4992
      self.required_nodes = 2
4993
    else:
4994
      self.required_nodes = 1
4995
    request = {
4996
      "type": "allocate",
4997
      "name": self.name,
4998
      "disk_template": self.disk_template,
4999
      "tags": self.tags,
5000
      "os": self.os,
5001
      "vcpus": self.vcpus,
5002
      "memory": self.mem_size,
5003
      "disks": self.disks,
5004
      "disk_space_total": disk_space,
5005
      "nics": self.nics,
5006
      "required_nodes": self.required_nodes,
5007
      }
5008
    data["request"] = request
5009

    
5010
  def _AddRelocateInstance(self):
5011
    """Add relocate instance data to allocator structure.
5012

5013
    This in combination with _IAllocatorGetClusterData will create the
5014
    correct structure needed as input for the allocator.
5015

5016
    The checks for the completeness of the opcode must have already been
5017
    done.
5018

5019
    """
5020
    instance = self.cfg.GetInstanceInfo(self.name)
5021
    if instance is None:
5022
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5023
                                   " IAllocator" % self.name)
5024

    
5025
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5026
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5027

    
5028
    if len(instance.secondary_nodes) != 1:
5029
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5030

    
5031
    self.required_nodes = 1
5032

    
5033
    disk_space = _ComputeDiskSize(instance.disk_template,
5034
                                  instance.disks[0].size,
5035
                                  instance.disks[1].size)
5036

    
5037
    request = {
5038
      "type": "relocate",
5039
      "name": self.name,
5040
      "disk_space_total": disk_space,
5041
      "required_nodes": self.required_nodes,
5042
      "relocate_from": self.relocate_from,
5043
      }
5044
    self.in_data["request"] = request
5045

    
5046
  def _BuildInputData(self):
5047
    """Build input data structures.
5048

5049
    """
5050
    self._ComputeClusterData()
5051

    
5052
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5053
      self._AddNewInstance()
5054
    else:
5055
      self._AddRelocateInstance()
5056

    
5057
    self.in_text = serializer.Dump(self.in_data)
5058

    
5059
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5060
    """Run an instance allocator and return the results.
5061

5062
    """
5063
    data = self.in_text
5064

    
5065
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5066

    
5067
    if not isinstance(result, tuple) or len(result) != 4:
5068
      raise errors.OpExecError("Invalid result from master iallocator runner")
5069

    
5070
    rcode, stdout, stderr, fail = result
5071

    
5072
    if rcode == constants.IARUN_NOTFOUND:
5073
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5074
    elif rcode == constants.IARUN_FAILURE:
5075
        raise errors.OpExecError("Instance allocator call failed: %s,"
5076
                                 " output: %s" %
5077
                                 (fail, stdout+stderr))
5078
    self.out_text = stdout
5079
    if validate:
5080
      self._ValidateResult()
5081

    
5082
  def _ValidateResult(self):
5083
    """Process the allocator results.
5084

5085
    This will process and if successful save the result in
5086
    self.out_data and the other parameters.
5087

5088
    """
5089
    try:
5090
      rdict = serializer.Load(self.out_text)
5091
    except Exception, err:
5092
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5093

    
5094
    if not isinstance(rdict, dict):
5095
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5096

    
5097
    for key in "success", "info", "nodes":
5098
      if key not in rdict:
5099
        raise errors.OpExecError("Can't parse iallocator results:"
5100
                                 " missing key '%s'" % key)
5101
      setattr(self, key, rdict[key])
5102

    
5103
    if not isinstance(rdict["nodes"], list):
5104
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5105
                               " is not a list")
5106
    self.out_data = rdict
5107

    
5108

    
5109
class LUTestAllocator(NoHooksLU):
5110
  """Run allocator tests.
5111

5112
  This LU runs the allocator tests
5113

5114
  """
5115
  _OP_REQP = ["direction", "mode", "name"]
5116

    
5117
  def CheckPrereq(self):
5118
    """Check prerequisites.
5119

5120
    This checks the opcode parameters depending on the director and mode test.
5121

5122
    """
5123
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5124
      for attr in ["name", "mem_size", "disks", "disk_template",
5125
                   "os", "tags", "nics", "vcpus"]:
5126
        if not hasattr(self.op, attr):
5127
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5128
                                     attr)
5129
      iname = self.cfg.ExpandInstanceName(self.op.name)
5130
      if iname is not None:
5131
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5132
                                   iname)
5133
      if not isinstance(self.op.nics, list):
5134
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5135
      for row in self.op.nics:
5136
        if (not isinstance(row, dict) or
5137
            "mac" not in row or
5138
            "ip" not in row or
5139
            "bridge" not in row):
5140
          raise errors.OpPrereqError("Invalid contents of the"
5141
                                     " 'nics' parameter")
5142
      if not isinstance(self.op.disks, list):
5143
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5144
      if len(self.op.disks) != 2:
5145
        raise errors.OpPrereqError("Only two-disk configurations supported")
5146
      for row in self.op.disks:
5147
        if (not isinstance(row, dict) or
5148
            "size" not in row or
5149
            not isinstance(row["size"], int) or
5150
            "mode" not in row or
5151
            row["mode"] not in ['r', 'w']):
5152
          raise errors.OpPrereqError("Invalid contents of the"
5153
                                     " 'disks' parameter")
5154
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5155
      if not hasattr(self.op, "name"):
5156
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5157
      fname = self.cfg.ExpandInstanceName(self.op.name)
5158
      if fname is None:
5159
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5160
                                   self.op.name)
5161
      self.op.name = fname
5162
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5163
    else:
5164
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5165
                                 self.op.mode)
5166

    
5167
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5168
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5169
        raise errors.OpPrereqError("Missing allocator name")
5170
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5171
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5172
                                 self.op.direction)
5173

    
5174
  def Exec(self, feedback_fn):
5175
    """Run the allocator test.
5176

5177
    """
5178
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5179
      ial = IAllocator(self.cfg, self.sstore,
5180
                       mode=self.op.mode,
5181
                       name=self.op.name,
5182
                       mem_size=self.op.mem_size,
5183
                       disks=self.op.disks,
5184
                       disk_template=self.op.disk_template,
5185
                       os=self.op.os,
5186
                       tags=self.op.tags,
5187
                       nics=self.op.nics,
5188
                       vcpus=self.op.vcpus,
5189
                       )
5190
    else:
5191
      ial = IAllocator(self.cfg, self.sstore,
5192
                       mode=self.op.mode,
5193
                       name=self.op.name,
5194
                       relocate_from=list(self.relocate_from),
5195
                       )
5196

    
5197
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5198
      result = ial.in_text
5199
    else:
5200
      ial.Run(self.op.allocator, validate=False)
5201
      result = ial.out_text
5202
    return result