Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 41362e70

History | View | Annotate | Download (176.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544

    
545
    # set up the inter-node password and certificate
546
    _InitGanetiServerSetup(ss)
547

    
548
    # start the master ip
549
    rpc.call_node_start_master(hostname.name)
550

    
551
    # set up ssh config and /etc/hosts
552
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
553
    try:
554
      sshline = f.read()
555
    finally:
556
      f.close()
557
    sshkey = sshline.split(" ")[1]
558

    
559
    _AddHostToEtcHosts(hostname.name)
560
    _InitSSHSetup(hostname.name)
561

    
562
    # init of cluster config file
563
    self.cfg = cfgw = config.ConfigWriter()
564
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
565
                    sshkey, self.op.mac_prefix,
566
                    self.op.vg_name, self.op.def_bridge)
567

    
568
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
569

    
570

    
571
class LUDestroyCluster(NoHooksLU):
572
  """Logical unit for destroying the cluster.
573

574
  """
575
  _OP_REQP = []
576

    
577
  def CheckPrereq(self):
578
    """Check prerequisites.
579

580
    This checks whether the cluster is empty.
581

582
    Any errors are signalled by raising errors.OpPrereqError.
583

584
    """
585
    master = self.sstore.GetMasterNode()
586

    
587
    nodelist = self.cfg.GetNodeList()
588
    if len(nodelist) != 1 or nodelist[0] != master:
589
      raise errors.OpPrereqError("There are still %d node(s) in"
590
                                 " this cluster." % (len(nodelist) - 1))
591
    instancelist = self.cfg.GetInstanceList()
592
    if instancelist:
593
      raise errors.OpPrereqError("There are still %d instance(s) in"
594
                                 " this cluster." % len(instancelist))
595

    
596
  def Exec(self, feedback_fn):
597
    """Destroys the cluster.
598

599
    """
600
    master = self.sstore.GetMasterNode()
601
    if not rpc.call_node_stop_master(master):
602
      raise errors.OpExecError("Could not disable the master role")
603
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
604
    utils.CreateBackup(priv_key)
605
    utils.CreateBackup(pub_key)
606
    rpc.call_node_leave_cluster(master)
607

    
608

    
609
class LUVerifyCluster(LogicalUnit):
610
  """Verifies the cluster status.
611

612
  """
613
  HPATH = "cluster-verify"
614
  HTYPE = constants.HTYPE_CLUSTER
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    hyp_result = node_result.get('hypervisor', None)
683
    if hyp_result is not None:
684
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          needed_mem += instance_cfg[instance].memory
779
        if nodeinfo['mfree'] < needed_mem:
780
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
781
                      " failovers should node %s fail" % (node, prinode))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    Transform the list of checks we're going to skip into a set and check that
789
    all its members are valid.
790

791
    """
792
    self.skip_set = frozenset(self.op.skip_checks)
793
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
794
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
795

    
796
  def BuildHooksEnv(self):
797
    """Build hooks env.
798

799
    Cluster-Verify hooks just rone in the post phase and their failure makes
800
    the output be logged in the verify output and the verification to fail.
801

802
    """
803
    all_nodes = self.cfg.GetNodeList()
804
    # TODO: populate the environment with useful information for verify hooks
805
    env = {}
806
    return env, [], all_nodes
807

    
808
  def Exec(self, feedback_fn):
809
    """Verify integrity of cluster, performing various test on nodes.
810

811
    """
812
    bad = False
813
    feedback_fn("* Verifying global settings")
814
    for msg in self.cfg.VerifyConfig():
815
      feedback_fn("  - ERROR: %s" % msg)
816

    
817
    vg_name = self.cfg.GetVGName()
818
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
819
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
820
    i_non_redundant = [] # Non redundant instances
821
    node_volume = {}
822
    node_instance = {}
823
    node_info = {}
824
    instance_cfg = {}
825

    
826
    # FIXME: verify OS list
827
    # do local checksums
828
    file_names = list(self.sstore.GetFileList())
829
    file_names.append(constants.SSL_CERT_FILE)
830
    file_names.append(constants.CLUSTER_CONF_FILE)
831
    local_checksums = utils.FingerprintFiles(file_names)
832

    
833
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
834
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
835
    all_instanceinfo = rpc.call_instance_list(nodelist)
836
    all_vglist = rpc.call_vg_list(nodelist)
837
    node_verify_param = {
838
      'filelist': file_names,
839
      'nodelist': nodelist,
840
      'hypervisor': None,
841
      }
842
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
843
    all_rversion = rpc.call_version(nodelist)
844
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
845

    
846
    for node in nodelist:
847
      feedback_fn("* Verifying node %s" % node)
848
      result = self._VerifyNode(node, file_names, local_checksums,
849
                                all_vglist[node], all_nvinfo[node],
850
                                all_rversion[node], feedback_fn)
851
      bad = bad or result
852

    
853
      # node_volume
854
      volumeinfo = all_volumeinfo[node]
855

    
856
      if isinstance(volumeinfo, basestring):
857
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
858
                    (node, volumeinfo[-400:].encode('string_escape')))
859
        bad = True
860
        node_volume[node] = {}
861
      elif not isinstance(volumeinfo, dict):
862
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
863
        bad = True
864
        continue
865
      else:
866
        node_volume[node] = volumeinfo
867

    
868
      # node_instance
869
      nodeinstance = all_instanceinfo[node]
870
      if type(nodeinstance) != list:
871
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
872
        bad = True
873
        continue
874

    
875
      node_instance[node] = nodeinstance
876

    
877
      # node_info
878
      nodeinfo = all_ninfo[node]
879
      if not isinstance(nodeinfo, dict):
880
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
881
        bad = True
882
        continue
883

    
884
      try:
885
        node_info[node] = {
886
          "mfree": int(nodeinfo['memory_free']),
887
          "dfree": int(nodeinfo['vg_free']),
888
          "pinst": [],
889
          "sinst": [],
890
          # dictionary holding all instances this node is secondary for,
891
          # grouped by their primary node. Each key is a cluster node, and each
892
          # value is a list of instances which have the key as primary and the
893
          # current node as secondary.  this is handy to calculate N+1 memory
894
          # availability if you can only failover from a primary to its
895
          # secondary.
896
          "sinst-by-pnode": {},
897
        }
898
      except ValueError:
899
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
900
        bad = True
901
        continue
902

    
903
    node_vol_should = {}
904

    
905
    for instance in instancelist:
906
      feedback_fn("* Verifying instance %s" % instance)
907
      inst_config = self.cfg.GetInstanceInfo(instance)
908
      result =  self._VerifyInstance(instance, inst_config, node_volume,
909
                                     node_instance, feedback_fn)
910
      bad = bad or result
911

    
912
      inst_config.MapLVsByNode(node_vol_should)
913

    
914
      instance_cfg[instance] = inst_config
915

    
916
      pnode = inst_config.primary_node
917
      if pnode in node_info:
918
        node_info[pnode]['pinst'].append(instance)
919
      else:
920
        feedback_fn("  - ERROR: instance %s, connection to primary node"
921
                    " %s failed" % (instance, pnode))
922
        bad = True
923

    
924
      # If the instance is non-redundant we cannot survive losing its primary
925
      # node, so we are not N+1 compliant. On the other hand we have no disk
926
      # templates with more than one secondary so that situation is not well
927
      # supported either.
928
      # FIXME: does not support file-backed instances
929
      if len(inst_config.secondary_nodes) == 0:
930
        i_non_redundant.append(instance)
931
      elif len(inst_config.secondary_nodes) > 1:
932
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
933
                    % instance)
934

    
935
      for snode in inst_config.secondary_nodes:
936
        if snode in node_info:
937
          node_info[snode]['sinst'].append(instance)
938
          if pnode not in node_info[snode]['sinst-by-pnode']:
939
            node_info[snode]['sinst-by-pnode'][pnode] = []
940
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
941
        else:
942
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
943
                      " %s failed" % (instance, snode))
944

    
945
    feedback_fn("* Verifying orphan volumes")
946
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
947
                                       feedback_fn)
948
    bad = bad or result
949

    
950
    feedback_fn("* Verifying remaining instances")
951
    result = self._VerifyOrphanInstances(instancelist, node_instance,
952
                                         feedback_fn)
953
    bad = bad or result
954

    
955
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
956
      feedback_fn("* Verifying N+1 Memory redundancy")
957
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
958
      bad = bad or result
959

    
960
    feedback_fn("* Other Notes")
961
    if i_non_redundant:
962
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
963
                  % len(i_non_redundant))
964

    
965
    return int(bad)
966

    
967
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
968
    """Analize the post-hooks' result, handle it, and send some
969
    nicely-formatted feedback back to the user.
970

971
    Args:
972
      phase: the hooks phase that has just been run
973
      hooks_results: the results of the multi-node hooks rpc call
974
      feedback_fn: function to send feedback back to the caller
975
      lu_result: previous Exec result
976

977
    """
978
    # We only really run POST phase hooks, and are only interested in their results
979
    if phase == constants.HOOKS_PHASE_POST:
980
      # Used to change hooks' output to proper indentation
981
      indent_re = re.compile('^', re.M)
982
      feedback_fn("* Hooks Results")
983
      if not hooks_results:
984
        feedback_fn("  - ERROR: general communication failure")
985
        lu_result = 1
986
      else:
987
        for node_name in hooks_results:
988
          show_node_header = True
989
          res = hooks_results[node_name]
990
          if res is False or not isinstance(res, list):
991
            feedback_fn("    Communication failure")
992
            lu_result = 1
993
            continue
994
          for script, hkr, output in res:
995
            if hkr == constants.HKR_FAIL:
996
              # The node header is only shown once, if there are
997
              # failing hooks on that node
998
              if show_node_header:
999
                feedback_fn("  Node %s:" % node_name)
1000
                show_node_header = False
1001
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1002
              output = indent_re.sub('      ', output)
1003
              feedback_fn("%s" % output)
1004
              lu_result = 1
1005

    
1006
      return lu_result
1007

    
1008

    
1009
class LUVerifyDisks(NoHooksLU):
1010
  """Verifies the cluster disks status.
1011

1012
  """
1013
  _OP_REQP = []
1014

    
1015
  def CheckPrereq(self):
1016
    """Check prerequisites.
1017

1018
    This has no prerequisites.
1019

1020
    """
1021
    pass
1022

    
1023
  def Exec(self, feedback_fn):
1024
    """Verify integrity of cluster disks.
1025

1026
    """
1027
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1028

    
1029
    vg_name = self.cfg.GetVGName()
1030
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1031
    instances = [self.cfg.GetInstanceInfo(name)
1032
                 for name in self.cfg.GetInstanceList()]
1033

    
1034
    nv_dict = {}
1035
    for inst in instances:
1036
      inst_lvs = {}
1037
      if (inst.status != "up" or
1038
          inst.disk_template not in constants.DTS_NET_MIRROR):
1039
        continue
1040
      inst.MapLVsByNode(inst_lvs)
1041
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1042
      for node, vol_list in inst_lvs.iteritems():
1043
        for vol in vol_list:
1044
          nv_dict[(node, vol)] = inst
1045

    
1046
    if not nv_dict:
1047
      return result
1048

    
1049
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1050

    
1051
    to_act = set()
1052
    for node in nodes:
1053
      # node_volume
1054
      lvs = node_lvs[node]
1055

    
1056
      if isinstance(lvs, basestring):
1057
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1058
        res_nlvm[node] = lvs
1059
      elif not isinstance(lvs, dict):
1060
        logger.Info("connection to node %s failed or invalid data returned" %
1061
                    (node,))
1062
        res_nodes.append(node)
1063
        continue
1064

    
1065
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1066
        inst = nv_dict.pop((node, lv_name), None)
1067
        if (not lv_online and inst is not None
1068
            and inst.name not in res_instances):
1069
          res_instances.append(inst.name)
1070

    
1071
    # any leftover items in nv_dict are missing LVs, let's arrange the
1072
    # data better
1073
    for key, inst in nv_dict.iteritems():
1074
      if inst.name not in res_missing:
1075
        res_missing[inst.name] = []
1076
      res_missing[inst.name].append(key)
1077

    
1078
    return result
1079

    
1080

    
1081
class LURenameCluster(LogicalUnit):
1082
  """Rename the cluster.
1083

1084
  """
1085
  HPATH = "cluster-rename"
1086
  HTYPE = constants.HTYPE_CLUSTER
1087
  _OP_REQP = ["name"]
1088

    
1089
  def BuildHooksEnv(self):
1090
    """Build hooks env.
1091

1092
    """
1093
    env = {
1094
      "OP_TARGET": self.sstore.GetClusterName(),
1095
      "NEW_NAME": self.op.name,
1096
      }
1097
    mn = self.sstore.GetMasterNode()
1098
    return env, [mn], [mn]
1099

    
1100
  def CheckPrereq(self):
1101
    """Verify that the passed name is a valid one.
1102

1103
    """
1104
    hostname = utils.HostInfo(self.op.name)
1105

    
1106
    new_name = hostname.name
1107
    self.ip = new_ip = hostname.ip
1108
    old_name = self.sstore.GetClusterName()
1109
    old_ip = self.sstore.GetMasterIP()
1110
    if new_name == old_name and new_ip == old_ip:
1111
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1112
                                 " cluster has changed")
1113
    if new_ip != old_ip:
1114
      result = utils.RunCmd(["fping", "-q", new_ip])
1115
      if not result.failed:
1116
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1117
                                   " reachable on the network. Aborting." %
1118
                                   new_ip)
1119

    
1120
    self.op.name = new_name
1121

    
1122
  def Exec(self, feedback_fn):
1123
    """Rename the cluster.
1124

1125
    """
1126
    clustername = self.op.name
1127
    ip = self.ip
1128
    ss = self.sstore
1129

    
1130
    # shutdown the master IP
1131
    master = ss.GetMasterNode()
1132
    if not rpc.call_node_stop_master(master):
1133
      raise errors.OpExecError("Could not disable the master role")
1134

    
1135
    try:
1136
      # modify the sstore
1137
      ss.SetKey(ss.SS_MASTER_IP, ip)
1138
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1139

    
1140
      # Distribute updated ss config to all nodes
1141
      myself = self.cfg.GetNodeInfo(master)
1142
      dist_nodes = self.cfg.GetNodeList()
1143
      if myself.name in dist_nodes:
1144
        dist_nodes.remove(myself.name)
1145

    
1146
      logger.Debug("Copying updated ssconf data to all nodes")
1147
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1148
        fname = ss.KeyToFilename(keyname)
1149
        result = rpc.call_upload_file(dist_nodes, fname)
1150
        for to_node in dist_nodes:
1151
          if not result[to_node]:
1152
            logger.Error("copy of file %s to node %s failed" %
1153
                         (fname, to_node))
1154
    finally:
1155
      if not rpc.call_node_start_master(master):
1156
        logger.Error("Could not re-enable the master role on the master,"
1157
                     " please restart manually.")
1158

    
1159

    
1160
def _RecursiveCheckIfLVMBased(disk):
1161
  """Check if the given disk or its children are lvm-based.
1162

1163
  Args:
1164
    disk: ganeti.objects.Disk object
1165

1166
  Returns:
1167
    boolean indicating whether a LD_LV dev_type was found or not
1168

1169
  """
1170
  if disk.children:
1171
    for chdisk in disk.children:
1172
      if _RecursiveCheckIfLVMBased(chdisk):
1173
        return True
1174
  return disk.dev_type == constants.LD_LV
1175

    
1176

    
1177
class LUSetClusterParams(LogicalUnit):
1178
  """Change the parameters of the cluster.
1179

1180
  """
1181
  HPATH = "cluster-modify"
1182
  HTYPE = constants.HTYPE_CLUSTER
1183
  _OP_REQP = []
1184

    
1185
  def BuildHooksEnv(self):
1186
    """Build hooks env.
1187

1188
    """
1189
    env = {
1190
      "OP_TARGET": self.sstore.GetClusterName(),
1191
      "NEW_VG_NAME": self.op.vg_name,
1192
      }
1193
    mn = self.sstore.GetMasterNode()
1194
    return env, [mn], [mn]
1195

    
1196
  def CheckPrereq(self):
1197
    """Check prerequisites.
1198

1199
    This checks whether the given params don't conflict and
1200
    if the given volume group is valid.
1201

1202
    """
1203
    if not self.op.vg_name:
1204
      instances = [self.cfg.GetInstanceInfo(name)
1205
                   for name in self.cfg.GetInstanceList()]
1206
      for inst in instances:
1207
        for disk in inst.disks:
1208
          if _RecursiveCheckIfLVMBased(disk):
1209
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1210
                                       " lvm-based instances exist")
1211

    
1212
    # if vg_name not None, checks given volume group on all nodes
1213
    if self.op.vg_name:
1214
      node_list = self.cfg.GetNodeList()
1215
      vglist = rpc.call_vg_list(node_list)
1216
      for node in node_list:
1217
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1218
        if vgstatus:
1219
          raise errors.OpPrereqError("Error on node '%s': %s" %
1220
                                     (node, vgstatus))
1221

    
1222
  def Exec(self, feedback_fn):
1223
    """Change the parameters of the cluster.
1224

1225
    """
1226
    if self.op.vg_name != self.cfg.GetVGName():
1227
      self.cfg.SetVGName(self.op.vg_name)
1228
    else:
1229
      feedback_fn("Cluster LVM configuration already in desired"
1230
                  " state, not changing")
1231

    
1232

    
1233
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1234
  """Sleep and poll for an instance's disk to sync.
1235

1236
  """
1237
  if not instance.disks:
1238
    return True
1239

    
1240
  if not oneshot:
1241
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1242

    
1243
  node = instance.primary_node
1244

    
1245
  for dev in instance.disks:
1246
    cfgw.SetDiskID(dev, node)
1247

    
1248
  retries = 0
1249
  while True:
1250
    max_time = 0
1251
    done = True
1252
    cumul_degraded = False
1253
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1254
    if not rstats:
1255
      proc.LogWarning("Can't get any data from node %s" % node)
1256
      retries += 1
1257
      if retries >= 10:
1258
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1259
                                 " aborting." % node)
1260
      time.sleep(6)
1261
      continue
1262
    retries = 0
1263
    for i in range(len(rstats)):
1264
      mstat = rstats[i]
1265
      if mstat is None:
1266
        proc.LogWarning("Can't compute data for node %s/%s" %
1267
                        (node, instance.disks[i].iv_name))
1268
        continue
1269
      # we ignore the ldisk parameter
1270
      perc_done, est_time, is_degraded, _ = mstat
1271
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1272
      if perc_done is not None:
1273
        done = False
1274
        if est_time is not None:
1275
          rem_time = "%d estimated seconds remaining" % est_time
1276
          max_time = est_time
1277
        else:
1278
          rem_time = "no time estimate"
1279
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1280
                     (instance.disks[i].iv_name, perc_done, rem_time))
1281
    if done or oneshot:
1282
      break
1283

    
1284
    if unlock:
1285
      #utils.Unlock('cmd')
1286
      pass
1287
    try:
1288
      time.sleep(min(60, max_time))
1289
    finally:
1290
      if unlock:
1291
        #utils.Lock('cmd')
1292
        pass
1293

    
1294
  if done:
1295
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1296
  return not cumul_degraded
1297

    
1298

    
1299
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1300
  """Check that mirrors are not degraded.
