Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4337cf1b

History | View | Annotate | Download (176.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_CLUSTER,
58
      REQ_MASTER); note that all commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_CLUSTER = True
65
  REQ_MASTER = True
66

    
67
  def __init__(self, processor, op, cfg, sstore):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = cfg
77
    self.sstore = sstore
78
    self.__ssh = None
79

    
80
    for attr_name in self._OP_REQP:
81
      attr_val = getattr(op, attr_name, None)
82
      if attr_val is None:
83
        raise errors.OpPrereqError("Required parameter '%s' missing" %
84
                                   attr_name)
85
    if self.REQ_CLUSTER:
86
      if not cfg.IsCluster():
87
        raise errors.OpPrereqError("Cluster not initialized yet,"
88
                                   " use 'gnt-cluster init' first.")
89
      if self.REQ_MASTER:
90
        master = sstore.GetMasterNode()
91
        if master != utils.HostInfo().name:
92
          raise errors.OpPrereqError("Commands must be run on the master"
93
                                     " node %s" % master)
94

    
95
  def __GetSSH(self):
96
    """Returns the SshRunner object
97

98
    """
99
    if not self.__ssh:
100
      self.__ssh = ssh.SshRunner(self.sstore)
101
    return self.__ssh
102

    
103
  ssh = property(fget=__GetSSH)
104

    
105
  def CheckPrereq(self):
106
    """Check prerequisites for this LU.
107

108
    This method should check that the prerequisites for the execution
109
    of this LU are fulfilled. It can do internode communication, but
110
    it should be idempotent - no cluster or system changes are
111
    allowed.
112

113
    The method should raise errors.OpPrereqError in case something is
114
    not fulfilled. Its return value is ignored.
115

116
    This method should also update all the parameters of the opcode to
117
    their canonical form; e.g. a short node name must be fully
118
    expanded after this method has successfully completed (so that
119
    hooks, logging, etc. work correctly).
120

121
    """
122
    raise NotImplementedError
123

    
124
  def Exec(self, feedback_fn):
125
    """Execute the LU.
126

127
    This method should implement the actual work. It should raise
128
    errors.OpExecError for failures that are somewhat dealt with in
129
    code, or expected.
130

131
    """
132
    raise NotImplementedError
133

    
134
  def BuildHooksEnv(self):
135
    """Build hooks environment for this LU.
136

137
    This method should return a three-node tuple consisting of: a dict
138
    containing the environment that will be used for running the
139
    specific hook for this LU, a list of node names on which the hook
140
    should run before the execution, and a list of node names on which
141
    the hook should run after the execution.
142

143
    The keys of the dict must not have 'GANETI_' prefixed as this will
144
    be handled in the hooks runner. Also note additional keys will be
145
    added by the hooks runner. If the LU doesn't define any
146
    environment, an empty dict (and not None) should be returned.
147

148
    No nodes should be returned as an empty list (and not None).
149

150
    Note that if the HPATH for a LU class is None, this function will
151
    not be called.
152

153
    """
154
    raise NotImplementedError
155

    
156
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
157
    """Notify the LU about the results of its hooks.
158

159
    This method is called every time a hooks phase is executed, and notifies
160
    the Logical Unit about the hooks' result. The LU can then use it to alter
161
    its result based on the hooks.  By default the method does nothing and the
162
    previous result is passed back unchanged but any LU can define it if it
163
    wants to use the local cluster hook-scripts somehow.
164

165
    Args:
166
      phase: the hooks phase that has just been run
167
      hooks_results: the results of the multi-node hooks rpc call
168
      feedback_fn: function to send feedback back to the caller
169
      lu_result: the previous result this LU had, or None in the PRE phase.
170

171
    """
172
    return lu_result
173

    
174

    
175
class NoHooksLU(LogicalUnit):
176
  """Simple LU which runs no hooks.
177

178
  This LU is intended as a parent for other LogicalUnits which will
179
  run no hooks, in order to reduce duplicate code.
180

181
  """
182
  HPATH = None
183
  HTYPE = None
184

    
185

    
186
def _AddHostToEtcHosts(hostname):
187
  """Wrapper around utils.SetEtcHostsEntry.
188

189
  """
190
  hi = utils.HostInfo(name=hostname)
191
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
192

    
193

    
194
def _RemoveHostFromEtcHosts(hostname):
195
  """Wrapper around utils.RemoveEtcHostsEntry.
196

197
  """
198
  hi = utils.HostInfo(name=hostname)
199
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
200
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
201

    
202

    
203
def _GetWantedNodes(lu, nodes):
204
  """Returns list of checked and expanded node names.
205

206
  Args:
207
    nodes: List of nodes (strings) or None for all
208

209
  """
210
  if not isinstance(nodes, list):
211
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
212

    
213
  if nodes:
214
    wanted = []
215

    
216
    for name in nodes:
217
      node = lu.cfg.ExpandNodeName(name)
218
      if node is None:
219
        raise errors.OpPrereqError("No such node name '%s'" % name)
220
      wanted.append(node)
221

    
222
  else:
223
    wanted = lu.cfg.GetNodeList()
224
  return utils.NiceSort(wanted)
225

    
226

    
227
def _GetWantedInstances(lu, instances):
228
  """Returns list of checked and expanded instance names.
229

230
  Args:
231
    instances: List of instances (strings) or None for all
232

233
  """
234
  if not isinstance(instances, list):
235
    raise errors.OpPrereqError("Invalid argument type 'instances'")
236

    
237
  if instances:
238
    wanted = []
239

    
240
    for name in instances:
241
      instance = lu.cfg.ExpandInstanceName(name)
242
      if instance is None:
243
        raise errors.OpPrereqError("No such instance name '%s'" % name)
244
      wanted.append(instance)
245

    
246
  else:
247
    wanted = lu.cfg.GetInstanceList()
248
  return utils.NiceSort(wanted)
249

    
250

    
251
def _CheckOutputFields(static, dynamic, selected):
252
  """Checks whether all selected fields are valid.
253

254
  Args:
255
    static: Static fields
256
    dynamic: Dynamic fields
257

258
  """
259
  static_fields = frozenset(static)
260
  dynamic_fields = frozenset(dynamic)
261

    
262
  all_fields = static_fields | dynamic_fields
263

    
264
  if not all_fields.issuperset(selected):
265
    raise errors.OpPrereqError("Unknown output fields selected: %s"
266
                               % ",".join(frozenset(selected).
267
                                          difference(all_fields)))
268

    
269

    
270
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
271
                          memory, vcpus, nics):
272
  """Builds instance related env variables for hooks from single variables.
273

274
  Args:
275
    secondary_nodes: List of secondary nodes as strings
276
  """
277
  env = {
278
    "OP_TARGET": name,
279
    "INSTANCE_NAME": name,
280
    "INSTANCE_PRIMARY": primary_node,
281
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
282
    "INSTANCE_OS_TYPE": os_type,
283
    "INSTANCE_STATUS": status,
284
    "INSTANCE_MEMORY": memory,
285
    "INSTANCE_VCPUS": vcpus,
286
  }
287

    
288
  if nics:
289
    nic_count = len(nics)
290
    for idx, (ip, bridge, mac) in enumerate(nics):
291
      if ip is None:
292
        ip = ""
293
      env["INSTANCE_NIC%d_IP" % idx] = ip
294
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
295
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
296
  else:
297
    nic_count = 0
298

    
299
  env["INSTANCE_NIC_COUNT"] = nic_count
300

    
301
  return env
302

    
303

    
304
def _BuildInstanceHookEnvByObject(instance, override=None):
305
  """Builds instance related env variables for hooks from an object.
306

307
  Args:
308
    instance: objects.Instance object of instance
309
    override: dict of values to override
310
  """
311
  args = {
312
    'name': instance.name,
313
    'primary_node': instance.primary_node,
314
    'secondary_nodes': instance.secondary_nodes,
315
    'os_type': instance.os,
316
    'status': instance.os,
317
    'memory': instance.memory,
318
    'vcpus': instance.vcpus,
319
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
320
  }
321
  if override:
322
    args.update(override)
323
  return _BuildInstanceHookEnv(**args)
324

    
325

    
326
def _HasValidVG(vglist, vgname):
327
  """Checks if the volume group list is valid.
328

329
  A non-None return value means there's an error, and the return value
330
  is the error message.
331

332
  """
333
  vgsize = vglist.get(vgname, None)
334
  if vgsize is None:
335
    return "volume group '%s' missing" % vgname
336
  elif vgsize < 20480:
337
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
338
            (vgname, vgsize))
339
  return None
340

    
341

    
342
def _InitSSHSetup(node):
343
  """Setup the SSH configuration for the cluster.
344

345

346
  This generates a dsa keypair for root, adds the pub key to the
347
  permitted hosts and adds the hostkey to its own known hosts.
348

349
  Args:
350
    node: the name of this host as a fqdn
351

352
  """
353
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
354

    
355
  for name in priv_key, pub_key:
356
    if os.path.exists(name):
357
      utils.CreateBackup(name)
358
    utils.RemoveFile(name)
359

    
360
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
361
                         "-f", priv_key,
362
                         "-q", "-N", ""])
363
  if result.failed:
364
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
365
                             result.output)
366

    
367
  f = open(pub_key, 'r')
368
  try:
369
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
370
  finally:
371
    f.close()
372

    
373

    
374
def _InitGanetiServerSetup(ss):
375
  """Setup the necessary configuration for the initial node daemon.
376

377
  This creates the nodepass file containing the shared password for
378
  the cluster and also generates the SSL certificate.
379

380
  """
381
  # Create pseudo random password
382
  randpass = sha.new(os.urandom(64)).hexdigest()
383
  # and write it into sstore
384
  ss.SetKey(ss.SS_NODED_PASS, randpass)
385

    
386
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
387
                         "-days", str(365*5), "-nodes", "-x509",
388
                         "-keyout", constants.SSL_CERT_FILE,
389
                         "-out", constants.SSL_CERT_FILE, "-batch"])
390
  if result.failed:
391
    raise errors.OpExecError("could not generate server ssl cert, command"
392
                             " %s had exitcode %s and error message %s" %
393
                             (result.cmd, result.exit_code, result.output))
394

    
395
  os.chmod(constants.SSL_CERT_FILE, 0400)
396

    
397
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
398

    
399
  if result.failed:
400
    raise errors.OpExecError("Could not start the node daemon, command %s"
401
                             " had exitcode %s and error %s" %
402
                             (result.cmd, result.exit_code, result.output))
403

    
404

    
405
def _CheckInstanceBridgesExist(instance):
406
  """Check that the brigdes needed by an instance exist.
407

408
  """
409
  # check bridges existance
410
  brlist = [nic.bridge for nic in instance.nics]
411
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
412
    raise errors.OpPrereqError("one or more target bridges %s does not"
413
                               " exist on destination node '%s'" %
414
                               (brlist, instance.primary_node))
415

    
416

    
417
class LUInitCluster(LogicalUnit):
418
  """Initialise the cluster.
419

420
  """
421
  HPATH = "cluster-init"
422
  HTYPE = constants.HTYPE_CLUSTER
423
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
424
              "def_bridge", "master_netdev", "file_storage_dir"]
425
  REQ_CLUSTER = False
426

    
427
  def BuildHooksEnv(self):
428
    """Build hooks env.
429

430
    Notes: Since we don't require a cluster, we must manually add
431
    ourselves in the post-run node list.
432

433
    """
434
    env = {"OP_TARGET": self.op.cluster_name}
435
    return env, [], [self.hostname.name]
436

    
437
  def CheckPrereq(self):
438
    """Verify that the passed name is a valid one.
439

440
    """
441
    if config.ConfigWriter.IsCluster():
442
      raise errors.OpPrereqError("Cluster is already initialised")
443

    
444
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
445
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
446
        raise errors.OpPrereqError("Please prepare the cluster VNC"
447
                                   "password file %s" %
448
                                   constants.VNC_PASSWORD_FILE)
449

    
450
    self.hostname = hostname = utils.HostInfo()
451

    
452
    if hostname.ip.startswith("127."):
453
      raise errors.OpPrereqError("This host's IP resolves to the private"
454
                                 " range (%s). Please fix DNS or %s." %
455
                                 (hostname.ip, constants.ETC_HOSTS))
456

    
457
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
458
                         source=constants.LOCALHOST_IP_ADDRESS):
459
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
460
                                 " to %s,\nbut this ip address does not"
461
                                 " belong to this host."
462
                                 " Aborting." % hostname.ip)
463

    
464
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
465

    
466
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
467
                     timeout=5):
468
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
469

    
470
    secondary_ip = getattr(self.op, "secondary_ip", None)
471
    if secondary_ip and not utils.IsValidIP(secondary_ip):
472
      raise errors.OpPrereqError("Invalid secondary ip given")
473
    if (secondary_ip and
474
        secondary_ip != hostname.ip and
475
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
476
                           source=constants.LOCALHOST_IP_ADDRESS))):
477
      raise errors.OpPrereqError("You gave %s as secondary IP,"
478
                                 " but it does not belong to this host." %
479
                                 secondary_ip)
480
    self.secondary_ip = secondary_ip
481

    
482
    if not hasattr(self.op, "vg_name"):
483
      self.op.vg_name = None
484
    # if vg_name not None, checks if volume group is valid
485
    if self.op.vg_name:
486
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
487
      if vgstatus:
488
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
489
                                   " you are not using lvm" % vgstatus)
490

    
491
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
492

    
493
    if not os.path.isabs(self.op.file_storage_dir):
494
      raise errors.OpPrereqError("The file storage directory you have is"
495
                                 " not an absolute path.")
496

    
497
    if not os.path.exists(self.op.file_storage_dir):
498
      try:
499
        os.makedirs(self.op.file_storage_dir, 0750)
500
      except OSError, err:
501
        raise errors.OpPrereqError("Cannot create file storage directory"
502
                                   " '%s': %s" %
503
                                   (self.op.file_storage_dir, err))
504

    
505
    if not os.path.isdir(self.op.file_storage_dir):
506
      raise errors.OpPrereqError("The file storage directory '%s' is not"
507
                                 " a directory." % self.op.file_storage_dir)
508

    
509
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
510
                    self.op.mac_prefix):
511
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
512
                                 self.op.mac_prefix)
513

    
514
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
515
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
516
                                 self.op.hypervisor_type)
517

    
518
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
519
    if result.failed:
520
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
521
                                 (self.op.master_netdev,
522
                                  result.output.strip()))
523

    
524
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
525
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
526
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
527
                                 " executable." % constants.NODE_INITD_SCRIPT)
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    self.sstore = ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
540
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
543
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
544

    
545
    # set up the inter-node password and certificate
546
    _InitGanetiServerSetup(ss)
547

    
548
    # start the master ip
549
    rpc.call_node_start_master(hostname.name)
550

    
551
    # set up ssh config and /etc/hosts
552
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
553
    try:
554
      sshline = f.read()
555
    finally:
556
      f.close()
557
    sshkey = sshline.split(" ")[1]
558

    
559
    _AddHostToEtcHosts(hostname.name)
560
    _InitSSHSetup(hostname.name)
561

    
562
    # init of cluster config file
563
    self.cfg = cfgw = config.ConfigWriter()
564
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
565
                    sshkey, self.op.mac_prefix,
566
                    self.op.vg_name, self.op.def_bridge)
567

    
568
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
569

    
570

    
571
class LUDestroyCluster(NoHooksLU):
572
  """Logical unit for destroying the cluster.
573

574
  """
575
  _OP_REQP = []
576

    
577
  def CheckPrereq(self):
578
    """Check prerequisites.
579

580
    This checks whether the cluster is empty.
581

582
    Any errors are signalled by raising errors.OpPrereqError.
583

584
    """
585
    master = self.sstore.GetMasterNode()
586

    
587
    nodelist = self.cfg.GetNodeList()
588
    if len(nodelist) != 1 or nodelist[0] != master:
589
      raise errors.OpPrereqError("There are still %d node(s) in"
590
                                 " this cluster." % (len(nodelist) - 1))
591
    instancelist = self.cfg.GetInstanceList()
592
    if instancelist:
593
      raise errors.OpPrereqError("There are still %d instance(s) in"
594
                                 " this cluster." % len(instancelist))
595

    
596
  def Exec(self, feedback_fn):
597
    """Destroys the cluster.
598

599
    """
600
    master = self.sstore.GetMasterNode()
601
    if not rpc.call_node_stop_master(master):
602
      raise errors.OpExecError("Could not disable the master role")
603
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
604
    utils.CreateBackup(priv_key)
605
    utils.CreateBackup(pub_key)
606
    rpc.call_node_leave_cluster(master)
607

    
608

    
609
class LUVerifyCluster(LogicalUnit):
610
  """Verifies the cluster status.
611

612
  """
613
  HPATH = "cluster-verify"
614
  HTYPE = constants.HTYPE_CLUSTER
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    hyp_result = node_result.get('hypervisor', None)
683
    if hyp_result is not None:
684
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          needed_mem += instance_cfg[instance].memory
779
        if nodeinfo['mfree'] < needed_mem:
780
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
781
                      " failovers should node %s fail" % (node, prinode))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    Transform the list of checks we're going to skip into a set and check that
789
    all its members are valid.
790

791
    """
792
    self.skip_set = frozenset(self.op.skip_checks)
793
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
794
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
795

    
796
  def BuildHooksEnv(self):
797
    """Build hooks env.
798

799
    Cluster-Verify hooks just rone in the post phase and their failure makes
800
    the output be logged in the verify output and the verification to fail.
801

802
    """
803
    all_nodes = self.cfg.GetNodeList()
804
    # TODO: populate the environment with useful information for verify hooks
805
    env = {}
806
    return env, [], all_nodes
807

    
808
  def Exec(self, feedback_fn):
809
    """Verify integrity of cluster, performing various test on nodes.
810

811
    """
812
    bad = False
813
    feedback_fn("* Verifying global settings")
814
    for msg in self.cfg.VerifyConfig():
815
      feedback_fn("  - ERROR: %s" % msg)
816

    
817
    vg_name = self.cfg.GetVGName()
818
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
819
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
820
    i_non_redundant = [] # Non redundant instances
821
    node_volume = {}
822
    node_instance = {}
823
    node_info = {}
824
    instance_cfg = {}
825

    
826
    # FIXME: verify OS list
827
    # do local checksums
828
    file_names = list(self.sstore.GetFileList())
829
    file_names.append(constants.SSL_CERT_FILE)
830
    file_names.append(constants.CLUSTER_CONF_FILE)
831
    local_checksums = utils.FingerprintFiles(file_names)
832

    
833
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
834
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
835
    all_instanceinfo = rpc.call_instance_list(nodelist)
836
    all_vglist = rpc.call_vg_list(nodelist)
837
    node_verify_param = {
838
      'filelist': file_names,
839
      'nodelist': nodelist,
840
      'hypervisor': None,
841
      }
842
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
843
    all_rversion = rpc.call_version(nodelist)
844
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
845

    
846
    for node in nodelist:
847
      feedback_fn("* Verifying node %s" % node)
848
      result = self._VerifyNode(node, file_names, local_checksums,
849
                                all_vglist[node], all_nvinfo[node],
850
                                all_rversion[node], feedback_fn)
851
      bad = bad or result
852

    
853
      # node_volume
854
      volumeinfo = all_volumeinfo[node]
855

    
856
      if isinstance(volumeinfo, basestring):
857
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
858
                    (node, volumeinfo[-400:].encode('string_escape')))
859
        bad = True
860
        node_volume[node] = {}
861
      elif not isinstance(volumeinfo, dict):
862
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
863
        bad = True
864
        continue
865
      else:
866
        node_volume[node] = volumeinfo
867

    
868
      # node_instance
869
      nodeinstance = all_instanceinfo[node]
870
      if type(nodeinstance) != list:
871
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
872
        bad = True
873
        continue
874

    
875
      node_instance[node] = nodeinstance
876

    
877
      # node_info
878
      nodeinfo = all_ninfo[node]
879
      if not isinstance(nodeinfo, dict):
880
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
881
        bad = True
882
        continue
883

    
884
      try:
885
        node_info[node] = {
886
          "mfree": int(nodeinfo['memory_free']),
887
          "dfree": int(nodeinfo['vg_free']),
888
          "pinst": [],
889
          "sinst": [],
890
          # dictionary holding all instances this node is secondary for,
891
          # grouped by their primary node. Each key is a cluster node, and each
892
          # value is a list of instances which have the key as primary and the
893
          # current node as secondary.  this is handy to calculate N+1 memory
894
          # availability if you can only failover from a primary to its
895
          # secondary.
896
          "sinst-by-pnode": {},
897
        }
898
      except ValueError:
899
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
900
        bad = True
901
        continue
902

    
903
    node_vol_should = {}
904

    
905
    for instance in instancelist:
906
      feedback_fn("* Verifying instance %s" % instance)
907
      inst_config = self.cfg.GetInstanceInfo(instance)
908
      result =  self._VerifyInstance(instance, inst_config, node_volume,
909
                                     node_instance, feedback_fn)
910
      bad = bad or result
911

    
912
      inst_config.MapLVsByNode(node_vol_should)
913

    
914
      instance_cfg[instance] = inst_config
915

    
916
      pnode = inst_config.primary_node
917
      if pnode in node_info:
918
        node_info[pnode]['pinst'].append(instance)
919
      else:
920
        feedback_fn("  - ERROR: instance %s, connection to primary node"
921
                    " %s failed" % (instance, pnode))
922
        bad = True
923

    
924
      # If the instance is non-redundant we cannot survive losing its primary
925
      # node, so we are not N+1 compliant. On the other hand we have no disk
926
      # templates with more than one secondary so that situation is not well
927
      # supported either.
928
      # FIXME: does not support file-backed instances
929
      if len(inst_config.secondary_nodes) == 0:
930
        i_non_redundant.append(instance)
931
      elif len(inst_config.secondary_nodes) > 1:
932
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
933
                    % instance)
934

    
935
      for snode in inst_config.secondary_nodes:
936
        if snode in node_info:
937
          node_info[snode]['sinst'].append(instance)
938
          if pnode not in node_info[snode]['sinst-by-pnode']:
939
            node_info[snode]['sinst-by-pnode'][pnode] = []
940
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
941
        else:
942
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
943
                      " %s failed" % (instance, snode))
944

    
945
    feedback_fn("* Verifying orphan volumes")
946
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
947
                                       feedback_fn)
948
    bad = bad or result
949

    
950
    feedback_fn("* Verifying remaining instances")
951
    result = self._VerifyOrphanInstances(instancelist, node_instance,
952
                                         feedback_fn)
953
    bad = bad or result
954

    
955
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
956
      feedback_fn("* Verifying N+1 Memory redundancy")
957
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
958
      bad = bad or result
959

    
960
    feedback_fn("* Other Notes")
961
    if i_non_redundant:
962
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
963
                  % len(i_non_redundant))
964

    
965
    return int(bad)
966

    
967
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
968
    """Analize the post-hooks' result, handle it, and send some