1301

1302
  The ldisk parameter, if True, will change the test from the
1303
  is_degraded attribute (which represents overall non-ok status for
1304
  the device(s)) to the ldisk (representing the local storage status).
1305

1306
  """
1307
  cfgw.SetDiskID(dev, node)
1308
  if ldisk:
1309
    idx = 6
1310
  else:
1311
    idx = 5
1312

    
1313
  result = True
1314
  if on_primary or dev.AssembleOnSecondary():
1315
    rstats = rpc.call_blockdev_find(node, dev)
1316
    if not rstats:
1317
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1318
      result = False
1319
    else:
1320
      result = result and (not rstats[idx])
1321
  if dev.children:
1322
    for child in dev.children:
1323
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1324

    
1325
  return result
1326

    
1327

    
1328
class LUDiagnoseOS(NoHooksLU):
1329
  """Logical unit for OS diagnose/query.
1330

1331
  """
1332
  _OP_REQP = ["output_fields", "names"]
1333

    
1334
  def CheckPrereq(self):
1335
    """Check prerequisites.
1336

1337
    This always succeeds, since this is a pure query LU.
1338

1339
    """
1340
    if self.op.names:
1341
      raise errors.OpPrereqError("Selective OS query not supported")
1342

    
1343
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1344
    _CheckOutputFields(static=[],
1345
                       dynamic=self.dynamic_fields,
1346
                       selected=self.op.output_fields)
1347

    
1348
  @staticmethod
1349
  def _DiagnoseByOS(node_list, rlist):
1350
    """Remaps a per-node return list into an a per-os per-node dictionary
1351

1352
      Args:
1353
        node_list: a list with the names of all nodes
1354
        rlist: a map with node names as keys and OS objects as values
1355

1356
      Returns:
1357
        map: a map with osnames as keys and as value another map, with
1358
             nodes as
1359
             keys and list of OS objects as values
1360
             e.g. {"debian-etch": {"node1": [<object>,...],
1361
                                   "node2": [<object>,]}
1362
                  }
1363

1364
    """
1365
    all_os = {}
1366
    for node_name, nr in rlist.iteritems():
1367
      if not nr:
1368
        continue
1369
      for os_obj in nr:
1370
        if os_obj.name not in all_os:
1371
          # build a list of nodes for this os containing empty lists
1372
          # for each node in node_list
1373
          all_os[os_obj.name] = {}
1374
          for nname in node_list:
1375
            all_os[os_obj.name][nname] = []
1376
        all_os[os_obj.name][node_name].append(os_obj)
1377
    return all_os
1378

    
1379
  def Exec(self, feedback_fn):
1380
    """Compute the list of OSes.
1381

1382
    """
1383
    node_list = self.cfg.GetNodeList()
1384
    node_data = rpc.call_os_diagnose(node_list)
1385
    if node_data == False:
1386
      raise errors.OpExecError("Can't gather the list of OSes")
1387
    pol = self._DiagnoseByOS(node_list, node_data)
1388
    output = []
1389
    for os_name, os_data in pol.iteritems():
1390
      row = []
1391
      for field in self.op.output_fields:
1392
        if field == "name":
1393
          val = os_name
1394
        elif field == "valid":
1395
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1396
        elif field == "node_status":
1397
          val = {}
1398
          for node_name, nos_list in os_data.iteritems():
1399
            val[node_name] = [(v.status, v.path) for v in nos_list]
1400
        else:
1401
          raise errors.ParameterError(field)
1402
        row.append(val)
1403
      output.append(row)
1404

    
1405
    return output
1406

    
1407

    
1408
class LURemoveNode(LogicalUnit):
1409
  """Logical unit for removing a node.
1410

1411
  """
1412
  HPATH = "node-remove"
1413
  HTYPE = constants.HTYPE_NODE
1414
  _OP_REQP = ["node_name"]
1415

    
1416
  def BuildHooksEnv(self):
1417
    """Build hooks env.
1418

1419
    This doesn't run on the target node in the pre phase as a failed
1420
    node would not allows itself to run.
1421

1422
    """
1423
    env = {
1424
      "OP_TARGET": self.op.node_name,
1425
      "NODE_NAME": self.op.node_name,
1426
      }
1427
    all_nodes = self.cfg.GetNodeList()
1428
    all_nodes.remove(self.op.node_name)
1429
    return env, all_nodes, all_nodes
1430

    
1431
  def CheckPrereq(self):
1432
    """Check prerequisites.
1433

1434
    This checks:
1435
     - the node exists in the configuration
1436
     - it does not have primary or secondary instances
1437
     - it's not the master
1438

1439
    Any errors are signalled by raising errors.OpPrereqError.
1440

1441
    """
1442
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1443
    if node is None:
1444
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1445

    
1446
    instance_list = self.cfg.GetInstanceList()
1447

    
1448
    masternode = self.sstore.GetMasterNode()
1449
    if node.name == masternode:
1450
      raise errors.OpPrereqError("Node is the master node,"
1451
                                 " you need to failover first.")
1452

    
1453
    for instance_name in instance_list:
1454
      instance = self.cfg.GetInstanceInfo(instance_name)
1455
      if node.name == instance.primary_node:
1456
        raise errors.OpPrereqError("Instance %s still running on the node,"
1457
                                   " please remove first." % instance_name)
1458
      if node.name in instance.secondary_nodes:
1459
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1460
                                   " please remove first." % instance_name)
1461
    self.op.node_name = node.name
1462
    self.node = node
1463

    
1464
  def Exec(self, feedback_fn):
1465
    """Removes the node from the cluster.
1466

1467
    """
1468
    node = self.node
1469
    logger.Info("stopping the node daemon and removing configs from node %s" %
1470
                node.name)
1471

    
1472
    rpc.call_node_leave_cluster(node.name)
1473

    
1474
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1475

    
1476
    logger.Info("Removing node %s from config" % node.name)
1477

    
1478
    self.cfg.RemoveNode(node.name)
1479

    
1480
    _RemoveHostFromEtcHosts(node.name)
1481

    
1482

    
1483
class LUQueryNodes(NoHooksLU):
1484
  """Logical unit for querying nodes.
1485

1486
  """
1487
  _OP_REQP = ["output_fields", "names"]
1488

    
1489
  def CheckPrereq(self):
1490
    """Check prerequisites.
1491

1492
    This checks that the fields required are valid output fields.
1493

1494
    """
1495
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1496
                                     "mtotal", "mnode", "mfree",
1497
                                     "bootid"])
1498

    
1499
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1500
                               "pinst_list", "sinst_list",
1501
                               "pip", "sip"],
1502
                       dynamic=self.dynamic_fields,
1503
                       selected=self.op.output_fields)
1504

    
1505
    self.wanted = _GetWantedNodes(self, self.op.names)
1506

    
1507
  def Exec(self, feedback_fn):
1508
    """Computes the list of nodes and their attributes.
1509

1510
    """
1511
    nodenames = self.wanted
1512
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1513

    
1514
    # begin data gathering
1515

    
1516
    if self.dynamic_fields.intersection(self.op.output_fields):
1517
      live_data = {}
1518
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1519
      for name in nodenames:
1520
        nodeinfo = node_data.get(name, None)
1521
        if nodeinfo:
1522
          live_data[name] = {
1523
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1524
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1525
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1526
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1527
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1528
            "bootid": nodeinfo['bootid'],
1529
            }
1530
        else:
1531
          live_data[name] = {}
1532
    else:
1533
      live_data = dict.fromkeys(nodenames, {})
1534

    
1535
    node_to_primary = dict([(name, set()) for name in nodenames])
1536
    node_to_secondary = dict([(name, set()) for name in nodenames])
1537

    
1538
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1539
                             "sinst_cnt", "sinst_list"))
1540
    if inst_fields & frozenset(self.op.output_fields):
1541
      instancelist = self.cfg.GetInstanceList()
1542

    
1543
      for instance_name in instancelist:
1544
        inst = self.cfg.GetInstanceInfo(instance_name)
1545
        if inst.primary_node in node_to_primary:
1546
          node_to_primary[inst.primary_node].add(inst.name)
1547
        for secnode in inst.secondary_nodes:
1548
          if secnode in node_to_secondary:
1549
            node_to_secondary[secnode].add(inst.name)
1550

    
1551
    # end data gathering
1552

    
1553
    output = []
1554
    for node in nodelist:
1555
      node_output = []
1556
      for field in self.op.output_fields:
1557
        if field == "name":
1558
          val = node.name
1559
        elif field == "pinst_list":
1560
          val = list(node_to_primary[node.name])
1561
        elif field == "sinst_list":
1562
          val = list(node_to_secondary[node.name])
1563
        elif field == "pinst_cnt":
1564
          val = len(node_to_primary[node.name])
1565
        elif field == "sinst_cnt":
1566
          val = len(node_to_secondary[node.name])
1567
        elif field == "pip":
1568
          val = node.primary_ip
1569
        elif field == "sip":
1570
          val = node.secondary_ip
1571
        elif field in self.dynamic_fields:
1572
          val = live_data[node.name].get(field, None)
1573
        else:
1574
          raise errors.ParameterError(field)
1575
        node_output.append(val)
1576
      output.append(node_output)
1577

    
1578
    return output
1579

    
1580

    
1581
class LUQueryNodeVolumes(NoHooksLU):
1582
  """Logical unit for getting volumes on node(s).
1583

1584
  """
1585
  _OP_REQP = ["nodes", "output_fields"]
1586

    
1587
  def CheckPrereq(self):
1588
    """Check prerequisites.
1589

1590
    This checks that the fields required are valid output fields.
1591

1592
    """
1593
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1594

    
1595
    _CheckOutputFields(static=["node"],
1596
                       dynamic=["phys", "vg", "name", "size", "instance"],
1597
                       selected=self.op.output_fields)
1598

    
1599

    
1600
  def Exec(self, feedback_fn):
1601
    """Computes the list of nodes and their attributes.
1602

1603
    """
1604
    nodenames = self.nodes
1605
    volumes = rpc.call_node_volumes(nodenames)
1606

    
1607
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1608
             in self.cfg.GetInstanceList()]
1609

    
1610
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1611

    
1612
    output = []
1613
    for node in nodenames:
1614
      if node not in volumes or not volumes[node]:
1615
        continue
1616

    
1617
      node_vols = volumes[node][:]
1618
      node_vols.sort(key=lambda vol: vol['dev'])
1619

    
1620
      for vol in node_vols:
1621
        node_output = []
1622
        for field in self.op.output_fields:
1623
          if field == "node":
1624
            val = node
1625
          elif field == "phys":
1626
            val = vol['dev']
1627
          elif field == "vg":
1628
            val = vol['vg']
1629
          elif field == "name":
1630
            val = vol['name']
1631
          elif field == "size":
1632
            val = int(float(vol['size']))
1633
          elif field == "instance":
1634
            for inst in ilist:
1635
              if node not in lv_by_node[inst]:
1636
                continue
1637
              if vol['name'] in lv_by_node[inst][node]:
1638
                val = inst.name
1639
                break
1640
            else:
1641
              val = '-'
1642
          else:
1643
            raise errors.ParameterError(field)
1644
          node_output.append(str(val))
1645

    
1646
        output.append(node_output)
1647

    
1648
    return output
1649

    
1650

    
1651
class LUAddNode(LogicalUnit):
1652
  """Logical unit for adding node to the cluster.
1653

1654
  """
1655
  HPATH = "node-add"
1656
  HTYPE = constants.HTYPE_NODE
1657
  _OP_REQP = ["node_name"]
1658

    
1659
  def BuildHooksEnv(self):
1660
    """Build hooks env.
1661

1662
    This will run on all nodes before, and on all nodes + the new node after.
1663

1664
    """
1665
    env = {
1666
      "OP_TARGET": self.op.node_name,
1667
      "NODE_NAME": self.op.node_name,
1668
      "NODE_PIP": self.op.primary_ip,
1669
      "NODE_SIP": self.op.secondary_ip,
1670
      }
1671
    nodes_0 = self.cfg.GetNodeList()
1672
    nodes_1 = nodes_0 + [self.op.node_name, ]
1673
    return env, nodes_0, nodes_1
1674

    
1675
  def CheckPrereq(self):
1676
    """Check prerequisites.
1677

1678
    This checks:
1679
     - the new node is not already in the config
1680
     - it is resolvable
1681
     - its parameters (single/dual homed) matches the cluster
1682

1683
    Any errors are signalled by raising errors.OpPrereqError.
1684

1685
    """
1686
    node_name = self.op.node_name
1687
    cfg = self.cfg
1688

    
1689
    dns_data = utils.HostInfo(node_name)
1690

    
1691
    node = dns_data.name
1692
    primary_ip = self.op.primary_ip = dns_data.ip
1693
    secondary_ip = getattr(self.op, "secondary_ip", None)
1694
    if secondary_ip is None:
1695
      secondary_ip = primary_ip
1696
    if not utils.IsValidIP(secondary_ip):
1697
      raise errors.OpPrereqError("Invalid secondary IP given")
1698
    self.op.secondary_ip = secondary_ip
1699

    
1700
    node_list = cfg.GetNodeList()
1701
    if not self.op.readd and node in node_list:
1702
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1703
                                 node)
1704
    elif self.op.readd and node not in node_list:
1705
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1706

    
1707
    for existing_node_name in node_list:
1708
      existing_node = cfg.GetNodeInfo(existing_node_name)
1709

    
1710
      if self.op.readd and node == existing_node_name:
1711
        if (existing_node.primary_ip != primary_ip or
1712
            existing_node.secondary_ip != secondary_ip):
1713
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1714
                                     " address configuration as before")
1715
        continue
1716

    
1717
      if (existing_node.primary_ip == primary_ip or
1718
          existing_node.secondary_ip == primary_ip or
1719
          existing_node.primary_ip == secondary_ip or
1720
          existing_node.secondary_ip == secondary_ip):
1721
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1722
                                   " existing node %s" % existing_node.name)
1723

    
1724
    # check that the type of the node (single versus dual homed) is the
1725
    # same as for the master
1726
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1727
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1728
    newbie_singlehomed = secondary_ip == primary_ip
1729
    if master_singlehomed != newbie_singlehomed:
1730
      if master_singlehomed:
1731
        raise errors.OpPrereqError("The master has no private ip but the"
1732
                                   " new node has one")
1733
      else:
1734
        raise errors.OpPrereqError("The master has a private ip but the"
1735
                                   " new node doesn't have one")
1736

    
1737
    # checks reachablity
1738
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1739
      raise errors.OpPrereqError("Node not reachable by ping")
1740

    
1741
    if not newbie_singlehomed:
1742
      # check reachability from my secondary ip to newbie's secondary ip
1743
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1744
                           source=myself.secondary_ip):
1745
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1746
                                   " based ping to noded port")
1747

    
1748
    self.new_node = objects.Node(name=node,
1749
                                 primary_ip=primary_ip,
1750
                                 secondary_ip=secondary_ip)
1751

    
1752
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1753
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1754
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1755
                                   constants.VNC_PASSWORD_FILE)
1756

    
1757
  def Exec(self, feedback_fn):
1758
    """Adds the new node to the cluster.
1759

1760
    """
1761
    new_node = self.new_node
1762
    node = new_node.name
1763

    
1764
    # set up inter-node password and certificate and restarts the node daemon
1765
    gntpass = self.sstore.GetNodeDaemonPassword()
1766
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1767
      raise errors.OpExecError("ganeti password corruption detected")
1768
    f = open(constants.SSL_CERT_FILE)
1769
    try:
1770
      gntpem = f.read(8192)
1771
    finally:
1772
      f.close()
1773
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1774
    # so we use this to detect an invalid certificate; as long as the
1775
    # cert doesn't contain this, the here-document will be correctly
1776
    # parsed by the shell sequence below
1777
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1778
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1779
    if not gntpem.endswith("\n"):
1780
      raise errors.OpExecError("PEM must end with newline")
1781
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1782

    
1783
    # and then connect with ssh to set password and start ganeti-noded
1784
    # note that all the below variables are sanitized at this point,
1785
    # either by being constants or by the checks above
1786
    ss = self.sstore
1787
    mycommand = ("umask 077 && "
1788
                 "echo '%s' > '%s' && "
1789
                 "cat > '%s' << '!EOF.' && \n"
1790
                 "%s!EOF.\n%s restart" %
1791
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1792
                  constants.SSL_CERT_FILE, gntpem,
1793
                  constants.NODE_INITD_SCRIPT))
1794

    
1795
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1796
    if result.failed:
1797
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1798
                               " output: %s" %
1799
                               (node, result.fail_reason, result.output))
1800

    
1801
    # check connectivity
1802
    time.sleep(4)
1803

    
1804
    result = rpc.call_version([node])[node]
1805
    if result:
1806
      if constants.PROTOCOL_VERSION == result:
1807
        logger.Info("communication to node %s fine, sw version %s match" %
1808
                    (node, result))
1809
      else:
1810
        raise errors.OpExecError("Version mismatch master version %s,"
1811
                                 " node version %s" %
1812
                                 (constants.PROTOCOL_VERSION, result))
1813
    else:
1814
      raise errors.OpExecError("Cannot get version from the new node")
1815

    
1816
    # setup ssh on node
1817
    logger.Info("copy ssh key to node %s" % node)
1818
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1819
    keyarray = []
1820
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1821
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1822
                priv_key, pub_key]
1823

    
1824
    for i in keyfiles:
1825
      f = open(i, 'r')
1826
      try:
1827
        keyarray.append(f.read())
1828
      finally:
1829
        f.close()
1830

    
1831
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1832
                               keyarray[3], keyarray[4], keyarray[5])
1833

    
1834
    if not result:
1835
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1836

    
1837
    # Add node to our /etc/hosts, and add key to known_hosts
1838
    _AddHostToEtcHosts(new_node.name)
1839

    
1840
    if new_node.secondary_ip != new_node.primary_ip:
1841
      if not rpc.call_node_tcp_ping(new_node.name,
1842
                                    constants.LOCALHOST_IP_ADDRESS,
1843
                                    new_node.secondary_ip,
1844
                                    constants.DEFAULT_NODED_PORT,
1845
                                    10, False):
1846
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1847
                                 " you gave (%s). Please fix and re-run this"
1848
                                 " command." % new_node.secondary_ip)
1849

    
1850
    success, msg = self.ssh.VerifyNodeHostname(node)
1851
    if not success:
1852
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1853
                               " than the one the resolver gives: %s."
1854
                               " Please fix and re-run this command." %
1855
                               (node, msg))
1856

    
1857
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1858
    # including the node just added
1859
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1860
    dist_nodes = self.cfg.GetNodeList() + [node]
1861
    if myself.name in dist_nodes:
1862
      dist_nodes.remove(myself.name)
1863

    
1864
    logger.Debug("Copying hosts and known_hosts to all nodes")
1865
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1866
      result = rpc.call_upload_file(dist_nodes, fname)
1867
      for to_node in dist_nodes:
1868
        if not result[to_node]:
1869
          logger.Error("copy of file %s to node %s failed" %
1870
                       (fname, to_node))
1871

    
1872
    to_copy = ss.GetFileList()
1873
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1874
      to_copy.append(constants.VNC_PASSWORD_FILE)
1875
    for fname in to_copy:
1876
      if not self.ssh.CopyFileToNode(node, fname):
1877
        logger.Error("could not copy file %s to node %s" % (fname, node))
1878

    
1879
    if not self.op.readd:
1880
      logger.Info("adding node %s to cluster.conf" % node)
1881
      self.cfg.AddNode(new_node)
1882

    
1883

    
1884
class LUMasterFailover(LogicalUnit):
1885
  """Failover the master node to the current node.