969
    nicely-formatted feedback back to the user.
970

971
    Args:
972
      phase: the hooks phase that has just been run
973
      hooks_results: the results of the multi-node hooks rpc call
974
      feedback_fn: function to send feedback back to the caller
975
      lu_result: previous Exec result
976

977
    """
978
    # We only really run POST phase hooks, and are only interested in their results
979
    if phase == constants.HOOKS_PHASE_POST:
980
      # Used to change hooks' output to proper indentation
981
      indent_re = re.compile('^', re.M)
982
      feedback_fn("* Hooks Results")
983
      if not hooks_results:
984
        feedback_fn("  - ERROR: general communication failure")
985
        lu_result = 1
986
      else:
987
        for node_name in hooks_results:
988
          show_node_header = True
989
          res = hooks_results[node_name]
990
          if res is False or not isinstance(res, list):
991
            feedback_fn("    Communication failure")
992
            lu_result = 1
993
            continue
994
          for script, hkr, output in res:
995
            if hkr == constants.HKR_FAIL:
996
              # The node header is only shown once, if there are
997
              # failing hooks on that node
998
              if show_node_header:
999
                feedback_fn("  Node %s:" % node_name)
1000
                show_node_header = False
1001
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1002
              output = indent_re.sub('      ', output)
1003
              feedback_fn("%s" % output)
1004
              lu_result = 1
1005

    
1006
      return lu_result
1007

    
1008

    
1009
class LUVerifyDisks(NoHooksLU):
1010
  """Verifies the cluster disks status.
1011

1012
  """
1013
  _OP_REQP = []
1014

    
1015
  def CheckPrereq(self):
1016
    """Check prerequisites.
1017

1018
    This has no prerequisites.
1019

1020
    """
1021
    pass
1022

    
1023
  def Exec(self, feedback_fn):
1024
    """Verify integrity of cluster disks.
1025

1026
    """
1027
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1028

    
1029
    vg_name = self.cfg.GetVGName()
1030
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1031
    instances = [self.cfg.GetInstanceInfo(name)
1032
                 for name in self.cfg.GetInstanceList()]
1033

    
1034
    nv_dict = {}
1035
    for inst in instances:
1036
      inst_lvs = {}
1037
      if (inst.status != "up" or
1038
          inst.disk_template not in constants.DTS_NET_MIRROR):
1039
        continue
1040
      inst.MapLVsByNode(inst_lvs)
1041
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1042
      for node, vol_list in inst_lvs.iteritems():
1043
        for vol in vol_list:
1044
          nv_dict[(node, vol)] = inst
1045

    
1046
    if not nv_dict:
1047
      return result
1048

    
1049
    node_lvs = rpc.call_volume_list(nodes, vg_name)
1050

    
1051
    to_act = set()
1052
    for node in nodes:
1053
      # node_volume
1054
      lvs = node_lvs[node]
1055

    
1056
      if isinstance(lvs, basestring):
1057
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1058
        res_nlvm[node] = lvs
1059
      elif not isinstance(lvs, dict):
1060
        logger.Info("connection to node %s failed or invalid data returned" %
1061
                    (node,))
1062
        res_nodes.append(node)
1063
        continue
1064

    
1065
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1066
        inst = nv_dict.pop((node, lv_name), None)
1067
        if (not lv_online and inst is not None
1068
            and inst.name not in res_instances):
1069
          res_instances.append(inst.name)
1070

    
1071
    # any leftover items in nv_dict are missing LVs, let's arrange the
1072
    # data better
1073
    for key, inst in nv_dict.iteritems():
1074
      if inst.name not in res_missing:
1075
        res_missing[inst.name] = []
1076
      res_missing[inst.name].append(key)
1077

    
1078
    return result
1079

    
1080

    
1081
class LURenameCluster(LogicalUnit):
1082
  """Rename the cluster.
1083

1084
  """
1085
  HPATH = "cluster-rename"
1086
  HTYPE = constants.HTYPE_CLUSTER
1087
  _OP_REQP = ["name"]
1088

    
1089
  def BuildHooksEnv(self):
1090
    """Build hooks env.
1091

1092
    """
1093
    env = {
1094
      "OP_TARGET": self.sstore.GetClusterName(),
1095
      "NEW_NAME": self.op.name,
1096
      }
1097
    mn = self.sstore.GetMasterNode()
1098
    return env, [mn], [mn]
1099

    
1100
  def CheckPrereq(self):
1101
    """Verify that the passed name is a valid one.
1102

1103
    """
1104
    hostname = utils.HostInfo(self.op.name)
1105

    
1106
    new_name = hostname.name
1107
    self.ip = new_ip = hostname.ip
1108
    old_name = self.sstore.GetClusterName()
1109
    old_ip = self.sstore.GetMasterIP()
1110
    if new_name == old_name and new_ip == old_ip:
1111
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1112
                                 " cluster has changed")
1113
    if new_ip != old_ip:
1114
      result = utils.RunCmd(["fping", "-q", new_ip])
1115
      if not result.failed:
1116
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1117
                                   " reachable on the network. Aborting." %
1118
                                   new_ip)
1119

    
1120
    self.op.name = new_name
1121

    
1122
  def Exec(self, feedback_fn):
1123
    """Rename the cluster.
1124

1125
    """
1126
    clustername = self.op.name
1127
    ip = self.ip
1128
    ss = self.sstore
1129

    
1130
    # shutdown the master IP
1131
    master = ss.GetMasterNode()
1132
    if not rpc.call_node_stop_master(master):
1133
      raise errors.OpExecError("Could not disable the master role")
1134

    
1135
    try:
1136
      # modify the sstore
1137
      ss.SetKey(ss.SS_MASTER_IP, ip)
1138
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1139

    
1140
      # Distribute updated ss config to all nodes
1141
      myself = self.cfg.GetNodeInfo(master)
1142
      dist_nodes = self.cfg.GetNodeList()
1143
      if myself.name in dist_nodes:
1144
        dist_nodes.remove(myself.name)
1145

    
1146
      logger.Debug("Copying updated ssconf data to all nodes")
1147
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1148
        fname = ss.KeyToFilename(keyname)
1149
        result = rpc.call_upload_file(dist_nodes, fname)
1150
        for to_node in dist_nodes:
1151
          if not result[to_node]:
1152
            logger.Error("copy of file %s to node %s failed" %
1153
                         (fname, to_node))
1154
    finally:
1155
      if not rpc.call_node_start_master(master):
1156
        logger.Error("Could not re-enable the master role on the master,"
1157
                     " please restart manually.")
1158

    
1159

    
1160
def _RecursiveCheckIfLVMBased(disk):
1161
  """Check if the given disk or its children are lvm-based.
1162

1163
  Args:
1164
    disk: ganeti.objects.Disk object
1165

1166
  Returns:
1167
    boolean indicating whether a LD_LV dev_type was found or not
1168

1169
  """
1170
  if disk.children:
1171
    for chdisk in disk.children:
1172
      if _RecursiveCheckIfLVMBased(chdisk):
1173
        return True
1174
  return disk.dev_type == constants.LD_LV
1175

    
1176

    
1177
class LUSetClusterParams(LogicalUnit):
1178
  """Change the parameters of the cluster.
1179

1180
  """
1181
  HPATH = "cluster-modify"
1182
  HTYPE = constants.HTYPE_CLUSTER
1183
  _OP_REQP = []
1184

    
1185
  def BuildHooksEnv(self):
1186
    """Build hooks env.
1187

1188
    """
1189
    env = {
1190
      "OP_TARGET": self.sstore.GetClusterName(),
1191
      "NEW_VG_NAME": self.op.vg_name,
1192
      }
1193
    mn = self.sstore.GetMasterNode()
1194
    return env, [mn], [mn]
1195

    
1196
  def CheckPrereq(self):
1197
    """Check prerequisites.
1198

1199
    This checks whether the given params don't conflict and
1200
    if the given volume group is valid.
1201

1202
    """
1203
    if not self.op.vg_name:
1204
      instances = [self.cfg.GetInstanceInfo(name)
1205
                   for name in self.cfg.GetInstanceList()]
1206
      for inst in instances:
1207
        for disk in inst.disks:
1208
          if _RecursiveCheckIfLVMBased(disk):
1209
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1210
                                       " lvm-based instances exist")
1211

    
1212
    # if vg_name not None, checks given volume group on all nodes
1213
    if self.op.vg_name:
1214
      node_list = self.cfg.GetNodeList()
1215
      vglist = rpc.call_vg_list(node_list)
1216
      for node in node_list:
1217
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1218
        if vgstatus:
1219
          raise errors.OpPrereqError("Error on node '%s': %s" %
1220
                                     (node, vgstatus))
1221

    
1222
  def Exec(self, feedback_fn):
1223
    """Change the parameters of the cluster.
1224

1225
    """
1226
    if self.op.vg_name != self.cfg.GetVGName():
1227
      self.cfg.SetVGName(self.op.vg_name)
1228
    else:
1229
      feedback_fn("Cluster LVM configuration already in desired"
1230
                  " state, not changing")
1231

    
1232

    
1233
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1234
  """Sleep and poll for an instance's disk to sync.
1235

1236
  """
1237
  if not instance.disks:
1238
    return True
1239

    
1240
  if not oneshot:
1241
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1242

    
1243
  node = instance.primary_node
1244

    
1245
  for dev in instance.disks:
1246
    cfgw.SetDiskID(dev, node)
1247

    
1248
  retries = 0
1249
  while True:
1250
    max_time = 0
1251
    done = True
1252
    cumul_degraded = False
1253
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1254
    if not rstats:
1255
      proc.LogWarning("Can't get any data from node %s" % node)
1256
      retries += 1
1257
      if retries >= 10:
1258
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1259
                                 " aborting." % node)
1260
      time.sleep(6)
1261
      continue
1262
    retries = 0
1263
    for i in range(len(rstats)):
1264
      mstat = rstats[i]
1265
      if mstat is None:
1266
        proc.LogWarning("Can't compute data for node %s/%s" %
1267
                        (node, instance.disks[i].iv_name))
1268
        continue
1269
      # we ignore the ldisk parameter
1270
      perc_done, est_time, is_degraded, _ = mstat
1271
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1272
      if perc_done is not None:
1273
        done = False
1274
        if est_time is not None:
1275
          rem_time = "%d estimated seconds remaining" % est_time
1276
          max_time = est_time
1277
        else:
1278
          rem_time = "no time estimate"
1279
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1280
                     (instance.disks[i].iv_name, perc_done, rem_time))
1281
    if done or oneshot:
1282
      break
1283

    
1284
    if unlock:
1285
      #utils.Unlock('cmd')
1286
      pass
1287
    try:
1288
      time.sleep(min(60, max_time))
1289
    finally:
1290
      if unlock:
1291
        #utils.Lock('cmd')
1292
        pass
1293

    
1294
  if done:
1295
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1296
  return not cumul_degraded
1297

    
1298

    
1299
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1300
  """Check that mirrors are not degraded.
1301

1302
  The ldisk parameter, if True, will change the test from the
1303
  is_degraded attribute (which represents overall non-ok status for
1304
  the device(s)) to the ldisk (representing the local storage status).
1305

1306
  """
1307
  cfgw.SetDiskID(dev, node)
1308
  if ldisk:
1309
    idx = 6
1310
  else:
1311
    idx = 5
1312

    
1313
  result = True
1314
  if on_primary or dev.AssembleOnSecondary():
1315
    rstats = rpc.call_blockdev_find(node, dev)
1316
    if not rstats:
1317
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1318
      result = False
1319
    else:
1320
      result = result and (not rstats[idx])
1321
  if dev.children:
1322
    for child in dev.children:
1323
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1324

    
1325
  return result
1326

    
1327

    
1328
class LUDiagnoseOS(NoHooksLU):
1329
  """Logical unit for OS diagnose/query.
1330

1331
  """
1332
  _OP_REQP = ["output_fields", "names"]
1333

    
1334
  def CheckPrereq(self):
1335
    """Check prerequisites.
1336

1337
    This always succeeds, since this is a pure query LU.
1338

1339
    """
1340
    if self.op.names:
1341
      raise errors.OpPrereqError("Selective OS query not supported")
1342

    
1343
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1344
    _CheckOutputFields(static=[],
1345
                       dynamic=self.dynamic_fields,
1346
                       selected=self.op.output_fields)
1347

    
1348
  @staticmethod
1349
  def _DiagnoseByOS(node_list, rlist):
1350
    """Remaps a per-node return list into an a per-os per-node dictionary
1351

1352
      Args:
1353
        node_list: a list with the names of all nodes
1354
        rlist: a map with node names as keys and OS objects as values
1355

1356
      Returns:
1357
        map: a map with osnames as keys and as value another map, with
1358
             nodes as
1359
             keys and list of OS objects as values
1360
             e.g. {"debian-etch": {"node1": [<object>,...],
1361
                                   "node2": [<object>,]}
1362
                  }
1363

1364
    """
1365
    all_os = {}
1366
    for node_name, nr in rlist.iteritems():
1367
      if not nr:
1368
        continue
1369
      for os_obj in nr:
1370
        if os_obj.name not in all_os:
1371
          # build a list of nodes for this os containing empty lists
1372
          # for each node in node_list
1373
          all_os[os_obj.name] = {}
1374
          for nname in node_list:
1375
            all_os[os_obj.name][nname] = []
1376
        all_os[os_obj.name][node_name].append(os_obj)
1377
    return all_os
1378

    
1379
  def Exec(self, feedback_fn):
1380
    """Compute the list of OSes.
1381

1382
    """
1383
    node_list = self.cfg.GetNodeList()
1384
    node_data = rpc.call_os_diagnose(node_list)
1385
    if node_data == False:
1386
      raise errors.OpExecError("Can't gather the list of OSes")
1387
    pol = self._DiagnoseByOS(node_list, node_data)
1388
    output = []
1389
    for os_name, os_data in pol.iteritems():
1390
      row = []
1391
      for field in self.op.output_fields:
1392
        if field == "name":
1393
          val = os_name
1394
        elif field == "valid":
1395
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1396
        elif field == "node_status":
1397
          val = {}
1398
          for node_name, nos_list in os_data.iteritems():
1399
            val[node_name] = [(v.status, v.path) for v in nos_list]
1400
        else:
1401
          raise errors.ParameterError(field)
1402
        row.append(val)
1403
      output.append(row)
1404

    
1405
    return output
1406

    
1407

    
1408
class LURemoveNode(LogicalUnit):
1409
  """Logical unit for removing a node.
1410

1411
  """
1412
  HPATH = "node-remove"
1413
  HTYPE = constants.HTYPE_NODE
1414
  _OP_REQP = ["node_name"]
1415

    
1416
  def BuildHooksEnv(self):
1417
    """Build hooks env.
1418

1419
    This doesn't run on the target node in the pre phase as a failed
1420
    node would not allows itself to run.
1421

1422
    """
1423
    env = {
1424
      "OP_TARGET": self.op.node_name,
1425
      "NODE_NAME": self.op.node_name,
1426
      }
1427
    all_nodes = self.cfg.GetNodeList()
1428
    all_nodes.remove(self.op.node_name)
1429
    return env, all_nodes, all_nodes
1430

    
1431
  def CheckPrereq(self):
1432
    """Check prerequisites.
1433

1434
    This checks:
1435
     - the node exists in the configuration
1436
     - it does not have primary or secondary instances
1437
     - it's not the master
1438

1439
    Any errors are signalled by raising errors.OpPrereqError.
1440

1441
    """
1442
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1443
    if node is None:
1444
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1445

    
1446
    instance_list = self.cfg.GetInstanceList()
1447

    
1448
    masternode = self.sstore.GetMasterNode()
1449
    if node.name == masternode:
1450
      raise errors.OpPrereqError("Node is the master node,"
1451
                                 " you need to failover first.")
1452

    
1453
    for instance_name in instance_list:
1454
      instance = self.cfg.GetInstanceInfo(instance_name)
1455
      if node.name == instance.primary_node:
1456
        raise errors.OpPrereqError("Instance %s still running on the node,"
1457
                                   " please remove first." % instance_name)
1458
      if node.name in instance.secondary_nodes:
1459
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1460
                                   " please remove first." % instance_name)
1461
    self.op.node_name = node.name
1462
    self.node = node
1463

    
1464
  def Exec(self, feedback_fn):
1465
    """Removes the node from the cluster.
1466

1467
    """
1468
    node = self.node
1469
    logger.Info("stopping the node daemon and removing configs from node %s" %
1470
                node.name)
1471

    
1472
    rpc.call_node_leave_cluster(node.name)
1473

    
1474
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1475

    
1476
    logger.Info("Removing node %s from config" % node.name)
1477

    
1478
    self.cfg.RemoveNode(node.name)
1479

    
1480
    _RemoveHostFromEtcHosts(node.name)
1481

    
1482

    
1483
class LUQueryNodes(NoHooksLU):
1484
  """Logical unit for querying nodes.
1485

1486
  """
1487
  _OP_REQP = ["output_fields", "names"]
1488

    
1489
  def CheckPrereq(self):
1490
    """Check prerequisites.
1491

1492
    This checks that the fields required are valid output fields.
1493

1494
    """
1495
    self.dynamic_fields = frozenset([
1496
      "dtotal", "dfree",
1497
      "mtotal", "mnode", "mfree",
1498
      "bootid",
1499
      "ctotal",
1500
      ])
1501

    
1502
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1503
                               "pinst_list", "sinst_list",
1504
                               "pip", "sip"],
1505
                       dynamic=self.dynamic_fields,
1506
                       selected=self.op.output_fields)
1507

    
1508
    self.wanted = _GetWantedNodes(self, self.op.names)
1509

    
1510
  def Exec(self, feedback_fn):
1511
    """Computes the list of nodes and their attributes.
1512