1886

1887
  This is a special LU in that it must run on a non-master node.
1888

1889
  """
1890
  HPATH = "master-failover"
1891
  HTYPE = constants.HTYPE_CLUSTER
1892
  REQ_MASTER = False
1893
  _OP_REQP = []
1894

    
1895
  def BuildHooksEnv(self):
1896
    """Build hooks env.
1897

1898
    This will run on the new master only in the pre phase, and on all
1899
    the nodes in the post phase.
1900

1901
    """
1902
    env = {
1903
      "OP_TARGET": self.new_master,
1904
      "NEW_MASTER": self.new_master,
1905
      "OLD_MASTER": self.old_master,
1906
      }
1907
    return env, [self.new_master], self.cfg.GetNodeList()
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks that we are not already the master.
1913

1914
    """
1915
    self.new_master = utils.HostInfo().name
1916
    self.old_master = self.sstore.GetMasterNode()
1917

    
1918
    if self.old_master == self.new_master:
1919
      raise errors.OpPrereqError("This commands must be run on the node"
1920
                                 " where you want the new master to be."
1921
                                 " %s is already the master" %
1922
                                 self.old_master)
1923

    
1924
  def Exec(self, feedback_fn):
1925
    """Failover the master node.
1926

1927
    This command, when run on a non-master node, will cause the current
1928
    master to cease being master, and the non-master to become new
1929
    master.
1930

1931
    """
1932
    #TODO: do not rely on gethostname returning the FQDN
1933
    logger.Info("setting master to %s, old master: %s" %
1934
                (self.new_master, self.old_master))
1935

    
1936
    if not rpc.call_node_stop_master(self.old_master):
1937
      logger.Error("could disable the master role on the old master"
1938
                   " %s, please disable manually" % self.old_master)
1939

    
1940
    ss = self.sstore
1941
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1942
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1943
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1944
      logger.Error("could not distribute the new simple store master file"
1945
                   " to the other nodes, please check.")
1946

    
1947
    if not rpc.call_node_start_master(self.new_master):
1948
      logger.Error("could not start the master role on the new master"
1949
                   " %s, please check" % self.new_master)
1950
      feedback_fn("Error in activating the master IP on the new master,"
1951
                  " please fix manually.")
1952

    
1953

    
1954

    
1955
class LUQueryClusterInfo(NoHooksLU):
1956
  """Query cluster configuration.
1957

1958
  """
1959
  _OP_REQP = []
1960
  REQ_MASTER = False
1961

    
1962
  def CheckPrereq(self):
1963
    """No prerequsites needed for this LU.
1964

1965
    """
1966
    pass
1967

    
1968
  def Exec(self, feedback_fn):
1969
    """Return cluster config.
1970

1971
    """
1972
    result = {
1973
      "name": self.sstore.GetClusterName(),
1974
      "software_version": constants.RELEASE_VERSION,
1975
      "protocol_version": constants.PROTOCOL_VERSION,
1976
      "config_version": constants.CONFIG_VERSION,
1977
      "os_api_version": constants.OS_API_VERSION,
1978
      "export_version": constants.EXPORT_VERSION,
1979
      "master": self.sstore.GetMasterNode(),
1980
      "architecture": (platform.architecture()[0], platform.machine()),
1981
      }
1982

    
1983
    return result
1984

    
1985

    
1986
class LUClusterCopyFile(NoHooksLU):
1987
  """Copy file to cluster.
1988

1989
  """
1990
  _OP_REQP = ["nodes", "filename"]
1991

    
1992
  def CheckPrereq(self):
1993
    """Check prerequisites.
1994

1995
    It should check that the named file exists and that the given list
1996
    of nodes is valid.
1997

1998
    """
1999
    if not os.path.exists(self.op.filename):
2000
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2001

    
2002
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Copy a file from master to some nodes.
2006

2007
    Args:
2008
      opts - class with options as members
2009
      args - list containing a single element, the file name
2010
    Opts used:
2011
      nodes - list containing the name of target nodes; if empty, all nodes
2012

2013
    """
2014
    filename = self.op.filename
2015

    
2016
    myname = utils.HostInfo().name
2017

    
2018
    for node in self.nodes:
2019
      if node == myname:
2020
        continue
2021
      if not self.ssh.CopyFileToNode(node, filename):
2022
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2023

    
2024

    
2025
class LUDumpClusterConfig(NoHooksLU):
2026
  """Return a text-representation of the cluster-config.
2027

2028
  """
2029
  _OP_REQP = []
2030

    
2031
  def CheckPrereq(self):
2032
    """No prerequisites.
2033

2034
    """
2035
    pass
2036

    
2037
  def Exec(self, feedback_fn):
2038
    """Dump a representation of the cluster config to the standard output.
2039

2040
    """
2041
    return self.cfg.DumpConfig()
2042

    
2043

    
2044
class LURunClusterCommand(NoHooksLU):
2045
  """Run a command on some nodes.
2046

2047
  """
2048
  _OP_REQP = ["command", "nodes"]
2049

    
2050
  def CheckPrereq(self):
2051
    """Check prerequisites.
2052

2053
    It checks that the given list of nodes is valid.
2054

2055
    """
2056
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Run a command on some nodes.
2060

2061
    """
2062
    # put the master at the end of the nodes list
2063
    master_node = self.sstore.GetMasterNode()
2064
    if master_node in self.nodes:
2065
      self.nodes.remove(master_node)
2066
      self.nodes.append(master_node)
2067

    
2068
    data = []
2069
    for node in self.nodes:
2070
      result = self.ssh.Run(node, "root", self.op.command)
2071
      data.append((node, result.output, result.exit_code))
2072

    
2073
    return data
2074

    
2075

    
2076
class LUActivateInstanceDisks(NoHooksLU):
2077
  """Bring up an instance's disks.
2078

2079
  """
2080
  _OP_REQP = ["instance_name"]
2081

    
2082
  def CheckPrereq(self):
2083
    """Check prerequisites.
2084

2085
    This checks that the instance is in the cluster.
2086

2087
    """
2088
    instance = self.cfg.GetInstanceInfo(
2089
      self.cfg.ExpandInstanceName(self.op.instance_name))
2090
    if instance is None:
2091
      raise errors.OpPrereqError("Instance '%s' not known" %
2092
                                 self.op.instance_name)
2093
    self.instance = instance
2094

    
2095

    
2096
  def Exec(self, feedback_fn):
2097
    """Activate the disks.
2098

2099
    """
2100
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2101
    if not disks_ok:
2102
      raise errors.OpExecError("Cannot activate block devices")
2103

    
2104
    return disks_info
2105

    
2106

    
2107
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2108
  """Prepare the block devices for an instance.
2109

2110
  This sets up the block devices on all nodes.
2111

2112
  Args:
2113
    instance: a ganeti.objects.Instance object
2114
    ignore_secondaries: if true, errors on secondary nodes won't result
2115
                        in an error return from the function
2116

2117
  Returns:
2118
    false if the operation failed
2119
    list of (host, instance_visible_name, node_visible_name) if the operation
2120
         suceeded with the mapping from node devices to instance devices
2121
  """
2122
  device_info = []
2123
  disks_ok = True
2124
  iname = instance.name
2125
  # With the two passes mechanism we try to reduce the window of
2126
  # opportunity for the race condition of switching DRBD to primary
2127
  # before handshaking occured, but we do not eliminate it
2128

    
2129
  # The proper fix would be to wait (with some limits) until the
2130
  # connection has been made and drbd transitions from WFConnection
2131
  # into any other network-connected state (Connected, SyncTarget,
2132
  # SyncSource, etc.)
2133

    
2134
  # 1st pass, assemble on all nodes in secondary mode
2135
  for inst_disk in instance.disks:
2136
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2137
      cfg.SetDiskID(node_disk, node)
2138
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2139
      if not result:
2140
        logger.Error("could not prepare block device %s on node %s"
2141
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2142
        if not ignore_secondaries:
2143
          disks_ok = False
2144

    
2145
  # FIXME: race condition on drbd migration to primary
2146

    
2147
  # 2nd pass, do only the primary node
2148
  for inst_disk in instance.disks:
2149
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2150
      if node != instance.primary_node:
2151
        continue
2152
      cfg.SetDiskID(node_disk, node)
2153
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2154
      if not result:
2155
        logger.Error("could not prepare block device %s on node %s"
2156
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2157
        disks_ok = False
2158
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2159

    
2160
  # leave the disks configured for the primary node
2161
  # this is a workaround that would be fixed better by
2162
  # improving the logical/physical id handling
2163
  for disk in instance.disks:
2164
    cfg.SetDiskID(disk, instance.primary_node)
2165

    
2166
  return disks_ok, device_info
2167

    
2168

    
2169
def _StartInstanceDisks(cfg, instance, force):
2170
  """Start the disks of an instance.
2171

2172
  """
2173
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2174
                                           ignore_secondaries=force)
2175
  if not disks_ok:
2176
    _ShutdownInstanceDisks(instance, cfg)
2177
    if force is not None and not force:
2178
      logger.Error("If the message above refers to a secondary node,"
2179
                   " you can retry the operation using '--force'.")
2180
    raise errors.OpExecError("Disk consistency error")
2181

    
2182

    
2183
class LUDeactivateInstanceDisks(NoHooksLU):
2184
  """Shutdown an instance's disks.
2185

2186
  """
2187
  _OP_REQP = ["instance_name"]
2188

    
2189
  def CheckPrereq(self):
2190
    """Check prerequisites.
2191

2192
    This checks that the instance is in the cluster.
2193

2194
    """
2195
    instance = self.cfg.GetInstanceInfo(
2196
      self.cfg.ExpandInstanceName(self.op.instance_name))
2197
    if instance is None:
2198
      raise errors.OpPrereqError("Instance '%s' not known" %
2199
                                 self.op.instance_name)
2200
    self.instance = instance
2201

    
2202
  def Exec(self, feedback_fn):
2203
    """Deactivate the disks
2204

2205
    """
2206
    instance = self.instance
2207
    ins_l = rpc.call_instance_list([instance.primary_node])
2208
    ins_l = ins_l[instance.primary_node]
2209
    if not type(ins_l) is list:
2210
      raise errors.OpExecError("Can't contact node '%s'" %
2211
                               instance.primary_node)
2212

    
2213
    if self.instance.name in ins_l:
2214
      raise errors.OpExecError("Instance is running, can't shutdown"
2215
                               " block devices.")
2216

    
2217
    _ShutdownInstanceDisks(instance, self.cfg)
2218

    
2219

    
2220
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2221
  """Shutdown block devices of an instance.
2222

2223
  This does the shutdown on all nodes of the instance.
2224

2225
  If the ignore_primary is false, errors on the primary node are
2226
  ignored.
2227

2228
  """
2229
  result = True
2230
  for disk in instance.disks:
2231
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2232
      cfg.SetDiskID(top_disk, node)
2233
      if not rpc.call_blockdev_shutdown(node, top_disk):
2234
        logger.Error("could not shutdown block device %s on node %s" %
2235
                     (disk.iv_name, node))
2236
        if not ignore_primary or node != instance.primary_node:
2237
          result = False
2238
  return result
2239

    
2240

    
2241
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2242
  """Checks if a node has enough free memory.
2243

2244
  This function check if a given node has the needed amount of free
2245
  memory. In case the node has less memory or we cannot get the
2246
  information from the node, this function raise an OpPrereqError
2247
  exception.
2248

2249
  Args:
2250
    - cfg: a ConfigWriter instance
2251
    - node: the node name
2252
    - reason: string to use in the error message
2253
    - requested: the amount of memory in MiB
2254

2255
  """
2256
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2257
  if not nodeinfo or not isinstance(nodeinfo, dict):
2258
    raise errors.OpPrereqError("Could not contact node %s for resource"
2259
                             " information" % (node,))
2260

    
2261
  free_mem = nodeinfo[node].get('memory_free')
2262
  if not isinstance(free_mem, int):
2263
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2264
                             " was '%s'" % (node, free_mem))
2265
  if requested > free_mem:
2266
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2267
                             " needed %s MiB, available %s MiB" %
2268
                             (node, reason, requested, free_mem))
2269

    
2270

    
2271
class LUStartupInstance(LogicalUnit):
2272
  """Starts an instance.
2273

2274
  """
2275
  HPATH = "instance-start"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "force"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "FORCE": self.op.force,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304

    
2305
    # check bridges existance
2306
    _CheckInstanceBridgesExist(instance)
2307

    
2308
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2309
                         "starting instance %s" % instance.name,
2310
                         instance.memory)
2311

    
2312
    self.instance = instance
2313
    self.op.instance_name = instance.name
2314

    
2315
  def Exec(self, feedback_fn):
2316
    """Start the instance.
2317

2318
    """
2319
    instance = self.instance
2320
    force = self.op.force
2321
    extra_args = getattr(self.op, "extra_args", "")
2322

    
2323
    self.cfg.MarkInstanceUp(instance.name)
2324

    
2325
    node_current = instance.primary_node
2326

    
2327
    _StartInstanceDisks(self.cfg, instance, force)
2328

    
2329
    if not rpc.call_instance_start(node_current, instance, extra_args):
2330
      _ShutdownInstanceDisks(instance, self.cfg)
2331
      raise errors.OpExecError("Could not start instance")
2332

    
2333

    
2334
class LURebootInstance(LogicalUnit):
2335
  """Reboot an instance.
2336

2337
  """
2338
  HPATH = "instance-reboot"
2339
  HTYPE = constants.HTYPE_INSTANCE
2340
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2341

    
2342
  def BuildHooksEnv(self):
2343
    """Build hooks env.
2344

2345
    This runs on master, primary and secondary nodes of the instance.
2346

2347
    """
2348
    env = {
2349
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2350
      }
2351
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2352
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2353
          list(self.instance.secondary_nodes))
2354
    return env, nl, nl
2355

    
2356
  def CheckPrereq(self):
2357
    """Check prerequisites.
2358

2359
    This checks that the instance is in the cluster.
2360

2361
    """
2362
    instance = self.cfg.GetInstanceInfo(
2363
      self.cfg.ExpandInstanceName(self.op.instance_name))
2364
    if instance is None:
2365
      raise errors.OpPrereqError("Instance '%s' not known" %
2366
                                 self.op.instance_name)
2367

    
2368
    # check bridges existance
2369
    _CheckInstanceBridgesExist(instance)
2370

    
2371
    self.instance = instance
2372
    self.op.instance_name = instance.name
2373

    
2374
  def Exec(self, feedback_fn):
2375
    """Reboot the instance.
2376

2377
    """
2378
    instance = self.instance
2379
    ignore_secondaries = self.op.ignore_secondaries
2380
    reboot_type = self.op.reboot_type
2381
    extra_args = getattr(self.op, "extra_args", "")
2382

    
2383
    node_current = instance.primary_node
2384

    
2385
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2386
                           constants.INSTANCE_REBOOT_HARD,
2387
                           constants.INSTANCE_REBOOT_FULL]:
2388
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2389
                                  (constants.INSTANCE_REBOOT_SOFT,
2390
                                   constants.INSTANCE_REBOOT_HARD,
2391
                                   constants.INSTANCE_REBOOT_FULL))
2392

    
2393
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2394
                       constants.INSTANCE_REBOOT_HARD]:
2395
      if not rpc.call_instance_reboot(node_current, instance,
2396
                                      reboot_type, extra_args):
2397
        raise errors.OpExecError("Could not reboot instance")
2398
    else:
2399
      if not rpc.call_instance_shutdown(node_current, instance):
2400
        raise errors.OpExecError("could not shutdown instance for full reboot")
2401
      _ShutdownInstanceDisks(instance, self.cfg)
2402
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2403
      if not rpc.call_instance_start(node_current, instance, extra_args):
2404
        _ShutdownInstanceDisks(instance, self.cfg)
2405
        raise errors.OpExecError("Could not start instance for full reboot")
2406

    
2407
    self.cfg.MarkInstanceUp(instance.name)
2408

    
2409

    
2410
class LUShutdownInstance(LogicalUnit):
2411
  """Shutdown an instance.
2412

2413
  """
2414
  HPATH = "instance-stop"
2415
  HTYPE = constants.HTYPE_INSTANCE
2416
  _OP_REQP = ["instance_name"]
2417

    
2418
  def BuildHooksEnv(self):
2419
    """Build hooks env.
2420

2421
    This runs on master, primary and secondary nodes of the instance.
2422

2423
    """
2424
    env = _BuildInstanceHookEnvByObject(self.instance)
2425
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2426
          list(self.instance.secondary_nodes))
2427
    return env, nl, nl
2428

    
2429
  def CheckPrereq(self):
2430
    """Check prerequisites.
2431

2432
    This checks that the instance is in the cluster.
2433

2434
    """
2435
    instance = self.cfg.GetInstanceInfo(
2436
      self.cfg.ExpandInstanceName(self.op.instance_name))
2437
    if instance is None:
2438
      raise errors.OpPrereqError("Instance '%s' not known" %
2439
                                 self.op.instance_name)
2440
    self.instance = instance
2441

    
2442
  def Exec(self, feedback_fn):
2443
    """Shutdown the instance.
2444

2445
    """
2446
    instance = self.instance
2447
    node_current = instance.primary_node
2448
    self.cfg.MarkInstanceDown(instance.name)
2449
    if not rpc.call_instance_shutdown(node_current, instance):
2450
      logger.Error("could not shutdown instance")
2451

    
2452
    _ShutdownInstanceDisks(instance, self.cfg)