1513
    """
1514
    nodenames = self.wanted
1515
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1516

    
1517
    # begin data gathering
1518

    
1519
    if self.dynamic_fields.intersection(self.op.output_fields):
1520
      live_data = {}
1521
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1522
      for name in nodenames:
1523
        nodeinfo = node_data.get(name, None)
1524
        if nodeinfo:
1525
          live_data[name] = {
1526
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1527
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1528
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1529
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1530
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1531
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1532
            "bootid": nodeinfo['bootid'],
1533
            }
1534
        else:
1535
          live_data[name] = {}
1536
    else:
1537
      live_data = dict.fromkeys(nodenames, {})
1538

    
1539
    node_to_primary = dict([(name, set()) for name in nodenames])
1540
    node_to_secondary = dict([(name, set()) for name in nodenames])
1541

    
1542
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1543
                             "sinst_cnt", "sinst_list"))
1544
    if inst_fields & frozenset(self.op.output_fields):
1545
      instancelist = self.cfg.GetInstanceList()
1546

    
1547
      for instance_name in instancelist:
1548
        inst = self.cfg.GetInstanceInfo(instance_name)
1549
        if inst.primary_node in node_to_primary:
1550
          node_to_primary[inst.primary_node].add(inst.name)
1551
        for secnode in inst.secondary_nodes:
1552
          if secnode in node_to_secondary:
1553
            node_to_secondary[secnode].add(inst.name)
1554

    
1555
    # end data gathering
1556

    
1557
    output = []
1558
    for node in nodelist:
1559
      node_output = []
1560
      for field in self.op.output_fields:
1561
        if field == "name":
1562
          val = node.name
1563
        elif field == "pinst_list":
1564
          val = list(node_to_primary[node.name])
1565
        elif field == "sinst_list":
1566
          val = list(node_to_secondary[node.name])
1567
        elif field == "pinst_cnt":
1568
          val = len(node_to_primary[node.name])
1569
        elif field == "sinst_cnt":
1570
          val = len(node_to_secondary[node.name])
1571
        elif field == "pip":
1572
          val = node.primary_ip
1573
        elif field == "sip":
1574
          val = node.secondary_ip
1575
        elif field in self.dynamic_fields:
1576
          val = live_data[node.name].get(field, None)
1577
        else:
1578
          raise errors.ParameterError(field)
1579
        node_output.append(val)
1580
      output.append(node_output)
1581

    
1582
    return output
1583

    
1584

    
1585
class LUQueryNodeVolumes(NoHooksLU):
1586
  """Logical unit for getting volumes on node(s).
1587

1588
  """
1589
  _OP_REQP = ["nodes", "output_fields"]
1590

    
1591
  def CheckPrereq(self):
1592
    """Check prerequisites.
1593

1594
    This checks that the fields required are valid output fields.
1595

1596
    """
1597
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1598

    
1599
    _CheckOutputFields(static=["node"],
1600
                       dynamic=["phys", "vg", "name", "size", "instance"],
1601
                       selected=self.op.output_fields)
1602

    
1603

    
1604
  def Exec(self, feedback_fn):
1605
    """Computes the list of nodes and their attributes.
1606

1607
    """
1608
    nodenames = self.nodes
1609
    volumes = rpc.call_node_volumes(nodenames)
1610

    
1611
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1612
             in self.cfg.GetInstanceList()]
1613

    
1614
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1615

    
1616
    output = []
1617
    for node in nodenames:
1618
      if node not in volumes or not volumes[node]:
1619
        continue
1620

    
1621
      node_vols = volumes[node][:]
1622
      node_vols.sort(key=lambda vol: vol['dev'])
1623

    
1624
      for vol in node_vols:
1625
        node_output = []
1626
        for field in self.op.output_fields:
1627
          if field == "node":
1628
            val = node
1629
          elif field == "phys":
1630
            val = vol['dev']
1631
          elif field == "vg":
1632
            val = vol['vg']
1633
          elif field == "name":
1634
            val = vol['name']
1635
          elif field == "size":
1636
            val = int(float(vol['size']))
1637
          elif field == "instance":
1638
            for inst in ilist:
1639
              if node not in lv_by_node[inst]:
1640
                continue
1641
              if vol['name'] in lv_by_node[inst][node]:
1642
                val = inst.name
1643
                break
1644
            else:
1645
              val = '-'
1646
          else:
1647
            raise errors.ParameterError(field)
1648
          node_output.append(str(val))
1649

    
1650
        output.append(node_output)
1651

    
1652
    return output
1653

    
1654

    
1655
class LUAddNode(LogicalUnit):
1656
  """Logical unit for adding node to the cluster.
1657

1658
  """
1659
  HPATH = "node-add"
1660
  HTYPE = constants.HTYPE_NODE
1661
  _OP_REQP = ["node_name"]
1662

    
1663
  def BuildHooksEnv(self):
1664
    """Build hooks env.
1665

1666
    This will run on all nodes before, and on all nodes + the new node after.
1667

1668
    """
1669
    env = {
1670
      "OP_TARGET": self.op.node_name,
1671
      "NODE_NAME": self.op.node_name,
1672
      "NODE_PIP": self.op.primary_ip,
1673
      "NODE_SIP": self.op.secondary_ip,
1674
      }
1675
    nodes_0 = self.cfg.GetNodeList()
1676
    nodes_1 = nodes_0 + [self.op.node_name, ]
1677
    return env, nodes_0, nodes_1
1678

    
1679
  def CheckPrereq(self):
1680
    """Check prerequisites.
1681

1682
    This checks:
1683
     - the new node is not already in the config
1684
     - it is resolvable
1685
     - its parameters (single/dual homed) matches the cluster
1686

1687
    Any errors are signalled by raising errors.OpPrereqError.
1688

1689
    """
1690
    node_name = self.op.node_name
1691
    cfg = self.cfg
1692

    
1693
    dns_data = utils.HostInfo(node_name)
1694

    
1695
    node = dns_data.name
1696
    primary_ip = self.op.primary_ip = dns_data.ip
1697
    secondary_ip = getattr(self.op, "secondary_ip", None)
1698
    if secondary_ip is None:
1699
      secondary_ip = primary_ip
1700
    if not utils.IsValidIP(secondary_ip):
1701
      raise errors.OpPrereqError("Invalid secondary IP given")
1702
    self.op.secondary_ip = secondary_ip
1703

    
1704
    node_list = cfg.GetNodeList()
1705
    if not self.op.readd and node in node_list:
1706
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1707
                                 node)
1708
    elif self.op.readd and node not in node_list:
1709
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1710

    
1711
    for existing_node_name in node_list:
1712
      existing_node = cfg.GetNodeInfo(existing_node_name)
1713

    
1714
      if self.op.readd and node == existing_node_name:
1715
        if (existing_node.primary_ip != primary_ip or
1716
            existing_node.secondary_ip != secondary_ip):
1717
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1718
                                     " address configuration as before")
1719
        continue
1720

    
1721
      if (existing_node.primary_ip == primary_ip or
1722
          existing_node.secondary_ip == primary_ip or
1723
          existing_node.primary_ip == secondary_ip or
1724
          existing_node.secondary_ip == secondary_ip):
1725
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1726
                                   " existing node %s" % existing_node.name)
1727

    
1728
    # check that the type of the node (single versus dual homed) is the
1729
    # same as for the master
1730
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1731
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1732
    newbie_singlehomed = secondary_ip == primary_ip
1733
    if master_singlehomed != newbie_singlehomed:
1734
      if master_singlehomed:
1735
        raise errors.OpPrereqError("The master has no private ip but the"
1736
                                   " new node has one")
1737
      else:
1738
        raise errors.OpPrereqError("The master has a private ip but the"
1739
                                   " new node doesn't have one")
1740

    
1741
    # checks reachablity
1742
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1743
      raise errors.OpPrereqError("Node not reachable by ping")
1744

    
1745
    if not newbie_singlehomed:
1746
      # check reachability from my secondary ip to newbie's secondary ip
1747
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1748
                           source=myself.secondary_ip):
1749
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1750
                                   " based ping to noded port")
1751

    
1752
    self.new_node = objects.Node(name=node,
1753
                                 primary_ip=primary_ip,
1754
                                 secondary_ip=secondary_ip)
1755

    
1756
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1757
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1758
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1759
                                   constants.VNC_PASSWORD_FILE)
1760

    
1761
  def Exec(self, feedback_fn):
1762
    """Adds the new node to the cluster.
1763

1764
    """
1765
    new_node = self.new_node
1766
    node = new_node.name
1767

    
1768
    # set up inter-node password and certificate and restarts the node daemon
1769
    gntpass = self.sstore.GetNodeDaemonPassword()
1770
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1771
      raise errors.OpExecError("ganeti password corruption detected")
1772
    f = open(constants.SSL_CERT_FILE)
1773
    try:
1774
      gntpem = f.read(8192)
1775
    finally:
1776
      f.close()
1777
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1778
    # so we use this to detect an invalid certificate; as long as the
1779
    # cert doesn't contain this, the here-document will be correctly
1780
    # parsed by the shell sequence below
1781
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1782
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1783
    if not gntpem.endswith("\n"):
1784
      raise errors.OpExecError("PEM must end with newline")
1785
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1786

    
1787
    # and then connect with ssh to set password and start ganeti-noded
1788
    # note that all the below variables are sanitized at this point,
1789
    # either by being constants or by the checks above
1790
    ss = self.sstore
1791
    mycommand = ("umask 077 && "
1792
                 "echo '%s' > '%s' && "
1793
                 "cat > '%s' << '!EOF.' && \n"
1794
                 "%s!EOF.\n%s restart" %
1795
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1796
                  constants.SSL_CERT_FILE, gntpem,
1797
                  constants.NODE_INITD_SCRIPT))
1798

    
1799
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1800
    if result.failed:
1801
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1802
                               " output: %s" %
1803
                               (node, result.fail_reason, result.output))
1804

    
1805
    # check connectivity
1806
    time.sleep(4)
1807

    
1808
    result = rpc.call_version([node])[node]
1809
    if result:
1810
      if constants.PROTOCOL_VERSION == result:
1811
        logger.Info("communication to node %s fine, sw version %s match" %
1812
                    (node, result))
1813
      else:
1814
        raise errors.OpExecError("Version mismatch master version %s,"
1815
                                 " node version %s" %
1816
                                 (constants.PROTOCOL_VERSION, result))
1817
    else:
1818
      raise errors.OpExecError("Cannot get version from the new node")
1819

    
1820
    # setup ssh on node
1821
    logger.Info("copy ssh key to node %s" % node)
1822
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1823
    keyarray = []
1824
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1825
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1826
                priv_key, pub_key]
1827

    
1828
    for i in keyfiles:
1829
      f = open(i, 'r')
1830
      try:
1831
        keyarray.append(f.read())
1832
      finally:
1833
        f.close()
1834

    
1835
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1836
                               keyarray[3], keyarray[4], keyarray[5])
1837

    
1838
    if not result:
1839
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1840

    
1841
    # Add node to our /etc/hosts, and add key to known_hosts
1842
    _AddHostToEtcHosts(new_node.name)
1843

    
1844
    if new_node.secondary_ip != new_node.primary_ip:
1845
      if not rpc.call_node_tcp_ping(new_node.name,
1846
                                    constants.LOCALHOST_IP_ADDRESS,
1847
                                    new_node.secondary_ip,
1848
                                    constants.DEFAULT_NODED_PORT,
1849
                                    10, False):
1850
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1851
                                 " you gave (%s). Please fix and re-run this"
1852
                                 " command." % new_node.secondary_ip)
1853

    
1854
    success, msg = self.ssh.VerifyNodeHostname(node)
1855
    if not success:
1856
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1857
                               " than the one the resolver gives: %s."
1858
                               " Please fix and re-run this command." %
1859
                               (node, msg))
1860

    
1861
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1862
    # including the node just added
1863
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1864
    dist_nodes = self.cfg.GetNodeList() + [node]
1865
    if myself.name in dist_nodes:
1866
      dist_nodes.remove(myself.name)
1867

    
1868
    logger.Debug("Copying hosts and known_hosts to all nodes")
1869
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870
      result = rpc.call_upload_file(dist_nodes, fname)
1871
      for to_node in dist_nodes:
1872
        if not result[to_node]:
1873
          logger.Error("copy of file %s to node %s failed" %
1874
                       (fname, to_node))
1875

    
1876
    to_copy = ss.GetFileList()
1877
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1878
      to_copy.append(constants.VNC_PASSWORD_FILE)
1879
    for fname in to_copy:
1880
      if not self.ssh.CopyFileToNode(node, fname):
1881
        logger.Error("could not copy file %s to node %s" % (fname, node))
1882

    
1883
    if not self.op.readd:
1884
      logger.Info("adding node %s to cluster.conf" % node)
1885
      self.cfg.AddNode(new_node)
1886

    
1887

    
1888
class LUMasterFailover(LogicalUnit):
1889
  """Failover the master node to the current node.
1890

1891
  This is a special LU in that it must run on a non-master node.
1892

1893
  """
1894
  HPATH = "master-failover"
1895
  HTYPE = constants.HTYPE_CLUSTER
1896
  REQ_MASTER = False
1897
  _OP_REQP = []
1898

    
1899
  def BuildHooksEnv(self):
1900
    """Build hooks env.
1901

1902
    This will run on the new master only in the pre phase, and on all
1903
    the nodes in the post phase.
1904

1905
    """
1906
    env = {
1907
      "OP_TARGET": self.new_master,
1908
      "NEW_MASTER": self.new_master,
1909
      "OLD_MASTER": self.old_master,
1910
      }
1911
    return env, [self.new_master], self.cfg.GetNodeList()
1912

    
1913
  def CheckPrereq(self):
1914
    """Check prerequisites.
1915

1916
    This checks that we are not already the master.
1917

1918
    """
1919
    self.new_master = utils.HostInfo().name
1920
    self.old_master = self.sstore.GetMasterNode()
1921

    
1922
    if self.old_master == self.new_master:
1923
      raise errors.OpPrereqError("This commands must be run on the node"
1924
                                 " where you want the new master to be."
1925
                                 " %s is already the master" %
1926
                                 self.old_master)
1927

    
1928
  def Exec(self, feedback_fn):
1929
    """Failover the master node.
1930

1931
    This command, when run on a non-master node, will cause the current
1932
    master to cease being master, and the non-master to become new
1933
    master.
1934

1935
    """
1936
    #TODO: do not rely on gethostname returning the FQDN
1937
    logger.Info("setting master to %s, old master: %s" %
1938
                (self.new_master, self.old_master))
1939

    
1940
    if not rpc.call_node_stop_master(self.old_master):
1941
      logger.Error("could disable the master role on the old master"
1942
                   " %s, please disable manually" % self.old_master)
1943

    
1944
    ss = self.sstore
1945
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1946
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1947
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1948
      logger.Error("could not distribute the new simple store master file"
1949
                   " to the other nodes, please check.")
1950

    
1951
    if not rpc.call_node_start_master(self.new_master):
1952
      logger.Error("could not start the master role on the new master"
1953
                   " %s, please check" % self.new_master)
1954
      feedback_fn("Error in activating the master IP on the new master,"
1955
                  " please fix manually.")
1956

    
1957

    
1958

    
1959
class LUQueryClusterInfo(NoHooksLU):
1960
  """Query cluster configuration.
1961

1962
  """
1963
  _OP_REQP = []
1964
  REQ_MASTER = False
1965

    
1966
  def CheckPrereq(self):
1967
    """No prerequsites needed for this LU.
1968

1969
    """
1970
    pass
1971

    
1972
  def Exec(self, feedback_fn):
1973
    """Return cluster config.
1974

1975
    """
1976
    result = {
1977
      "name": self.sstore.GetClusterName(),
1978
      "software_version": constants.RELEASE_VERSION,
1979
      "protocol_version": constants.PROTOCOL_VERSION,
1980
      "config_version": constants.CONFIG_VERSION,
1981
      "os_api_version": constants.OS_API_VERSION,
1982
      "export_version": constants.EXPORT_VERSION,
1983
      "master": self.sstore.GetMasterNode(),
1984
      "architecture": (platform.architecture()[0], platform.machine()),
1985
      }
1986

    
1987
    return result
1988

    
1989

    
1990
class LUClusterCopyFile(NoHooksLU):
1991
  """Copy file to cluster.
1992

1993
  """
1994
  _OP_REQP = ["nodes", "filename"]
1995

    
1996
  def CheckPrereq(self):
1997
    """Check prerequisites.
1998

1999
    It should check that the named file exists and that the given list
2000
    of nodes is valid.
2001

2002
    """
2003
    if not os.path.exists(self.op.filename):
2004
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2005

    
2006
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2007

    
2008
  def Exec(self, feedback_fn):
2009
    """Copy a file from master to some nodes.
2010

2011
    Args:
2012
      opts - class with options as members
2013
      args - list containing a single element, the file name
2014
    Opts used:
2015
      nodes - list containing the name of target nodes; if empty, all nodes
2016

2017
    """
2018
    filename = self.op.filename
2019

    
2020
    myname = utils.HostInfo().name
2021

    
2022
    for node in self.nodes:
2023
      if node == myname:
2024
        continue
2025
      if not self.ssh.CopyFileToNode(node, filename):
2026
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
2027

    
2028

    
2029
class LUDumpClusterConfig(NoHooksLU):
2030
  """Return a text-representation of the cluster-config.
2031

2032
  """
2033
  _OP_REQP = []
2034

    
2035
  def CheckPrereq(self):
2036
    """No prerequisites.
2037

2038
    """
2039
    pass
2040

    
2041
  def Exec(self, feedback_fn):
2042
    """Dump a representation of the cluster config to the standard output.
2043

2044
    """
2045
    return self.cfg.DumpConfig()
2046

    
2047

    
2048
class LURunClusterCommand(NoHooksLU):
2049
  """Run a command on some nodes.
2050

2051
  """
2052
  _OP_REQP = ["command", "nodes"]
2053

    
2054
  def CheckPrereq(self):
2055
    """Check prerequisites.
2056

2057
    It checks that the given list of nodes is valid.
2058

2059
    """
2060
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2061

    
2062
  def Exec(self, feedback_fn):
2063
    """Run a command on some nodes.
2064

2065
    """
2066
    # put the master at the end of the nodes list
2067
    master_node = self.sstore.GetMasterNode()
2068
    if master_node in self.nodes:
2069
      self.nodes.remove(master_node)
2070
      self.nodes.append(master_node)
2071

    
2072
    data = []
2073
    for node in self.nodes:
2074
      result = self.ssh.Run(node, "root", self.op.command)
2075
      data.append((node, result.output, result.exit_code))
2076

    
2077
    return data
2078

    
2079

    
2080
class LUActivateInstanceDisks(NoHooksLU):
2081
  """Bring up an instance's disks.
2082

2083
  """
2084
  _OP_REQP = ["instance_name"]
2085

    
2086
  def CheckPrereq(self):
2087
    """Check prerequisites.
2088

2089
    This checks that the instance is in the cluster.
2090

2091
    """
2092
    instance = self.cfg.GetInstanceInfo(
2093
      self.cfg.ExpandInstanceName(self.op.instance_name))
2094
    if instance is None:
2095
      raise errors.OpPrereqError("Instance '%s' not known" %
2096
                                 self.op.instance_name)
2097
    self.instance = instance
2098

    
2099

    
2100
  def Exec(self, feedback_fn):
2101
    """Activate the disks.
2102

2103
    """
2104
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2105
    if not disks_ok:
2106
      raise errors.OpExecError("Cannot activate block devices")
2107

    
2108
    return disks_info
2109

    
2110

    
2111
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2112
  """Prepare the block devices for an instance.
2113

2114
  This sets up the block devices on all nodes.
2115

2116
  Args:
2117
    instance: a ganeti.objects.Instance object
2118
    ignore_secondaries: if true, errors on secondary nodes won't result
2119
                        in an error return from the function
2120

2121
  Returns:
2122
    false if the operation failed
2123
    list of (host, instance_visible_name, node_visible_name) if the operation
2124
         suceeded with the mapping from node devices to instance devices
2125
  """
2126
  device_info = []
2127
  disks_ok = True
2128
  iname = instance.name
2129
  # With the two passes mechanism we try to reduce the window of
2130
  # opportunity for the race condition of switching DRBD to primary
2131
  # before handshaking occured, but we do not eliminate it
2132

    
2133
  # The proper fix would be to wait (with some limits) until the
2134
  # connection has been made and drbd transitions from WFConnection
2135
  # into any other network-connected state (Connected, SyncTarget,
2136
  # SyncSource, etc.)
2137

    
2138
  # 1st pass, assemble on all nodes in secondary mode
2139
  for inst_disk in instance.disks:
2140
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2141
      cfg.SetDiskID(node_disk, node)
2142
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2143
      if not result:
2144
        logger.Error("could not prepare block device %s on node %s"
2145
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2146
        if not ignore_secondaries:
2147
          disks_ok = False
2148

    
2149
  # FIXME: race condition on drbd migration to primary
2150

    
2151
  # 2nd pass, do only the primary node
2152
  for inst_disk in instance.disks:
2153
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2154
      if node != instance.primary_node:
2155
        continue
2156
      cfg.SetDiskID(node_disk, node)
2157
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2158
      if not result:
2159
        logger.Error("could not prepare block device %s on node %s"
2160
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2161
        disks_ok = False
2162
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2163

    
2164
  # leave the disks configured for the primary node
2165
  # this is a workaround that would be fixed better by
2166
  # improving the logical/physical id handling
2167
  for disk in instance.disks:
2168
    cfg.SetDiskID(disk, instance.primary_node)
2169

    
2170
  return disks_ok, device_info
2171

    
2172

    
2173
def _StartInstanceDisks(cfg, instance, force):
2174
  """Start the disks of an instance.