2453

    
2454

    
2455
class LUReinstallInstance(LogicalUnit):
2456
  """Reinstall an instance.
2457

2458
  """
2459
  HPATH = "instance-reinstall"
2460
  HTYPE = constants.HTYPE_INSTANCE
2461
  _OP_REQP = ["instance_name"]
2462

    
2463
  def BuildHooksEnv(self):
2464
    """Build hooks env.
2465

2466
    This runs on master, primary and secondary nodes of the instance.
2467

2468
    """
2469
    env = _BuildInstanceHookEnvByObject(self.instance)
2470
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2471
          list(self.instance.secondary_nodes))
2472
    return env, nl, nl
2473

    
2474
  def CheckPrereq(self):
2475
    """Check prerequisites.
2476

2477
    This checks that the instance is in the cluster and is not running.
2478

2479
    """
2480
    instance = self.cfg.GetInstanceInfo(
2481
      self.cfg.ExpandInstanceName(self.op.instance_name))
2482
    if instance is None:
2483
      raise errors.OpPrereqError("Instance '%s' not known" %
2484
                                 self.op.instance_name)
2485
    if instance.disk_template == constants.DT_DISKLESS:
2486
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2487
                                 self.op.instance_name)
2488
    if instance.status != "down":
2489
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2490
                                 self.op.instance_name)
2491
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2492
    if remote_info:
2493
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2494
                                 (self.op.instance_name,
2495
                                  instance.primary_node))
2496

    
2497
    self.op.os_type = getattr(self.op, "os_type", None)
2498
    if self.op.os_type is not None:
2499
      # OS verification
2500
      pnode = self.cfg.GetNodeInfo(
2501
        self.cfg.ExpandNodeName(instance.primary_node))
2502
      if pnode is None:
2503
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2504
                                   self.op.pnode)
2505
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2506
      if not os_obj:
2507
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2508
                                   " primary node"  % self.op.os_type)
2509

    
2510
    self.instance = instance
2511

    
2512
  def Exec(self, feedback_fn):
2513
    """Reinstall the instance.
2514

2515
    """
2516
    inst = self.instance
2517

    
2518
    if self.op.os_type is not None:
2519
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2520
      inst.os = self.op.os_type
2521
      self.cfg.AddInstance(inst)
2522

    
2523
    _StartInstanceDisks(self.cfg, inst, None)
2524
    try:
2525
      feedback_fn("Running the instance OS create scripts...")
2526
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2527
        raise errors.OpExecError("Could not install OS for instance %s"
2528
                                 " on node %s" %
2529
                                 (inst.name, inst.primary_node))
2530
    finally:
2531
      _ShutdownInstanceDisks(inst, self.cfg)
2532

    
2533

    
2534
class LURenameInstance(LogicalUnit):
2535
  """Rename an instance.
2536

2537
  """
2538
  HPATH = "instance-rename"
2539
  HTYPE = constants.HTYPE_INSTANCE
2540
  _OP_REQP = ["instance_name", "new_name"]
2541

    
2542
  def BuildHooksEnv(self):
2543
    """Build hooks env.
2544

2545
    This runs on master, primary and secondary nodes of the instance.
2546

2547
    """
2548
    env = _BuildInstanceHookEnvByObject(self.instance)
2549
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2550
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2551
          list(self.instance.secondary_nodes))
2552
    return env, nl, nl
2553

    
2554
  def CheckPrereq(self):
2555
    """Check prerequisites.
2556

2557
    This checks that the instance is in the cluster and is not running.
2558

2559
    """
2560
    instance = self.cfg.GetInstanceInfo(
2561
      self.cfg.ExpandInstanceName(self.op.instance_name))
2562
    if instance is None:
2563
      raise errors.OpPrereqError("Instance '%s' not known" %
2564
                                 self.op.instance_name)
2565
    if instance.status != "down":
2566
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2567
                                 self.op.instance_name)
2568
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2569
    if remote_info:
2570
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2571
                                 (self.op.instance_name,
2572
                                  instance.primary_node))
2573
    self.instance = instance
2574

    
2575
    # new name verification
2576
    name_info = utils.HostInfo(self.op.new_name)
2577

    
2578
    self.op.new_name = new_name = name_info.name
2579
    instance_list = self.cfg.GetInstanceList()
2580
    if new_name in instance_list:
2581
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2582
                                 new_name)
2583

    
2584
    if not getattr(self.op, "ignore_ip", False):
2585
      command = ["fping", "-q", name_info.ip]
2586
      result = utils.RunCmd(command)
2587
      if not result.failed:
2588
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2589
                                   (name_info.ip, new_name))
2590

    
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Reinstall the instance.
2594

2595
    """
2596
    inst = self.instance
2597
    old_name = inst.name
2598

    
2599
    if inst.disk_template == constants.DT_FILE:
2600
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601

    
2602
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2603

    
2604
    # re-read the instance from the configuration after rename
2605
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2606

    
2607
    if inst.disk_template == constants.DT_FILE:
2608
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2609
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2610
                                                old_file_storage_dir,
2611
                                                new_file_storage_dir)
2612

    
2613
      if not result:
2614
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2615
                                 " directory '%s' to '%s' (but the instance"
2616
                                 " has been renamed in Ganeti)" % (
2617
                                 inst.primary_node, old_file_storage_dir,
2618
                                 new_file_storage_dir))
2619

    
2620
      if not result[0]:
2621
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2622
                                 " (but the instance has been renamed in"
2623
                                 " Ganeti)" % (old_file_storage_dir,
2624
                                               new_file_storage_dir))
2625

    
2626
    _StartInstanceDisks(self.cfg, inst, None)
2627
    try:
2628
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2629
                                          "sda", "sdb"):
2630
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2631
               " instance has been renamed in Ganeti)" %
2632
               (inst.name, inst.primary_node))
2633
        logger.Error(msg)
2634
    finally:
2635
      _ShutdownInstanceDisks(inst, self.cfg)
2636

    
2637

    
2638
class LURemoveInstance(LogicalUnit):
2639
  """Remove an instance.
2640

2641
  """
2642
  HPATH = "instance-remove"
2643
  HTYPE = constants.HTYPE_INSTANCE
2644
  _OP_REQP = ["instance_name"]
2645

    
2646
  def BuildHooksEnv(self):
2647
    """Build hooks env.
2648

2649
    This runs on master, primary and secondary nodes of the instance.
2650

2651
    """
2652
    env = _BuildInstanceHookEnvByObject(self.instance)
2653
    nl = [self.sstore.GetMasterNode()]
2654
    return env, nl, nl
2655

    
2656
  def CheckPrereq(self):
2657
    """Check prerequisites.
2658

2659
    This checks that the instance is in the cluster.
2660

2661
    """
2662
    instance = self.cfg.GetInstanceInfo(
2663
      self.cfg.ExpandInstanceName(self.op.instance_name))
2664
    if instance is None:
2665
      raise errors.OpPrereqError("Instance '%s' not known" %
2666
                                 self.op.instance_name)
2667
    self.instance = instance
2668

    
2669
  def Exec(self, feedback_fn):
2670
    """Remove the instance.
2671

2672
    """
2673
    instance = self.instance
2674
    logger.Info("shutting down instance %s on node %s" %
2675
                (instance.name, instance.primary_node))
2676

    
2677
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2678
      if self.op.ignore_failures:
2679
        feedback_fn("Warning: can't shutdown instance")
2680
      else:
2681
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2682
                                 (instance.name, instance.primary_node))
2683

    
2684
    logger.Info("removing block devices for instance %s" % instance.name)
2685

    
2686
    if not _RemoveDisks(instance, self.cfg):
2687
      if self.op.ignore_failures:
2688
        feedback_fn("Warning: can't remove instance's disks")
2689
      else:
2690
        raise errors.OpExecError("Can't remove instance's disks")
2691

    
2692
    logger.Info("removing instance %s out of cluster config" % instance.name)
2693

    
2694
    self.cfg.RemoveInstance(instance.name)
2695

    
2696

    
2697
class LUQueryInstances(NoHooksLU):
2698
  """Logical unit for querying instances.
2699

2700
  """
2701
  _OP_REQP = ["output_fields", "names"]
2702

    
2703
  def CheckPrereq(self):
2704
    """Check prerequisites.
2705

2706
    This checks that the fields required are valid output fields.
2707

2708
    """
2709
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2710
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2711
                               "admin_state", "admin_ram",
2712
                               "disk_template", "ip", "mac", "bridge",
2713
                               "sda_size", "sdb_size", "vcpus"],
2714
                       dynamic=self.dynamic_fields,
2715
                       selected=self.op.output_fields)
2716

    
2717
    self.wanted = _GetWantedInstances(self, self.op.names)
2718

    
2719
  def Exec(self, feedback_fn):
2720
    """Computes the list of nodes and their attributes.
2721

2722
    """
2723
    instance_names = self.wanted
2724
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2725
                     in instance_names]
2726

    
2727
    # begin data gathering
2728

    
2729
    nodes = frozenset([inst.primary_node for inst in instance_list])
2730

    
2731
    bad_nodes = []
2732
    if self.dynamic_fields.intersection(self.op.output_fields):
2733
      live_data = {}
2734
      node_data = rpc.call_all_instances_info(nodes)
2735
      for name in nodes:
2736
        result = node_data[name]
2737
        if result:
2738
          live_data.update(result)
2739
        elif result == False:
2740
          bad_nodes.append(name)
2741
        # else no instance is alive
2742
    else:
2743
      live_data = dict([(name, {}) for name in instance_names])
2744

    
2745
    # end data gathering
2746

    
2747
    output = []
2748
    for instance in instance_list:
2749
      iout = []
2750
      for field in self.op.output_fields:
2751
        if field == "name":
2752
          val = instance.name
2753
        elif field == "os":
2754
          val = instance.os
2755
        elif field == "pnode":
2756
          val = instance.primary_node
2757
        elif field == "snodes":
2758
          val = list(instance.secondary_nodes)
2759
        elif field == "admin_state":
2760
          val = (instance.status != "down")
2761
        elif field == "oper_state":
2762
          if instance.primary_node in bad_nodes:
2763
            val = None
2764
          else:
2765
            val = bool(live_data.get(instance.name))
2766
        elif field == "status":
2767
          if instance.primary_node in bad_nodes:
2768
            val = "ERROR_nodedown"
2769
          else:
2770
            running = bool(live_data.get(instance.name))
2771
            if running:
2772
              if instance.status != "down":
2773
                val = "running"
2774
              else:
2775
                val = "ERROR_up"
2776
            else:
2777
              if instance.status != "down":
2778
                val = "ERROR_down"
2779
              else:
2780
                val = "ADMIN_down"
2781
        elif field == "admin_ram":
2782
          val = instance.memory
2783
        elif field == "oper_ram":
2784
          if instance.primary_node in bad_nodes:
2785
            val = None
2786
          elif instance.name in live_data:
2787
            val = live_data[instance.name].get("memory", "?")
2788
          else:
2789
            val = "-"
2790
        elif field == "disk_template":
2791
          val = instance.disk_template
2792
        elif field == "ip":
2793
          val = instance.nics[0].ip
2794
        elif field == "bridge":
2795
          val = instance.nics[0].bridge
2796
        elif field == "mac":
2797
          val = instance.nics[0].mac
2798
        elif field == "sda_size" or field == "sdb_size":
2799
          disk = instance.FindDisk(field[:3])
2800
          if disk is None:
2801
            val = None
2802
          else:
2803
            val = disk.size
2804
        elif field == "vcpus":
2805
          val = instance.vcpus
2806
        else:
2807
          raise errors.ParameterError(field)
2808
        iout.append(val)
2809
      output.append(iout)
2810

    
2811
    return output
2812

    
2813

    
2814
class LUFailoverInstance(LogicalUnit):
2815
  """Failover an instance.
2816

2817
  """
2818
  HPATH = "instance-failover"
2819
  HTYPE = constants.HTYPE_INSTANCE
2820
  _OP_REQP = ["instance_name", "ignore_consistency"]
2821

    
2822
  def BuildHooksEnv(self):
2823
    """Build hooks env.
2824

2825
    This runs on master, primary and secondary nodes of the instance.
2826

2827
    """
2828
    env = {
2829
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2830
      }
2831
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2832
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2833
    return env, nl, nl
2834

    
2835
  def CheckPrereq(self):
2836
    """Check prerequisites.
2837

2838
    This checks that the instance is in the cluster.
2839

2840
    """
2841
    instance = self.cfg.GetInstanceInfo(
2842
      self.cfg.ExpandInstanceName(self.op.instance_name))
2843
    if instance is None:
2844
      raise errors.OpPrereqError("Instance '%s' not known" %
2845
                                 self.op.instance_name)
2846

    
2847
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2848
      raise errors.OpPrereqError("Instance's disk layout is not"
2849
                                 " network mirrored, cannot failover.")
2850

    
2851
    secondary_nodes = instance.secondary_nodes
2852
    if not secondary_nodes:
2853
      raise errors.ProgrammerError("no secondary node but using "
2854
                                   "DT_REMOTE_RAID1 template")
2855

    
2856
    target_node = secondary_nodes[0]
2857
    # check memory requirements on the secondary node
2858
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2859
                         instance.name, instance.memory)
2860

    
2861
    # check bridge existance
2862
    brlist = [nic.bridge for nic in instance.nics]
2863
    if not rpc.call_bridges_exist(target_node, brlist):
2864
      raise errors.OpPrereqError("One or more target bridges %s does not"
2865
                                 " exist on destination node '%s'" %
2866
                                 (brlist, target_node))
2867

    
2868
    self.instance = instance
2869

    
2870
  def Exec(self, feedback_fn):
2871
    """Failover an instance.
2872

2873
    The failover is done by shutting it down on its present node and
2874
    starting it on the secondary.
2875

2876
    """
2877
    instance = self.instance
2878

    
2879
    source_node = instance.primary_node
2880
    target_node = instance.secondary_nodes[0]
2881

    
2882
    feedback_fn("* checking disk consistency between source and target")
2883
    for dev in instance.disks:
2884
      # for remote_raid1, these are md over drbd
2885
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2886
        if instance.status == "up" and not self.op.ignore_consistency:
2887
          raise errors.OpExecError("Disk %s is degraded on target node,"
2888
                                   " aborting failover." % dev.iv_name)
2889

    
2890
    feedback_fn("* shutting down instance on source node")
2891
    logger.Info("Shutting down instance %s on node %s" %
2892
                (instance.name, source_node))
2893

    
2894
    if not rpc.call_instance_shutdown(source_node, instance):
2895
      if self.op.ignore_consistency:
2896
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2897
                     " anyway. Please make sure node %s is down"  %
2898
                     (instance.name, source_node, source_node))
2899
      else:
2900
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2901
                                 (instance.name, source_node))
2902

    
2903
    feedback_fn("* deactivating the instance's disks on source node")
2904
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2905
      raise errors.OpExecError("Can't shut down the instance's disks.")
2906

    
2907
    instance.primary_node = target_node
2908
    # distribute new instance config to the other nodes
2909
    self.cfg.AddInstance(instance)
2910

    
2911
    # Only start the instance if it's marked as up
2912
    if instance.status == "up":
2913
      feedback_fn("* activating the instance's disks on target node")
2914
      logger.Info("Starting instance %s on node %s" %
2915
                  (instance.name, target_node))
2916

    
2917
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2918
                                               ignore_secondaries=True)
2919
      if not disks_ok:
2920
        _ShutdownInstanceDisks(instance, self.cfg)
2921
        raise errors.OpExecError("Can't activate the instance's disks")
2922

    
2923
      feedback_fn("* starting the instance on the target node")
2924
      if not rpc.call_instance_start(target_node, instance, None):
2925
        _ShutdownInstanceDisks(instance, self.cfg)
2926
        raise errors.OpExecError("Could not start instance %s on node %s." %
2927
                                 (instance.name, target_node))
2928

    
2929

    
2930
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2931
  """Create a tree of block devices on the primary node.
2932

2933
  This always creates all devices.
2934

2935
  """
2936
  if device.children:
2937
    for child in device.children:
2938
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2939
        return False
2940

    
2941
  cfg.SetDiskID(device, node)
2942
  new_id = rpc.call_blockdev_create(node, device, device.size,
2943
                                    instance.name, True, info)
2944
  if not new_id:
2945
    return False
2946
  if device.physical_id is None:
2947
    device.physical_id = new_id
2948
  return True
2949

    
2950

    
2951
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2952
  """Create a tree of block devices on a secondary node.
2953

2954
  If this device type has to be created on secondaries, create it and
2955
  all its children.
2956

2957
  If not, just recurse to children keeping the same 'force' value.
2958

2959
  """
2960
  if device.CreateOnSecondary():
2961
    force = True
2962
  if device.children:
2963
    for child in device.children:
2964
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2965
                                        child, force, info):
2966
        return False
2967

    
2968
  if not force:
2969
    return True
2970
  cfg.SetDiskID(device, node)
2971
  new_id = rpc.call_blockdev_create(node, device, device.size,
2972
                                    instance.name, False, info)
2973
  if not new_id:
2974
    return False
2975
  if device.physical_id is None:
2976
    device.physical_id = new_id
2977
  return True
2978

    
2979

    
2980
def _GenerateUniqueNames(cfg, exts):
2981
  """Generate a suitable LV name.
2982

2983
  This will generate a logical volume name for the given instance.
2984

2985
  """
2986
  results = []
2987
  for val in exts:
2988
    new_id = cfg.GenerateUniqueID()
2989
    results.append("%s%s" % (new_id, val))
2990
  return results
2991

    
2992

    
2993
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2994
  """Generate a drbd device complete with its children.
2995

2996
  """
2997
  port = cfg.AllocatePort()
2998
  vgname = cfg.GetVGName()
2999
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3000
                          logical_id=(vgname, names[0]))
3001
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3002
                          logical_id=(vgname, names[1]))
3003
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3004
                          logical_id = (primary, secondary, port),
3005
                          children = [dev_data, dev_meta])
3006
  return drbd_dev
3007

    
3008

    
3009
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3010
  """Generate a drbd8 device complete with its children.
3011

3012
  """
3013
  port = cfg.AllocatePort()
3014
  vgname = cfg.GetVGName()
3015
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3016
                          logical_id=(vgname, names[0]))
3017
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3018
                          logical_id=(vgname, names[1]))
3019
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3020
                          logical_id = (primary, secondary, port),
3021
                          children = [dev_data, dev_meta],
3022
                          iv_name=iv_name)
3023
  return drbd_dev
3024

    
3025

    
3026
def _GenerateDiskTemplate(cfg, template_name,
3027
                          instance_name, primary_node,
3028
                          secondary_nodes, disk_sz, swap_sz,
3029
                          file_storage_dir, file_driver):
3030
  """Generate the entire disk layout for a given template type.
3031