2175

2176
  """
2177
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2178
                                           ignore_secondaries=force)
2179
  if not disks_ok:
2180
    _ShutdownInstanceDisks(instance, cfg)
2181
    if force is not None and not force:
2182
      logger.Error("If the message above refers to a secondary node,"
2183
                   " you can retry the operation using '--force'.")
2184
    raise errors.OpExecError("Disk consistency error")
2185

    
2186

    
2187
class LUDeactivateInstanceDisks(NoHooksLU):
2188
  """Shutdown an instance's disks.
2189

2190
  """
2191
  _OP_REQP = ["instance_name"]
2192

    
2193
  def CheckPrereq(self):
2194
    """Check prerequisites.
2195

2196
    This checks that the instance is in the cluster.
2197

2198
    """
2199
    instance = self.cfg.GetInstanceInfo(
2200
      self.cfg.ExpandInstanceName(self.op.instance_name))
2201
    if instance is None:
2202
      raise errors.OpPrereqError("Instance '%s' not known" %
2203
                                 self.op.instance_name)
2204
    self.instance = instance
2205

    
2206
  def Exec(self, feedback_fn):
2207
    """Deactivate the disks
2208

2209
    """
2210
    instance = self.instance
2211
    ins_l = rpc.call_instance_list([instance.primary_node])
2212
    ins_l = ins_l[instance.primary_node]
2213
    if not type(ins_l) is list:
2214
      raise errors.OpExecError("Can't contact node '%s'" %
2215
                               instance.primary_node)
2216

    
2217
    if self.instance.name in ins_l:
2218
      raise errors.OpExecError("Instance is running, can't shutdown"
2219
                               " block devices.")
2220

    
2221
    _ShutdownInstanceDisks(instance, self.cfg)
2222

    
2223

    
2224
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2225
  """Shutdown block devices of an instance.
2226

2227
  This does the shutdown on all nodes of the instance.
2228

2229
  If the ignore_primary is false, errors on the primary node are
2230
  ignored.
2231

2232
  """
2233
  result = True
2234
  for disk in instance.disks:
2235
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2236
      cfg.SetDiskID(top_disk, node)
2237
      if not rpc.call_blockdev_shutdown(node, top_disk):
2238
        logger.Error("could not shutdown block device %s on node %s" %
2239
                     (disk.iv_name, node))
2240
        if not ignore_primary or node != instance.primary_node:
2241
          result = False
2242
  return result
2243

    
2244

    
2245
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2246
  """Checks if a node has enough free memory.
2247

2248
  This function check if a given node has the needed amount of free
2249
  memory. In case the node has less memory or we cannot get the
2250
  information from the node, this function raise an OpPrereqError
2251
  exception.
2252

2253
  Args:
2254
    - cfg: a ConfigWriter instance
2255
    - node: the node name
2256
    - reason: string to use in the error message
2257
    - requested: the amount of memory in MiB
2258

2259
  """
2260
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2261
  if not nodeinfo or not isinstance(nodeinfo, dict):
2262
    raise errors.OpPrereqError("Could not contact node %s for resource"
2263
                             " information" % (node,))
2264

    
2265
  free_mem = nodeinfo[node].get('memory_free')
2266
  if not isinstance(free_mem, int):
2267
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2268
                             " was '%s'" % (node, free_mem))
2269
  if requested > free_mem:
2270
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2271
                             " needed %s MiB, available %s MiB" %
2272
                             (node, reason, requested, free_mem))
2273

    
2274

    
2275
class LUStartupInstance(LogicalUnit):
2276
  """Starts an instance.
2277

2278
  """
2279
  HPATH = "instance-start"
2280
  HTYPE = constants.HTYPE_INSTANCE
2281
  _OP_REQP = ["instance_name", "force"]
2282

    
2283
  def BuildHooksEnv(self):
2284
    """Build hooks env.
2285

2286
    This runs on master, primary and secondary nodes of the instance.
2287

2288
    """
2289
    env = {
2290
      "FORCE": self.op.force,
2291
      }
2292
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2293
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2294
          list(self.instance.secondary_nodes))
2295
    return env, nl, nl
2296

    
2297
  def CheckPrereq(self):
2298
    """Check prerequisites.
2299

2300
    This checks that the instance is in the cluster.
2301

2302
    """
2303
    instance = self.cfg.GetInstanceInfo(
2304
      self.cfg.ExpandInstanceName(self.op.instance_name))
2305
    if instance is None:
2306
      raise errors.OpPrereqError("Instance '%s' not known" %
2307
                                 self.op.instance_name)
2308

    
2309
    # check bridges existance
2310
    _CheckInstanceBridgesExist(instance)
2311

    
2312
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2313
                         "starting instance %s" % instance.name,
2314
                         instance.memory)
2315

    
2316
    self.instance = instance
2317
    self.op.instance_name = instance.name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Start the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    force = self.op.force
2325
    extra_args = getattr(self.op, "extra_args", "")
2326

    
2327
    self.cfg.MarkInstanceUp(instance.name)
2328

    
2329
    node_current = instance.primary_node
2330

    
2331
    _StartInstanceDisks(self.cfg, instance, force)
2332

    
2333
    if not rpc.call_instance_start(node_current, instance, extra_args):
2334
      _ShutdownInstanceDisks(instance, self.cfg)
2335
      raise errors.OpExecError("Could not start instance")
2336

    
2337

    
2338
class LURebootInstance(LogicalUnit):
2339
  """Reboot an instance.
2340

2341
  """
2342
  HPATH = "instance-reboot"
2343
  HTYPE = constants.HTYPE_INSTANCE
2344
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2345

    
2346
  def BuildHooksEnv(self):
2347
    """Build hooks env.
2348

2349
    This runs on master, primary and secondary nodes of the instance.
2350

2351
    """
2352
    env = {
2353
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2354
      }
2355
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2356
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2357
          list(self.instance.secondary_nodes))
2358
    return env, nl, nl
2359

    
2360
  def CheckPrereq(self):
2361
    """Check prerequisites.
2362

2363
    This checks that the instance is in the cluster.
2364

2365
    """
2366
    instance = self.cfg.GetInstanceInfo(
2367
      self.cfg.ExpandInstanceName(self.op.instance_name))
2368
    if instance is None:
2369
      raise errors.OpPrereqError("Instance '%s' not known" %
2370
                                 self.op.instance_name)
2371

    
2372
    # check bridges existance
2373
    _CheckInstanceBridgesExist(instance)
2374

    
2375
    self.instance = instance
2376
    self.op.instance_name = instance.name
2377

    
2378
  def Exec(self, feedback_fn):
2379
    """Reboot the instance.
2380

2381
    """
2382
    instance = self.instance
2383
    ignore_secondaries = self.op.ignore_secondaries
2384
    reboot_type = self.op.reboot_type
2385
    extra_args = getattr(self.op, "extra_args", "")
2386

    
2387
    node_current = instance.primary_node
2388

    
2389
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2390
                           constants.INSTANCE_REBOOT_HARD,
2391
                           constants.INSTANCE_REBOOT_FULL]:
2392
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2393
                                  (constants.INSTANCE_REBOOT_SOFT,
2394
                                   constants.INSTANCE_REBOOT_HARD,
2395
                                   constants.INSTANCE_REBOOT_FULL))
2396

    
2397
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2398
                       constants.INSTANCE_REBOOT_HARD]:
2399
      if not rpc.call_instance_reboot(node_current, instance,
2400
                                      reboot_type, extra_args):
2401
        raise errors.OpExecError("Could not reboot instance")
2402
    else:
2403
      if not rpc.call_instance_shutdown(node_current, instance):
2404
        raise errors.OpExecError("could not shutdown instance for full reboot")
2405
      _ShutdownInstanceDisks(instance, self.cfg)
2406
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2407
      if not rpc.call_instance_start(node_current, instance, extra_args):
2408
        _ShutdownInstanceDisks(instance, self.cfg)
2409
        raise errors.OpExecError("Could not start instance for full reboot")
2410

    
2411
    self.cfg.MarkInstanceUp(instance.name)
2412

    
2413

    
2414
class LUShutdownInstance(LogicalUnit):
2415
  """Shutdown an instance.
2416

2417
  """
2418
  HPATH = "instance-stop"
2419
  HTYPE = constants.HTYPE_INSTANCE
2420
  _OP_REQP = ["instance_name"]
2421

    
2422
  def BuildHooksEnv(self):
2423
    """Build hooks env.
2424

2425
    This runs on master, primary and secondary nodes of the instance.
2426

2427
    """
2428
    env = _BuildInstanceHookEnvByObject(self.instance)
2429
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2430
          list(self.instance.secondary_nodes))
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster.
2437

2438
    """
2439
    instance = self.cfg.GetInstanceInfo(
2440
      self.cfg.ExpandInstanceName(self.op.instance_name))
2441
    if instance is None:
2442
      raise errors.OpPrereqError("Instance '%s' not known" %
2443
                                 self.op.instance_name)
2444
    self.instance = instance
2445

    
2446
  def Exec(self, feedback_fn):
2447
    """Shutdown the instance.
2448

2449
    """
2450
    instance = self.instance
2451
    node_current = instance.primary_node
2452
    self.cfg.MarkInstanceDown(instance.name)
2453
    if not rpc.call_instance_shutdown(node_current, instance):
2454
      logger.Error("could not shutdown instance")
2455

    
2456
    _ShutdownInstanceDisks(instance, self.cfg)
2457

    
2458

    
2459
class LUReinstallInstance(LogicalUnit):
2460
  """Reinstall an instance.
2461

2462
  """
2463
  HPATH = "instance-reinstall"
2464
  HTYPE = constants.HTYPE_INSTANCE
2465
  _OP_REQP = ["instance_name"]
2466

    
2467
  def BuildHooksEnv(self):
2468
    """Build hooks env.
2469

2470
    This runs on master, primary and secondary nodes of the instance.
2471

2472
    """
2473
    env = _BuildInstanceHookEnvByObject(self.instance)
2474
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2475
          list(self.instance.secondary_nodes))
2476
    return env, nl, nl
2477

    
2478
  def CheckPrereq(self):
2479
    """Check prerequisites.
2480

2481
    This checks that the instance is in the cluster and is not running.
2482

2483
    """
2484
    instance = self.cfg.GetInstanceInfo(
2485
      self.cfg.ExpandInstanceName(self.op.instance_name))
2486
    if instance is None:
2487
      raise errors.OpPrereqError("Instance '%s' not known" %
2488
                                 self.op.instance_name)
2489
    if instance.disk_template == constants.DT_DISKLESS:
2490
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2491
                                 self.op.instance_name)
2492
    if instance.status != "down":
2493
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2494
                                 self.op.instance_name)
2495
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2496
    if remote_info:
2497
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2498
                                 (self.op.instance_name,
2499
                                  instance.primary_node))
2500

    
2501
    self.op.os_type = getattr(self.op, "os_type", None)
2502
    if self.op.os_type is not None:
2503
      # OS verification
2504
      pnode = self.cfg.GetNodeInfo(
2505
        self.cfg.ExpandNodeName(instance.primary_node))
2506
      if pnode is None:
2507
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2508
                                   self.op.pnode)
2509
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2510
      if not os_obj:
2511
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2512
                                   " primary node"  % self.op.os_type)
2513

    
2514
    self.instance = instance
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Reinstall the instance.
2518

2519
    """
2520
    inst = self.instance
2521

    
2522
    if self.op.os_type is not None:
2523
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2524
      inst.os = self.op.os_type
2525
      self.cfg.AddInstance(inst)
2526

    
2527
    _StartInstanceDisks(self.cfg, inst, None)
2528
    try:
2529
      feedback_fn("Running the instance OS create scripts...")
2530
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2531
        raise errors.OpExecError("Could not install OS for instance %s"
2532
                                 " on node %s" %
2533
                                 (inst.name, inst.primary_node))
2534
    finally:
2535
      _ShutdownInstanceDisks(inst, self.cfg)
2536

    
2537

    
2538
class LURenameInstance(LogicalUnit):
2539
  """Rename an instance.
2540

2541
  """
2542
  HPATH = "instance-rename"
2543
  HTYPE = constants.HTYPE_INSTANCE
2544
  _OP_REQP = ["instance_name", "new_name"]
2545

    
2546
  def BuildHooksEnv(self):
2547
    """Build hooks env.
2548

2549
    This runs on master, primary and secondary nodes of the instance.
2550

2551
    """
2552
    env = _BuildInstanceHookEnvByObject(self.instance)
2553
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2554
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2555
          list(self.instance.secondary_nodes))
2556
    return env, nl, nl
2557

    
2558
  def CheckPrereq(self):
2559
    """Check prerequisites.
2560

2561
    This checks that the instance is in the cluster and is not running.
2562

2563
    """
2564
    instance = self.cfg.GetInstanceInfo(
2565
      self.cfg.ExpandInstanceName(self.op.instance_name))
2566
    if instance is None:
2567
      raise errors.OpPrereqError("Instance '%s' not known" %
2568
                                 self.op.instance_name)
2569
    if instance.status != "down":
2570
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2571
                                 self.op.instance_name)
2572
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2573
    if remote_info:
2574
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2575
                                 (self.op.instance_name,
2576
                                  instance.primary_node))
2577
    self.instance = instance
2578

    
2579
    # new name verification
2580
    name_info = utils.HostInfo(self.op.new_name)
2581

    
2582
    self.op.new_name = new_name = name_info.name
2583
    instance_list = self.cfg.GetInstanceList()
2584
    if new_name in instance_list:
2585
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2586
                                 new_name)
2587

    
2588
    if not getattr(self.op, "ignore_ip", False):
2589
      command = ["fping", "-q", name_info.ip]
2590
      result = utils.RunCmd(command)
2591
      if not result.failed:
2592
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2593
                                   (name_info.ip, new_name))
2594

    
2595

    
2596
  def Exec(self, feedback_fn):
2597
    """Reinstall the instance.
2598

2599
    """
2600
    inst = self.instance
2601
    old_name = inst.name
2602

    
2603
    if inst.disk_template == constants.DT_FILE:
2604
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2605

    
2606
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2607

    
2608
    # re-read the instance from the configuration after rename
2609
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2610

    
2611
    if inst.disk_template == constants.DT_FILE:
2612
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2613
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2614
                                                old_file_storage_dir,
2615
                                                new_file_storage_dir)
2616

    
2617
      if not result:
2618
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2619
                                 " directory '%s' to '%s' (but the instance"
2620
                                 " has been renamed in Ganeti)" % (
2621
                                 inst.primary_node, old_file_storage_dir,
2622
                                 new_file_storage_dir))
2623

    
2624
      if not result[0]:
2625
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2626
                                 " (but the instance has been renamed in"
2627
                                 " Ganeti)" % (old_file_storage_dir,
2628
                                               new_file_storage_dir))
2629

    
2630
    _StartInstanceDisks(self.cfg, inst, None)
2631
    try:
2632
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2633
                                          "sda", "sdb"):
2634
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2635
               " instance has been renamed in Ganeti)" %
2636
               (inst.name, inst.primary_node))
2637
        logger.Error(msg)
2638
    finally:
2639
      _ShutdownInstanceDisks(inst, self.cfg)
2640

    
2641

    
2642
class LURemoveInstance(LogicalUnit):
2643
  """Remove an instance.
2644

2645
  """
2646
  HPATH = "instance-remove"
2647
  HTYPE = constants.HTYPE_INSTANCE
2648
  _OP_REQP = ["instance_name"]
2649

    
2650
  def BuildHooksEnv(self):
2651
    """Build hooks env.
2652

2653
    This runs on master, primary and secondary nodes of the instance.
2654

2655
    """
2656
    env = _BuildInstanceHookEnvByObject(self.instance)
2657
    nl = [self.sstore.GetMasterNode()]
2658
    return env, nl, nl
2659

    
2660
  def CheckPrereq(self):
2661
    """Check prerequisites.
2662

2663
    This checks that the instance is in the cluster.
2664

2665
    """
2666
    instance = self.cfg.GetInstanceInfo(
2667
      self.cfg.ExpandInstanceName(self.op.instance_name))
2668
    if instance is None:
2669
      raise errors.OpPrereqError("Instance '%s' not known" %
2670
                                 self.op.instance_name)
2671
    self.instance = instance
2672

    
2673
  def Exec(self, feedback_fn):
2674
    """Remove the instance.
2675

2676
    """
2677
    instance = self.instance
2678
    logger.Info("shutting down instance %s on node %s" %
2679
                (instance.name, instance.primary_node))
2680

    
2681
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2682
      if self.op.ignore_failures:
2683
        feedback_fn("Warning: can't shutdown instance")
2684
      else:
2685
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2686
                                 (instance.name, instance.primary_node))
2687

    
2688
    logger.Info("removing block devices for instance %s" % instance.name)
2689

    
2690
    if not _RemoveDisks(instance, self.cfg):
2691
      if self.op.ignore_failures:
2692
        feedback_fn("Warning: can't remove instance's disks")
2693
      else:
2694
        raise errors.OpExecError("Can't remove instance's disks")
2695

    
2696
    logger.Info("removing instance %s out of cluster config" % instance.name)
2697

    
2698
    self.cfg.RemoveInstance(instance.name)
2699

    
2700

    
2701
class LUQueryInstances(NoHooksLU):
2702
  """Logical unit for querying instances.
2703

2704
  """
2705
  _OP_REQP = ["output_fields", "names"]
2706

    
2707
  def CheckPrereq(self):
2708
    """Check prerequisites.
2709

2710
    This checks that the fields required are valid output fields.
2711

2712
    """
2713
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2714
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2715
                               "admin_state", "admin_ram",
2716
                               "disk_template", "ip", "mac", "bridge",
2717
                               "sda_size", "sdb_size", "vcpus"],
2718
                       dynamic=self.dynamic_fields,
2719
                       selected=self.op.output_fields)
2720

    
2721
    self.wanted = _GetWantedInstances(self, self.op.names)
2722

    
2723
  def Exec(self, feedback_fn):
2724
    """Computes the list of nodes and their attributes.
2725

2726
    """
2727
    instance_names = self.wanted
2728
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2729
                     in instance_names]
2730

    
2731
    # begin data gathering
2732

    
2733
    nodes = frozenset([inst.primary_node for inst in instance_list])
2734

    
2735
    bad_nodes = []
2736
    if self.dynamic_fields.intersection(self.op.output_fields):
2737
      live_data = {}
2738
      node_data = rpc.call_all_instances_info(nodes)
2739
      for name in nodes:
2740
        result = node_data[name]
2741
        if result:
2742
          live_data.update(result)
2743
        elif result == False:
2744
          bad_nodes.append(name)
2745
        # else no instance is alive
2746
    else:
2747
      live_data = dict([(name, {}) for name in instance_names])
2748

    
2749
    # end data gathering
2750

    
2751
    output = []
2752
    for instance in instance_list:
2753
      iout = []
2754
      for field in self.op.output_fields:
2755
        if field == "name":
2756
          val = instance.name
2757
        elif field == "os":
2758
          val = instance.os
2759
        elif field == "pnode":
2760
          val = instance.primary_node
2761
        elif field == "snodes":
2762
          val = list(instance.secondary_nodes)
2763
        elif field == "admin_state":
2764
          val = (instance.status != "down")
2765
        elif field == "oper_state":
2766
          if instance.primary_node in bad_nodes:
2767
            val = None
2768
          else:
2769
            val = bool(live_data.get(instance.name))
2770
        elif field == "status":
2771
          if instance.primary_node in bad_nodes:
2772
            val = "ERROR_nodedown"
2773
          else:
2774
            running = bool(live_data.get(instance.name))
2775
            if running:
2776
              if instance.status != "down":
2777
                val = "running"
2778
              else:
2779
                val = "ERROR_up"
2780
            else:
2781
              if instance.status != "down":
2782
                val = "ERROR_down"
2783
              else:
2784
                val = "ADMIN_down"
2785
        elif field == "admin_ram":
2786
          val = instance.memory
2787
        elif field == "oper_ram":
2788
          if instance.primary_node in bad_nodes:
2789
            val = None
2790
          elif instance.name in live_data:
2791
            val = live_data[instance.name].get("memory", "?")
2792
          else:
2793
            val = "-"
2794
        elif field == "disk_template":
2795
          val = instance.disk_template
2796
        elif field == "ip":
2797
          val = instance.nics[0].ip
2798
        elif field == "bridge":
2799
          val = instance.nics[0].bridge
2800
        elif field == "mac":
2801
          val = instance.nics[0].mac
2802
        elif field == "sda_size" or field == "sdb_size":
2803
          disk = instance.FindDisk(field[:3])
2804
          if disk is None:
2805
            val = None
2806
          else:
2807
            val = disk.size
2808
        elif field == "vcpus":
2809
          val = instance.vcpus
2810
        else:
2811
          raise errors.ParameterError(field)
2812
        iout.append(val)
2813
      output.append(iout)
2814

    
2815
    return output
2816

    
2817

    
2818
class LUFailoverInstance(LogicalUnit):
2819
  """Failover an instance.
2820

2821
  """
2822
  HPATH = "instance-failover"
2823
  HTYPE = constants.HTYPE_INSTANCE
2824
  _OP_REQP = ["instance_name", "ignore_consistency"]
2825

    
2826
  def BuildHooksEnv(self):
2827
    """Build hooks env.
2828

2829
    This runs on master, primary and secondary nodes of the instance.
2830

2831
    """
2832
    env = {
2833
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2834
      }
2835
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2836
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2837
    return env, nl, nl
2838

    
2839
  def CheckPrereq(self):
2840
    """Check prerequisites.
2841

2842
    This checks that the instance is in the cluster.
2843

2844
    """
2845
    instance = self.cfg.GetInstanceInfo(
2846
      self.cfg.ExpandInstanceName(self.op.instance_name))
2847
    if instance is None:
2848
      raise errors.OpPrereqError("Instance '%s' not known" %
2849
                                 self.op.instance_name)
2850

    
2851
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2852
      raise errors.OpPrereqError("Instance's disk layout is not"
2853
                                 " network mirrored, cannot failover.")
2854

    
2855
    secondary_nodes = instance.secondary_nodes
2856
    if not secondary_nodes:
2857
      raise errors.ProgrammerError("no secondary node but using "
2858
                                   "DT_REMOTE_RAID1 template")
2859

    
2860
    target_node = secondary_nodes[0]
2861
    # check memory requirements on the secondary node
2862
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2863
                         instance.name, instance.memory)
2864

    
2865
    # check bridge existance
2866
    brlist = [nic.bridge for nic in instance.nics]
2867
    if not rpc.call_bridges_exist(target_node, brlist):
2868
      raise errors.OpPrereqError("One or more target bridges %s does not"
2869
                                 " exist on destination node '%s'" %
2870
                                 (brlist, target_node))
2871

    
2872
    self.instance = instance
2873

    
2874
  def Exec(self, feedback_fn):
2875
    """Failover an instance.
2876

2877
    The failover is done by shutting it down on its present node and
2878
    starting it on the secondary.
2879

2880
    """
2881
    instance = self.instance
2882

    
2883
    source_node = instance.primary_node
2884
    target_node = instance.secondary_nodes[0]
2885

    
2886
    feedback_fn("* checking disk consistency between source and target")
2887
    for dev in instance.disks:
2888
      # for remote_raid1, these are md over drbd
2889
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2890
        if instance.status == "up" and not self.op.ignore_consistency:
2891
          raise errors.OpExecError("Disk %s is degraded on target node,"
2892
                                   " aborting failover." % dev.iv_name)
2893

    
2894
    feedback_fn("* shutting down instance on source node")
2895
    logger.Info("Shutting down instance %s on node %s" %
2896
                (instance.name, source_node))
2897

    
2898
    if not rpc.call_instance_shutdown(source_node, instance):
2899
      if self.op.ignore_consistency:
2900
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2901
                     " anyway. Please make sure node %s is down"  %
2902
                     (instance.name, source_node, source_node))
2903
      else:
2904
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2905
                                 (instance.name, source_node))
2906

    
2907
    feedback_fn("* deactivating the instance's disks on source node")
2908
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2909
      raise errors.OpExecError("Can't shut down the instance's disks.")
2910

    
2911
    instance.primary_node = target_node
2912
    # distribute new instance config to the other nodes
2913
    self.cfg.AddInstance(instance)
2914

    
2915
    # Only start the instance if it's marked as up
2916
    if instance.status == "up":
2917
      feedback_fn("* activating the instance's disks on target node")
2918
      logger.Info("Starting instance %s on node %s" %
2919
                  (instance.name, target_node))
2920

    
2921
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2922
                                               ignore_secondaries=True)
2923
      if not disks_ok:
2924
        _ShutdownInstanceDisks(instance, self.cfg)
2925
        raise errors.OpExecError("Can't activate the instance's disks")
2926

    
2927
      feedback_fn("* starting the instance on the target node")
2928
      if not rpc.call_instance_start(target_node, instance, None):
2929
        _ShutdownInstanceDisks(instance, self.cfg)
2930
        raise errors.OpExecError("Could not start instance %s on node %s." %
2931
                                 (instance.name, target_node))
2932

    
2933

    
2934
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2935
  """Create a tree of block devices on the primary node.
2936

2937
  This always creates all devices.
2938

2939
  """
2940
  if device.children:
2941
    for child in device.children:
2942
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2943
        return False
2944

    
2945
  cfg.SetDiskID(device, node)
2946
  new_id = rpc.call_blockdev_create(node, device, device.size,
2947
                                    instance.name, True, info)
2948
  if not new_id:
2949
    return False
2950
  if device.physical_id is None:
2951
    device.physical_id = new_id
2952
  return True
2953

    
2954

    
2955
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2956
  """Create a tree of block devices on a secondary node.
2957

2958
  If this device type has to be created on secondaries, create it and
2959
  all its children.
2960

2961
  If not, just recurse to children keeping the same 'force' value.
2962

2963
  """
2964
  if device.CreateOnSecondary():
2965
    force = True
2966
  if device.children:
2967
    for child in device.children:
2968
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2969
                                        child, force, info):
2970
        return False
2971

    
2972
  if not force:
2973
    return True
2974
  cfg.SetDiskID(device, node)
2975
  new_id = rpc.call_blockdev_create(node, device, device.size,
2976
                                    instance.name, False, info)
2977
  if not new_id:
2978
    return False
2979
  if device.physical_id is None:
2980
    device.physical_id = new_id
2981
  return True
2982

    
2983

    
2984
def _GenerateUniqueNames(cfg, exts):
2985
  """Generate a suitable LV name.
2986

2987
  This will generate a logical volume name for the given instance.
2988

2989
  """
2990
  results = []
2991
  for val in exts:
2992
    new_id = cfg.GenerateUniqueID()
2993
    results.append("%s%s" % (new_id, val))
2994
  return results
2995

    
2996

    
2997
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2998
  """Generate a drbd device complete with its children.
2999

3000
  """
3001
  port = cfg.AllocatePort()
3002
  vgname = cfg.GetVGName()
3003
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3004
                          logical_id=(vgname, names[0]))
3005
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3006
                          logical_id=(vgname, names[1]))
3007
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3008
                          logical_id = (primary, secondary, port),
3009
                          children = [dev_data, dev_meta])
3010
  return drbd_dev
3011

    
3012

    
3013
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3014
  """Generate a drbd8 device complete with its children.
3015

3016
  """
3017
  port = cfg.AllocatePort()
3018
  vgname = cfg.GetVGName()
3019
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3020
                          logical_id=(vgname, names[0]))
3021
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3022
                          logical_id=(vgname, names[1]))
3023
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3024
                          logical_id = (primary, secondary, port),
3025
                          children = [dev_data, dev_meta],
3026
                          iv_name=iv_name)
3027
  return drbd_dev
3028

    
3029

    
3030
def _GenerateDiskTemplate(cfg, template_name,
3031
                          instance_name, primary_node,
3032
                          secondary_nodes, disk_sz, swap_sz,
3033
                          file_storage_dir, file_driver):
3034
  """Generate the entire disk layout for a given template type.
3035

3036
  """
3037
  #TODO: compute space requirements
3038

    
3039
  vgname = cfg.GetVGName()
3040
  if template_name == constants.DT_DISKLESS:
3041
    disks = []
3042
  elif template_name == constants.DT_PLAIN:
3043
    if len(secondary_nodes) != 0:
3044
      raise errors.ProgrammerError("Wrong template configuration")
3045

    
3046
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3047
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3048
                           logical_id=(vgname, names[0]),
3049
                           iv_name = "sda")
3050
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3051
                           logical_id=(vgname, names[1]),
3052
                           iv_name = "sdb")
3053
    disks = [sda_dev, sdb_dev]
3054
  elif template_name == constants.DT_DRBD8:
3055
    if len(secondary_nodes) != 1:
3056
      raise errors.ProgrammerError("Wrong template configuration")
3057
    remote_node = secondary_nodes[0]
3058
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3059
                                       ".sdb_data", ".sdb_meta"])
3060
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3061
                                         disk_sz, names[0:2], "sda")
3062
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3063
                                         swap_sz, names[2:4], "sdb")
3064
    disks = [drbd_sda_dev, drbd_sdb_dev]
3065
  elif template_name == constants.DT_FILE:
3066
    if len(secondary_nodes) != 0:
3067
      raise errors.ProgrammerError("Wrong template configuration")
3068

    
3069
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3070
                                iv_name="sda", logical_id=(file_driver,
3071
                                "%s/sda" % file_storage_dir))
3072
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3073
                                iv_name="sdb", logical_id=(file_driver,
3074
                                "%s/sdb" % file_storage_dir))
3075
    disks = [file_sda_dev, file_sdb_dev]
3076
  else:
3077
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3078
  return disks
3079

    
3080

    
3081
def _GetInstanceInfoText(instance):
3082
  """Compute that text that should be added to the disk's metadata.
3083

3084
  """
3085
  return "originstname+%s" % instance.name
3086

    
3087

    
3088
def _CreateDisks(cfg, instance):
3089
  """Create all disks for an instance.
3090

3091
  This abstracts away some work from AddInstance.
3092

3093
  Args:
3094
    instance: the instance object
3095

3096
  Returns:
3097
    True or False showing the success of the creation process
3098

3099
  """
3100
  info = _GetInstanceInfoText(instance)
3101

    
3102
  if instance.disk_template == constants.DT_FILE:
3103
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3104
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3105
                                              file_storage_dir)
3106

    
3107
    if not result:
3108
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3109
      return False
3110

    
3111
    if not result[0]:
3112
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3113
      return False
3114

    
3115
  for device in instance.disks:
3116
    logger.Info("creating volume %s for instance %s" %
3117
                (device.iv_name, instance.name))
3118
    #HARDCODE
3119
    for secondary_node in instance.secondary_nodes:
3120
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3121
                                        device, False, info):
3122
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3123
                     (device.iv_name, device, secondary_node))
3124
        return False
3125
    #HARDCODE
3126
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3127
                                    instance, device, info):
3128
      logger.Error("failed to create volume %s on primary!" %
3129
                   device.iv_name)
3130
      return False
3131

    
3132
  return True
3133

    
3134

    
3135
def _RemoveDisks(instance, cfg):
3136
  """Remove all disks for an instance.
3137

3138
  This abstracts away some work from `AddInstance()` and
3139
  `RemoveInstance()`. Note that in case some of the devices couldn't
3140
  be removed, the removal will continue with the other ones (compare
3141
  with `_CreateDisks()`).
3142

3143
  Args:
3144
    instance: the instance object
3145

3146
  Returns:
3147
    True or False showing the success of the removal proces
3148

3149
  """
3150
  logger.Info("removing block devices for instance %s" % instance.name)
3151

    
3152
  result = True
3153
  for device in instance.disks:
3154
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3155
      cfg.SetDiskID(disk, node)
3156
      if not rpc.call_blockdev_remove(node, disk):
3157
        logger.Error("could not remove block device %s on node %s,"
3158
                     " continuing anyway" %
3159
                     (device.iv_name, node))
3160
        result = False
3161

    
3162
  if instance.disk_template == constants.DT_FILE:
3163
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3164
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3165
                                            file_storage_dir):
3166
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3167
      result = False
3168

    
3169
  return result
3170

    
3171

    
3172
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3173
  """Compute disk size requirements in the volume group
3174

3175
  This is currently hard-coded for the two-drive layout.
3176

3177
  """
3178
  # Required free disk space as a function of disk and swap space
3179
  req_size_dict = {
3180
    constants.DT_DISKLESS: None,
3181
    constants.DT_PLAIN: disk_size + swap_size,
3182
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3183
    constants.DT_DRBD8: disk_size + swap_size + 256,
3184
    constants.DT_FILE: None,
3185
  }
3186

    
3187
  if disk_template not in req_size_dict:
3188
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3189
                                 " is unknown" %  disk_template)
3190

    
3191
  return req_size_dict[disk_template]
3192

    
3193

    
3194
class LUCreateInstance(LogicalUnit):
3195
  """Create an instance.
3196

3197
  """
3198
  HPATH = "instance-add"
3199
  HTYPE = constants.HTYPE_INSTANCE
3200
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3201
              "disk_template", "swap_size", "mode", "start", "vcpus",
3202
              "wait_for_sync", "ip_check", "mac"]
3203

    
3204
  def _RunAllocator(self):
3205
    """Run the allocator based on input opcode.
3206

3207
    """
3208
    disks = [{"size": self.op.disk_size, "mode": "w"},
3209
             {"size": self.op.swap_size, "mode": "w"}]
3210
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3211
             "bridge": self.op.bridge}]
3212
    ial = IAllocator(self.cfg, self.sstore,
3213
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3214
                     name=self.op.instance_name,
3215
                     disk_template=self.op.disk_template,
3216
                     tags=[],
3217
                     os=self.op.os_type,
3218
                     vcpus=self.op.vcpus,
3219
                     mem_size=self.op.mem_size,
3220
                     disks=disks,
3221
                     nics=nics,
3222
                     )
3223

    
3224
    ial.Run(self.op.iallocator)
3225

    
3226
    if not ial.success:
3227
      raise errors.OpPrereqError("Can't compute nodes using"
3228
                                 " iallocator '%s': %s" % (self.op.iallocator,
3229
                                                           ial.info))
3230
    if len(ial.nodes) != ial.required_nodes:
3231
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3232
                                 " of nodes (%s), required %s" %
3233
                                 (len(ial.nodes), ial.required_nodes))
3234
    self.op.pnode = ial.nodes[0]
3235
    logger.ToStdout("Selected nodes for the instance: %s" %
3236
                    (", ".join(ial.nodes),))
3237
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3238
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3239
    if ial.required_nodes == 2:
3240
      self.op.snode = ial.nodes[1]
3241

    
3242
  def BuildHooksEnv(self):
3243
    """Build hooks env.
3244

3245
    This runs on master, primary and secondary nodes of the instance.
3246

3247
    """
3248
    env = {
3249
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3250
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3251
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3252
      "INSTANCE_ADD_MODE": self.op.mode,
3253
      }
3254
    if self.op.mode == constants.INSTANCE_IMPORT:
3255
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3256
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3257
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3258

    
3259
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3260
      primary_node=self.op.pnode,
3261
      secondary_nodes=self.secondaries,
3262
      status=self.instance_status,
3263
      os_type=self.op.os_type,
3264
      memory=self.op.mem_size,
3265
      vcpus=self.op.vcpus,
3266
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3267
    ))
3268

    
3269
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3270
          self.secondaries)
3271
    return env, nl, nl
3272

    
3273

    
3274
  def CheckPrereq(self):
3275
    """Check prerequisites.
3276

3277
    """
3278
    # set optional parameters to none if they don't exist
3279
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3280
                 "iallocator"]:
3281
      if not hasattr(self.op, attr):
3282
        setattr(self.op, attr, None)
3283

    
3284
    if self.op.mode not in (constants.INSTANCE_CREATE,
3285
                            constants.INSTANCE_IMPORT):
3286
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3287
                                 self.op.mode)
3288

    
3289
    if (not self.cfg.GetVGName() and
3290
        self.op.disk_template not in constants.DTS_NOT_LVM):
3291
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3292
                                 " instances")
3293

    
3294
    if self.op.mode == constants.INSTANCE_IMPORT:
3295
      src_node = getattr(self.op, "src_node", None)
3296
      src_path = getattr(self.op, "src_path", None)
3297
      if src_node is None or src_path is None:
3298
        raise errors.OpPrereqError("Importing an instance requires source"
3299
                                   " node and path options")
3300
      src_node_full = self.cfg.ExpandNodeName(src_node)
3301
      if src_node_full is None:
3302
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3303
      self.op.src_node = src_node = src_node_full
3304

    
3305
      if not os.path.isabs(src_path):
3306
        raise errors.OpPrereqError("The source path must be absolute")
3307

    
3308
      export_info = rpc.call_export_info(src_node, src_path)
3309

    
3310
      if not export_info:
3311
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3312

    
3313
      if not export_info.has_section(constants.INISECT_EXP):
3314
        raise errors.ProgrammerError("Corrupted export config")
3315

    
3316
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3317
      if (int(ei_version) != constants.EXPORT_VERSION):
3318
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3319
                                   (ei_version, constants.EXPORT_VERSION))
3320

    
3321
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3322
        raise errors.OpPrereqError("Can't import instance with more than"
3323
                                   " one data disk")
3324

    
3325
      # FIXME: are the old os-es, disk sizes, etc. useful?
3326
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3327
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3328
                                                         'disk0_dump'))
3329
      self.src_image = diskimage
3330
    else: # INSTANCE_CREATE
3331
      if getattr(self.op, "os_type", None) is None:
3332
        raise errors.OpPrereqError("No guest OS specified")
3333

    
3334
    #### instance parameters check
3335

    
3336
    # disk template and mirror node verification
3337
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3338
      raise errors.OpPrereqError("Invalid disk template name")
3339

    
3340
    # instance name verification
3341
    hostname1 = utils.HostInfo(self.op.instance_name)
3342

    
3343
    self.op.instance_name = instance_name = hostname1.name
3344
    instance_list = self.cfg.GetInstanceList()
3345
    if instance_name in instance_list:
3346
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3347
                                 instance_name)
3348

    
3349
    # ip validity checks
3350
    ip = getattr(self.op, "ip", None)
3351
    if ip is None or ip.lower() == "none":
3352
      inst_ip = None
3353
    elif ip.lower() == "auto":
3354
      inst_ip = hostname1.ip
3355
    else:
3356
      if not utils.IsValidIP(ip):
3357
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3358
                                   " like a valid IP" % ip)
3359
      inst_ip = ip
3360
    self.inst_ip = self.op.ip = inst_ip
3361

    
3362
    if self.op.start and not self.op.ip_check:
3363
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3364
                                 " adding an instance in start mode")
3365

    
3366
    if self.op.ip_check:
3367
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3368
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3369
                                   (hostname1.ip, instance_name))
3370

    
3371
    # MAC address verification
3372
    if self.op.mac != "auto":
3373
      if not utils.IsValidMac(self.op.mac.lower()):
3374
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3375
                                   self.op.mac)
3376

    
3377
    # bridge verification
3378
    bridge = getattr(self.op, "bridge", None)
3379
    if bridge is None:
3380
      self.op.bridge = self.cfg.GetDefBridge()
3381
    else:
3382
      self.op.bridge = bridge
3383

    
3384
    # boot order verification
3385
    if self.op.hvm_boot_order is not None:
3386
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3387
        raise errors.OpPrereqError("invalid boot order specified,"
3388
                                   " must be one or more of [acdn]")
3389
    # file storage checks
3390
    if (self.op.file_driver and
3391
        not self.op.file_driver in constants.FILE_DRIVER):
3392
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3393
                                 self.op.file_driver)
3394

    
3395
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3396
      raise errors.OpPrereqError("File storage directory not a relative"
3397
                                 " path")
3398
    #### allocator run
3399

    
3400
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3401
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3402
                                 " node must be given")
3403

    
3404
    if self.op.iallocator is not None:
3405
      self._RunAllocator()
3406

    
3407
    #### node related checks
3408

    
3409
    # check primary node
3410
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3411
    if pnode is None:
3412
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3413
                                 self.op.pnode)
3414
    self.op.pnode = pnode.name
3415
    self.pnode = pnode
3416
    self.secondaries = []
3417

    
3418
    # mirror node verification
3419
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3420
      if getattr(self.op, "snode", None) is None:
3421
        raise errors.OpPrereqError("The networked disk templates need"
3422
                                   " a mirror node")
3423

    
3424
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3425
      if snode_name is None:
3426
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3427
                                   self.op.snode)
3428
      elif snode_name == pnode.name:
3429
        raise errors.OpPrereqError("The secondary node cannot be"
3430
                                   " the primary node.")
3431
      self.secondaries.append(snode_name)
3432

    
3433
    req_size = _ComputeDiskSize(self.op.disk_template,
3434
                                self.op.disk_size, self.op.swap_size)
3435

    
3436
    # Check lv size requirements
3437
    if req_size is not None:
3438
      nodenames = [pnode.name] + self.secondaries
3439
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3440
      for node in nodenames:
3441
        info = nodeinfo.get(node, None)
3442
        if not info:
3443
          raise errors.OpPrereqError("Cannot get current information"
3444
                                     " from node '%s'" % nodeinfo)
3445
        vg_free = info.get('vg_free', None)
3446
        if not isinstance(vg_free, int):
3447
          raise errors.OpPrereqError("Can't compute free disk space on"
3448
                                     " node %s" % node)
3449
        if req_size > info['vg_free']:
3450
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3451
                                     " %d MB available, %d MB required" %
3452
                                     (node, info['vg_free'], req_size))
3453

    
3454
    # os verification
3455
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3456
    if not os_obj:
3457
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458
                                 " primary node"  % self.op.os_type)
3459

    
3460
    if self.op.kernel_path == constants.VALUE_NONE:
3461
      raise errors.OpPrereqError("Can't set instance kernel to none")
3462

    
3463

    
3464
    # bridge check on primary node
3465
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3466
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3467
                                 " destination node '%s'" %
3468
                                 (self.op.bridge, pnode.name))
3469

    
3470
    if self.op.start:
3471
      self.instance_status = 'up'
3472
    else:
3473
      self.instance_status = 'down'
3474

    
3475
  def Exec(self, feedback_fn):
3476
    """Create and add the instance to the cluster.