3032
  """
3033
  #TODO: compute space requirements
3034

    
3035
  vgname = cfg.GetVGName()
3036
  if template_name == constants.DT_DISKLESS:
3037
    disks = []
3038
  elif template_name == constants.DT_PLAIN:
3039
    if len(secondary_nodes) != 0:
3040
      raise errors.ProgrammerError("Wrong template configuration")
3041

    
3042
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3043
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3044
                           logical_id=(vgname, names[0]),
3045
                           iv_name = "sda")
3046
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3047
                           logical_id=(vgname, names[1]),
3048
                           iv_name = "sdb")
3049
    disks = [sda_dev, sdb_dev]
3050
  elif template_name == constants.DT_DRBD8:
3051
    if len(secondary_nodes) != 1:
3052
      raise errors.ProgrammerError("Wrong template configuration")
3053
    remote_node = secondary_nodes[0]
3054
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3055
                                       ".sdb_data", ".sdb_meta"])
3056
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3057
                                         disk_sz, names[0:2], "sda")
3058
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3059
                                         swap_sz, names[2:4], "sdb")
3060
    disks = [drbd_sda_dev, drbd_sdb_dev]
3061
  elif template_name == constants.DT_FILE:
3062
    if len(secondary_nodes) != 0:
3063
      raise errors.ProgrammerError("Wrong template configuration")
3064

    
3065
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3066
                                iv_name="sda", logical_id=(file_driver,
3067
                                "%s/sda" % file_storage_dir))
3068
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3069
                                iv_name="sdb", logical_id=(file_driver,
3070
                                "%s/sdb" % file_storage_dir))
3071
    disks = [file_sda_dev, file_sdb_dev]
3072
  else:
3073
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3074
  return disks
3075

    
3076

    
3077
def _GetInstanceInfoText(instance):
3078
  """Compute that text that should be added to the disk's metadata.
3079

3080
  """
3081
  return "originstname+%s" % instance.name
3082

    
3083

    
3084
def _CreateDisks(cfg, instance):
3085
  """Create all disks for an instance.
3086

3087
  This abstracts away some work from AddInstance.
3088

3089
  Args:
3090
    instance: the instance object
3091

3092
  Returns:
3093
    True or False showing the success of the creation process
3094

3095
  """
3096
  info = _GetInstanceInfoText(instance)
3097

    
3098
  if instance.disk_template == constants.DT_FILE:
3099
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3100
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3101
                                              file_storage_dir)
3102

    
3103
    if not result:
3104
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3105
      return False
3106

    
3107
    if not result[0]:
3108
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3109
      return False
3110

    
3111
  for device in instance.disks:
3112
    logger.Info("creating volume %s for instance %s" %
3113
                (device.iv_name, instance.name))
3114
    #HARDCODE
3115
    for secondary_node in instance.secondary_nodes:
3116
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3117
                                        device, False, info):
3118
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3119
                     (device.iv_name, device, secondary_node))
3120
        return False
3121
    #HARDCODE
3122
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3123
                                    instance, device, info):
3124
      logger.Error("failed to create volume %s on primary!" %
3125
                   device.iv_name)
3126
      return False
3127

    
3128
  return True
3129

    
3130

    
3131
def _RemoveDisks(instance, cfg):
3132
  """Remove all disks for an instance.
3133

3134
  This abstracts away some work from `AddInstance()` and
3135
  `RemoveInstance()`. Note that in case some of the devices couldn't
3136
  be removed, the removal will continue with the other ones (compare
3137
  with `_CreateDisks()`).
3138

3139
  Args:
3140
    instance: the instance object
3141

3142
  Returns:
3143
    True or False showing the success of the removal proces
3144

3145
  """
3146
  logger.Info("removing block devices for instance %s" % instance.name)
3147

    
3148
  result = True
3149
  for device in instance.disks:
3150
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3151
      cfg.SetDiskID(disk, node)
3152
      if not rpc.call_blockdev_remove(node, disk):
3153
        logger.Error("could not remove block device %s on node %s,"
3154
                     " continuing anyway" %
3155
                     (device.iv_name, node))
3156
        result = False
3157

    
3158
  if instance.disk_template == constants.DT_FILE:
3159
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3160
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3161
                                            file_storage_dir):
3162
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3163
      result = False
3164

    
3165
  return result
3166

    
3167

    
3168
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3169
  """Compute disk size requirements in the volume group
3170

3171
  This is currently hard-coded for the two-drive layout.
3172

3173
  """
3174
  # Required free disk space as a function of disk and swap space
3175
  req_size_dict = {
3176
    constants.DT_DISKLESS: None,
3177
    constants.DT_PLAIN: disk_size + swap_size,
3178
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3179
    constants.DT_DRBD8: disk_size + swap_size + 256,
3180
    constants.DT_FILE: None,
3181
  }
3182

    
3183
  if disk_template not in req_size_dict:
3184
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3185
                                 " is unknown" %  disk_template)
3186

    
3187
  return req_size_dict[disk_template]
3188

    
3189

    
3190
class LUCreateInstance(LogicalUnit):
3191
  """Create an instance.
3192

3193
  """
3194
  HPATH = "instance-add"
3195
  HTYPE = constants.HTYPE_INSTANCE
3196
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3197
              "disk_template", "swap_size", "mode", "start", "vcpus",
3198
              "wait_for_sync", "ip_check", "mac"]
3199

    
3200
  def _RunAllocator(self):
3201
    """Run the allocator based on input opcode.
3202

3203
    """
3204
    disks = [{"size": self.op.disk_size, "mode": "w"},
3205
             {"size": self.op.swap_size, "mode": "w"}]
3206
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3207
             "bridge": self.op.bridge}]
3208
    ial = IAllocator(self.cfg, self.sstore,
3209
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3210
                     name=self.op.instance_name,
3211
                     disk_template=self.op.disk_template,
3212
                     tags=[],
3213
                     os=self.op.os_type,
3214
                     vcpus=self.op.vcpus,
3215
                     mem_size=self.op.mem_size,
3216
                     disks=disks,
3217
                     nics=nics,
3218
                     )
3219

    
3220
    ial.Run(self.op.iallocator)
3221

    
3222
    if not ial.success:
3223
      raise errors.OpPrereqError("Can't compute nodes using"
3224
                                 " iallocator '%s': %s" % (self.op.iallocator,
3225
                                                           ial.info))
3226
    if len(ial.nodes) != ial.required_nodes:
3227
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3228
                                 " of nodes (%s), required %s" %
3229
                                 (len(ial.nodes), ial.required_nodes))
3230
    self.op.pnode = ial.nodes[0]
3231
    logger.ToStdout("Selected nodes for the instance: %s" %
3232
                    (", ".join(ial.nodes),))
3233
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3234
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3235
    if ial.required_nodes == 2:
3236
      self.op.snode = ial.nodes[1]
3237

    
3238
  def BuildHooksEnv(self):
3239
    """Build hooks env.
3240

3241
    This runs on master, primary and secondary nodes of the instance.
3242

3243
    """
3244
    env = {
3245
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3246
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3247
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3248
      "INSTANCE_ADD_MODE": self.op.mode,
3249
      }
3250
    if self.op.mode == constants.INSTANCE_IMPORT:
3251
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3252
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3253
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3254

    
3255
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3256
      primary_node=self.op.pnode,
3257
      secondary_nodes=self.secondaries,
3258
      status=self.instance_status,
3259
      os_type=self.op.os_type,
3260
      memory=self.op.mem_size,
3261
      vcpus=self.op.vcpus,
3262
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3263
    ))
3264

    
3265
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3266
          self.secondaries)
3267
    return env, nl, nl
3268

    
3269

    
3270
  def CheckPrereq(self):
3271
    """Check prerequisites.
3272

3273
    """
3274
    # set optional parameters to none if they don't exist
3275
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3276
                 "iallocator"]:
3277
      if not hasattr(self.op, attr):
3278
        setattr(self.op, attr, None)
3279

    
3280
    if self.op.mode not in (constants.INSTANCE_CREATE,
3281
                            constants.INSTANCE_IMPORT):
3282
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3283
                                 self.op.mode)
3284

    
3285
    if (not self.cfg.GetVGName() and
3286
        self.op.disk_template not in constants.DTS_NOT_LVM):
3287
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3288
                                 " instances")
3289

    
3290
    if self.op.mode == constants.INSTANCE_IMPORT:
3291
      src_node = getattr(self.op, "src_node", None)
3292
      src_path = getattr(self.op, "src_path", None)
3293
      if src_node is None or src_path is None:
3294
        raise errors.OpPrereqError("Importing an instance requires source"
3295
                                   " node and path options")
3296
      src_node_full = self.cfg.ExpandNodeName(src_node)
3297
      if src_node_full is None:
3298
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3299
      self.op.src_node = src_node = src_node_full
3300

    
3301
      if not os.path.isabs(src_path):
3302
        raise errors.OpPrereqError("The source path must be absolute")
3303

    
3304
      export_info = rpc.call_export_info(src_node, src_path)
3305

    
3306
      if not export_info:
3307
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3308

    
3309
      if not export_info.has_section(constants.INISECT_EXP):
3310
        raise errors.ProgrammerError("Corrupted export config")
3311

    
3312
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3313
      if (int(ei_version) != constants.EXPORT_VERSION):
3314
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3315
                                   (ei_version, constants.EXPORT_VERSION))
3316

    
3317
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3318
        raise errors.OpPrereqError("Can't import instance with more than"
3319
                                   " one data disk")
3320

    
3321
      # FIXME: are the old os-es, disk sizes, etc. useful?
3322
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3323
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3324
                                                         'disk0_dump'))
3325
      self.src_image = diskimage
3326
    else: # INSTANCE_CREATE
3327
      if getattr(self.op, "os_type", None) is None:
3328
        raise errors.OpPrereqError("No guest OS specified")
3329

    
3330
    #### instance parameters check
3331

    
3332
    # disk template and mirror node verification
3333
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3334
      raise errors.OpPrereqError("Invalid disk template name")
3335

    
3336
    # instance name verification
3337
    hostname1 = utils.HostInfo(self.op.instance_name)
3338

    
3339
    self.op.instance_name = instance_name = hostname1.name
3340
    instance_list = self.cfg.GetInstanceList()
3341
    if instance_name in instance_list:
3342
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3343
                                 instance_name)
3344

    
3345
    # ip validity checks
3346
    ip = getattr(self.op, "ip", None)
3347
    if ip is None or ip.lower() == "none":
3348
      inst_ip = None
3349
    elif ip.lower() == "auto":
3350
      inst_ip = hostname1.ip
3351
    else:
3352
      if not utils.IsValidIP(ip):
3353
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3354
                                   " like a valid IP" % ip)
3355
      inst_ip = ip
3356
    self.inst_ip = self.op.ip = inst_ip
3357

    
3358
    if self.op.start and not self.op.ip_check:
3359
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3360
                                 " adding an instance in start mode")
3361

    
3362
    if self.op.ip_check:
3363
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3364
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3365
                                   (hostname1.ip, instance_name))
3366

    
3367
    # MAC address verification
3368
    if self.op.mac != "auto":
3369
      if not utils.IsValidMac(self.op.mac.lower()):
3370
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3371
                                   self.op.mac)
3372

    
3373
    # bridge verification
3374
    bridge = getattr(self.op, "bridge", None)
3375
    if bridge is None:
3376
      self.op.bridge = self.cfg.GetDefBridge()
3377
    else:
3378
      self.op.bridge = bridge
3379

    
3380
    # boot order verification
3381
    if self.op.hvm_boot_order is not None:
3382
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3383
        raise errors.OpPrereqError("invalid boot order specified,"
3384
                                   " must be one or more of [acdn]")
3385
    # file storage checks
3386
    if (self.op.file_driver and
3387
        not self.op.file_driver in constants.FILE_DRIVER):
3388
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3389
                                 self.op.file_driver)
3390

    
3391
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3392
      raise errors.OpPrereqError("File storage directory not a relative"
3393
                                 " path")
3394
    #### allocator run
3395

    
3396
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3397
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3398
                                 " node must be given")
3399

    
3400
    if self.op.iallocator is not None:
3401
      self._RunAllocator()
3402

    
3403
    #### node related checks
3404

    
3405
    # check primary node
3406
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3407
    if pnode is None:
3408
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3409
                                 self.op.pnode)
3410
    self.op.pnode = pnode.name
3411
    self.pnode = pnode
3412
    self.secondaries = []
3413

    
3414
    # mirror node verification
3415
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3416
      if getattr(self.op, "snode", None) is None:
3417
        raise errors.OpPrereqError("The networked disk templates need"
3418
                                   " a mirror node")
3419

    
3420
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3421
      if snode_name is None:
3422
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3423
                                   self.op.snode)
3424
      elif snode_name == pnode.name:
3425
        raise errors.OpPrereqError("The secondary node cannot be"
3426
                                   " the primary node.")
3427
      self.secondaries.append(snode_name)
3428

    
3429
    req_size = _ComputeDiskSize(self.op.disk_template,
3430
                                self.op.disk_size, self.op.swap_size)
3431

    
3432
    # Check lv size requirements
3433
    if req_size is not None:
3434
      nodenames = [pnode.name] + self.secondaries
3435
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3436
      for node in nodenames:
3437
        info = nodeinfo.get(node, None)
3438
        if not info:
3439
          raise errors.OpPrereqError("Cannot get current information"
3440
                                     " from node '%s'" % nodeinfo)
3441
        vg_free = info.get('vg_free', None)
3442
        if not isinstance(vg_free, int):
3443
          raise errors.OpPrereqError("Can't compute free disk space on"
3444
                                     " node %s" % node)
3445
        if req_size > info['vg_free']:
3446
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3447
                                     " %d MB available, %d MB required" %
3448
                                     (node, info['vg_free'], req_size))
3449

    
3450
    # os verification
3451
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3452
    if not os_obj:
3453
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3454
                                 " primary node"  % self.op.os_type)
3455

    
3456
    if self.op.kernel_path == constants.VALUE_NONE:
3457
      raise errors.OpPrereqError("Can't set instance kernel to none")
3458

    
3459

    
3460
    # bridge check on primary node
3461
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3462
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3463
                                 " destination node '%s'" %
3464
                                 (self.op.bridge, pnode.name))
3465

    
3466
    if self.op.start:
3467
      self.instance_status = 'up'
3468
    else:
3469
      self.instance_status = 'down'
3470

    
3471
  def Exec(self, feedback_fn):
3472
    """Create and add the instance to the cluster.
3473

3474
    """
3475
    instance = self.op.instance_name
3476
    pnode_name = self.pnode.name
3477

    
3478
    if self.op.mac == "auto":
3479
      mac_address = self.cfg.GenerateMAC()
3480
    else:
3481
      mac_address = self.op.mac
3482

    
3483
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3484
    if self.inst_ip is not None:
3485
      nic.ip = self.inst_ip
3486

    
3487
    ht_kind = self.sstore.GetHypervisorType()
3488
    if ht_kind in constants.HTS_REQ_PORT:
3489
      network_port = self.cfg.AllocatePort()
3490
    else:
3491
      network_port = None
3492

    
3493
    # this is needed because os.path.join does not accept None arguments
3494
    if self.op.file_storage_dir is None:
3495
      string_file_storage_dir = ""
3496
    else:
3497
      string_file_storage_dir = self.op.file_storage_dir
3498

    
3499
    # build the full file storage dir path
3500
    file_storage_dir = os.path.normpath(os.path.join(
3501
                                        self.sstore.GetFileStorageDir(),
3502
                                        string_file_storage_dir, instance))
3503

    
3504

    
3505
    disks = _GenerateDiskTemplate(self.cfg,
3506
                                  self.op.disk_template,
3507
                                  instance, pnode_name,
3508
                                  self.secondaries, self.op.disk_size,
3509
                                  self.op.swap_size,
3510
                                  file_storage_dir,
3511
                                  self.op.file_driver)
3512

    
3513
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3514
                            primary_node=pnode_name,
3515
                            memory=self.op.mem_size,
3516
                            vcpus=self.op.vcpus,
3517
                            nics=[nic], disks=disks,
3518
                            disk_template=self.op.disk_template,
3519
                            status=self.instance_status,
3520
                            network_port=network_port,
3521
                            kernel_path=self.op.kernel_path,
3522
                            initrd_path=self.op.initrd_path,
3523
                            hvm_boot_order=self.op.hvm_boot_order,
3524
                            )
3525

    
3526
    feedback_fn("* creating instance disks...")
3527
    if not _CreateDisks(self.cfg, iobj):
3528
      _RemoveDisks(iobj, self.cfg)
3529
      raise errors.OpExecError("Device creation failed, reverting...")
3530

    
3531
    feedback_fn("adding instance %s to cluster config" % instance)
3532

    
3533
    self.cfg.AddInstance(iobj)
3534

    
3535
    if self.op.wait_for_sync:
3536
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3537
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3538
      # make sure the disks are not degraded (still sync-ing is ok)
3539
      time.sleep(15)
3540
      feedback_fn("* checking mirrors status")
3541
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3542
    else:
3543
      disk_abort = False
3544

    
3545
    if disk_abort:
3546
      _RemoveDisks(iobj, self.cfg)
3547
      self.cfg.RemoveInstance(iobj.name)
3548
      raise errors.OpExecError("There are some degraded disks for"
3549
                               " this instance")
3550

    
3551
    feedback_fn("creating os for instance %s on node %s" %
3552
                (instance, pnode_name))
3553

    
3554
    if iobj.disk_template != constants.DT_DISKLESS:
3555
      if self.op.mode == constants.INSTANCE_CREATE:
3556
        feedback_fn("* running the instance OS create scripts...")
3557
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3558
          raise errors.OpExecError("could not add os for instance %s"
3559
                                   " on node %s" %
3560
                                   (instance, pnode_name))
3561

    
3562
      elif self.op.mode == constants.INSTANCE_IMPORT:
3563
        feedback_fn("* running the instance OS import scripts...")
3564
        src_node = self.op.src_node
3565
        src_image = self.src_image
3566
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3567
                                                src_node, src_image):
3568
          raise errors.OpExecError("Could not import os for instance"
3569
                                   " %s on node %s" %
3570
                                   (instance, pnode_name))
3571
      else:
3572
        # also checked in the prereq part
3573
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3574
                                     % self.op.mode)
3575

    
3576
    if self.op.start:
3577
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3578
      feedback_fn("* starting instance...")
3579
      if not rpc.call_instance_start(pnode_name, iobj, None):
3580
        raise errors.OpExecError("Could not start instance")
3581

    
3582

    
3583
class LUConnectConsole(NoHooksLU):
3584
  """Connect to an instance's console.
3585

3586
  This is somewhat special in that it returns the command line that
3587
  you need to run on the master node in order to connect to the
3588
  console.
3589

3590
  """
3591
  _OP_REQP = ["instance_name"]
3592

    
3593
  def CheckPrereq(self):
3594
    """Check prerequisites.