3477

3478
    """
3479
    instance = self.op.instance_name
3480
    pnode_name = self.pnode.name
3481

    
3482
    if self.op.mac == "auto":
3483
      mac_address = self.cfg.GenerateMAC()
3484
    else:
3485
      mac_address = self.op.mac
3486

    
3487
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3488
    if self.inst_ip is not None:
3489
      nic.ip = self.inst_ip
3490

    
3491
    ht_kind = self.sstore.GetHypervisorType()
3492
    if ht_kind in constants.HTS_REQ_PORT:
3493
      network_port = self.cfg.AllocatePort()
3494
    else:
3495
      network_port = None
3496

    
3497
    # this is needed because os.path.join does not accept None arguments
3498
    if self.op.file_storage_dir is None:
3499
      string_file_storage_dir = ""
3500
    else:
3501
      string_file_storage_dir = self.op.file_storage_dir
3502

    
3503
    # build the full file storage dir path
3504
    file_storage_dir = os.path.normpath(os.path.join(
3505
                                        self.sstore.GetFileStorageDir(),
3506
                                        string_file_storage_dir, instance))
3507

    
3508

    
3509
    disks = _GenerateDiskTemplate(self.cfg,
3510
                                  self.op.disk_template,
3511
                                  instance, pnode_name,
3512
                                  self.secondaries, self.op.disk_size,
3513
                                  self.op.swap_size,
3514
                                  file_storage_dir,
3515
                                  self.op.file_driver)
3516

    
3517
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3518
                            primary_node=pnode_name,
3519
                            memory=self.op.mem_size,
3520
                            vcpus=self.op.vcpus,
3521
                            nics=[nic], disks=disks,
3522
                            disk_template=self.op.disk_template,
3523
                            status=self.instance_status,
3524
                            network_port=network_port,
3525
                            kernel_path=self.op.kernel_path,
3526
                            initrd_path=self.op.initrd_path,
3527
                            hvm_boot_order=self.op.hvm_boot_order,
3528
                            )
3529

    
3530
    feedback_fn("* creating instance disks...")
3531
    if not _CreateDisks(self.cfg, iobj):
3532
      _RemoveDisks(iobj, self.cfg)
3533
      raise errors.OpExecError("Device creation failed, reverting...")
3534

    
3535
    feedback_fn("adding instance %s to cluster config" % instance)
3536

    
3537
    self.cfg.AddInstance(iobj)
3538

    
3539
    if self.op.wait_for_sync:
3540
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3541
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3542
      # make sure the disks are not degraded (still sync-ing is ok)
3543
      time.sleep(15)
3544
      feedback_fn("* checking mirrors status")
3545
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3546
    else:
3547
      disk_abort = False
3548

    
3549
    if disk_abort:
3550
      _RemoveDisks(iobj, self.cfg)
3551
      self.cfg.RemoveInstance(iobj.name)
3552
      raise errors.OpExecError("There are some degraded disks for"
3553
                               " this instance")
3554

    
3555
    feedback_fn("creating os for instance %s on node %s" %
3556
                (instance, pnode_name))
3557

    
3558
    if iobj.disk_template != constants.DT_DISKLESS:
3559
      if self.op.mode == constants.INSTANCE_CREATE:
3560
        feedback_fn("* running the instance OS create scripts...")
3561
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3562
          raise errors.OpExecError("could not add os for instance %s"
3563
                                   " on node %s" %
3564
                                   (instance, pnode_name))
3565

    
3566
      elif self.op.mode == constants.INSTANCE_IMPORT:
3567
        feedback_fn("* running the instance OS import scripts...")
3568
        src_node = self.op.src_node
3569
        src_image = self.src_image
3570
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3571
                                                src_node, src_image):
3572
          raise errors.OpExecError("Could not import os for instance"
3573
                                   " %s on node %s" %
3574
                                   (instance, pnode_name))
3575
      else:
3576
        # also checked in the prereq part
3577
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3578
                                     % self.op.mode)
3579

    
3580
    if self.op.start:
3581
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3582
      feedback_fn("* starting instance...")
3583
      if not rpc.call_instance_start(pnode_name, iobj, None):
3584
        raise errors.OpExecError("Could not start instance")
3585

    
3586

    
3587
class LUConnectConsole(NoHooksLU):
3588
  """Connect to an instance's console.
3589

3590
  This is somewhat special in that it returns the command line that
3591
  you need to run on the master node in order to connect to the
3592
  console.
3593

3594
  """
3595
  _OP_REQP = ["instance_name"]
3596

    
3597
  def CheckPrereq(self):
3598
    """Check prerequisites.
3599

3600
    This checks that the instance is in the cluster.
3601

3602
    """
3603
    instance = self.cfg.GetInstanceInfo(
3604
      self.cfg.ExpandInstanceName(self.op.instance_name))
3605
    if instance is None:
3606
      raise errors.OpPrereqError("Instance '%s' not known" %
3607
                                 self.op.instance_name)
3608
    self.instance = instance
3609

    
3610
  def Exec(self, feedback_fn):
3611
    """Connect to the console of an instance
3612

3613
    """
3614
    instance = self.instance
3615
    node = instance.primary_node
3616

    
3617
    node_insts = rpc.call_instance_list([node])[node]
3618
    if node_insts is False:
3619
      raise errors.OpExecError("Can't connect to node %s." % node)
3620

    
3621
    if instance.name not in node_insts:
3622
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3623

    
3624
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3625

    
3626
    hyper = hypervisor.GetHypervisor()
3627
    console_cmd = hyper.GetShellCommandForConsole(instance)
3628

    
3629
    # build ssh cmdline
3630
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3631

    
3632

    
3633
class LUReplaceDisks(LogicalUnit):
3634
  """Replace the disks of an instance.
3635

3636
  """
3637
  HPATH = "mirrors-replace"
3638
  HTYPE = constants.HTYPE_INSTANCE
3639
  _OP_REQP = ["instance_name", "mode", "disks"]
3640

    
3641
  def _RunAllocator(self):
3642
    """Compute a new secondary node using an IAllocator.
3643

3644
    """
3645
    ial = IAllocator(self.cfg, self.sstore,
3646
                     mode=constants.IALLOCATOR_MODE_RELOC,
3647
                     name=self.op.instance_name,
3648
                     relocate_from=[self.sec_node])
3649

    
3650
    ial.Run(self.op.iallocator)
3651

    
3652
    if not ial.success:
3653
      raise errors.OpPrereqError("Can't compute nodes using"
3654
                                 " iallocator '%s': %s" % (self.op.iallocator,
3655
                                                           ial.info))
3656
    if len(ial.nodes) != ial.required_nodes:
3657
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3658
                                 " of nodes (%s), required %s" %
3659
                                 (len(ial.nodes), ial.required_nodes))
3660
    self.op.remote_node = ial.nodes[0]
3661
    logger.ToStdout("Selected new secondary for the instance: %s" %
3662
                    self.op.remote_node)
3663

    
3664
  def BuildHooksEnv(self):
3665
    """Build hooks env.
3666

3667
    This runs on the master, the primary and all the secondaries.
3668

3669
    """
3670
    env = {
3671
      "MODE": self.op.mode,
3672
      "NEW_SECONDARY": self.op.remote_node,
3673
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3674
      }
3675
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3676
    nl = [
3677
      self.sstore.GetMasterNode(),
3678
      self.instance.primary_node,
3679
      ]
3680
    if self.op.remote_node is not None:
3681
      nl.append(self.op.remote_node)
3682
    return env, nl, nl
3683

    
3684
  def CheckPrereq(self):
3685
    """Check prerequisites.
3686

3687
    This checks that the instance is in the cluster.
3688

3689
    """
3690
    if not hasattr(self.op, "remote_node"):
3691
      self.op.remote_node = None
3692

    
3693
    instance = self.cfg.GetInstanceInfo(
3694
      self.cfg.ExpandInstanceName(self.op.instance_name))
3695
    if instance is None:
3696
      raise errors.OpPrereqError("Instance '%s' not known" %
3697
                                 self.op.instance_name)
3698
    self.instance = instance
3699
    self.op.instance_name = instance.name
3700

    
3701
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3702
      raise errors.OpPrereqError("Instance's disk layout is not"
3703
                                 " network mirrored.")
3704

    
3705
    if len(instance.secondary_nodes) != 1:
3706
      raise errors.OpPrereqError("The instance has a strange layout,"
3707
                                 " expected one secondary but found %d" %
3708
                                 len(instance.secondary_nodes))
3709

    
3710
    self.sec_node = instance.secondary_nodes[0]
3711

    
3712
    ia_name = getattr(self.op, "iallocator", None)
3713
    if ia_name is not None:
3714
      if self.op.remote_node is not None:
3715
        raise errors.OpPrereqError("Give either the iallocator or the new"
3716
                                   " secondary, not both")
3717
      self.op.remote_node = self._RunAllocator()
3718

    
3719
    remote_node = self.op.remote_node
3720
    if remote_node is not None:
3721
      remote_node = self.cfg.ExpandNodeName(remote_node)
3722
      if remote_node is None:
3723
        raise errors.OpPrereqError("Node '%s' not known" %
3724
                                   self.op.remote_node)
3725
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3726
    else:
3727
      self.remote_node_info = None
3728
    if remote_node == instance.primary_node:
3729
      raise errors.OpPrereqError("The specified node is the primary node of"
3730
                                 " the instance.")
3731
    elif remote_node == self.sec_node:
3732
      if self.op.mode == constants.REPLACE_DISK_SEC:
3733
        # this is for DRBD8, where we can't execute the same mode of
3734
        # replacement as for drbd7 (no different port allocated)
3735
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3736
                                   " replacement")
3737
      # the user gave the current secondary, switch to
3738
      # 'no-replace-secondary' mode for drbd7
3739
      remote_node = None
3740
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3741
        self.op.mode != constants.REPLACE_DISK_ALL):
3742
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3743
                                 " disks replacement, not individual ones")
3744
    if instance.disk_template == constants.DT_DRBD8:
3745
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3746
          remote_node is not None):
3747
        # switch to replace secondary mode
3748
        self.op.mode = constants.REPLACE_DISK_SEC
3749

    
3750
      if self.op.mode == constants.REPLACE_DISK_ALL:
3751
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3752
                                   " secondary disk replacement, not"
3753
                                   " both at once")
3754
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3755
        if remote_node is not None:
3756
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3757
                                     " the secondary while doing a primary"
3758
                                     " node disk replacement")
3759
        self.tgt_node = instance.primary_node
3760
        self.oth_node = instance.secondary_nodes[0]
3761
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3762
        self.new_node = remote_node # this can be None, in which case
3763
                                    # we don't change the secondary
3764
        self.tgt_node = instance.secondary_nodes[0]
3765
        self.oth_node = instance.primary_node
3766
      else:
3767
        raise errors.ProgrammerError("Unhandled disk replace mode")
3768

    
3769
    for name in self.op.disks:
3770
      if instance.FindDisk(name) is None:
3771
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3772
                                   (name, instance.name))
3773
    self.op.remote_node = remote_node
3774

    
3775
  def _ExecRR1(self, feedback_fn):
3776
    """Replace the disks of an instance.
3777

3778
    """
3779
    instance = self.instance
3780
    iv_names = {}
3781
    # start of work
3782
    if self.op.remote_node is None:
3783
      remote_node = self.sec_node
3784
    else:
3785
      remote_node = self.op.remote_node
3786
    cfg = self.cfg
3787
    for dev in instance.disks:
3788
      size = dev.size
3789
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3790
      names = _GenerateUniqueNames(cfg, lv_names)
3791
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3792
                                       remote_node, size, names)
3793
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3794
      logger.Info("adding new mirror component on secondary for %s" %
3795
                  dev.iv_name)
3796
      #HARDCODE
3797
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3798
                                        new_drbd, False,
3799
                                        _GetInstanceInfoText(instance)):
3800
        raise errors.OpExecError("Failed to create new component on secondary"
3801
                                 " node %s. Full abort, cleanup manually!" %
3802
                                 remote_node)
3803

    
3804
      logger.Info("adding new mirror component on primary")
3805
      #HARDCODE
3806
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3807
                                      instance, new_drbd,
3808
                                      _GetInstanceInfoText(instance)):
3809
        # remove secondary dev
3810
        cfg.SetDiskID(new_drbd, remote_node)
3811
        rpc.call_blockdev_remove(remote_node, new_drbd)
3812
        raise errors.OpExecError("Failed to create volume on primary!"
3813
                                 " Full abort, cleanup manually!!")
3814

    
3815
      # the device exists now
3816
      # call the primary node to add the mirror to md
3817
      logger.Info("adding new mirror component to md")
3818
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3819
                                           [new_drbd]):
3820
        logger.Error("Can't add mirror compoment to md!")
3821
        cfg.SetDiskID(new_drbd, remote_node)
3822
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3823
          logger.Error("Can't rollback on secondary")
3824
        cfg.SetDiskID(new_drbd, instance.primary_node)
3825
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3826
          logger.Error("Can't rollback on primary")
3827
        raise errors.OpExecError("Full abort, cleanup manually!!")
3828

    
3829
      dev.children.append(new_drbd)
3830
      cfg.AddInstance(instance)
3831

    
3832
    # this can fail as the old devices are degraded and _WaitForSync
3833
    # does a combined result over all disks, so we don't check its
3834
    # return value
3835
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3836

    
3837
    # so check manually all the devices
3838
    for name in iv_names:
3839
      dev, child, new_drbd = iv_names[name]
3840
      cfg.SetDiskID(dev, instance.primary_node)
3841
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3842
      if is_degr:
3843
        raise errors.OpExecError("MD device %s is degraded!" % name)
3844
      cfg.SetDiskID(new_drbd, instance.primary_node)
3845
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3846
      if is_degr:
3847
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3848

    
3849
    for name in iv_names:
3850
      dev, child, new_drbd = iv_names[name]
3851
      logger.Info("remove mirror %s component" % name)
3852
      cfg.SetDiskID(dev, instance.primary_node)
3853
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3854
                                              dev, [child]):
3855
        logger.Error("Can't remove child from mirror, aborting"
3856
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3857
        continue
3858

    
3859
      for node in child.logical_id[:2]:
3860
        logger.Info("remove child device on %s" % node)
3861
        cfg.SetDiskID(child, node)
3862
        if not rpc.call_blockdev_remove(node, child):
3863
          logger.Error("Warning: failed to remove device from node %s,"
3864
                       " continuing operation." % node)
3865

    
3866
      dev.children.remove(child)
3867

    
3868
      cfg.AddInstance(instance)
3869

    
3870
  def _ExecD8DiskOnly(self, feedback_fn):
3871
    """Replace a disk on the primary or secondary for dbrd8.
3872

3873
    The algorithm for replace is quite complicated:
3874
      - for each disk to be replaced:
3875
        - create new LVs on the target node with unique names
3876
        - detach old LVs from the drbd device
3877
        - rename old LVs to name_replaced.<time_t>
3878
        - rename new LVs to old LVs
3879
        - attach the new LVs (with the old names now) to the drbd device
3880
      - wait for sync across all devices
3881
      - for each modified disk:
3882
        - remove old LVs (which have the name name_replaces.<time_t>)
3883

3884
    Failures are not very well handled.
3885

3886
    """
3887
    steps_total = 6
3888
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3889
    instance = self.instance
3890
    iv_names = {}
3891
    vgname = self.cfg.GetVGName()
3892
    # start of work
3893
    cfg = self.cfg
3894
    tgt_node = self.tgt_node
3895
    oth_node = self.oth_node
3896

    
3897
    # Step: check device activation
3898
    self.proc.LogStep(1, steps_total, "check device existence")
3899
    info("checking volume groups")
3900
    my_vg = cfg.GetVGName()
3901
    results = rpc.call_vg_list([oth_node, tgt_node])
3902
    if not results:
3903
      raise errors.OpExecError("Can't list volume groups on the nodes")
3904
    for node in oth_node, tgt_node:
3905
      res = results.get(node, False)
3906
      if not res or my_vg not in res:
3907
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3908
                                 (my_vg, node))
3909
    for dev in instance.disks:
3910
      if not dev.iv_name in self.op.disks:
3911
        continue
3912
      for node in tgt_node, oth_node:
3913
        info("checking %s on %s" % (dev.iv_name, node))
3914
        cfg.SetDiskID(dev, node)
3915
        if not rpc.call_blockdev_find(node, dev):
3916
          raise errors.OpExecError("Can't find device %s on node %s" %
3917
                                   (dev.iv_name, node))
3918

    
3919
    # Step: check other node consistency
3920
    self.proc.LogStep(2, steps_total, "check peer consistency")
3921
    for dev in instance.disks:
3922
      if not dev.iv_name in self.op.disks:
3923
        continue
3924
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3925
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3926
                                   oth_node==instance.primary_node):
3927
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3928
                                 " to replace disks on this node (%s)" %
3929
                                 (oth_node, tgt_node))
3930

    
3931
    # Step: create new storage
3932
    self.proc.LogStep(3, steps_total, "allocate new storage")
3933
    for dev in instance.disks:
3934
      if not dev.iv_name in self.op.disks:
3935
        continue
3936
      size = dev.size
3937
      cfg.SetDiskID(dev, tgt_node)
3938
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3939
      names = _GenerateUniqueNames(cfg, lv_names)
3940
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3941
                             logical_id=(vgname, names[0]))
3942
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3943
                             logical_id=(vgname, names[1]))
3944
      new_lvs = [lv_data, lv_meta]
3945
      old_lvs = dev.children
3946
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3947
      info("creating new local storage on %s for %s" %
3948
           (tgt_node, dev.iv_name))
3949
      # since we *always* want to create this LV, we use the
3950
      # _Create...OnPrimary (which forces the creation), even if we
3951
      # are talking about the secondary node
3952
      for new_lv in new_lvs:
3953
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3954
                                        _GetInstanceInfoText(instance)):
3955
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3956
                                   " node '%s'" %
3957
                                   (new_lv.logical_id[1], tgt_node))
3958

    
3959
    # Step: for each lv, detach+rename*2+attach
3960
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3961
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3962
      info("detaching %s drbd from local storage" % dev.iv_name)
3963
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3964
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3965
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3966
      #dev.children = []
3967
      #cfg.Update(instance)
3968

    
3969
      # ok, we created the new LVs, so now we know we have the needed
3970
      # storage; as such, we proceed on the target node to rename
3971
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3972
      # using the assumption that logical_id == physical_id (which in
3973
      # turn is the unique_id on that node)
3974

    
3975
      # FIXME(iustin): use a better name for the replaced LVs
3976
      temp_suffix = int(time.time())
3977
      ren_fn = lambda d, suff: (d.physical_id[0],
3978
                                d.physical_id[1] + "_replaced-%s" % suff)
3979
      # build the rename list based on what LVs exist on the node
3980
      rlist = []
3981
      for to_ren in old_lvs:
3982
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3983
        if find_res is not None: # device exists
3984
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3985

    
3986
      info("renaming the old LVs on the target node")
3987
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3988
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3989
      # now we rename the new LVs to the old LVs
3990
      info("renaming the new LVs on the target node")
3991
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3992
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3993
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3994

    
3995
      for old, new in zip(old_lvs, new_lvs):
3996
        new.logical_id = old.logical_id
3997
        cfg.SetDiskID(new, tgt_node)
3998

    
3999
      for disk in old_lvs:
4000
        disk.logical_id = ren_fn(disk, temp_suffix)
4001
        cfg.SetDiskID(disk, tgt_node)
4002

    
4003
      # now that the new lvs have the old name, we can add them to the device
4004
      info("adding new mirror component on %s" % tgt_node)
4005
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4006
        for new_lv in new_lvs:
4007
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
4008
            warning("Can't rollback device %s", hint="manually cleanup unused"
4009
                    " logical volumes")
4010
        raise errors.OpExecError("Can't add local storage to drbd")
4011

    
4012
      dev.children = new_lvs
4013
      cfg.Update(instance)
4014

    
4015
    # Step: wait for sync
4016

    
4017
    # this can fail as the old devices are degraded and _WaitForSync
4018
    # does a combined result over all disks, so we don't check its
4019
    # return value
4020
    self.proc.LogStep(5, steps_total, "sync devices")
4021
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4022

    
4023
    # so check manually all the devices
4024
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4025
      cfg.SetDiskID(dev, instance.primary_node)
4026
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4027
      if is_degr:
4028
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4029

    
4030
    # Step: remove old storage
4031
    self.proc.LogStep(6, steps_total, "removing old storage")
4032
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4033
      info("remove logical volumes for %s" % name)
4034
      for lv in old_lvs:
4035
        cfg.SetDiskID(lv, tgt_node)
4036
        if not rpc.call_blockdev_remove(tgt_node, lv):
4037
          warning("Can't remove old LV", hint="manually remove unused LVs")
4038
          continue
4039

    
4040
  def _ExecD8Secondary(self, feedback_fn):
4041
    """Replace the secondary node for drbd8.
4042

4043
    The algorithm for replace is quite complicated:
4044
      - for all disks of the instance:
4045
        - create new LVs on the new node with same names
4046
        - shutdown the drbd device on the old secondary
4047
        - disconnect the drbd network on the primary
4048
        - create the drbd device on the new secondary
4049
        - network attach the drbd on the primary, using an artifice:
4050
          the drbd code for Attach() will connect to the network if it
4051
          finds a device which is connected to the good local disks but
4052
          not network enabled
4053
      - wait for sync across all devices
4054
      - remove all disks from the old secondary
4055

4056
    Failures are not very well handled.
4057

4058
    """
4059
    steps_total = 6
4060
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4061
    instance = self.instance
4062
    iv_names = {}
4063
    vgname = self.cfg.GetVGName()
4064
    # start of work
4065
    cfg = self.cfg
4066
    old_node = self.tgt_node
4067
    new_node = self.new_node
4068
    pri_node = instance.primary_node
4069

    
4070
    # Step: check device activation
4071
    self.proc.LogStep(1, steps_total, "check device existence")
4072
    info("checking volume groups")
4073
    my_vg = cfg.GetVGName()
4074
    results = rpc.call_vg_list([pri_node, new_node])
4075
    if not results:
4076
      raise errors.OpExecError("Can't list volume groups on the nodes")
4077
    for node in pri_node, new_node:
4078
      res = results.get(node, False)
4079
      if not res or my_vg not in res:
4080
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4081
                                 (my_vg, node))
4082
    for dev in instance.disks:
4083
      if not dev.iv_name in self.op.disks:
4084
        continue
4085
      info("checking %s on %s" % (dev.iv_name, pri_node))
4086
      cfg.SetDiskID(dev, pri_node)
4087
      if not rpc.call_blockdev_find(pri_node, dev):
4088
        raise errors.OpExecError("Can't find device %s on node %s" %
4089
                                 (dev.iv_name, pri_node))
4090

    
4091
    # Step: check other node consistency
4092
    self.proc.LogStep(2, steps_total, "check peer consistency")
4093
    for dev in instance.disks:
4094
      if not dev.iv_name in self.op.disks:
4095
        continue
4096
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4097
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4098
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4099
                                 " unsafe to replace the secondary" %
4100
                                 pri_node)
4101

    
4102
    # Step: create new storage
4103
    self.proc.LogStep(3, steps_total, "allocate new storage")
4104
    for dev in instance.disks:
4105
      size = dev.size
4106
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4107
      # since we *always* want to create this LV, we use the
4108
      # _Create...OnPrimary (which forces the creation), even if we
4109
      # are talking about the secondary node
4110
      for new_lv in dev.children:
4111
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4112
                                        _GetInstanceInfoText(instance)):
4113
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4114
                                   " node '%s'" %
4115
                                   (new_lv.logical_id[1], new_node))
4116

    
4117
      iv_names[dev.iv_name] = (dev, dev.children)
4118

    
4119
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4120
    for dev in instance.disks:
4121
      size = dev.size
4122
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4123
      # create new devices on new_node
4124
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4125
                              logical_id=(pri_node, new_node,
4126
                                          dev.logical_id[2]),
4127
                              children=dev.children)
4128
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4129
                                        new_drbd, False,
4130
                                      _GetInstanceInfoText(instance)):
4131
        raise errors.OpExecError("Failed to create new DRBD on"
4132
                                 " node '%s'" % new_node)
4133

    
4134
    for dev in instance.disks:
4135
      # we have new devices, shutdown the drbd on the old secondary
4136
      info("shutting down drbd for %s on old node" % dev.iv_name)
4137
      cfg.SetDiskID(dev, old_node)
4138
      if not rpc.call_blockdev_shutdown(old_node, dev):
4139
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4140
                hint="Please cleanup this device manually as soon as possible")
4141

    
4142
    info("detaching primary drbds from the network (=> standalone)")
4143
    done = 0
4144
    for dev in instance.disks:
4145
      cfg.SetDiskID(dev, pri_node)
4146
      # set the physical (unique in bdev terms) id to None, meaning
4147
      # detach from network
4148
      dev.physical_id = (None,) * len(dev.physical_id)
4149
      # and 'find' the device, which will 'fix' it to match the
4150
      # standalone state
4151
      if rpc.call_blockdev_find(pri_node, dev):
4152
        done += 1
4153
      else:
4154
        warning("Failed to detach drbd %s from network, unusual case" %
4155
                dev.iv_name)
4156

    
4157
    if not done:
4158
      # no detaches succeeded (very unlikely)
4159
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4160

    
4161
    # if we managed to detach at least one, we update all the disks of
4162
    # the instance to point to the new secondary
4163
    info("updating instance configuration")
4164
    for dev in instance.disks:
4165
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4166
      cfg.SetDiskID(dev, pri_node)
4167
    cfg.Update(instance)
4168

    
4169
    # and now perform the drbd attach
4170
    info("attaching primary drbds to new secondary (standalone => connected)")
4171
    failures = []
4172
    for dev in instance.disks:
4173
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4174
      # since the attach is smart, it's enough to 'find' the device,
4175
      # it will automatically activate the network, if the physical_id
4176
      # is correct
4177
      cfg.SetDiskID(dev, pri_node)
4178
      if not rpc.call_blockdev_find(pri_node, dev):
4179
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4180
                "please do a gnt-instance info to see the status of disks")
4181

    
4182
    # this can fail as the old devices are degraded and _WaitForSync
4183
    # does a combined result over all disks, so we don't check its
4184
    # return value
4185
    self.proc.LogStep(5, steps_total, "sync devices")
4186
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4187

    
4188
    # so check manually all the devices
4189
    for name, (dev, old_lvs) in iv_names.iteritems():
4190
      cfg.SetDiskID(dev, pri_node)
4191
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4192
      if is_degr:
4193
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4194

    
4195
    self.proc.LogStep(6, steps_total, "removing old storage")
4196
    for name, (dev, old_lvs) in iv_names.iteritems():
4197
      info("remove logical volumes for %s" % name)
4198
      for lv in old_lvs:
4199
        cfg.SetDiskID(lv, old_node)
4200
        if not rpc.call_blockdev_remove(old_node, lv):
4201
          warning("Can't remove LV on old secondary",
4202
                  hint="Cleanup stale volumes by hand")
4203

    
4204
  def Exec(self, feedback_fn):
4205
    """Execute disk replacement.
4206

4207
    This dispatches the disk replacement to the appropriate handler.
4208

4209
    """
4210
    instance = self.instance
4211
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4212
      fn = self._ExecRR1
4213
    elif instance.disk_template == constants.DT_DRBD8:
4214
      if self.op.remote_node is None:
4215
        fn = self._ExecD8DiskOnly
4216
      else:
4217
        fn = self._ExecD8Secondary
4218
    else:
4219
      raise errors.ProgrammerError("Unhandled disk replacement case")
4220
    return fn(feedback_fn)
4221

    
4222

    
4223
class LUQueryInstanceData(NoHooksLU):
4224
  """Query runtime instance data.
4225

4226
  """
4227
  _OP_REQP = ["instances"]
4228

    
4229
  def CheckPrereq(self):
4230
    """Check prerequisites.
4231

4232
    This only checks the optional instance list against the existing names.
4233

4234
    """
4235
    if not isinstance(self.op.instances, list):
4236
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4237
    if self.op.instances:
4238
      self.wanted_instances = []
4239
      names = self.op.instances
4240
      for name in names:
4241
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4242
        if instance is None:
4243
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4244
        self.wanted_instances.append(instance)
4245
    else:
4246
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4247
                               in self.cfg.GetInstanceList()]
4248
    return
4249

    
4250

    
4251
  def _ComputeDiskStatus(self, instance, snode, dev):
4252
    """Compute block device status.
4253

4254
    """
4255
    self.cfg.SetDiskID(dev, instance.primary_node)
4256
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4257
    if dev.dev_type in constants.LDS_DRBD:
4258
      # we change the snode then (otherwise we use the one passed in)
4259
      if dev.logical_id[0] == instance.primary_node:
4260
        snode = dev.logical_id[1]
4261
      else:
4262
        snode = dev.logical_id[0]
4263

    
4264
    if snode:
4265
      self.cfg.SetDiskID(dev, snode)
4266
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4267
    else:
4268
      dev_sstatus = None
4269

    
4270
    if dev.children:
4271
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4272
                      for child in dev.children]
4273
    else:
4274
      dev_children = []
4275

    
4276
    data = {
4277
      "iv_name": dev.iv_name,
4278
      "dev_type": dev.dev_type,
4279
      "logical_id": dev.logical_id,
4280
      "physical_id": dev.physical_id,
4281
      "pstatus": dev_pstatus,
4282
      "sstatus": dev_sstatus,
4283
      "children": dev_children,
4284
      }
4285

    
4286
    return data
4287

    
4288
  def Exec(self, feedback_fn):
4289
    """Gather and return data"""
4290
    result = {}
4291
    for instance in self.wanted_instances:
4292
      remote_info = rpc.call_instance_info(instance.primary_node,
4293
                                                instance.name)
4294
      if remote_info and "state" in remote_info:
4295
        remote_state = "up"
4296
      else:
4297
        remote_state = "down"
4298
      if instance.status == "down":
4299
        config_state = "down"
4300
      else:
4301
        config_state = "up"
4302

    
4303
      disks = [self._ComputeDiskStatus(instance, None, device)
4304
               for device in instance.disks]
4305

    
4306
      idict = {
4307
        "name": instance.name,
4308
        "config_state": config_state,
4309
        "run_state": remote_state,
4310
        "pnode": instance.primary_node,
4311
        "snodes": instance.secondary_nodes,
4312
        "os": instance.os,
4313
        "memory": instance.memory,
4314
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4315
        "disks": disks,
4316
        "network_port": instance.network_port,
4317
        "vcpus": instance.vcpus,
4318
        "kernel_path": instance.kernel_path,
4319
        "initrd_path": instance.initrd_path,
4320
        "hvm_boot_order": instance.hvm_boot_order,
4321
        }
4322

    
4323
      result[instance.name] = idict
4324

    
4325
    return result
4326

    
4327

    
4328
class LUSetInstanceParams(LogicalUnit):
4329
  """Modifies an instances's parameters.
4330

4331
  """
4332
  HPATH = "instance-modify"
4333
  HTYPE = constants.HTYPE_INSTANCE
4334
  _OP_REQP = ["instance_name"]
4335

    
4336
  def BuildHooksEnv(self):
4337
    """Build hooks env.
4338

4339
    This runs on the master, primary and secondaries.
4340

4341
    """
4342
    args = dict()
4343
    if self.mem:
4344
      args['memory'] = self.mem
4345
    if self.vcpus:
4346
      args['vcpus'] = self.vcpus
4347
    if self.do_ip or self.do_bridge or self.mac:
4348
      if self.do_ip:
4349
        ip = self.ip
4350
      else:
4351
        ip = self.instance.nics[0].ip
4352
      if self.bridge:
4353
        bridge = self.bridge
4354
      else:
4355
        bridge = self.instance.nics[0].bridge
4356
      if self.mac:
4357
        mac = self.mac
4358
      else:
4359
        mac = self.instance.nics[0].mac
4360
      args['nics'] = [(ip, bridge, mac)]
4361
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4362
    nl = [self.sstore.GetMasterNode(),
4363
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4364
    return env, nl, nl
4365

    
4366
  def CheckPrereq(self):
4367
    """Check prerequisites.
4368

4369
    This only checks the instance list against the existing names.
4370

4371
    """
4372
    self.mem = getattr(self.op, "mem", None)
4373
    self.vcpus = getattr(self.op, "vcpus", None)
4374
    self.ip = getattr(self.op, "ip", None)
4375
    self.mac = getattr(self.op, "mac", None)
4376
    self.bridge = getattr(self.op, "bridge", None)
4377
    self.kernel_path = getattr(self.op, "kernel_path", None)
4378
    self.initrd_path = getattr(self.op, "initrd_path", None)
4379
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4380
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4381
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4382
    if all_params.count(None) == len(all_params):
4383
      raise errors.OpPrereqError("No changes submitted")
4384
    if self.mem is not None:
4385
      try:
4386
        self.mem = int(self.mem)
4387
      except ValueError, err:
4388
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4389
    if self.vcpus is not None:
4390
      try:
4391
        self.vcpus = int(self.vcpus)
4392
      except ValueError, err:
4393
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4394
    if self.ip is not None:
4395
      self.do_ip = True
4396
      if self.ip.lower() == "none":
4397
        self.ip = None
4398
      else:
4399
        if not utils.IsValidIP(self.ip):
4400
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4401
    else:
4402
      self.do_ip = False
4403
    self.do_bridge = (self.bridge is not None)
4404
    if self.mac is not None:
4405
      if self.cfg.IsMacInUse(self.mac):
4406
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4407
                                   self.mac)
4408
      if not utils.IsValidMac(self.mac):
4409
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4410

    
4411
    if self.kernel_path is not None:
4412
      self.do_kernel_path = True
4413
      if self.kernel_path == constants.VALUE_NONE:
4414
        raise errors.OpPrereqError("Can't set instance to no kernel")
4415

    
4416
      if self.kernel_path != constants.VALUE_DEFAULT:
4417
        if not os.path.isabs(self.kernel_path):
4418
          raise errors.OpPrereqError("The kernel path must be an absolute"
4419
                                    " filename")
4420
    else:
4421
      self.do_kernel_path = False
4422

    
4423
    if self.initrd_path is not None:
4424
      self.do_initrd_path = True
4425
      if self.initrd_path not in (constants.VALUE_NONE,
4426
                                  constants.VALUE_DEFAULT):
4427
        if not os.path.isabs(self.initrd_path):
4428
          raise errors.OpPrereqError("The initrd path must be an absolute"
4429
                                    " filename")
4430
    else:
4431
      self.do_initrd_path = False
4432

    
4433
    # boot order verification
4434
    if self.hvm_boot_order is not None:
4435
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4436
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4437
          raise errors.OpPrereqError("invalid boot order specified,"
4438
                                     " must be one or more of [acdn]"
4439
                                     " or 'default'")
4440

    
4441
    instance = self.cfg.GetInstanceInfo(
4442
      self.cfg.ExpandInstanceName(self.op.instance_name))
4443
    if instance is None:
4444
      raise errors.OpPrereqError("No such instance name '%s'" %
4445
                                 self.op.instance_name)
4446
    self.op.instance_name = instance.name
4447
    self.instance = instance
4448
    return
4449

    
4450
  def Exec(self, feedback_fn):
4451
    """Modifies an instance.
4452

4453
    All parameters take effect only at the next restart of the instance.
4454
    """
4455
    result = []
4456
    instance = self.instance
4457
    if self.mem:
4458
      instance.memory = self.mem
4459
      result.append(("mem", self.mem))
4460
    if self.vcpus:
4461
      instance.vcpus = self.vcpus
4462
      result.append(("vcpus",  self.vcpus))
4463
    if self.do_ip:
4464
      instance.nics[0].ip = self.ip
4465
      result.append(("ip", self.ip))
4466
    if self.bridge:
4467
      instance.nics[0].bridge = self.bridge
4468
      result.append(("bridge", self.bridge))
4469
    if self.mac:
4470
      instance.nics[0].mac = self.mac
4471
      result.append(("mac", self.mac))
4472
    if self.do_kernel_path:
4473
      instance.kernel_path = self.kernel_path
4474
      result.append(("kernel_path", self.kernel_path))
4475
    if self.do_initrd_path:
4476
      instance.initrd_path = self.initrd_path
4477
      result.append(("initrd_path", self.initrd_path))
4478
    if self.hvm_boot_order:
4479
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4480
        instance.hvm_boot_order = None
4481
      else:
4482
        instance.hvm_boot_order = self.hvm_boot_order
4483
      result.append(("hvm_boot_order", self.hvm_boot_order))
4484

    
4485
    self.cfg.AddInstance(instance)
4486

    
4487
    return result
4488

    
4489

    
4490
class LUQueryExports(NoHooksLU):
4491
  """Query the exports list
4492

4493
  """
4494
  _OP_REQP = []
4495

    
4496
  def CheckPrereq(self):
4497
    """Check that the nodelist contains only existing nodes.
4498

4499
    """
4500
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4501

    
4502
  def Exec(self, feedback_fn):
4503
    """Compute the list of all the exported system images.
4504

4505
    Returns:
4506
      a dictionary with the structure node->(export-list)
4507
      where export-list is a list of the instances exported on
4508
      that node.
4509

4510
    """
4511
    return rpc.call_export_list(self.nodes)
4512

    
4513

    
4514
class LUExportInstance(LogicalUnit):
4515
  """Export an instance to an image in the cluster.
4516

4517
  """
4518
  HPATH = "instance-export"
4519
  HTYPE = constants.HTYPE_INSTANCE
4520
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4521

    
4522
  def BuildHooksEnv(self):
4523
    """Build hooks env.
4524

4525
    This will run on the master, primary node and target node.
4526

4527
    """
4528
    env = {
4529
      "EXPORT_NODE": self.op.target_node,
4530
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4531
      }
4532
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4533
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4534
          self.op.target_node]
4535
    return env, nl, nl
4536

    
4537
  def CheckPrereq(self):
4538
    """Check prerequisites.
4539

4540
    This checks that the instance and node names are valid.
4541

4542
    """
4543
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4544
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4545
    if self.instance is None:
4546
      raise errors.OpPrereqError("Instance '%s' not found" %
4547
                                 self.op.instance_name)
4548

    
4549
    # node verification
4550
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4551
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4552

    
4553
    if self.dst_node is None:
4554
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4555
                                 self.op.target_node)
4556
    self.op.target_node = self.dst_node.name
4557

    
4558
    # instance disk type verification
4559
    for disk in self.instance.disks:
4560
      if disk.dev_type == constants.LD_FILE:
4561
        raise errors.OpPrereqError("Export not supported for instances with"
4562
                                   " file-based disks")
4563

    
4564
  def Exec(self, feedback_fn):
4565
    """Export an instance to an image in the cluster.
4566