3595

3596
    This checks that the instance is in the cluster.
3597

3598
    """
3599
    instance = self.cfg.GetInstanceInfo(
3600
      self.cfg.ExpandInstanceName(self.op.instance_name))
3601
    if instance is None:
3602
      raise errors.OpPrereqError("Instance '%s' not known" %
3603
                                 self.op.instance_name)
3604
    self.instance = instance
3605

    
3606
  def Exec(self, feedback_fn):
3607
    """Connect to the console of an instance
3608

3609
    """
3610
    instance = self.instance
3611
    node = instance.primary_node
3612

    
3613
    node_insts = rpc.call_instance_list([node])[node]
3614
    if node_insts is False:
3615
      raise errors.OpExecError("Can't connect to node %s." % node)
3616

    
3617
    if instance.name not in node_insts:
3618
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3619

    
3620
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3621

    
3622
    hyper = hypervisor.GetHypervisor()
3623
    console_cmd = hyper.GetShellCommandForConsole(instance)
3624

    
3625
    # build ssh cmdline
3626
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3627

    
3628

    
3629
class LUReplaceDisks(LogicalUnit):
3630
  """Replace the disks of an instance.
3631

3632
  """
3633
  HPATH = "mirrors-replace"
3634
  HTYPE = constants.HTYPE_INSTANCE
3635
  _OP_REQP = ["instance_name", "mode", "disks"]
3636

    
3637
  def _RunAllocator(self):
3638
    """Compute a new secondary node using an IAllocator.
3639

3640
    """
3641
    ial = IAllocator(self.cfg, self.sstore,
3642
                     mode=constants.IALLOCATOR_MODE_RELOC,
3643
                     name=self.op.instance_name,
3644
                     relocate_from=[self.sec_node])
3645

    
3646
    ial.Run(self.op.iallocator)
3647

    
3648
    if not ial.success:
3649
      raise errors.OpPrereqError("Can't compute nodes using"
3650
                                 " iallocator '%s': %s" % (self.op.iallocator,
3651
                                                           ial.info))
3652
    if len(ial.nodes) != ial.required_nodes:
3653
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3654
                                 " of nodes (%s), required %s" %
3655
                                 (len(ial.nodes), ial.required_nodes))
3656
    self.op.remote_node = ial.nodes[0]
3657
    logger.ToStdout("Selected new secondary for the instance: %s" %
3658
                    self.op.remote_node)
3659

    
3660
  def BuildHooksEnv(self):
3661
    """Build hooks env.
3662

3663
    This runs on the master, the primary and all the secondaries.
3664

3665
    """
3666
    env = {
3667
      "MODE": self.op.mode,
3668
      "NEW_SECONDARY": self.op.remote_node,
3669
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3670
      }
3671
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3672
    nl = [
3673
      self.sstore.GetMasterNode(),
3674
      self.instance.primary_node,
3675
      ]
3676
    if self.op.remote_node is not None:
3677
      nl.append(self.op.remote_node)
3678
    return env, nl, nl
3679

    
3680
  def CheckPrereq(self):
3681
    """Check prerequisites.
3682

3683
    This checks that the instance is in the cluster.
3684

3685
    """
3686
    if not hasattr(self.op, "remote_node"):
3687
      self.op.remote_node = None
3688

    
3689
    instance = self.cfg.GetInstanceInfo(
3690
      self.cfg.ExpandInstanceName(self.op.instance_name))
3691
    if instance is None:
3692
      raise errors.OpPrereqError("Instance '%s' not known" %
3693
                                 self.op.instance_name)
3694
    self.instance = instance
3695
    self.op.instance_name = instance.name
3696

    
3697
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3698
      raise errors.OpPrereqError("Instance's disk layout is not"
3699
                                 " network mirrored.")
3700

    
3701
    if len(instance.secondary_nodes) != 1:
3702
      raise errors.OpPrereqError("The instance has a strange layout,"
3703
                                 " expected one secondary but found %d" %
3704
                                 len(instance.secondary_nodes))
3705

    
3706
    self.sec_node = instance.secondary_nodes[0]
3707

    
3708
    ia_name = getattr(self.op, "iallocator", None)
3709
    if ia_name is not None:
3710
      if self.op.remote_node is not None:
3711
        raise errors.OpPrereqError("Give either the iallocator or the new"
3712
                                   " secondary, not both")
3713
      self.op.remote_node = self._RunAllocator()
3714

    
3715
    remote_node = self.op.remote_node
3716
    if remote_node is not None:
3717
      remote_node = self.cfg.ExpandNodeName(remote_node)
3718
      if remote_node is None:
3719
        raise errors.OpPrereqError("Node '%s' not known" %
3720
                                   self.op.remote_node)
3721
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3722
    else:
3723
      self.remote_node_info = None
3724
    if remote_node == instance.primary_node:
3725
      raise errors.OpPrereqError("The specified node is the primary node of"
3726
                                 " the instance.")
3727
    elif remote_node == self.sec_node:
3728
      if self.op.mode == constants.REPLACE_DISK_SEC:
3729
        # this is for DRBD8, where we can't execute the same mode of
3730
        # replacement as for drbd7 (no different port allocated)
3731
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3732
                                   " replacement")
3733
      # the user gave the current secondary, switch to
3734
      # 'no-replace-secondary' mode for drbd7
3735
      remote_node = None
3736
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3737
        self.op.mode != constants.REPLACE_DISK_ALL):
3738
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3739
                                 " disks replacement, not individual ones")
3740
    if instance.disk_template == constants.DT_DRBD8:
3741
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3742
          remote_node is not None):
3743
        # switch to replace secondary mode
3744
        self.op.mode = constants.REPLACE_DISK_SEC
3745

    
3746
      if self.op.mode == constants.REPLACE_DISK_ALL:
3747
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3748
                                   " secondary disk replacement, not"
3749
                                   " both at once")
3750
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3751
        if remote_node is not None:
3752
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3753
                                     " the secondary while doing a primary"
3754
                                     " node disk replacement")
3755
        self.tgt_node = instance.primary_node
3756
        self.oth_node = instance.secondary_nodes[0]
3757
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3758
        self.new_node = remote_node # this can be None, in which case
3759
                                    # we don't change the secondary
3760
        self.tgt_node = instance.secondary_nodes[0]
3761
        self.oth_node = instance.primary_node
3762
      else:
3763
        raise errors.ProgrammerError("Unhandled disk replace mode")
3764

    
3765
    for name in self.op.disks:
3766
      if instance.FindDisk(name) is None:
3767
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3768
                                   (name, instance.name))
3769
    self.op.remote_node = remote_node
3770

    
3771
  def _ExecRR1(self, feedback_fn):
3772
    """Replace the disks of an instance.
3773

3774
    """
3775
    instance = self.instance
3776
    iv_names = {}
3777
    # start of work
3778
    if self.op.remote_node is None:
3779
      remote_node = self.sec_node
3780
    else:
3781
      remote_node = self.op.remote_node
3782
    cfg = self.cfg
3783
    for dev in instance.disks:
3784
      size = dev.size
3785
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3786
      names = _GenerateUniqueNames(cfg, lv_names)
3787
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3788
                                       remote_node, size, names)
3789
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3790
      logger.Info("adding new mirror component on secondary for %s" %
3791
                  dev.iv_name)
3792
      #HARDCODE
3793
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3794
                                        new_drbd, False,
3795
                                        _GetInstanceInfoText(instance)):
3796
        raise errors.OpExecError("Failed to create new component on secondary"
3797
                                 " node %s. Full abort, cleanup manually!" %
3798
                                 remote_node)
3799

    
3800
      logger.Info("adding new mirror component on primary")
3801
      #HARDCODE
3802
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3803
                                      instance, new_drbd,
3804
                                      _GetInstanceInfoText(instance)):
3805
        # remove secondary dev
3806
        cfg.SetDiskID(new_drbd, remote_node)
3807
        rpc.call_blockdev_remove(remote_node, new_drbd)
3808
        raise errors.OpExecError("Failed to create volume on primary!"
3809
                                 " Full abort, cleanup manually!!")
3810

    
3811
      # the device exists now
3812
      # call the primary node to add the mirror to md
3813
      logger.Info("adding new mirror component to md")
3814
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3815
                                           [new_drbd]):
3816
        logger.Error("Can't add mirror compoment to md!")
3817
        cfg.SetDiskID(new_drbd, remote_node)
3818
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3819
          logger.Error("Can't rollback on secondary")
3820
        cfg.SetDiskID(new_drbd, instance.primary_node)
3821
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3822
          logger.Error("Can't rollback on primary")
3823
        raise errors.OpExecError("Full abort, cleanup manually!!")
3824

    
3825
      dev.children.append(new_drbd)
3826
      cfg.AddInstance(instance)
3827

    
3828
    # this can fail as the old devices are degraded and _WaitForSync
3829
    # does a combined result over all disks, so we don't check its
3830
    # return value
3831
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3832

    
3833
    # so check manually all the devices
3834
    for name in iv_names:
3835
      dev, child, new_drbd = iv_names[name]
3836
      cfg.SetDiskID(dev, instance.primary_node)
3837
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3838
      if is_degr:
3839
        raise errors.OpExecError("MD device %s is degraded!" % name)
3840
      cfg.SetDiskID(new_drbd, instance.primary_node)
3841
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3842
      if is_degr:
3843
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3844

    
3845
    for name in iv_names:
3846
      dev, child, new_drbd = iv_names[name]
3847
      logger.Info("remove mirror %s component" % name)
3848
      cfg.SetDiskID(dev, instance.primary_node)
3849
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3850
                                              dev, [child]):
3851
        logger.Error("Can't remove child from mirror, aborting"
3852
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3853
        continue
3854

    
3855
      for node in child.logical_id[:2]:
3856
        logger.Info("remove child device on %s" % node)
3857
        cfg.SetDiskID(child, node)
3858
        if not rpc.call_blockdev_remove(node, child):
3859
          logger.Error("Warning: failed to remove device from node %s,"
3860
                       " continuing operation." % node)
3861

    
3862
      dev.children.remove(child)
3863

    
3864
      cfg.AddInstance(instance)
3865

    
3866
  def _ExecD8DiskOnly(self, feedback_fn):
3867
    """Replace a disk on the primary or secondary for dbrd8.
3868

3869
    The algorithm for replace is quite complicated:
3870
      - for each disk to be replaced:
3871
        - create new LVs on the target node with unique names
3872
        - detach old LVs from the drbd device
3873
        - rename old LVs to name_replaced.<time_t>
3874
        - rename new LVs to old LVs
3875
        - attach the new LVs (with the old names now) to the drbd device
3876
      - wait for sync across all devices
3877
      - for each modified disk:
3878
        - remove old LVs (which have the name name_replaces.<time_t>)
3879

3880
    Failures are not very well handled.
3881

3882
    """
3883
    steps_total = 6
3884
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3885
    instance = self.instance
3886
    iv_names = {}
3887
    vgname = self.cfg.GetVGName()
3888
    # start of work
3889
    cfg = self.cfg
3890
    tgt_node = self.tgt_node
3891
    oth_node = self.oth_node
3892

    
3893
    # Step: check device activation
3894
    self.proc.LogStep(1, steps_total, "check device existence")
3895
    info("checking volume groups")
3896
    my_vg = cfg.GetVGName()
3897
    results = rpc.call_vg_list([oth_node, tgt_node])
3898
    if not results:
3899
      raise errors.OpExecError("Can't list volume groups on the nodes")
3900
    for node in oth_node, tgt_node:
3901
      res = results.get(node, False)
3902
      if not res or my_vg not in res:
3903
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3904
                                 (my_vg, node))
3905
    for dev in instance.disks:
3906
      if not dev.iv_name in self.op.disks:
3907
        continue
3908
      for node in tgt_node, oth_node:
3909
        info("checking %s on %s" % (dev.iv_name, node))
3910
        cfg.SetDiskID(dev, node)
3911
        if not rpc.call_blockdev_find(node, dev):
3912
          raise errors.OpExecError("Can't find device %s on node %s" %
3913
                                   (dev.iv_name, node))
3914

    
3915
    # Step: check other node consistency
3916
    self.proc.LogStep(2, steps_total, "check peer consistency")
3917
    for dev in instance.disks:
3918
      if not dev.iv_name in self.op.disks:
3919
        continue
3920
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3921
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3922
                                   oth_node==instance.primary_node):
3923
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3924
                                 " to replace disks on this node (%s)" %
3925
                                 (oth_node, tgt_node))
3926

    
3927
    # Step: create new storage
3928
    self.proc.LogStep(3, steps_total, "allocate new storage")
3929
    for dev in instance.disks:
3930
      if not dev.iv_name in self.op.disks:
3931
        continue
3932
      size = dev.size
3933
      cfg.SetDiskID(dev, tgt_node)
3934
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3935
      names = _GenerateUniqueNames(cfg, lv_names)
3936
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3937
                             logical_id=(vgname, names[0]))
3938
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3939
                             logical_id=(vgname, names[1]))
3940
      new_lvs = [lv_data, lv_meta]
3941
      old_lvs = dev.children
3942
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3943
      info("creating new local storage on %s for %s" %
3944
           (tgt_node, dev.iv_name))
3945
      # since we *always* want to create this LV, we use the
3946
      # _Create...OnPrimary (which forces the creation), even if we
3947
      # are talking about the secondary node
3948
      for new_lv in new_lvs:
3949
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3950
                                        _GetInstanceInfoText(instance)):
3951
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3952
                                   " node '%s'" %
3953
                                   (new_lv.logical_id[1], tgt_node))
3954

    
3955
    # Step: for each lv, detach+rename*2+attach
3956
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3957
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3958
      info("detaching %s drbd from local storage" % dev.iv_name)
3959
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3960
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3961
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3962
      #dev.children = []
3963
      #cfg.Update(instance)
3964

    
3965
      # ok, we created the new LVs, so now we know we have the needed
3966
      # storage; as such, we proceed on the target node to rename
3967
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3968
      # using the assumption that logical_id == physical_id (which in
3969
      # turn is the unique_id on that node)
3970

    
3971
      # FIXME(iustin): use a better name for the replaced LVs
3972
      temp_suffix = int(time.time())
3973
      ren_fn = lambda d, suff: (d.physical_id[0],
3974
                                d.physical_id[1] + "_replaced-%s" % suff)
3975
      # build the rename list based on what LVs exist on the node
3976
      rlist = []
3977
      for to_ren in old_lvs:
3978
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3979
        if find_res is not None: # device exists
3980
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3981

    
3982
      info("renaming the old LVs on the target node")
3983
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3984
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3985
      # now we rename the new LVs to the old LVs
3986
      info("renaming the new LVs on the target node")
3987
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3988
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3989
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3990

    
3991
      for old, new in zip(old_lvs, new_lvs):
3992
        new.logical_id = old.logical_id
3993
        cfg.SetDiskID(new, tgt_node)
3994

    
3995
      for disk in old_lvs:
3996
        disk.logical_id = ren_fn(disk, temp_suffix)
3997
        cfg.SetDiskID(disk, tgt_node)
3998

    
3999
      # now that the new lvs have the old name, we can add them to the device
4000
      info("adding new mirror component on %s" % tgt_node)
4001
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4002
        for new_lv in new_lvs:
4003
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
4004
            warning("Can't rollback device %s", hint="manually cleanup unused"
4005
                    " logical volumes")
4006
        raise errors.OpExecError("Can't add local storage to drbd")
4007

    
4008
      dev.children = new_lvs
4009
      cfg.Update(instance)
4010

    
4011
    # Step: wait for sync
4012

    
4013
    # this can fail as the old devices are degraded and _WaitForSync
4014
    # does a combined result over all disks, so we don't check its
4015
    # return value
4016
    self.proc.LogStep(5, steps_total, "sync devices")
4017
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4018

    
4019
    # so check manually all the devices
4020
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4021
      cfg.SetDiskID(dev, instance.primary_node)
4022
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4023
      if is_degr:
4024
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4025

    
4026
    # Step: remove old storage
4027
    self.proc.LogStep(6, steps_total, "removing old storage")
4028
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4029
      info("remove logical volumes for %s" % name)
4030
      for lv in old_lvs:
4031
        cfg.SetDiskID(lv, tgt_node)
4032
        if not rpc.call_blockdev_remove(tgt_node, lv):
4033
          warning("Can't remove old LV", hint="manually remove unused LVs")
4034
          continue
4035

    
4036
  def _ExecD8Secondary(self, feedback_fn):
4037
    """Replace the secondary node for drbd8.
4038

4039
    The algorithm for replace is quite complicated:
4040
      - for all disks of the instance:
4041
        - create new LVs on the new node with same names
4042
        - shutdown the drbd device on the old secondary
4043
        - disconnect the drbd network on the primary
4044
        - create the drbd device on the new secondary
4045
        - network attach the drbd on the primary, using an artifice:
4046
          the drbd code for Attach() will connect to the network if it
4047
          finds a device which is connected to the good local disks but
4048
          not network enabled
4049
      - wait for sync across all devices
4050
      - remove all disks from the old secondary
4051

4052
    Failures are not very well handled.
4053