4567
    """
4568
    instance = self.instance
4569
    dst_node = self.dst_node
4570
    src_node = instance.primary_node
4571
    if self.op.shutdown:
4572
      # shutdown the instance, but not the disks
4573
      if not rpc.call_instance_shutdown(src_node, instance):
4574
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4575
                                  (instance.name, src_node))
4576

    
4577
    vgname = self.cfg.GetVGName()
4578

    
4579
    snap_disks = []
4580

    
4581
    try:
4582
      for disk in instance.disks:
4583
        if disk.iv_name == "sda":
4584
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4585
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4586

    
4587
          if not new_dev_name:
4588
            logger.Error("could not snapshot block device %s on node %s" %
4589
                         (disk.logical_id[1], src_node))
4590
          else:
4591
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4592
                                      logical_id=(vgname, new_dev_name),
4593
                                      physical_id=(vgname, new_dev_name),
4594
                                      iv_name=disk.iv_name)
4595
            snap_disks.append(new_dev)
4596

    
4597
    finally:
4598
      if self.op.shutdown and instance.status == "up":
4599
        if not rpc.call_instance_start(src_node, instance, None):
4600
          _ShutdownInstanceDisks(instance, self.cfg)
4601
          raise errors.OpExecError("Could not start instance")
4602

    
4603
    # TODO: check for size
4604

    
4605
    for dev in snap_disks:
4606
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4607
        logger.Error("could not export block device %s from node %s to node %s"
4608
                     % (dev.logical_id[1], src_node, dst_node.name))
4609
      if not rpc.call_blockdev_remove(src_node, dev):
4610
        logger.Error("could not remove snapshot block device %s from node %s" %
4611
                     (dev.logical_id[1], src_node))
4612

    
4613
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4614
      logger.Error("could not finalize export for instance %s on node %s" %
4615
                   (instance.name, dst_node.name))
4616

    
4617
    nodelist = self.cfg.GetNodeList()
4618
    nodelist.remove(dst_node.name)
4619

    
4620
    # on one-node clusters nodelist will be empty after the removal
4621
    # if we proceed the backup would be removed because OpQueryExports
4622
    # substitutes an empty list with the full cluster node list.
4623
    if nodelist:
4624
      op = opcodes.OpQueryExports(nodes=nodelist)
4625
      exportlist = self.proc.ChainOpCode(op)
4626
      for node in exportlist:
4627
        if instance.name in exportlist[node]:
4628
          if not rpc.call_export_remove(node, instance.name):
4629
            logger.Error("could not remove older export for instance %s"
4630
                         " on node %s" % (instance.name, node))
4631

    
4632

    
4633
class LURemoveExport(NoHooksLU):
4634
  """Remove exports related to the named instance.
4635

4636
  """
4637
  _OP_REQP = ["instance_name"]
4638

    
4639
  def CheckPrereq(self):
4640
    """Check prerequisites.
4641
    """
4642
    pass
4643

    
4644
  def Exec(self, feedback_fn):
4645
    """Remove any export.
4646

4647
    """
4648
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4649
    # If the instance was not found we'll try with the name that was passed in.
4650
    # This will only work if it was an FQDN, though.
4651
    fqdn_warn = False
4652
    if not instance_name:
4653
      fqdn_warn = True
4654
      instance_name = self.op.instance_name
4655

    
4656
    op = opcodes.OpQueryExports(nodes=[])
4657
    exportlist = self.proc.ChainOpCode(op)
4658
    found = False
4659
    for node in exportlist:
4660
      if instance_name in exportlist[node]:
4661
        found = True
4662
        if not rpc.call_export_remove(node, instance_name):
4663
          logger.Error("could not remove export for instance %s"
4664
                       " on node %s" % (instance_name, node))
4665

    
4666
    if fqdn_warn and not found:
4667
      feedback_fn("Export not found. If trying to remove an export belonging"
4668
                  " to a deleted instance please use its Fully Qualified"
4669
                  " Domain Name.")
4670

    
4671

    
4672
class TagsLU(NoHooksLU):
4673
  """Generic tags LU.
4674

4675
  This is an abstract class which is the parent of all the other tags LUs.
4676

4677
  """
4678
  def CheckPrereq(self):
4679
    """Check prerequisites.
4680

4681
    """
4682
    if self.op.kind == constants.TAG_CLUSTER:
4683
      self.target = self.cfg.GetClusterInfo()
4684
    elif self.op.kind == constants.TAG_NODE:
4685
      name = self.cfg.ExpandNodeName(self.op.name)
4686
      if name is None:
4687
        raise errors.OpPrereqError("Invalid node name (%s)" %
4688
                                   (self.op.name,))
4689
      self.op.name = name
4690
      self.target = self.cfg.GetNodeInfo(name)
4691
    elif self.op.kind == constants.TAG_INSTANCE:
4692
      name = self.cfg.ExpandInstanceName(self.op.name)
4693
      if name is None:
4694
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4695
                                   (self.op.name,))
4696
      self.op.name = name
4697
      self.target = self.cfg.GetInstanceInfo(name)
4698
    else:
4699
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4700
                                 str(self.op.kind))
4701

    
4702

    
4703
class LUGetTags(TagsLU):
4704
  """Returns the tags of a given object.
4705

4706
  """
4707
  _OP_REQP = ["kind", "name"]
4708

    
4709
  def Exec(self, feedback_fn):
4710
    """Returns the tag list.
4711

4712
    """
4713
    return self.target.GetTags()
4714

    
4715

    
4716
class LUSearchTags(NoHooksLU):
4717
  """Searches the tags for a given pattern.
4718

4719
  """
4720
  _OP_REQP = ["pattern"]
4721

    
4722
  def CheckPrereq(self):
4723
    """Check prerequisites.
4724

4725
    This checks the pattern passed for validity by compiling it.
4726

4727
    """
4728
    try:
4729
      self.re = re.compile(self.op.pattern)
4730
    except re.error, err:
4731
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4732
                                 (self.op.pattern, err))
4733

    
4734
  def Exec(self, feedback_fn):
4735
    """Returns the tag list.
4736

4737
    """
4738
    cfg = self.cfg
4739
    tgts = [("/cluster", cfg.GetClusterInfo())]
4740
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4741
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4742
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4743
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4744
    results = []
4745
    for path, target in tgts:
4746
      for tag in target.GetTags():
4747
        if self.re.search(tag):
4748
          results.append((path, tag))
4749
    return results
4750

    
4751

    
4752
class LUAddTags(TagsLU):
4753
  """Sets a tag on a given object.
4754

4755
  """
4756
  _OP_REQP = ["kind", "name", "tags"]
4757

    
4758
  def CheckPrereq(self):
4759
    """Check prerequisites.
4760

4761
    This checks the type and length of the tag name and value.
4762

4763
    """
4764
    TagsLU.CheckPrereq(self)
4765
    for tag in self.op.tags:
4766
      objects.TaggableObject.ValidateTag(tag)
4767

    
4768
  def Exec(self, feedback_fn):
4769
    """Sets the tag.
4770

4771
    """
4772
    try:
4773
      for tag in self.op.tags:
4774
        self.target.AddTag(tag)
4775
    except errors.TagError, err:
4776
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4777
    try:
4778
      self.cfg.Update(self.target)
4779
    except errors.ConfigurationError:
4780
      raise errors.OpRetryError("There has been a modification to the"
4781
                                " config file and the operation has been"
4782
                                " aborted. Please retry.")
4783

    
4784

    
4785
class LUDelTags(TagsLU):
4786
  """Delete a list of tags from a given object.
4787

4788
  """
4789
  _OP_REQP = ["kind", "name", "tags"]
4790

    
4791
  def CheckPrereq(self):
4792
    """Check prerequisites.
4793

4794
    This checks that we have the given tag.
4795

4796
    """
4797
    TagsLU.CheckPrereq(self)
4798
    for tag in self.op.tags:
4799
      objects.TaggableObject.ValidateTag(tag)
4800
    del_tags = frozenset(self.op.tags)
4801
    cur_tags = self.target.GetTags()
4802
    if not del_tags <= cur_tags:
4803
      diff_tags = del_tags - cur_tags
4804
      diff_names = ["'%s'" % tag for tag in diff_tags]
4805
      diff_names.sort()
4806
      raise errors.OpPrereqError("Tag(s) %s not found" %
4807
                                 (",".join(diff_names)))
4808

    
4809
  def Exec(self, feedback_fn):
4810
    """Remove the tag from the object.
4811

4812
    """
4813
    for tag in self.op.tags:
4814
      self.target.RemoveTag(tag)
4815
    try:
4816
      self.cfg.Update(self.target)
4817
    except errors.ConfigurationError:
4818
      raise errors.OpRetryError("There has been a modification to the"
4819
                                " config file and the operation has been"
4820
                                " aborted. Please retry.")
4821

    
4822
class LUTestDelay(NoHooksLU):
4823
  """Sleep for a specified amount of time.
4824

4825
  This LU sleeps on the master and/or nodes for a specified amoutn of
4826
  time.
4827

4828
  """
4829
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4830

    
4831
  def CheckPrereq(self):
4832
    """Check prerequisites.
4833

4834
    This checks that we have a good list of nodes and/or the duration
4835
    is valid.
4836

4837
    """
4838

    
4839
    if self.op.on_nodes:
4840
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4841

    
4842
  def Exec(self, feedback_fn):
4843
    """Do the actual sleep.
4844

4845
    """
4846
    if self.op.on_master:
4847
      if not utils.TestDelay(self.op.duration):
4848
        raise errors.OpExecError("Error during master delay test")
4849
    if self.op.on_nodes:
4850
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4851
      if not result:
4852
        raise errors.OpExecError("Complete failure from rpc call")
4853
      for node, node_result in result.items():
4854
        if not node_result:
4855
          raise errors.OpExecError("Failure during rpc call to node %s,"
4856
                                   " result: %s" % (node, node_result))
4857

    
4858

    
4859
class IAllocator(object):
4860
  """IAllocator framework.
4861

4862
  An IAllocator instance has three sets of attributes:
4863
    - cfg/sstore that are needed to query the cluster
4864
    - input data (all members of the _KEYS class attribute are required)
4865
    - four buffer attributes (in|out_data|text), that represent the
4866
      input (to the external script) in text and data structure format,
4867
      and the output from it, again in two formats
4868
    - the result variables from the script (success, info, nodes) for
4869
      easy usage
4870

4871
  """
4872
  _ALLO_KEYS = [
4873
    "mem_size", "disks", "disk_template",
4874
    "os", "tags", "nics", "vcpus",
4875
    ]
4876
  _RELO_KEYS = [
4877
    "relocate_from",
4878
    ]
4879

    
4880
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4881
    self.cfg = cfg
4882
    self.sstore = sstore
4883
    # init buffer variables
4884
    self.in_text = self.out_text = self.in_data = self.out_data = None
4885
    # init all input fields so that pylint is happy
4886
    self.mode = mode
4887
    self.name = name
4888
    self.mem_size = self.disks = self.disk_template = None
4889
    self.os = self.tags = self.nics = self.vcpus = None
4890
    self.relocate_from = None
4891
    # computed fields
4892
    self.required_nodes = None
4893
    # init result fields
4894
    self.success = self.info = self.nodes = None
4895
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4896
      keyset = self._ALLO_KEYS
4897
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4898
      keyset = self._RELO_KEYS
4899
    else:
4900
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4901
                                   " IAllocator" % self.mode)
4902
    for key in kwargs:
4903
      if key not in keyset:
4904
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4905
                                     " IAllocator" % key)
4906
      setattr(self, key, kwargs[key])
4907
    for key in keyset:
4908
      if key not in kwargs:
4909
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4910
                                     " IAllocator" % key)
4911
    self._BuildInputData()
4912

    
4913
  def _ComputeClusterData(self):
4914
    """Compute the generic allocator input data.
4915

4916
    This is the data that is independent of the actual operation.
4917

4918
    """
4919
    cfg = self.cfg
4920
    # cluster data
4921
    data = {
4922
      "version": 1,
4923
      "cluster_name": self.sstore.GetClusterName(),
4924
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4925
      "hypervisor_type": self.sstore.GetHypervisorType(),
4926
      # we don't have job IDs
4927
      }
4928

    
4929
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4930

    
4931
    # node data
4932
    node_results = {}
4933
    node_list = cfg.GetNodeList()
4934
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4935
    for nname in node_list:
4936
      ninfo = cfg.GetNodeInfo(nname)
4937
      if nname not in node_data or not isinstance(node_data[nname], dict):
4938
        raise errors.OpExecError("Can't get data for node %s" % nname)
4939
      remote_info = node_data[nname]
4940
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4941
                   'vg_size', 'vg_free', 'cpu_total']:
4942
        if attr not in remote_info:
4943
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4944
                                   (nname, attr))
4945
        try:
4946
          remote_info[attr] = int(remote_info[attr])
4947
        except ValueError, err:
4948
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4949
                                   " %s" % (nname, attr, str(err)))
4950
      # compute memory used by primary instances
4951
      i_p_mem = i_p_up_mem = 0
4952
      for iinfo in i_list:
4953
        if iinfo.primary_node == nname:
4954
          i_p_mem += iinfo.memory
4955
          if iinfo.status == "up":
4956
            i_p_up_mem += iinfo.memory
4957

    
4958
      # compute memory used by instances
4959
      pnr = {
4960
        "tags": list(ninfo.GetTags()),
4961
        "total_memory": remote_info['memory_total'],
4962
        "reserved_memory": remote_info['memory_dom0'],
4963
        "free_memory": remote_info['memory_free'],
4964
        "i_pri_memory": i_p_mem,
4965
        "i_pri_up_memory": i_p_up_mem,
4966
        "total_disk": remote_info['vg_size'],
4967
        "free_disk": remote_info['vg_free'],
4968
        "primary_ip": ninfo.primary_ip,
4969
        "secondary_ip": ninfo.secondary_ip,
4970
        "total_cpus": remote_info['cpu_total'],
4971
        }
4972
      node_results[nname] = pnr
4973
    data["nodes"] = node_results
4974

    
4975
    # instance data
4976
    instance_data = {}
4977
    for iinfo in i_list:
4978
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4979
                  for n in iinfo.nics]
4980
      pir = {
4981
        "tags": list(iinfo.GetTags()),
4982
        "should_run": iinfo.status == "up",
4983
        "vcpus": iinfo.vcpus,
4984
        "memory": iinfo.memory,
4985
        "os": iinfo.os,
4986
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4987
        "nics": nic_data,
4988
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4989
        "disk_template": iinfo.disk_template,
4990
        }
4991
      instance_data[iinfo.name] = pir
4992

    
4993
    data["instances"] = instance_data
4994

    
4995
    self.in_data = data
4996

    
4997
  def _AddNewInstance(self):
4998
    """Add new instance data to allocator structure.
4999

5000
    This in combination with _AllocatorGetClusterData will create the
5001
    correct structure needed as input for the allocator.
5002

5003
    The checks for the completeness of the opcode must have already been
5004
    done.
5005

5006
    """
5007
    data = self.in_data
5008
    if len(self.disks) != 2:
5009
      raise errors.OpExecError("Only two-disk configurations supported")
5010

    
5011
    disk_space = _ComputeDiskSize(self.disk_template,
5012
                                  self.disks[0]["size"], self.disks[1]["size"])
5013

    
5014
    if self.disk_template in constants.DTS_NET_MIRROR:
5015
      self.required_nodes = 2
5016
    else:
5017
      self.required_nodes = 1
5018
    request = {
5019
      "type": "allocate",
5020
      "name": self.name,
5021
      "disk_template": self.disk_template,
5022
      "tags": self.tags,
5023
      "os": self.os,
5024
      "vcpus": self.vcpus,
5025
      "memory": self.mem_size,
5026
      "disks": self.disks,
5027
      "disk_space_total": disk_space,
5028
      "nics": self.nics,
5029
      "required_nodes": self.required_nodes,
5030
      }
5031
    data["request"] = request
5032

    
5033
  def _AddRelocateInstance(self):
5034
    """Add relocate instance data to allocator structure.
5035

5036
    This in combination with _IAllocatorGetClusterData will create the
5037
    correct structure needed as input for the allocator.
5038

5039
    The checks for the completeness of the opcode must have already been
5040
    done.
5041

5042
    """
5043
    instance = self.cfg.GetInstanceInfo(self.name)
5044
    if instance is None:
5045
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5046
                                   " IAllocator" % self.name)
5047

    
5048
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5049
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5050

    
5051
    if len(instance.secondary_nodes) != 1:
5052
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5053

    
5054
    self.required_nodes = 1
5055

    
5056
    disk_space = _ComputeDiskSize(instance.disk_template,
5057
                                  instance.disks[0].size,
5058
                                  instance.disks[1].size)
5059

    
5060
    request = {
5061
      "type": "relocate",
5062
      "name": self.name,
5063
      "disk_space_total": disk_space,
5064
      "required_nodes": self.required_nodes,
5065
      "relocate_from": self.relocate_from,
5066
      }
5067
    self.in_data["request"] = request
5068

    
5069
  def _BuildInputData(self):
5070
    """Build input data structures.
5071

5072
    """
5073
    self._ComputeClusterData()
5074

    
5075
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5076
      self._AddNewInstance()
5077
    else:
5078
      self._AddRelocateInstance()
5079

    
5080
    self.in_text = serializer.Dump(self.in_data)
5081

    
5082
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5083
    """Run an instance allocator and return the results.
5084

5085
    """
5086
    data = self.in_text
5087

    
5088
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5089

    
5090
    if not isinstance(result, tuple) or len(result) != 4:
5091
      raise errors.OpExecError("Invalid result from master iallocator runner")
5092

    
5093
    rcode, stdout, stderr, fail = result
5094

    
5095
    if rcode == constants.IARUN_NOTFOUND:
5096
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5097
    elif rcode == constants.IARUN_FAILURE:
5098
        raise errors.OpExecError("Instance allocator call failed: %s,"
5099
                                 " output: %s" %
5100
                                 (fail, stdout+stderr))
5101
    self.out_text = stdout
5102
    if validate:
5103
      self._ValidateResult()
5104

    
5105
  def _ValidateResult(self):
5106
    """Process the allocator results.
5107

5108
    This will process and if successful save the result in
5109
    self.out_data and the other parameters.
5110

5111
    """
5112
    try:
5113
      rdict = serializer.Load(self.out_text)
5114
    except Exception, err:
5115
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5116

    
5117
    if not isinstance(rdict, dict):
5118
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5119

    
5120
    for key in "success", "info", "nodes":
5121
      if key not in rdict:
5122
        raise errors.OpExecError("Can't parse iallocator results:"
5123
                                 " missing key '%s'" % key)
5124
      setattr(self, key, rdict[key])
5125

    
5126
    if not isinstance(rdict["nodes"], list):
5127
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5128
                               " is not a list")
5129
    self.out_data = rdict
5130

    
5131

    
5132
class LUTestAllocator(NoHooksLU):
5133
  """Run allocator tests.
5134

5135
  This LU runs the allocator tests
5136

5137
  """
5138
  _OP_REQP = ["direction", "mode", "name"]
5139

    
5140
  def CheckPrereq(self):
5141
    """Check prerequisites.
5142

5143
    This checks the opcode parameters depending on the director and mode test.
5144

5145
    """
5146
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5147
      for attr in ["name", "mem_size", "disks", "disk_template",
5148
                   "os", "tags", "nics", "vcpus"]:
5149
        if not hasattr(self.op, attr):
5150
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5151
                                     attr)
5152
      iname = self.cfg.ExpandInstanceName(self.op.name)
5153
      if iname is not None:
5154
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5155
                                   iname)
5156
      if not isinstance(self.op.nics, list):
5157
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5158
      for row in self.op.nics:
5159
        if (not isinstance(row, dict) or
5160
            "mac" not in row or
5161
            "ip" not in row or
5162
            "bridge" not in row):
5163
          raise errors.OpPrereqError("Invalid contents of the"
5164
                                     " 'nics' parameter")
5165
      if not isinstance(self.op.disks, list):
5166
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5167
      if len(self.op.disks) != 2:
5168
        raise errors.OpPrereqError("Only two-disk configurations supported")
5169
      for row in self.op.disks:
5170
        if (not isinstance(row, dict) or
5171
            "size" not in row or
5172
            not isinstance(row["size"], int) or
5173
            "mode" not in row or
5174
            row["mode"] not in ['r', 'w']):
5175
          raise errors.OpPrereqError("Invalid contents of the"
5176
                                     " 'disks' parameter")
5177
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5178
      if not hasattr(self.op, "name"):
5179
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5180
      fname = self.cfg.ExpandInstanceName(self.op.name)
5181
      if fname is None:
5182
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5183
                                   self.op.name)
5184
      self.op.name = fname
5185
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5186
    else:
5187
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5188
                                 self.op.mode)
5189

    
5190
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5191
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5192
        raise errors.OpPrereqError("Missing allocator name")
5193
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5194
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5195
                                 self.op.direction)
5196

    
5197
  def Exec(self, feedback_fn):
5198
    """Run the allocator test.
5199

5200
    """
5201
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5202
      ial = IAllocator(self.cfg, self.sstore,
5203
                       mode=self.op.mode,
5204
                       name=self.op.name,
5205
                       mem_size=self.op.mem_size,
5206
                       disks=self.op.disks,
5207
                       disk_template=self.op.disk_template,
5208
                       os=self.op.os,
5209
                       tags=self.op.tags,
5210
                       nics=self.op.nics,
5211
                       vcpus=self.op.vcpus,
5212
                       )
5213
    else:
5214
      ial = IAllocator(self.cfg, self.sstore,
5215
                       mode=self.op.mode,
5216
                       name=self.op.name,
5217
                       relocate_from=list(self.relocate_from),
5218
                       )
5219

    
5220
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5221
      result = ial.in_text
5222
    else:
5223
      ial.Run(self.op.allocator, validate=False)
5224
      result = ial.out_text
5225
    return result