4054
    """
4055
    steps_total = 6
4056
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4057
    instance = self.instance
4058
    iv_names = {}
4059
    vgname = self.cfg.GetVGName()
4060
    # start of work
4061
    cfg = self.cfg
4062
    old_node = self.tgt_node
4063
    new_node = self.new_node
4064
    pri_node = instance.primary_node
4065

    
4066
    # Step: check device activation
4067
    self.proc.LogStep(1, steps_total, "check device existence")
4068
    info("checking volume groups")
4069
    my_vg = cfg.GetVGName()
4070
    results = rpc.call_vg_list([pri_node, new_node])
4071
    if not results:
4072
      raise errors.OpExecError("Can't list volume groups on the nodes")
4073
    for node in pri_node, new_node:
4074
      res = results.get(node, False)
4075
      if not res or my_vg not in res:
4076
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4077
                                 (my_vg, node))
4078
    for dev in instance.disks:
4079
      if not dev.iv_name in self.op.disks:
4080
        continue
4081
      info("checking %s on %s" % (dev.iv_name, pri_node))
4082
      cfg.SetDiskID(dev, pri_node)
4083
      if not rpc.call_blockdev_find(pri_node, dev):
4084
        raise errors.OpExecError("Can't find device %s on node %s" %
4085
                                 (dev.iv_name, pri_node))
4086

    
4087
    # Step: check other node consistency
4088
    self.proc.LogStep(2, steps_total, "check peer consistency")
4089
    for dev in instance.disks:
4090
      if not dev.iv_name in self.op.disks:
4091
        continue
4092
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4093
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4094
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4095
                                 " unsafe to replace the secondary" %
4096
                                 pri_node)
4097

    
4098
    # Step: create new storage
4099
    self.proc.LogStep(3, steps_total, "allocate new storage")
4100
    for dev in instance.disks:
4101
      size = dev.size
4102
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4103
      # since we *always* want to create this LV, we use the
4104
      # _Create...OnPrimary (which forces the creation), even if we
4105
      # are talking about the secondary node
4106
      for new_lv in dev.children:
4107
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4108
                                        _GetInstanceInfoText(instance)):
4109
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4110
                                   " node '%s'" %
4111
                                   (new_lv.logical_id[1], new_node))
4112

    
4113
      iv_names[dev.iv_name] = (dev, dev.children)
4114

    
4115
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4116
    for dev in instance.disks:
4117
      size = dev.size
4118
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4119
      # create new devices on new_node
4120
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4121
                              logical_id=(pri_node, new_node,
4122
                                          dev.logical_id[2]),
4123
                              children=dev.children)
4124
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4125
                                        new_drbd, False,
4126
                                      _GetInstanceInfoText(instance)):
4127
        raise errors.OpExecError("Failed to create new DRBD on"
4128
                                 " node '%s'" % new_node)
4129

    
4130
    for dev in instance.disks:
4131
      # we have new devices, shutdown the drbd on the old secondary
4132
      info("shutting down drbd for %s on old node" % dev.iv_name)
4133
      cfg.SetDiskID(dev, old_node)
4134
      if not rpc.call_blockdev_shutdown(old_node, dev):
4135
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4136
                hint="Please cleanup this device manually as soon as possible")
4137

    
4138
    info("detaching primary drbds from the network (=> standalone)")
4139
    done = 0
4140
    for dev in instance.disks:
4141
      cfg.SetDiskID(dev, pri_node)
4142
      # set the physical (unique in bdev terms) id to None, meaning
4143
      # detach from network
4144
      dev.physical_id = (None,) * len(dev.physical_id)
4145
      # and 'find' the device, which will 'fix' it to match the
4146
      # standalone state
4147
      if rpc.call_blockdev_find(pri_node, dev):
4148
        done += 1
4149
      else:
4150
        warning("Failed to detach drbd %s from network, unusual case" %
4151
                dev.iv_name)
4152

    
4153
    if not done:
4154
      # no detaches succeeded (very unlikely)
4155
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4156

    
4157
    # if we managed to detach at least one, we update all the disks of
4158
    # the instance to point to the new secondary
4159
    info("updating instance configuration")
4160
    for dev in instance.disks:
4161
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4162
      cfg.SetDiskID(dev, pri_node)
4163
    cfg.Update(instance)
4164

    
4165
    # and now perform the drbd attach
4166
    info("attaching primary drbds to new secondary (standalone => connected)")
4167
    failures = []
4168
    for dev in instance.disks:
4169
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4170
      # since the attach is smart, it's enough to 'find' the device,
4171
      # it will automatically activate the network, if the physical_id
4172
      # is correct
4173
      cfg.SetDiskID(dev, pri_node)
4174
      if not rpc.call_blockdev_find(pri_node, dev):
4175
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4176
                "please do a gnt-instance info to see the status of disks")
4177

    
4178
    # this can fail as the old devices are degraded and _WaitForSync
4179
    # does a combined result over all disks, so we don't check its
4180
    # return value
4181
    self.proc.LogStep(5, steps_total, "sync devices")
4182
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4183

    
4184
    # so check manually all the devices
4185
    for name, (dev, old_lvs) in iv_names.iteritems():
4186
      cfg.SetDiskID(dev, pri_node)
4187
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4188
      if is_degr:
4189
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4190

    
4191
    self.proc.LogStep(6, steps_total, "removing old storage")
4192
    for name, (dev, old_lvs) in iv_names.iteritems():
4193
      info("remove logical volumes for %s" % name)
4194
      for lv in old_lvs:
4195
        cfg.SetDiskID(lv, old_node)
4196
        if not rpc.call_blockdev_remove(old_node, lv):
4197
          warning("Can't remove LV on old secondary",
4198
                  hint="Cleanup stale volumes by hand")
4199

    
4200
  def Exec(self, feedback_fn):
4201
    """Execute disk replacement.
4202

4203
    This dispatches the disk replacement to the appropriate handler.
4204

4205
    """
4206
    instance = self.instance
4207
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4208
      fn = self._ExecRR1
4209
    elif instance.disk_template == constants.DT_DRBD8:
4210
      if self.op.remote_node is None:
4211
        fn = self._ExecD8DiskOnly
4212
      else:
4213
        fn = self._ExecD8Secondary
4214
    else:
4215
      raise errors.ProgrammerError("Unhandled disk replacement case")
4216
    return fn(feedback_fn)
4217

    
4218

    
4219
class LUQueryInstanceData(NoHooksLU):
4220
  """Query runtime instance data.
4221

4222
  """
4223
  _OP_REQP = ["instances"]
4224

    
4225
  def CheckPrereq(self):
4226
    """Check prerequisites.
4227

4228
    This only checks the optional instance list against the existing names.
4229

4230
    """
4231
    if not isinstance(self.op.instances, list):
4232
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4233
    if self.op.instances:
4234
      self.wanted_instances = []
4235
      names = self.op.instances
4236
      for name in names:
4237
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4238
        if instance is None:
4239
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4240
        self.wanted_instances.append(instance)
4241
    else:
4242
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4243
                               in self.cfg.GetInstanceList()]
4244
    return
4245

    
4246

    
4247
  def _ComputeDiskStatus(self, instance, snode, dev):
4248
    """Compute block device status.
4249

4250
    """
4251
    self.cfg.SetDiskID(dev, instance.primary_node)
4252
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4253
    if dev.dev_type in constants.LDS_DRBD:
4254
      # we change the snode then (otherwise we use the one passed in)
4255
      if dev.logical_id[0] == instance.primary_node:
4256
        snode = dev.logical_id[1]
4257
      else:
4258
        snode = dev.logical_id[0]
4259

    
4260
    if snode:
4261
      self.cfg.SetDiskID(dev, snode)
4262
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4263
    else:
4264
      dev_sstatus = None
4265

    
4266
    if dev.children:
4267
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4268
                      for child in dev.children]
4269
    else:
4270
      dev_children = []
4271

    
4272
    data = {
4273
      "iv_name": dev.iv_name,
4274
      "dev_type": dev.dev_type,
4275
      "logical_id": dev.logical_id,
4276
      "physical_id": dev.physical_id,
4277
      "pstatus": dev_pstatus,
4278
      "sstatus": dev_sstatus,
4279
      "children": dev_children,
4280
      }
4281

    
4282
    return data
4283

    
4284
  def Exec(self, feedback_fn):
4285
    """Gather and return data"""
4286
    result = {}
4287
    for instance in self.wanted_instances:
4288
      remote_info = rpc.call_instance_info(instance.primary_node,
4289
                                                instance.name)
4290
      if remote_info and "state" in remote_info:
4291
        remote_state = "up"
4292
      else:
4293
        remote_state = "down"
4294
      if instance.status == "down":
4295
        config_state = "down"
4296
      else:
4297
        config_state = "up"
4298

    
4299
      disks = [self._ComputeDiskStatus(instance, None, device)
4300
               for device in instance.disks]
4301

    
4302
      idict = {
4303
        "name": instance.name,
4304
        "config_state": config_state,
4305
        "run_state": remote_state,
4306
        "pnode": instance.primary_node,
4307
        "snodes": instance.secondary_nodes,
4308
        "os": instance.os,
4309
        "memory": instance.memory,
4310
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4311
        "disks": disks,
4312
        "network_port": instance.network_port,
4313
        "vcpus": instance.vcpus,
4314
        "kernel_path": instance.kernel_path,
4315
        "initrd_path": instance.initrd_path,
4316
        "hvm_boot_order": instance.hvm_boot_order,
4317
        }
4318

    
4319
      result[instance.name] = idict
4320

    
4321
    return result
4322

    
4323

    
4324
class LUSetInstanceParams(LogicalUnit):
4325
  """Modifies an instances's parameters.
4326

4327
  """
4328
  HPATH = "instance-modify"
4329
  HTYPE = constants.HTYPE_INSTANCE
4330
  _OP_REQP = ["instance_name"]
4331

    
4332
  def BuildHooksEnv(self):
4333
    """Build hooks env.
4334

4335
    This runs on the master, primary and secondaries.
4336

4337
    """
4338
    args = dict()
4339
    if self.mem:
4340
      args['memory'] = self.mem
4341
    if self.vcpus:
4342
      args['vcpus'] = self.vcpus
4343
    if self.do_ip or self.do_bridge or self.mac:
4344
      if self.do_ip:
4345
        ip = self.ip
4346
      else:
4347
        ip = self.instance.nics[0].ip
4348
      if self.bridge:
4349
        bridge = self.bridge
4350
      else:
4351
        bridge = self.instance.nics[0].bridge
4352
      if self.mac:
4353
        mac = self.mac
4354
      else:
4355
        mac = self.instance.nics[0].mac
4356
      args['nics'] = [(ip, bridge, mac)]
4357
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4358
    nl = [self.sstore.GetMasterNode(),
4359
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4360
    return env, nl, nl
4361

    
4362
  def CheckPrereq(self):
4363
    """Check prerequisites.
4364

4365
    This only checks the instance list against the existing names.
4366

4367
    """
4368
    self.mem = getattr(self.op, "mem", None)
4369
    self.vcpus = getattr(self.op, "vcpus", None)
4370
    self.ip = getattr(self.op, "ip", None)
4371
    self.mac = getattr(self.op, "mac", None)
4372
    self.bridge = getattr(self.op, "bridge", None)
4373
    self.kernel_path = getattr(self.op, "kernel_path", None)
4374
    self.initrd_path = getattr(self.op, "initrd_path", None)
4375
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4376
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4377
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4378
    if all_params.count(None) == len(all_params):
4379
      raise errors.OpPrereqError("No changes submitted")
4380
    if self.mem is not None:
4381
      try:
4382
        self.mem = int(self.mem)
4383
      except ValueError, err:
4384
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4385
    if self.vcpus is not None:
4386
      try:
4387
        self.vcpus = int(self.vcpus)
4388
      except ValueError, err:
4389
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4390
    if self.ip is not None:
4391
      self.do_ip = True
4392
      if self.ip.lower() == "none":
4393
        self.ip = None
4394
      else:
4395
        if not utils.IsValidIP(self.ip):
4396
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4397
    else:
4398
      self.do_ip = False
4399
    self.do_bridge = (self.bridge is not None)
4400
    if self.mac is not None:
4401
      if self.cfg.IsMacInUse(self.mac):
4402
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4403
                                   self.mac)
4404
      if not utils.IsValidMac(self.mac):
4405
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4406

    
4407
    if self.kernel_path is not None:
4408
      self.do_kernel_path = True
4409
      if self.kernel_path == constants.VALUE_NONE:
4410
        raise errors.OpPrereqError("Can't set instance to no kernel")
4411

    
4412
      if self.kernel_path != constants.VALUE_DEFAULT:
4413
        if not os.path.isabs(self.kernel_path):
4414
          raise errors.OpPrereqError("The kernel path must be an absolute"
4415
                                    " filename")
4416
    else:
4417
      self.do_kernel_path = False
4418

    
4419
    if self.initrd_path is not None:
4420
      self.do_initrd_path = True
4421
      if self.initrd_path not in (constants.VALUE_NONE,
4422
                                  constants.VALUE_DEFAULT):
4423
        if not os.path.isabs(self.initrd_path):
4424
          raise errors.OpPrereqError("The initrd path must be an absolute"
4425
                                    " filename")
4426
    else:
4427
      self.do_initrd_path = False
4428

    
4429
    # boot order verification
4430
    if self.hvm_boot_order is not None:
4431
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4432
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4433
          raise errors.OpPrereqError("invalid boot order specified,"
4434
                                     " must be one or more of [acdn]"
4435
                                     " or 'default'")
4436

    
4437
    instance = self.cfg.GetInstanceInfo(
4438
      self.cfg.ExpandInstanceName(self.op.instance_name))
4439
    if instance is None:
4440
      raise errors.OpPrereqError("No such instance name '%s'" %
4441
                                 self.op.instance_name)
4442
    self.op.instance_name = instance.name
4443
    self.instance = instance
4444
    return
4445

    
4446
  def Exec(self, feedback_fn):
4447
    """Modifies an instance.
4448

4449
    All parameters take effect only at the next restart of the instance.
4450
    """
4451
    result = []
4452
    instance = self.instance
4453
    if self.mem:
4454
      instance.memory = self.mem
4455
      result.append(("mem", self.mem))
4456
    if self.vcpus:
4457
      instance.vcpus = self.vcpus
4458
      result.append(("vcpus",  self.vcpus))
4459
    if self.do_ip:
4460
      instance.nics[0].ip = self.ip
4461
      result.append(("ip", self.ip))
4462
    if self.bridge:
4463
      instance.nics[0].bridge = self.bridge
4464
      result.append(("bridge", self.bridge))
4465
    if self.mac:
4466
      instance.nics[0].mac = self.mac
4467
      result.append(("mac", self.mac))
4468
    if self.do_kernel_path:
4469
      instance.kernel_path = self.kernel_path
4470
      result.append(("kernel_path", self.kernel_path))
4471
    if self.do_initrd_path:
4472
      instance.initrd_path = self.initrd_path
4473
      result.append(("initrd_path", self.initrd_path))
4474
    if self.hvm_boot_order:
4475
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4476
        instance.hvm_boot_order = None
4477
      else:
4478
        instance.hvm_boot_order = self.hvm_boot_order
4479
      result.append(("hvm_boot_order", self.hvm_boot_order))
4480

    
4481
    self.cfg.AddInstance(instance)
4482

    
4483
    return result
4484

    
4485

    
4486
class LUQueryExports(NoHooksLU):
4487
  """Query the exports list
4488

4489
  """
4490
  _OP_REQP = []
4491

    
4492
  def CheckPrereq(self):
4493
    """Check that the nodelist contains only existing nodes.
4494

4495
    """
4496
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4497

    
4498
  def Exec(self, feedback_fn):
4499
    """Compute the list of all the exported system images.
4500

4501
    Returns:
4502
      a dictionary with the structure node->(export-list)
4503
      where export-list is a list of the instances exported on
4504
      that node.
4505

4506
    """
4507
    return rpc.call_export_list(self.nodes)
4508

    
4509

    
4510
class LUExportInstance(LogicalUnit):
4511
  """Export an instance to an image in the cluster.
4512

4513
  """
4514
  HPATH = "instance-export"
4515
  HTYPE = constants.HTYPE_INSTANCE
4516
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4517

    
4518
  def BuildHooksEnv(self):
4519
    """Build hooks env.
4520

4521
    This will run on the master, primary node and target node.
4522

4523
    """
4524
    env = {
4525
      "EXPORT_NODE": self.op.target_node,
4526
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4527
      }
4528
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4529
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4530
          self.op.target_node]
4531
    return env, nl, nl
4532

    
4533
  def CheckPrereq(self):
4534
    """Check prerequisites.
4535

4536
    This checks that the instance and node names are valid.
4537

4538
    """
4539
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4540
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4541
    if self.instance is None:
4542
      raise errors.OpPrereqError("Instance '%s' not found" %
4543
                                 self.op.instance_name)
4544

    
4545
    # node verification
4546
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4547
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4548

    
4549
    if self.dst_node is None:
4550
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4551
                                 self.op.target_node)
4552
    self.op.target_node = self.dst_node.name
4553

    
4554
    # instance disk type verification
4555
    for disk in self.instance.disks:
4556
      if disk.dev_type == constants.LD_FILE:
4557
        raise errors.OpPrereqError("Export not supported for instances with"
4558
                                   " file-based disks")
4559

    
4560
  def Exec(self, feedback_fn):
4561
    """Export an instance to an image in the cluster.
4562

4563
    """
4564
    instance = self.instance
4565
    dst_node = self.dst_node
4566
    src_node = instance.primary_node
4567
    if self.op.shutdown:
4568
      # shutdown the instance, but not the disks
4569
      if not rpc.call_instance_shutdown(src_node, instance):
4570
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4571
                                  (instance.name, src_node))
4572

    
4573
    vgname = self.cfg.GetVGName()
4574

    
4575
    snap_disks = []
4576

    
4577
    try:
4578
      for disk in instance.disks:
4579
        if disk.iv_name == "sda":
4580
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4581
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4582

    
4583
          if not new_dev_name:
4584
            logger.Error("could not snapshot block device %s on node %s" %
4585
                         (disk.logical_id[1], src_node))
4586
          else:
4587
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4588
                                      logical_id=(vgname, new_dev_name),
4589
                                      physical_id=(vgname, new_dev_name),
4590
                                      iv_name=disk.iv_name)
4591
            snap_disks.append(new_dev)
4592

    
4593
    finally:
4594
      if self.op.shutdown and instance.status == "up":
4595
        if not rpc.call_instance_start(src_node, instance, None):
4596
          _ShutdownInstanceDisks(instance, self.cfg)
4597
          raise errors.OpExecError("Could not start instance")
4598

    
4599
    # TODO: check for size
4600

    
4601
    for dev in snap_disks:
4602
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4603
        logger.Error("could not export block device %s from node %s to node %s"
4604
                     % (dev.logical_id[1], src_node, dst_node.name))
4605
      if not rpc.call_blockdev_remove(src_node, dev):
4606
        logger.Error("could not remove snapshot block device %s from node %s" %
4607
                     (dev.logical_id[1], src_node))
4608

    
4609
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4610
      logger.Error("could not finalize export for instance %s on node %s" %
4611
                   (instance.name, dst_node.name))
4612

    
4613
    nodelist = self.cfg.GetNodeList()
4614
    nodelist.remove(dst_node.name)
4615

    
4616
    # on one-node clusters nodelist will be empty after the removal
4617
    # if we proceed the backup would be removed because OpQueryExports
4618
    # substitutes an empty list with the full cluster node list.
4619
    if nodelist:
4620
      op = opcodes.OpQueryExports(nodes=nodelist)
4621
      exportlist = self.proc.ChainOpCode(op)
4622
      for node in exportlist:
4623
        if instance.name in exportlist[node]:
4624
          if not rpc.call_export_remove(node, instance.name):
4625
            logger.Error("could not remove older export for instance %s"
4626
                         " on node %s" % (instance.name, node))
4627

    
4628

    
4629
class LURemoveExport(NoHooksLU):
4630
  """Remove exports related to the named instance.
4631

4632
  """
4633
  _OP_REQP = ["instance_name"]
4634

    
4635
  def CheckPrereq(self):
4636
    """Check prerequisites.
4637
    """
4638
    pass
4639

    
4640
  def Exec(self, feedback_fn):
4641
    """Remove any export.
4642

4643
    """
4644
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4645
    # If the instance was not found we'll try with the name that was passed in.
4646
    # This will only work if it was an FQDN, though.
4647
    fqdn_warn = False
4648
    if not instance_name:
4649
      fqdn_warn = True
4650
      instance_name = self.op.instance_name
4651

    
4652
    op = opcodes.OpQueryExports(nodes=[])
4653
    exportlist = self.proc.ChainOpCode(op)
4654
    found = False
4655
    for node in exportlist:
4656
      if instance_name in exportlist[node]:
4657
        found = True
4658
        if not rpc.call_export_remove(node, instance_name):
4659
          logger.Error("could not remove export for instance %s"
4660
                       " on node %s" % (instance_name, node))
4661

    
4662
    if fqdn_warn and not found:
4663
      feedback_fn("Export not found. If trying to remove an export belonging"
4664
                  " to a deleted instance please use its Fully Qualified"
4665
                  " Domain Name.")
4666

    
4667

    
4668
class TagsLU(NoHooksLU):
4669
  """Generic tags LU.
4670

4671
  This is an abstract class which is the parent of all the other tags LUs.
4672

4673
  """
4674
  def CheckPrereq(self):
4675
    """Check prerequisites.
4676

4677
    """
4678
    if self.op.kind == constants.TAG_CLUSTER:
4679
      self.target = self.cfg.GetClusterInfo()
4680
    elif self.op.kind == constants.TAG_NODE:
4681
      name = self.cfg.ExpandNodeName(self.op.name)
4682
      if name is None:
4683
        raise errors.OpPrereqError("Invalid node name (%s)" %
4684
                                   (self.op.name,))
4685
      self.op.name = name
4686
      self.target = self.cfg.GetNodeInfo(name)
4687
    elif self.op.kind == constants.TAG_INSTANCE:
4688
      name = self.cfg.ExpandInstanceName(self.op.name)
4689
      if name is None:
4690
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4691
                                   (self.op.name,))
4692
      self.op.name = name
4693
      self.target = self.cfg.GetInstanceInfo(name)
4694
    else:
4695
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4696
                                 str(self.op.kind))
4697

    
4698

    
4699
class LUGetTags(TagsLU):
4700
  """Returns the tags of a given object.
4701

4702
  """
4703
  _OP_REQP = ["kind", "name"]
4704

    
4705
  def Exec(self, feedback_fn):
4706
    """Returns the tag list.
4707

4708
    """
4709
    return self.target.GetTags()
4710

    
4711

    
4712
class LUSearchTags(NoHooksLU):
4713
  """Searches the tags for a given pattern.
4714

4715
  """
4716
  _OP_REQP = ["pattern"]
4717

    
4718
  def CheckPrereq(self):
4719
    """Check prerequisites.
4720

4721
    This checks the pattern passed for validity by compiling it.
4722

4723
    """
4724
    try:
4725
      self.re = re.compile(self.op.pattern)
4726
    except re.error, err:
4727
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4728
                                 (self.op.pattern, err))
4729

    
4730
  def Exec(self, feedback_fn):
4731
    """Returns the tag list.
4732

4733
    """
4734
    cfg = self.cfg
4735
    tgts = [("/cluster", cfg.GetClusterInfo())]
4736
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4737
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4738
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4739
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4740
    results = []
4741
    for path, target in tgts:
4742
      for tag in target.GetTags():
4743
        if self.re.search(tag):
4744
          results.append((path, tag))
4745
    return results
4746

    
4747

    
4748
class LUAddTags(TagsLU):
4749
  """Sets a tag on a given object.
4750

4751
  """
4752
  _OP_REQP = ["kind", "name", "tags"]
4753

    
4754
  def CheckPrereq(self):
4755
    """Check prerequisites.
4756

4757
    This checks the type and length of the tag name and value.
4758

4759
    """
4760
    TagsLU.CheckPrereq(self)
4761
    for tag in self.op.tags:
4762
      objects.TaggableObject.ValidateTag(tag)
4763

    
4764
  def Exec(self, feedback_fn):
4765
    """Sets the tag.
4766

4767
    """
4768
    try:
4769
      for tag in self.op.tags:
4770
        self.target.AddTag(tag)
4771
    except errors.TagError, err:
4772
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4773
    try:
4774
      self.cfg.Update(self.target)
4775
    except errors.ConfigurationError:
4776
      raise errors.OpRetryError("There has been a modification to the"
4777
                                " config file and the operation has been"
4778
                                " aborted. Please retry.")
4779

    
4780

    
4781
class LUDelTags(TagsLU):
4782
  """Delete a list of tags from a given object.
4783

4784
  """
4785
  _OP_REQP = ["kind", "name", "tags"]
4786

    
4787
  def CheckPrereq(self):
4788
    """Check prerequisites.
4789

4790
    This checks that we have the given tag.
4791

4792
    """
4793
    TagsLU.CheckPrereq(self)
4794
    for tag in self.op.tags:
4795
      objects.TaggableObject.ValidateTag(tag)
4796
    del_tags = frozenset(self.op.tags)
4797
    cur_tags = self.target.GetTags()
4798
    if not del_tags <= cur_tags:
4799
      diff_tags = del_tags - cur_tags
4800
      diff_names = ["'%s'" % tag for tag in diff_tags]
4801
      diff_names.sort()
4802
      raise errors.OpPrereqError("Tag(s) %s not found" %
4803
                                 (",".join(diff_names)))
4804

    
4805
  def Exec(self, feedback_fn):
4806
    """Remove the tag from the object.
4807

4808
    """
4809
    for tag in self.op.tags:
4810
      self.target.RemoveTag(tag)
4811
    try:
4812
      self.cfg.Update(self.target)
4813
    except errors.ConfigurationError:
4814
      raise errors.OpRetryError("There has been a modification to the"
4815
                                " config file and the operation has been"
4816
                                " aborted. Please retry.")
4817

    
4818
class LUTestDelay(NoHooksLU):
4819
  """Sleep for a specified amount of time.
4820

4821
  This LU sleeps on the master and/or nodes for a specified amoutn of
4822
  time.
4823

4824
  """
4825
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4826

    
4827
  def CheckPrereq(self):
4828
    """Check prerequisites.
4829

4830
    This checks that we have a good list of nodes and/or the duration
4831
    is valid.
4832

4833
    """
4834

    
4835
    if self.op.on_nodes:
4836
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4837

    
4838
  def Exec(self, feedback_fn):
4839
    """Do the actual sleep.
4840

4841
    """
4842
    if self.op.on_master:
4843
      if not utils.TestDelay(self.op.duration):
4844
        raise errors.OpExecError("Error during master delay test")
4845
    if self.op.on_nodes:
4846
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4847
      if not result:
4848
        raise errors.OpExecError("Complete failure from rpc call")
4849
      for node, node_result in result.items():
4850
        if not node_result:
4851
          raise errors.OpExecError("Failure during rpc call to node %s,"
4852
                                   " result: %s" % (node, node_result))
4853

    
4854

    
4855
class IAllocator(object):
4856
  """IAllocator framework.
4857

4858
  An IAllocator instance has three sets of attributes:
4859
    - cfg/sstore that are needed to query the cluster
4860
    - input data (all members of the _KEYS class attribute are required)
4861
    - four buffer attributes (in|out_data|text), that represent the
4862
      input (to the external script) in text and data structure format,
4863
      and the output from it, again in two formats
4864
    - the result variables from the script (success, info, nodes) for
4865
      easy usage
4866

4867
  """
4868
  _ALLO_KEYS = [
4869
    "mem_size", "disks", "disk_template",
4870
    "os", "tags", "nics", "vcpus",
4871
    ]
4872
  _RELO_KEYS = [
4873
    "relocate_from",
4874
    ]
4875

    
4876
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4877
    self.cfg = cfg
4878
    self.sstore = sstore
4879
    # init buffer variables
4880
    self.in_text = self.out_text = self.in_data = self.out_data = None
4881
    # init all input fields so that pylint is happy
4882
    self.mode = mode
4883
    self.name = name
4884
    self.mem_size = self.disks = self.disk_template = None
4885
    self.os = self.tags = self.nics = self.vcpus = None
4886
    self.relocate_from = None
4887
    # computed fields
4888
    self.required_nodes = None
4889
    # init result fields
4890
    self.success = self.info = self.nodes = None
4891
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4892
      keyset = self._ALLO_KEYS
4893
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4894
      keyset = self._RELO_KEYS
4895
    else:
4896
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4897
                                   " IAllocator" % self.mode)
4898
    for key in kwargs:
4899
      if key not in keyset:
4900
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4901
                                     " IAllocator" % key)
4902
      setattr(self, key, kwargs[key])
4903
    for key in keyset:
4904
      if key not in kwargs:
4905
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4906
                                     " IAllocator" % key)
4907
    self._BuildInputData()
4908

    
4909
  def _ComputeClusterData(self):
4910
    """Compute the generic allocator input data.
4911

4912
    This is the data that is independent of the actual operation.
4913

4914
    """
4915
    cfg = self.cfg
4916
    # cluster data
4917
    data = {
4918
      "version": 1,
4919
      "cluster_name": self.sstore.GetClusterName(),
4920
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4921
      "hypervisor_type": self.sstore.GetHypervisorType(),
4922
      # we don't have job IDs
4923
      }
4924

    
4925
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4926

    
4927
    # node data
4928
    node_results = {}
4929
    node_list = cfg.GetNodeList()
4930
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4931
    for nname in node_list:
4932
      ninfo = cfg.GetNodeInfo(nname)
4933
      if nname not in node_data or not isinstance(node_data[nname], dict):
4934
        raise errors.OpExecError("Can't get data for node %s" % nname)
4935
      remote_info = node_data[nname]
4936
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4937
                   'vg_size', 'vg_free']:
4938
        if attr not in remote_info:
4939
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4940
                                   (nname, attr))
4941
        try:
4942
          remote_info[attr] = int(remote_info[attr])
4943
        except ValueError, err:
4944
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4945
                                   " %s" % (nname, attr, str(err)))
4946
      # compute memory used by primary instances
4947
      i_p_mem = i_p_up_mem = 0
4948
      for iinfo in i_list:
4949
        if iinfo.primary_node == nname:
4950
          i_p_mem += iinfo.memory
4951
          if iinfo.status == "up":
4952
            i_p_up_mem += iinfo.memory
4953

    
4954
      # compute memory used by instances
4955
      pnr = {
4956
        "tags": list(ninfo.GetTags()),
4957
        "total_memory": remote_info['memory_total'],
4958
        "reserved_memory": remote_info['memory_dom0'],
4959
        "free_memory": remote_info['memory_free'],
4960
        "i_pri_memory": i_p_mem,
4961
        "i_pri_up_memory": i_p_up_mem,
4962
        "total_disk": remote_info['vg_size'],
4963
        "free_disk": remote_info['vg_free'],
4964
        "primary_ip": ninfo.primary_ip,
4965
        "secondary_ip": ninfo.secondary_ip,
4966
        }
4967
      node_results[nname] = pnr
4968
    data["nodes"] = node_results
4969

    
4970
    # instance data
4971
    instance_data = {}
4972
    for iinfo in i_list:
4973
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4974
                  for n in iinfo.nics]
4975
      pir = {
4976
        "tags": list(iinfo.GetTags()),
4977
        "should_run": iinfo.status == "up",
4978
        "vcpus": iinfo.vcpus,
4979
        "memory": iinfo.memory,
4980
        "os": iinfo.os,
4981
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4982
        "nics": nic_data,
4983
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4984
        "disk_template": iinfo.disk_template,
4985
        }
4986
      instance_data[iinfo.name] = pir
4987

    
4988
    data["instances"] = instance_data
4989

    
4990
    self.in_data = data
4991

    
4992
  def _AddNewInstance(self):
4993
    """Add new instance data to allocator structure.
4994

4995
    This in combination with _AllocatorGetClusterData will create the
4996
    correct structure needed as input for the allocator.
4997

4998
    The checks for the completeness of the opcode must have already been
4999
    done.
5000

5001
    """
5002
    data = self.in_data
5003
    if len(self.disks) != 2:
5004
      raise errors.OpExecError("Only two-disk configurations supported")
5005

    
5006
    disk_space = _ComputeDiskSize(self.disk_template,
5007
                                  self.disks[0]["size"], self.disks[1]["size"])
5008

    
5009
    if self.disk_template in constants.DTS_NET_MIRROR:
5010
      self.required_nodes = 2
5011
    else:
5012
      self.required_nodes = 1
5013
    request = {
5014
      "type": "allocate",
5015
      "name": self.name,
5016
      "disk_template": self.disk_template,
5017
      "tags": self.tags,
5018
      "os": self.os,
5019
      "vcpus": self.vcpus,
5020
      "memory": self.mem_size,
5021
      "disks": self.disks,
5022
      "disk_space_total": disk_space,
5023
      "nics": self.nics,
5024
      "required_nodes": self.required_nodes,
5025
      }
5026
    data["request"] = request
5027

    
5028
  def _AddRelocateInstance(self):
5029
    """Add relocate instance data to allocator structure.
5030

5031
    This in combination with _IAllocatorGetClusterData will create the
5032
    correct structure needed as input for the allocator.
5033

5034
    The checks for the completeness of the opcode must have already been
5035
    done.
5036

5037
    """
5038
    instance = self.cfg.GetInstanceInfo(self.name)
5039
    if instance is None:
5040
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5041
                                   " IAllocator" % self.name)
5042

    
5043
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5044
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5045

    
5046
    if len(instance.secondary_nodes) != 1:
5047
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5048

    
5049
    self.required_nodes = 1
5050

    
5051
    disk_space = _ComputeDiskSize(instance.disk_template,
5052
                                  instance.disks[0].size,
5053
                                  instance.disks[1].size)
5054

    
5055
    request = {
5056
      "type": "relocate",
5057
      "name": self.name,
5058
      "disk_space_total": disk_space,
5059
      "required_nodes": self.required_nodes,
5060
      "relocate_from": self.relocate_from,
5061
      }
5062
    self.in_data["request"] = request
5063

    
5064
  def _BuildInputData(self):
5065
    """Build input data structures.
5066

5067
    """
5068
    self._ComputeClusterData()
5069

    
5070
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5071
      self._AddNewInstance()
5072
    else:
5073
      self._AddRelocateInstance()
5074

    
5075
    self.in_text = serializer.Dump(self.in_data)
5076

    
5077
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5078
    """Run an instance allocator and return the results.
5079

5080
    """
5081
    data = self.in_text
5082

    
5083
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5084

    
5085
    if not isinstance(result, tuple) or len(result) != 4:
5086
      raise errors.OpExecError("Invalid result from master iallocator runner")
5087

    
5088
    rcode, stdout, stderr, fail = result
5089

    
5090
    if rcode == constants.IARUN_NOTFOUND:
5091
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5092
    elif rcode == constants.IARUN_FAILURE:
5093
        raise errors.OpExecError("Instance allocator call failed: %s,"
5094
                                 " output: %s" %
5095
                                 (fail, stdout+stderr))
5096
    self.out_text = stdout
5097
    if validate:
5098
      self._ValidateResult()
5099

    
5100
  def _ValidateResult(self):
5101
    """Process the allocator results.
5102

5103
    This will process and if successful save the result in
5104
    self.out_data and the other parameters.
5105

5106
    """
5107
    try:
5108
      rdict = serializer.Load(self.out_text)
5109
    except Exception, err:
5110
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5111

    
5112
    if not isinstance(rdict, dict):
5113
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5114

    
5115
    for key in "success", "info", "nodes":
5116
      if key not in rdict:
5117
        raise errors.OpExecError("Can't parse iallocator results:"
5118
                                 " missing key '%s'" % key)
5119
      setattr(self, key, rdict[key])
5120

    
5121
    if not isinstance(rdict["nodes"], list):
5122
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5123
                               " is not a list")
5124
    self.out_data = rdict
5125

    
5126

    
5127
class LUTestAllocator(NoHooksLU):
5128
  """Run allocator tests.
5129

5130
  This LU runs the allocator tests
5131

5132
  """
5133
  _OP_REQP = ["direction", "mode", "name"]
5134

    
5135
  def CheckPrereq(self):
5136
    """Check prerequisites.
5137

5138
    This checks the opcode parameters depending on the director and mode test.
5139

5140
    """
5141
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5142
      for attr in ["name", "mem_size", "disks", "disk_template",
5143
                   "os", "tags", "nics", "vcpus"]:
5144
        if not hasattr(self.op, attr):
5145
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5146
                                     attr)
5147
      iname = self.cfg.ExpandInstanceName(self.op.name)
5148
      if iname is not None:
5149
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5150
                                   iname)
5151
      if not isinstance(self.op.nics, list):
5152
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5153
      for row in self.op.nics:
5154
        if (not isinstance(row, dict) or
5155
            "mac" not in row or
5156
            "ip" not in row or
5157
            "bridge" not in row):
5158
          raise errors.OpPrereqError("Invalid contents of the"
5159
                                     " 'nics' parameter")
5160
      if not isinstance(self.op.disks, list):
5161
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5162
      if len(self.op.disks) != 2:
5163
        raise errors.OpPrereqError("Only two-disk configurations supported")
5164
      for row in self.op.disks:
5165
        if (not isinstance(row, dict) or
5166
            "size" not in row or
5167
            not isinstance(row["size"], int) or
5168
            "mode" not in row or
5169
            row["mode"] not in ['r', 'w']):
5170
          raise errors.OpPrereqError("Invalid contents of the"
5171
                                     " 'disks' parameter")
5172
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5173
      if not hasattr(self.op, "name"):
5174
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5175
      fname = self.cfg.ExpandInstanceName(self.op.name)
5176
      if fname is None:
5177
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5178
                                   self.op.name)
5179
      self.op.name = fname
5180
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5181
    else:
5182
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5183
                                 self.op.mode)
5184

    
5185
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5186
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5187
        raise errors.OpPrereqError("Missing allocator name")
5188
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5189
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5190
                                 self.op.direction)
5191

    
5192
  def Exec(self, feedback_fn):
5193
    """Run the allocator test.
5194

5195
    """
5196
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5197
      ial = IAllocator(self.cfg, self.sstore,
5198
                       mode=self.op.mode,
5199
                       name=self.op.name,
5200
                       mem_size=self.op.mem_size,
5201
                       disks=self.op.disks,
5202
                       disk_template=self.op.disk_template,
5203
                       os=self.op.os,
5204
                       tags=self.op.tags,
5205
                       nics=self.op.nics,
5206
                       vcpus=self.op.vcpus,
5207
                       )
5208
    else:
5209
      ial = IAllocator(self.cfg, self.sstore,
5210
                       mode=self.op.mode,
5211
                       name=self.op.name,
5212
                       relocate_from=list(self.relocate_from),
5213
                       )
5214

    
5215
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5216
      result = ial.in_text
5217
    else:
5218
      ial.Run(self.op.allocator, validate=False)
5219
      result = ial.out_text
5220
    return result