Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ efd990e4

History | View | Annotate | Download (182.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self, primary_only=False):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    @type primary_only: boolean
284
    @param primary_only: only lock primary nodes of locked instances
285

286
    """
287
    assert locking.LEVEL_NODE in self.recalculate_locks, \
288
      "_LockInstancesNodes helper function called with no nodes to recalculate"
289

    
290
    # TODO: check if we're really been called with the instance locks held
291

    
292
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293
    # future we might want to have different behaviors depending on the value
294
    # of self.recalculate_locks[locking.LEVEL_NODE]
295
    wanted_nodes = []
296
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297
      instance = self.context.cfg.GetInstanceInfo(instance_name)
298
      wanted_nodes.append(instance.primary_node)
299
      if not primary_only:
300
        wanted_nodes.extend(instance.secondary_nodes)
301

    
302
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
303
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
304
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
305
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
306

    
307
    del self.recalculate_locks[locking.LEVEL_NODE]
308

    
309

    
310
class NoHooksLU(LogicalUnit):
311
  """Simple LU which runs no hooks.
312

313
  This LU is intended as a parent for other LogicalUnits which will
314
  run no hooks, in order to reduce duplicate code.
315

316
  """
317
  HPATH = None
318
  HTYPE = None
319

    
320

    
321
def _GetWantedNodes(lu, nodes):
322
  """Returns list of checked and expanded node names.
323

324
  Args:
325
    nodes: List of nodes (strings) or None for all
326

327
  """
328
  if not isinstance(nodes, list):
329
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
330

    
331
  if not nodes:
332
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
333
      " non-empty list of nodes whose name is to be expanded.")
334

    
335
  wanted = []
336
  for name in nodes:
337
    node = lu.cfg.ExpandNodeName(name)
338
    if node is None:
339
      raise errors.OpPrereqError("No such node name '%s'" % name)
340
    wanted.append(node)
341

    
342
  return utils.NiceSort(wanted)
343

    
344

    
345
def _GetWantedInstances(lu, instances):
346
  """Returns list of checked and expanded instance names.
347

348
  Args:
349
    instances: List of instances (strings) or None for all
350

351
  """
352
  if not isinstance(instances, list):
353
    raise errors.OpPrereqError("Invalid argument type 'instances'")
354

    
355
  if instances:
356
    wanted = []
357

    
358
    for name in instances:
359
      instance = lu.cfg.ExpandInstanceName(name)
360
      if instance is None:
361
        raise errors.OpPrereqError("No such instance name '%s'" % name)
362
      wanted.append(instance)
363

    
364
  else:
365
    wanted = lu.cfg.GetInstanceList()
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _CheckOutputFields(static, dynamic, selected):
370
  """Checks whether all selected fields are valid.
371

372
  Args:
373
    static: Static fields
374
    dynamic: Dynamic fields
375

376
  """
377
  static_fields = frozenset(static)
378
  dynamic_fields = frozenset(dynamic)
379

    
380
  all_fields = static_fields | dynamic_fields
381

    
382
  if not all_fields.issuperset(selected):
383
    raise errors.OpPrereqError("Unknown output fields selected: %s"
384
                               % ",".join(frozenset(selected).
385
                                          difference(all_fields)))
386

    
387

    
388
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
389
                          memory, vcpus, nics):
390
  """Builds instance related env variables for hooks from single variables.
391

392
  Args:
393
    secondary_nodes: List of secondary nodes as strings
394
  """
395
  env = {
396
    "OP_TARGET": name,
397
    "INSTANCE_NAME": name,
398
    "INSTANCE_PRIMARY": primary_node,
399
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
400
    "INSTANCE_OS_TYPE": os_type,
401
    "INSTANCE_STATUS": status,
402
    "INSTANCE_MEMORY": memory,
403
    "INSTANCE_VCPUS": vcpus,
404
  }
405

    
406
  if nics:
407
    nic_count = len(nics)
408
    for idx, (ip, bridge, mac) in enumerate(nics):
409
      if ip is None:
410
        ip = ""
411
      env["INSTANCE_NIC%d_IP" % idx] = ip
412
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
413
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
414
  else:
415
    nic_count = 0
416

    
417
  env["INSTANCE_NIC_COUNT"] = nic_count
418

    
419
  return env
420

    
421

    
422
def _BuildInstanceHookEnvByObject(instance, override=None):
423
  """Builds instance related env variables for hooks from an object.
424

425
  Args:
426
    instance: objects.Instance object of instance
427
    override: dict of values to override
428
  """
429
  args = {
430
    'name': instance.name,
431
    'primary_node': instance.primary_node,
432
    'secondary_nodes': instance.secondary_nodes,
433
    'os_type': instance.os,
434
    'status': instance.os,
435
    'memory': instance.memory,
436
    'vcpus': instance.vcpus,
437
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
438
  }
439
  if override:
440
    args.update(override)
441
  return _BuildInstanceHookEnv(**args)
442

    
443

    
444
def _CheckInstanceBridgesExist(instance):
445
  """Check that the brigdes needed by an instance exist.
446

447
  """
448
  # check bridges existance
449
  brlist = [nic.bridge for nic in instance.nics]
450
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
    raise errors.OpPrereqError("one or more target bridges %s does not"
452
                               " exist on destination node '%s'" %
453
                               (brlist, instance.primary_node))
454

    
455

    
456
class LUDestroyCluster(NoHooksLU):
457
  """Logical unit for destroying the cluster.
458

459
  """
460
  _OP_REQP = []
461

    
462
  def CheckPrereq(self):
463
    """Check prerequisites.
464

465
    This checks whether the cluster is empty.
466

467
    Any errors are signalled by raising errors.OpPrereqError.
468

469
    """
470
    master = self.sstore.GetMasterNode()
471

    
472
    nodelist = self.cfg.GetNodeList()
473
    if len(nodelist) != 1 or nodelist[0] != master:
474
      raise errors.OpPrereqError("There are still %d node(s) in"
475
                                 " this cluster." % (len(nodelist) - 1))
476
    instancelist = self.cfg.GetInstanceList()
477
    if instancelist:
478
      raise errors.OpPrereqError("There are still %d instance(s) in"
479
                                 " this cluster." % len(instancelist))
480

    
481
  def Exec(self, feedback_fn):
482
    """Destroys the cluster.
483

484
    """
485
    master = self.sstore.GetMasterNode()
486
    if not rpc.call_node_stop_master(master, False):
487
      raise errors.OpExecError("Could not disable the master role")
488
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
489
    utils.CreateBackup(priv_key)
490
    utils.CreateBackup(pub_key)
491
    return master
492

    
493

    
494
class LUVerifyCluster(LogicalUnit):
495
  """Verifies the cluster status.
496

497
  """
498
  HPATH = "cluster-verify"
499
  HTYPE = constants.HTYPE_CLUSTER
500
  _OP_REQP = ["skip_checks"]
501

    
502
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
503
                  remote_version, feedback_fn):
504
    """Run multiple tests against a node.
505

506
    Test list:
507
      - compares ganeti version
508
      - checks vg existance and size > 20G
509
      - checks config file checksum
510
      - checks ssh to other nodes
511

512
    Args:
513
      node: name of the node to check
514
      file_list: required list of files
515
      local_cksum: dictionary of local files and their checksums
516

517
    """
518
    # compares ganeti version
519
    local_version = constants.PROTOCOL_VERSION
520
    if not remote_version:
521
      feedback_fn("  - ERROR: connection to %s failed" % (node))
522
      return True
523

    
524
    if local_version != remote_version:
525
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
526
                      (local_version, node, remote_version))
527
      return True
528

    
529
    # checks vg existance and size > 20G
530

    
531
    bad = False
532
    if not vglist:
533
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
534
                      (node,))
535
      bad = True
536
    else:
537
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
538
                                            constants.MIN_VG_SIZE)
539
      if vgstatus:
540
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
541
        bad = True
542

    
543
    # checks config file checksum
544
    # checks ssh to any
545

    
546
    if 'filelist' not in node_result:
547
      bad = True
548
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
549
    else:
550
      remote_cksum = node_result['filelist']
551
      for file_name in file_list:
552
        if file_name not in remote_cksum:
553
          bad = True
554
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
555
        elif remote_cksum[file_name] != local_cksum[file_name]:
556
          bad = True
557
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
558

    
559
    if 'nodelist' not in node_result:
560
      bad = True
561
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
562
    else:
563
      if node_result['nodelist']:
564
        bad = True
565
        for node in node_result['nodelist']:
566
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
567
                          (node, node_result['nodelist'][node]))
568
    if 'node-net-test' not in node_result:
569
      bad = True
570
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
571
    else:
572
      if node_result['node-net-test']:
573
        bad = True
574
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
575
        for node in nlist:
576
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
577
                          (node, node_result['node-net-test'][node]))
578

    
579
    hyp_result = node_result.get('hypervisor', None)
580
    if hyp_result is not None:
581
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
582
    return bad
583

    
584
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
585
                      node_instance, feedback_fn):
586
    """Verify an instance.
587

588
    This function checks to see if the required block devices are
589
    available on the instance's node.
590

591
    """
592
    bad = False
593

    
594
    node_current = instanceconfig.primary_node
595

    
596
    node_vol_should = {}
597
    instanceconfig.MapLVsByNode(node_vol_should)
598

    
599
    for node in node_vol_should:
600
      for volume in node_vol_should[node]:
601
        if node not in node_vol_is or volume not in node_vol_is[node]:
602
          feedback_fn("  - ERROR: volume %s missing on node %s" %
603
                          (volume, node))
604
          bad = True
605

    
606
    if not instanceconfig.status == 'down':
607
      if (node_current not in node_instance or
608
          not instance in node_instance[node_current]):
609
        feedback_fn("  - ERROR: instance %s not running on node %s" %
610
                        (instance, node_current))
611
        bad = True
612

    
613
    for node in node_instance:
614
      if (not node == node_current):
615
        if instance in node_instance[node]:
616
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
617
                          (instance, node))
618
          bad = True
619

    
620
    return bad
621

    
622
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
623
    """Verify if there are any unknown volumes in the cluster.
624

625
    The .os, .swap and backup volumes are ignored. All other volumes are
626
    reported as unknown.
627

628
    """
629
    bad = False
630

    
631
    for node in node_vol_is:
632
      for volume in node_vol_is[node]:
633
        if node not in node_vol_should or volume not in node_vol_should[node]:
634
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
635
                      (volume, node))
636
          bad = True
637
    return bad
638

    
639
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
640
    """Verify the list of running instances.
641

642
    This checks what instances are running but unknown to the cluster.
643

644
    """
645
    bad = False
646
    for node in node_instance:
647
      for runninginstance in node_instance[node]:
648
        if runninginstance not in instancelist:
649
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
650
                          (runninginstance, node))
651
          bad = True
652
    return bad
653

    
654
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
655
    """Verify N+1 Memory Resilience.
656

657
    Check that if one single node dies we can still start all the instances it
658
    was primary for.
659

660
    """
661
    bad = False
662

    
663
    for node, nodeinfo in node_info.iteritems():
664
      # This code checks that every node which is now listed as secondary has
665
      # enough memory to host all instances it is supposed to should a single
666
      # other node in the cluster fail.
667
      # FIXME: not ready for failover to an arbitrary node
668
      # FIXME: does not support file-backed instances
669
      # WARNING: we currently take into account down instances as well as up
670
      # ones, considering that even if they're down someone might want to start
671
      # them even in the event of a node failure.
672
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
673
        needed_mem = 0
674
        for instance in instances:
675
          needed_mem += instance_cfg[instance].memory
676
        if nodeinfo['mfree'] < needed_mem:
677
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
678
                      " failovers should node %s fail" % (node, prinode))
679
          bad = True
680
    return bad
681

    
682
  def CheckPrereq(self):
683
    """Check prerequisites.
684

685
    Transform the list of checks we're going to skip into a set and check that
686
    all its members are valid.
687

688
    """
689
    self.skip_set = frozenset(self.op.skip_checks)
690
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
691
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
692

    
693
  def BuildHooksEnv(self):
694
    """Build hooks env.
695

696
    Cluster-Verify hooks just rone in the post phase and their failure makes
697
    the output be logged in the verify output and the verification to fail.
698

699
    """
700
    all_nodes = self.cfg.GetNodeList()
701
    # TODO: populate the environment with useful information for verify hooks
702
    env = {}
703
    return env, [], all_nodes
704

    
705
  def Exec(self, feedback_fn):
706
    """Verify integrity of cluster, performing various test on nodes.
707

708
    """
709
    bad = False
710
    feedback_fn("* Verifying global settings")
711
    for msg in self.cfg.VerifyConfig():
712
      feedback_fn("  - ERROR: %s" % msg)
713

    
714
    vg_name = self.cfg.GetVGName()
715
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
716
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
717
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
718
    i_non_redundant = [] # Non redundant instances
719
    node_volume = {}
720
    node_instance = {}
721
    node_info = {}
722
    instance_cfg = {}
723

    
724
    # FIXME: verify OS list
725
    # do local checksums
726
    file_names = list(self.sstore.GetFileList())
727
    file_names.append(constants.SSL_CERT_FILE)
728
    file_names.append(constants.CLUSTER_CONF_FILE)
729
    local_checksums = utils.FingerprintFiles(file_names)
730

    
731
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
732
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
733
    all_instanceinfo = rpc.call_instance_list(nodelist)
734
    all_vglist = rpc.call_vg_list(nodelist)
735
    node_verify_param = {
736
      'filelist': file_names,
737
      'nodelist': nodelist,
738
      'hypervisor': None,
739
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
740
                        for node in nodeinfo]
741
      }
742
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
743
    all_rversion = rpc.call_version(nodelist)
744
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
745

    
746
    for node in nodelist:
747
      feedback_fn("* Verifying node %s" % node)
748
      result = self._VerifyNode(node, file_names, local_checksums,
749
                                all_vglist[node], all_nvinfo[node],
750
                                all_rversion[node], feedback_fn)
751
      bad = bad or result
752

    
753
      # node_volume
754
      volumeinfo = all_volumeinfo[node]
755

    
756
      if isinstance(volumeinfo, basestring):
757
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
758
                    (node, volumeinfo[-400:].encode('string_escape')))
759
        bad = True
760
        node_volume[node] = {}
761
      elif not isinstance(volumeinfo, dict):
762
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
763
        bad = True
764
        continue
765
      else:
766
        node_volume[node] = volumeinfo
767

    
768
      # node_instance
769
      nodeinstance = all_instanceinfo[node]
770
      if type(nodeinstance) != list:
771
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
772
        bad = True
773
        continue
774

    
775
      node_instance[node] = nodeinstance
776

    
777
      # node_info
778
      nodeinfo = all_ninfo[node]
779
      if not isinstance(nodeinfo, dict):
780
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
781
        bad = True
782
        continue
783

    
784
      try:
785
        node_info[node] = {
786
          "mfree": int(nodeinfo['memory_free']),
787
          "dfree": int(nodeinfo['vg_free']),
788
          "pinst": [],
789
          "sinst": [],
790
          # dictionary holding all instances this node is secondary for,
791
          # grouped by their primary node. Each key is a cluster node, and each
792
          # value is a list of instances which have the key as primary and the
793
          # current node as secondary.  this is handy to calculate N+1 memory
794
          # availability if you can only failover from a primary to its
795
          # secondary.
796
          "sinst-by-pnode": {},
797
        }
798
      except ValueError:
799
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
800
        bad = True
801
        continue
802

    
803
    node_vol_should = {}
804

    
805
    for instance in instancelist:
806
      feedback_fn("* Verifying instance %s" % instance)
807
      inst_config = self.cfg.GetInstanceInfo(instance)
808
      result =  self._VerifyInstance(instance, inst_config, node_volume,
809
                                     node_instance, feedback_fn)
810
      bad = bad or result
811

    
812
      inst_config.MapLVsByNode(node_vol_should)
813

    
814
      instance_cfg[instance] = inst_config
815

    
816
      pnode = inst_config.primary_node
817
      if pnode in node_info:
818
        node_info[pnode]['pinst'].append(instance)
819
      else:
820
        feedback_fn("  - ERROR: instance %s, connection to primary node"
821
                    " %s failed" % (instance, pnode))
822
        bad = True
823

    
824
      # If the instance is non-redundant we cannot survive losing its primary
825
      # node, so we are not N+1 compliant. On the other hand we have no disk
826
      # templates with more than one secondary so that situation is not well
827
      # supported either.
828
      # FIXME: does not support file-backed instances
829
      if len(inst_config.secondary_nodes) == 0:
830
        i_non_redundant.append(instance)
831
      elif len(inst_config.secondary_nodes) > 1:
832
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
833
                    % instance)
834

    
835
      for snode in inst_config.secondary_nodes:
836
        if snode in node_info:
837
          node_info[snode]['sinst'].append(instance)
838
          if pnode not in node_info[snode]['sinst-by-pnode']:
839
            node_info[snode]['sinst-by-pnode'][pnode] = []
840
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
841
        else:
842
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
843
                      " %s failed" % (instance, snode))
844

    
845
    feedback_fn("* Verifying orphan volumes")
846
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
847
                                       feedback_fn)
848
    bad = bad or result
849

    
850
    feedback_fn("* Verifying remaining instances")
851
    result = self._VerifyOrphanInstances(instancelist, node_instance,
852
                                         feedback_fn)
853
    bad = bad or result
854

    
855
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
856
      feedback_fn("* Verifying N+1 Memory redundancy")
857
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
858
      bad = bad or result
859

    
860
    feedback_fn("* Other Notes")
861
    if i_non_redundant:
862
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
863
                  % len(i_non_redundant))
864

    
865
    return not bad
866

    
867
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
868
    """Analize the post-hooks' result, handle it, and send some
869
    nicely-formatted feedback back to the user.
870

871
    Args:
872
      phase: the hooks phase that has just been run
873
      hooks_results: the results of the multi-node hooks rpc call
874
      feedback_fn: function to send feedback back to the caller
875
      lu_result: previous Exec result
876

877
    """
878
    # We only really run POST phase hooks, and are only interested in
879
    # their results
880
    if phase == constants.HOOKS_PHASE_POST:
881
      # Used to change hooks' output to proper indentation
882
      indent_re = re.compile('^', re.M)
883
      feedback_fn("* Hooks Results")
884
      if not hooks_results:
885
        feedback_fn("  - ERROR: general communication failure")
886
        lu_result = 1
887
      else:
888
        for node_name in hooks_results:
889
          show_node_header = True
890
          res = hooks_results[node_name]
891
          if res is False or not isinstance(res, list):
892
            feedback_fn("    Communication failure")
893
            lu_result = 1
894
            continue
895
          for script, hkr, output in res:
896
            if hkr == constants.HKR_FAIL:
897
              # The node header is only shown once, if there are
898
              # failing hooks on that node
899
              if show_node_header:
900
                feedback_fn("  Node %s:" % node_name)
901
                show_node_header = False
902
              feedback_fn("    ERROR: Script %s failed, output:" % script)
903
              output = indent_re.sub('      ', output)
904
              feedback_fn("%s" % output)
905
              lu_result = 1
906

    
907
      return lu_result
908

    
909

    
910
class LUVerifyDisks(NoHooksLU):
911
  """Verifies the cluster disks status.
912

913
  """
914
  _OP_REQP = []
915

    
916
  def CheckPrereq(self):
917
    """Check prerequisites.
918

919
    This has no prerequisites.
920

921
    """
922
    pass
923

    
924
  def Exec(self, feedback_fn):
925
    """Verify integrity of cluster disks.
926

927
    """
928
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
929

    
930
    vg_name = self.cfg.GetVGName()
931
    nodes = utils.NiceSort(self.cfg.GetNodeList())
932
    instances = [self.cfg.GetInstanceInfo(name)
933
                 for name in self.cfg.GetInstanceList()]
934

    
935
    nv_dict = {}
936
    for inst in instances:
937
      inst_lvs = {}
938
      if (inst.status != "up" or
939
          inst.disk_template not in constants.DTS_NET_MIRROR):
940
        continue
941
      inst.MapLVsByNode(inst_lvs)
942
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
943
      for node, vol_list in inst_lvs.iteritems():
944
        for vol in vol_list:
945
          nv_dict[(node, vol)] = inst
946

    
947
    if not nv_dict:
948
      return result
949

    
950
    node_lvs = rpc.call_volume_list(nodes, vg_name)
951

    
952
    to_act = set()
953
    for node in nodes:
954
      # node_volume
955
      lvs = node_lvs[node]
956

    
957
      if isinstance(lvs, basestring):
958
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
959
        res_nlvm[node] = lvs
960
      elif not isinstance(lvs, dict):
961
        logger.Info("connection to node %s failed or invalid data returned" %
962
                    (node,))
963
        res_nodes.append(node)
964
        continue
965

    
966
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
967
        inst = nv_dict.pop((node, lv_name), None)
968
        if (not lv_online and inst is not None
969
            and inst.name not in res_instances):
970
          res_instances.append(inst.name)
971

    
972
    # any leftover items in nv_dict are missing LVs, let's arrange the
973
    # data better
974
    for key, inst in nv_dict.iteritems():
975
      if inst.name not in res_missing:
976
        res_missing[inst.name] = []
977
      res_missing[inst.name].append(key)
978

    
979
    return result
980

    
981

    
982
class LURenameCluster(LogicalUnit):
983
  """Rename the cluster.
984

985
  """
986
  HPATH = "cluster-rename"
987
  HTYPE = constants.HTYPE_CLUSTER
988
  _OP_REQP = ["name"]
989
  REQ_WSSTORE = True
990

    
991
  def BuildHooksEnv(self):
992
    """Build hooks env.
993

994
    """
995
    env = {
996
      "OP_TARGET": self.sstore.GetClusterName(),
997
      "NEW_NAME": self.op.name,
998
      }
999
    mn = self.sstore.GetMasterNode()
1000
    return env, [mn], [mn]
1001

    
1002
  def CheckPrereq(self):
1003
    """Verify that the passed name is a valid one.
1004

1005
    """
1006
    hostname = utils.HostInfo(self.op.name)
1007

    
1008
    new_name = hostname.name
1009
    self.ip = new_ip = hostname.ip
1010
    old_name = self.sstore.GetClusterName()
1011
    old_ip = self.sstore.GetMasterIP()
1012
    if new_name == old_name and new_ip == old_ip:
1013
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1014
                                 " cluster has changed")
1015
    if new_ip != old_ip:
1016
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1017
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1018
                                   " reachable on the network. Aborting." %
1019
                                   new_ip)
1020

    
1021
    self.op.name = new_name
1022

    
1023
  def Exec(self, feedback_fn):
1024
    """Rename the cluster.
1025

1026
    """
1027
    clustername = self.op.name
1028
    ip = self.ip
1029
    ss = self.sstore
1030

    
1031
    # shutdown the master IP
1032
    master = ss.GetMasterNode()
1033
    if not rpc.call_node_stop_master(master, False):
1034
      raise errors.OpExecError("Could not disable the master role")
1035

    
1036
    try:
1037
      # modify the sstore
1038
      ss.SetKey(ss.SS_MASTER_IP, ip)
1039
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1040

    
1041
      # Distribute updated ss config to all nodes
1042
      myself = self.cfg.GetNodeInfo(master)
1043
      dist_nodes = self.cfg.GetNodeList()
1044
      if myself.name in dist_nodes:
1045
        dist_nodes.remove(myself.name)
1046

    
1047
      logger.Debug("Copying updated ssconf data to all nodes")
1048
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1049
        fname = ss.KeyToFilename(keyname)
1050
        result = rpc.call_upload_file(dist_nodes, fname)
1051
        for to_node in dist_nodes:
1052
          if not result[to_node]:
1053
            logger.Error("copy of file %s to node %s failed" %
1054
                         (fname, to_node))
1055
    finally:
1056
      if not rpc.call_node_start_master(master, False):
1057
        logger.Error("Could not re-enable the master role on the master,"
1058
                     " please restart manually.")
1059

    
1060

    
1061
def _RecursiveCheckIfLVMBased(disk):
1062
  """Check if the given disk or its children are lvm-based.
1063

1064
  Args:
1065
    disk: ganeti.objects.Disk object
1066

1067
  Returns:
1068
    boolean indicating whether a LD_LV dev_type was found or not
1069

1070
  """
1071
  if disk.children:
1072
    for chdisk in disk.children:
1073
      if _RecursiveCheckIfLVMBased(chdisk):
1074
        return True
1075
  return disk.dev_type == constants.LD_LV
1076

    
1077

    
1078
class LUSetClusterParams(LogicalUnit):
1079
  """Change the parameters of the cluster.
1080

1081
  """
1082
  HPATH = "cluster-modify"
1083
  HTYPE = constants.HTYPE_CLUSTER
1084
  _OP_REQP = []
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.sstore.GetClusterName(),
1092
      "NEW_VG_NAME": self.op.vg_name,
1093
      }
1094
    mn = self.sstore.GetMasterNode()
1095
    return env, [mn], [mn]
1096

    
1097
  def CheckPrereq(self):
1098
    """Check prerequisites.
1099

1100
    This checks whether the given params don't conflict and
1101
    if the given volume group is valid.
1102

1103
    """
1104
    if not self.op.vg_name:
1105
      instances = [self.cfg.GetInstanceInfo(name)
1106
                   for name in self.cfg.GetInstanceList()]
1107
      for inst in instances:
1108
        for disk in inst.disks:
1109
          if _RecursiveCheckIfLVMBased(disk):
1110
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1111
                                       " lvm-based instances exist")
1112

    
1113
    # if vg_name not None, checks given volume group on all nodes
1114
    if self.op.vg_name:
1115
      node_list = self.cfg.GetNodeList()
1116
      vglist = rpc.call_vg_list(node_list)
1117
      for node in node_list:
1118
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1119
                                              constants.MIN_VG_SIZE)
1120
        if vgstatus:
1121
          raise errors.OpPrereqError("Error on node '%s': %s" %
1122
                                     (node, vgstatus))
1123

    
1124
  def Exec(self, feedback_fn):
1125
    """Change the parameters of the cluster.
1126

1127
    """
1128
    if self.op.vg_name != self.cfg.GetVGName():
1129
      self.cfg.SetVGName(self.op.vg_name)
1130
    else:
1131
      feedback_fn("Cluster LVM configuration already in desired"
1132
                  " state, not changing")
1133

    
1134

    
1135
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1136
  """Sleep and poll for an instance's disk to sync.
1137

1138
  """
1139
  if not instance.disks:
1140
    return True
1141

    
1142
  if not oneshot:
1143
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1144

    
1145
  node = instance.primary_node
1146

    
1147
  for dev in instance.disks:
1148
    cfgw.SetDiskID(dev, node)
1149

    
1150
  retries = 0
1151
  while True:
1152
    max_time = 0
1153
    done = True
1154
    cumul_degraded = False
1155
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1156
    if not rstats:
1157
      proc.LogWarning("Can't get any data from node %s" % node)
1158
      retries += 1
1159
      if retries >= 10:
1160
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1161
                                 " aborting." % node)
1162
      time.sleep(6)
1163
      continue
1164
    retries = 0
1165
    for i in range(len(rstats)):
1166
      mstat = rstats[i]
1167
      if mstat is None:
1168
        proc.LogWarning("Can't compute data for node %s/%s" %
1169
                        (node, instance.disks[i].iv_name))
1170
        continue
1171
      # we ignore the ldisk parameter
1172
      perc_done, est_time, is_degraded, _ = mstat
1173
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1174
      if perc_done is not None:
1175
        done = False
1176
        if est_time is not None:
1177
          rem_time = "%d estimated seconds remaining" % est_time
1178
          max_time = est_time
1179
        else:
1180
          rem_time = "no time estimate"
1181
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1182
                     (instance.disks[i].iv_name, perc_done, rem_time))
1183
    if done or oneshot:
1184
      break
1185

    
1186
    time.sleep(min(60, max_time))
1187

    
1188
  if done:
1189
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1190
  return not cumul_degraded
1191

    
1192

    
1193
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1194
  """Check that mirrors are not degraded.
1195

1196
  The ldisk parameter, if True, will change the test from the
1197
  is_degraded attribute (which represents overall non-ok status for
1198
  the device(s)) to the ldisk (representing the local storage status).
1199

1200
  """
1201
  cfgw.SetDiskID(dev, node)
1202
  if ldisk:
1203
    idx = 6
1204
  else:
1205
    idx = 5
1206

    
1207
  result = True
1208
  if on_primary or dev.AssembleOnSecondary():
1209
    rstats = rpc.call_blockdev_find(node, dev)
1210
    if not rstats:
1211
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1212
      result = False
1213
    else:
1214
      result = result and (not rstats[idx])
1215
  if dev.children:
1216
    for child in dev.children:
1217
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1218

    
1219
  return result
1220

    
1221

    
1222
class LUDiagnoseOS(NoHooksLU):
1223
  """Logical unit for OS diagnose/query.
1224

1225
  """
1226
  _OP_REQP = ["output_fields", "names"]
1227
  REQ_BGL = False
1228

    
1229
  def ExpandNames(self):
1230
    if self.op.names:
1231
      raise errors.OpPrereqError("Selective OS query not supported")
1232

    
1233
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1234
    _CheckOutputFields(static=[],
1235
                       dynamic=self.dynamic_fields,
1236
                       selected=self.op.output_fields)
1237

    
1238
    # Lock all nodes, in shared mode
1239
    self.needed_locks = {}
1240
    self.share_locks[locking.LEVEL_NODE] = 1
1241
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1242

    
1243
  def CheckPrereq(self):
1244
    """Check prerequisites.
1245

1246
    """
1247

    
1248
  @staticmethod
1249
  def _DiagnoseByOS(node_list, rlist):
1250
    """Remaps a per-node return list into an a per-os per-node dictionary
1251

1252
      Args:
1253
        node_list: a list with the names of all nodes
1254
        rlist: a map with node names as keys and OS objects as values
1255

1256
      Returns:
1257
        map: a map with osnames as keys and as value another map, with
1258
             nodes as
1259
             keys and list of OS objects as values
1260
             e.g. {"debian-etch": {"node1": [<object>,...],
1261
                                   "node2": [<object>,]}
1262
                  }
1263

1264
    """
1265
    all_os = {}
1266
    for node_name, nr in rlist.iteritems():
1267
      if not nr:
1268
        continue
1269
      for os_obj in nr:
1270
        if os_obj.name not in all_os:
1271
          # build a list of nodes for this os containing empty lists
1272
          # for each node in node_list
1273
          all_os[os_obj.name] = {}
1274
          for nname in node_list:
1275
            all_os[os_obj.name][nname] = []
1276
        all_os[os_obj.name][node_name].append(os_obj)
1277
    return all_os
1278

    
1279
  def Exec(self, feedback_fn):
1280
    """Compute the list of OSes.
1281

1282
    """
1283
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1284
    node_data = rpc.call_os_diagnose(node_list)
1285
    if node_data == False:
1286
      raise errors.OpExecError("Can't gather the list of OSes")
1287
    pol = self._DiagnoseByOS(node_list, node_data)
1288
    output = []
1289
    for os_name, os_data in pol.iteritems():
1290
      row = []
1291
      for field in self.op.output_fields:
1292
        if field == "name":
1293
          val = os_name
1294
        elif field == "valid":
1295
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1296
        elif field == "node_status":
1297
          val = {}
1298
          for node_name, nos_list in os_data.iteritems():
1299
            val[node_name] = [(v.status, v.path) for v in nos_list]
1300
        else:
1301
          raise errors.ParameterError(field)
1302
        row.append(val)
1303
      output.append(row)
1304

    
1305
    return output
1306

    
1307

    
1308
class LURemoveNode(LogicalUnit):
1309
  """Logical unit for removing a node.
1310

1311
  """
1312
  HPATH = "node-remove"
1313
  HTYPE = constants.HTYPE_NODE
1314
  _OP_REQP = ["node_name"]
1315

    
1316
  def BuildHooksEnv(self):
1317
    """Build hooks env.
1318

1319
    This doesn't run on the target node in the pre phase as a failed
1320
    node would then be impossible to remove.
1321

1322
    """
1323
    env = {
1324
      "OP_TARGET": self.op.node_name,
1325
      "NODE_NAME": self.op.node_name,
1326
      }
1327
    all_nodes = self.cfg.GetNodeList()
1328
    all_nodes.remove(self.op.node_name)
1329
    return env, all_nodes, all_nodes
1330

    
1331
  def CheckPrereq(self):
1332
    """Check prerequisites.
1333

1334
    This checks:
1335
     - the node exists in the configuration
1336
     - it does not have primary or secondary instances
1337
     - it's not the master
1338

1339
    Any errors are signalled by raising errors.OpPrereqError.
1340

1341
    """
1342
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1343
    if node is None:
1344
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1345

    
1346
    instance_list = self.cfg.GetInstanceList()
1347

    
1348
    masternode = self.sstore.GetMasterNode()
1349
    if node.name == masternode:
1350
      raise errors.OpPrereqError("Node is the master node,"
1351
                                 " you need to failover first.")
1352

    
1353
    for instance_name in instance_list:
1354
      instance = self.cfg.GetInstanceInfo(instance_name)
1355
      if node.name == instance.primary_node:
1356
        raise errors.OpPrereqError("Instance %s still running on the node,"
1357
                                   " please remove first." % instance_name)
1358
      if node.name in instance.secondary_nodes:
1359
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1360
                                   " please remove first." % instance_name)
1361
    self.op.node_name = node.name
1362
    self.node = node
1363

    
1364
  def Exec(self, feedback_fn):
1365
    """Removes the node from the cluster.
1366

1367
    """
1368
    node = self.node
1369
    logger.Info("stopping the node daemon and removing configs from node %s" %
1370
                node.name)
1371

    
1372
    self.context.RemoveNode(node.name)
1373

    
1374
    rpc.call_node_leave_cluster(node.name)
1375

    
1376

    
1377
class LUQueryNodes(NoHooksLU):
1378
  """Logical unit for querying nodes.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383

    
1384
  def ExpandNames(self):
1385
    self.dynamic_fields = frozenset([
1386
      "dtotal", "dfree",
1387
      "mtotal", "mnode", "mfree",
1388
      "bootid",
1389
      "ctotal",
1390
      ])
1391

    
1392
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1393
                               "pinst_list", "sinst_list",
1394
                               "pip", "sip", "tags"],
1395
                       dynamic=self.dynamic_fields,
1396
                       selected=self.op.output_fields)
1397

    
1398
    self.needed_locks = {}
1399
    self.share_locks[locking.LEVEL_NODE] = 1
1400
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1401
    # that we need atomic ways to get info for a group of nodes from the
1402
    # config, though.
1403
    if not self.op.names:
1404
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1405
    else:
1406
      self.needed_locks[locking.LEVEL_NODE] = \
1407
        _GetWantedNodes(self, self.op.names)
1408

    
1409
  def CheckPrereq(self):
1410
    """Check prerequisites.
1411

1412
    """
1413
    # This of course is valid only if we locked the nodes
1414
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1415

    
1416
  def Exec(self, feedback_fn):
1417
    """Computes the list of nodes and their attributes.
1418

1419
    """
1420
    nodenames = self.wanted
1421
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1422

    
1423
    # begin data gathering
1424

    
1425
    if self.dynamic_fields.intersection(self.op.output_fields):
1426
      live_data = {}
1427
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1428
      for name in nodenames:
1429
        nodeinfo = node_data.get(name, None)
1430
        if nodeinfo:
1431
          live_data[name] = {
1432
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1433
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1434
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1435
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1436
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1437
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1438
            "bootid": nodeinfo['bootid'],
1439
            }
1440
        else:
1441
          live_data[name] = {}
1442
    else:
1443
      live_data = dict.fromkeys(nodenames, {})
1444

    
1445
    node_to_primary = dict([(name, set()) for name in nodenames])
1446
    node_to_secondary = dict([(name, set()) for name in nodenames])
1447

    
1448
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1449
                             "sinst_cnt", "sinst_list"))
1450
    if inst_fields & frozenset(self.op.output_fields):
1451
      instancelist = self.cfg.GetInstanceList()
1452

    
1453
      for instance_name in instancelist:
1454
        inst = self.cfg.GetInstanceInfo(instance_name)
1455
        if inst.primary_node in node_to_primary:
1456
          node_to_primary[inst.primary_node].add(inst.name)
1457
        for secnode in inst.secondary_nodes:
1458
          if secnode in node_to_secondary:
1459
            node_to_secondary[secnode].add(inst.name)
1460

    
1461
    # end data gathering
1462

    
1463
    output = []
1464
    for node in nodelist:
1465
      node_output = []
1466
      for field in self.op.output_fields:
1467
        if field == "name":
1468
          val = node.name
1469
        elif field == "pinst_list":
1470
          val = list(node_to_primary[node.name])
1471
        elif field == "sinst_list":
1472
          val = list(node_to_secondary[node.name])
1473
        elif field == "pinst_cnt":
1474
          val = len(node_to_primary[node.name])
1475
        elif field == "sinst_cnt":
1476
          val = len(node_to_secondary[node.name])
1477
        elif field == "pip":
1478
          val = node.primary_ip
1479
        elif field == "sip":
1480
          val = node.secondary_ip
1481
        elif field == "tags":
1482
          val = list(node.GetTags())
1483
        elif field in self.dynamic_fields:
1484
          val = live_data[node.name].get(field, None)
1485
        else:
1486
          raise errors.ParameterError(field)
1487
        node_output.append(val)
1488
      output.append(node_output)
1489

    
1490
    return output
1491

    
1492

    
1493
class LUQueryNodeVolumes(NoHooksLU):
1494
  """Logical unit for getting volumes on node(s).
1495

1496
  """
1497
  _OP_REQP = ["nodes", "output_fields"]
1498
  REQ_BGL = False
1499

    
1500
  def ExpandNames(self):
1501
    _CheckOutputFields(static=["node"],
1502
                       dynamic=["phys", "vg", "name", "size", "instance"],
1503
                       selected=self.op.output_fields)
1504

    
1505
    self.needed_locks = {}
1506
    self.share_locks[locking.LEVEL_NODE] = 1
1507
    if not self.op.nodes:
1508
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1509
    else:
1510
      self.needed_locks[locking.LEVEL_NODE] = \
1511
        _GetWantedNodes(self, self.op.nodes)
1512

    
1513
  def CheckPrereq(self):
1514
    """Check prerequisites.
1515

1516
    This checks that the fields required are valid output fields.
1517

1518
    """
1519
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1520

    
1521
  def Exec(self, feedback_fn):
1522
    """Computes the list of nodes and their attributes.
1523

1524
    """
1525
    nodenames = self.nodes
1526
    volumes = rpc.call_node_volumes(nodenames)
1527

    
1528
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1529
             in self.cfg.GetInstanceList()]
1530

    
1531
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1532

    
1533
    output = []
1534
    for node in nodenames:
1535
      if node not in volumes or not volumes[node]:
1536
        continue
1537

    
1538
      node_vols = volumes[node][:]
1539
      node_vols.sort(key=lambda vol: vol['dev'])
1540

    
1541
      for vol in node_vols:
1542
        node_output = []
1543
        for field in self.op.output_fields:
1544
          if field == "node":
1545
            val = node
1546
          elif field == "phys":
1547
            val = vol['dev']
1548
          elif field == "vg":
1549
            val = vol['vg']
1550
          elif field == "name":
1551
            val = vol['name']
1552
          elif field == "size":
1553
            val = int(float(vol['size']))
1554
          elif field == "instance":
1555
            for inst in ilist:
1556
              if node not in lv_by_node[inst]:
1557
                continue
1558
              if vol['name'] in lv_by_node[inst][node]:
1559
                val = inst.name
1560
                break
1561
            else:
1562
              val = '-'
1563
          else:
1564
            raise errors.ParameterError(field)
1565
          node_output.append(str(val))
1566

    
1567
        output.append(node_output)
1568

    
1569
    return output
1570

    
1571

    
1572
class LUAddNode(LogicalUnit):
1573
  """Logical unit for adding node to the cluster.
1574

1575
  """
1576
  HPATH = "node-add"
1577
  HTYPE = constants.HTYPE_NODE
1578
  _OP_REQP = ["node_name"]
1579

    
1580
  def BuildHooksEnv(self):
1581
    """Build hooks env.
1582

1583
    This will run on all nodes before, and on all nodes + the new node after.
1584

1585
    """
1586
    env = {
1587
      "OP_TARGET": self.op.node_name,
1588
      "NODE_NAME": self.op.node_name,
1589
      "NODE_PIP": self.op.primary_ip,
1590
      "NODE_SIP": self.op.secondary_ip,
1591
      }
1592
    nodes_0 = self.cfg.GetNodeList()
1593
    nodes_1 = nodes_0 + [self.op.node_name, ]
1594
    return env, nodes_0, nodes_1
1595

    
1596
  def CheckPrereq(self):
1597
    """Check prerequisites.
1598

1599
    This checks:
1600
     - the new node is not already in the config
1601
     - it is resolvable
1602
     - its parameters (single/dual homed) matches the cluster
1603

1604
    Any errors are signalled by raising errors.OpPrereqError.
1605

1606
    """
1607
    node_name = self.op.node_name
1608
    cfg = self.cfg
1609

    
1610
    dns_data = utils.HostInfo(node_name)
1611

    
1612
    node = dns_data.name
1613
    primary_ip = self.op.primary_ip = dns_data.ip
1614
    secondary_ip = getattr(self.op, "secondary_ip", None)
1615
    if secondary_ip is None:
1616
      secondary_ip = primary_ip
1617
    if not utils.IsValidIP(secondary_ip):
1618
      raise errors.OpPrereqError("Invalid secondary IP given")
1619
    self.op.secondary_ip = secondary_ip
1620

    
1621
    node_list = cfg.GetNodeList()
1622
    if not self.op.readd and node in node_list:
1623
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1624
                                 node)
1625
    elif self.op.readd and node not in node_list:
1626
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1627

    
1628
    for existing_node_name in node_list:
1629
      existing_node = cfg.GetNodeInfo(existing_node_name)
1630

    
1631
      if self.op.readd and node == existing_node_name:
1632
        if (existing_node.primary_ip != primary_ip or
1633
            existing_node.secondary_ip != secondary_ip):
1634
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1635
                                     " address configuration as before")
1636
        continue
1637

    
1638
      if (existing_node.primary_ip == primary_ip or
1639
          existing_node.secondary_ip == primary_ip or
1640
          existing_node.primary_ip == secondary_ip or
1641
          existing_node.secondary_ip == secondary_ip):
1642
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1643
                                   " existing node %s" % existing_node.name)
1644

    
1645
    # check that the type of the node (single versus dual homed) is the
1646
    # same as for the master
1647
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1648
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1649
    newbie_singlehomed = secondary_ip == primary_ip
1650
    if master_singlehomed != newbie_singlehomed:
1651
      if master_singlehomed:
1652
        raise errors.OpPrereqError("The master has no private ip but the"
1653
                                   " new node has one")
1654
      else:
1655
        raise errors.OpPrereqError("The master has a private ip but the"
1656
                                   " new node doesn't have one")
1657

    
1658
    # checks reachablity
1659
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1660
      raise errors.OpPrereqError("Node not reachable by ping")
1661

    
1662
    if not newbie_singlehomed:
1663
      # check reachability from my secondary ip to newbie's secondary ip
1664
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1665
                           source=myself.secondary_ip):
1666
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1667
                                   " based ping to noded port")
1668

    
1669
    self.new_node = objects.Node(name=node,
1670
                                 primary_ip=primary_ip,
1671
                                 secondary_ip=secondary_ip)
1672

    
1673
  def Exec(self, feedback_fn):
1674
    """Adds the new node to the cluster.
1675

1676
    """
1677
    new_node = self.new_node
1678
    node = new_node.name
1679

    
1680
    # check connectivity
1681
    result = rpc.call_version([node])[node]
1682
    if result:
1683
      if constants.PROTOCOL_VERSION == result:
1684
        logger.Info("communication to node %s fine, sw version %s match" %
1685
                    (node, result))
1686
      else:
1687
        raise errors.OpExecError("Version mismatch master version %s,"
1688
                                 " node version %s" %
1689
                                 (constants.PROTOCOL_VERSION, result))
1690
    else:
1691
      raise errors.OpExecError("Cannot get version from the new node")
1692

    
1693
    # setup ssh on node
1694
    logger.Info("copy ssh key to node %s" % node)
1695
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1696
    keyarray = []
1697
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1698
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1699
                priv_key, pub_key]
1700

    
1701
    for i in keyfiles:
1702
      f = open(i, 'r')
1703
      try:
1704
        keyarray.append(f.read())
1705
      finally:
1706
        f.close()
1707

    
1708
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1709
                               keyarray[3], keyarray[4], keyarray[5])
1710

    
1711
    if not result:
1712
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1713

    
1714
    # Add node to our /etc/hosts, and add key to known_hosts
1715
    utils.AddHostToEtcHosts(new_node.name)
1716

    
1717
    if new_node.secondary_ip != new_node.primary_ip:
1718
      if not rpc.call_node_tcp_ping(new_node.name,
1719
                                    constants.LOCALHOST_IP_ADDRESS,
1720
                                    new_node.secondary_ip,
1721
                                    constants.DEFAULT_NODED_PORT,
1722
                                    10, False):
1723
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1724
                                 " you gave (%s). Please fix and re-run this"
1725
                                 " command." % new_node.secondary_ip)
1726

    
1727
    node_verify_list = [self.sstore.GetMasterNode()]
1728
    node_verify_param = {
1729
      'nodelist': [node],
1730
      # TODO: do a node-net-test as well?
1731
    }
1732

    
1733
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1734
    for verifier in node_verify_list:
1735
      if not result[verifier]:
1736
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1737
                                 " for remote verification" % verifier)
1738
      if result[verifier]['nodelist']:
1739
        for failed in result[verifier]['nodelist']:
1740
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1741
                      (verifier, result[verifier]['nodelist'][failed]))
1742
        raise errors.OpExecError("ssh/hostname verification failed.")
1743

    
1744
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1745
    # including the node just added
1746
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1747
    dist_nodes = self.cfg.GetNodeList()
1748
    if not self.op.readd:
1749
      dist_nodes.append(node)
1750
    if myself.name in dist_nodes:
1751
      dist_nodes.remove(myself.name)
1752

    
1753
    logger.Debug("Copying hosts and known_hosts to all nodes")
1754
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1755
      result = rpc.call_upload_file(dist_nodes, fname)
1756
      for to_node in dist_nodes:
1757
        if not result[to_node]:
1758
          logger.Error("copy of file %s to node %s failed" %
1759
                       (fname, to_node))
1760

    
1761
    to_copy = self.sstore.GetFileList()
1762
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1763
      to_copy.append(constants.VNC_PASSWORD_FILE)
1764
    for fname in to_copy:
1765
      result = rpc.call_upload_file([node], fname)
1766
      if not result[node]:
1767
        logger.Error("could not copy file %s to node %s" % (fname, node))
1768

    
1769
    if self.op.readd:
1770
      self.context.ReaddNode(new_node)
1771
    else:
1772
      self.context.AddNode(new_node)
1773

    
1774

    
1775
class LUQueryClusterInfo(NoHooksLU):
1776
  """Query cluster configuration.
1777

1778
  """
1779
  _OP_REQP = []
1780
  REQ_MASTER = False
1781
  REQ_BGL = False
1782

    
1783
  def ExpandNames(self):
1784
    self.needed_locks = {}
1785

    
1786
  def CheckPrereq(self):
1787
    """No prerequsites needed for this LU.
1788

1789
    """
1790
    pass
1791

    
1792
  def Exec(self, feedback_fn):
1793
    """Return cluster config.
1794

1795
    """
1796
    result = {
1797
      "name": self.sstore.GetClusterName(),
1798
      "software_version": constants.RELEASE_VERSION,
1799
      "protocol_version": constants.PROTOCOL_VERSION,
1800
      "config_version": constants.CONFIG_VERSION,
1801
      "os_api_version": constants.OS_API_VERSION,
1802
      "export_version": constants.EXPORT_VERSION,
1803
      "master": self.sstore.GetMasterNode(),
1804
      "architecture": (platform.architecture()[0], platform.machine()),
1805
      "hypervisor_type": self.sstore.GetHypervisorType(),
1806
      }
1807

    
1808
    return result
1809

    
1810

    
1811
class LUDumpClusterConfig(NoHooksLU):
1812
  """Return a text-representation of the cluster-config.
1813

1814
  """
1815
  _OP_REQP = []
1816
  REQ_BGL = False
1817

    
1818
  def ExpandNames(self):
1819
    self.needed_locks = {}
1820

    
1821
  def CheckPrereq(self):
1822
    """No prerequisites.
1823

1824
    """
1825
    pass
1826

    
1827
  def Exec(self, feedback_fn):
1828
    """Dump a representation of the cluster config to the standard output.
1829

1830
    """
1831
    return self.cfg.DumpConfig()
1832

    
1833

    
1834
class LUActivateInstanceDisks(NoHooksLU):
1835
  """Bring up an instance's disks.
1836

1837
  """
1838
  _OP_REQP = ["instance_name"]
1839
  REQ_BGL = False
1840

    
1841
  def ExpandNames(self):
1842
    self._ExpandAndLockInstance()
1843
    self.needed_locks[locking.LEVEL_NODE] = []
1844
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1845

    
1846
  def DeclareLocks(self, level):
1847
    if level == locking.LEVEL_NODE:
1848
      self._LockInstancesNodes()
1849

    
1850
  def CheckPrereq(self):
1851
    """Check prerequisites.
1852

1853
    This checks that the instance is in the cluster.
1854

1855
    """
1856
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1857
    assert self.instance is not None, \
1858
      "Cannot retrieve locked instance %s" % self.op.instance_name
1859

    
1860
  def Exec(self, feedback_fn):
1861
    """Activate the disks.
1862

1863
    """
1864
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1865
    if not disks_ok:
1866
      raise errors.OpExecError("Cannot activate block devices")
1867

    
1868
    return disks_info
1869

    
1870

    
1871
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1872
  """Prepare the block devices for an instance.
1873

1874
  This sets up the block devices on all nodes.
1875

1876
  Args:
1877
    instance: a ganeti.objects.Instance object
1878
    ignore_secondaries: if true, errors on secondary nodes won't result
1879
                        in an error return from the function
1880

1881
  Returns:
1882
    false if the operation failed
1883
    list of (host, instance_visible_name, node_visible_name) if the operation
1884
         suceeded with the mapping from node devices to instance devices
1885
  """
1886
  device_info = []
1887
  disks_ok = True
1888
  iname = instance.name
1889
  # With the two passes mechanism we try to reduce the window of
1890
  # opportunity for the race condition of switching DRBD to primary
1891
  # before handshaking occured, but we do not eliminate it
1892

    
1893
  # The proper fix would be to wait (with some limits) until the
1894
  # connection has been made and drbd transitions from WFConnection
1895
  # into any other network-connected state (Connected, SyncTarget,
1896
  # SyncSource, etc.)
1897

    
1898
  # 1st pass, assemble on all nodes in secondary mode
1899
  for inst_disk in instance.disks:
1900
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1901
      cfg.SetDiskID(node_disk, node)
1902
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1903
      if not result:
1904
        logger.Error("could not prepare block device %s on node %s"
1905
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1906
        if not ignore_secondaries:
1907
          disks_ok = False
1908

    
1909
  # FIXME: race condition on drbd migration to primary
1910

    
1911
  # 2nd pass, do only the primary node
1912
  for inst_disk in instance.disks:
1913
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1914
      if node != instance.primary_node:
1915
        continue
1916
      cfg.SetDiskID(node_disk, node)
1917
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1918
      if not result:
1919
        logger.Error("could not prepare block device %s on node %s"
1920
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1921
        disks_ok = False
1922
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1923

    
1924
  # leave the disks configured for the primary node
1925
  # this is a workaround that would be fixed better by
1926
  # improving the logical/physical id handling
1927
  for disk in instance.disks:
1928
    cfg.SetDiskID(disk, instance.primary_node)
1929

    
1930
  return disks_ok, device_info
1931

    
1932

    
1933
def _StartInstanceDisks(cfg, instance, force):
1934
  """Start the disks of an instance.
1935

1936
  """
1937
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1938
                                           ignore_secondaries=force)
1939
  if not disks_ok:
1940
    _ShutdownInstanceDisks(instance, cfg)
1941
    if force is not None and not force:
1942
      logger.Error("If the message above refers to a secondary node,"
1943
                   " you can retry the operation using '--force'.")
1944
    raise errors.OpExecError("Disk consistency error")
1945

    
1946

    
1947
class LUDeactivateInstanceDisks(NoHooksLU):
1948
  """Shutdown an instance's disks.
1949

1950
  """
1951
  _OP_REQP = ["instance_name"]
1952
  REQ_BGL = False
1953

    
1954
  def ExpandNames(self):
1955
    self._ExpandAndLockInstance()
1956
    self.needed_locks[locking.LEVEL_NODE] = []
1957
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1958

    
1959
  def DeclareLocks(self, level):
1960
    if level == locking.LEVEL_NODE:
1961
      self._LockInstancesNodes()
1962

    
1963
  def CheckPrereq(self):
1964
    """Check prerequisites.
1965

1966
    This checks that the instance is in the cluster.
1967

1968
    """
1969
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1970
    assert self.instance is not None, \
1971
      "Cannot retrieve locked instance %s" % self.op.instance_name
1972

    
1973
  def Exec(self, feedback_fn):
1974
    """Deactivate the disks
1975

1976
    """
1977
    instance = self.instance
1978
    _SafeShutdownInstanceDisks(instance, self.cfg)
1979

    
1980

    
1981
def _SafeShutdownInstanceDisks(instance, cfg):
1982
  """Shutdown block devices of an instance.
1983

1984
  This function checks if an instance is running, before calling
1985
  _ShutdownInstanceDisks.
1986

1987
  """
1988
  ins_l = rpc.call_instance_list([instance.primary_node])
1989
  ins_l = ins_l[instance.primary_node]
1990
  if not type(ins_l) is list:
1991
    raise errors.OpExecError("Can't contact node '%s'" %
1992
                             instance.primary_node)
1993

    
1994
  if instance.name in ins_l:
1995
    raise errors.OpExecError("Instance is running, can't shutdown"
1996
                             " block devices.")
1997

    
1998
  _ShutdownInstanceDisks(instance, cfg)
1999

    
2000

    
2001
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2002
  """Shutdown block devices of an instance.
2003

2004
  This does the shutdown on all nodes of the instance.
2005

2006
  If the ignore_primary is false, errors on the primary node are
2007
  ignored.
2008

2009
  """
2010
  result = True
2011
  for disk in instance.disks:
2012
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2013
      cfg.SetDiskID(top_disk, node)
2014
      if not rpc.call_blockdev_shutdown(node, top_disk):
2015
        logger.Error("could not shutdown block device %s on node %s" %
2016
                     (disk.iv_name, node))
2017
        if not ignore_primary or node != instance.primary_node:
2018
          result = False
2019
  return result
2020

    
2021

    
2022
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2023
  """Checks if a node has enough free memory.
2024

2025
  This function check if a given node has the needed amount of free
2026
  memory. In case the node has less memory or we cannot get the
2027
  information from the node, this function raise an OpPrereqError
2028
  exception.
2029

2030
  Args:
2031
    - cfg: a ConfigWriter instance
2032
    - node: the node name
2033
    - reason: string to use in the error message
2034
    - requested: the amount of memory in MiB
2035

2036
  """
2037
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2038
  if not nodeinfo or not isinstance(nodeinfo, dict):
2039
    raise errors.OpPrereqError("Could not contact node %s for resource"
2040
                             " information" % (node,))
2041

    
2042
  free_mem = nodeinfo[node].get('memory_free')
2043
  if not isinstance(free_mem, int):
2044
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2045
                             " was '%s'" % (node, free_mem))
2046
  if requested > free_mem:
2047
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2048
                             " needed %s MiB, available %s MiB" %
2049
                             (node, reason, requested, free_mem))
2050

    
2051

    
2052
class LUStartupInstance(LogicalUnit):
2053
  """Starts an instance.
2054

2055
  """
2056
  HPATH = "instance-start"
2057
  HTYPE = constants.HTYPE_INSTANCE
2058
  _OP_REQP = ["instance_name", "force"]
2059
  REQ_BGL = False
2060

    
2061
  def ExpandNames(self):
2062
    self._ExpandAndLockInstance()
2063
    self.needed_locks[locking.LEVEL_NODE] = []
2064
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2065

    
2066
  def DeclareLocks(self, level):
2067
    if level == locking.LEVEL_NODE:
2068
      self._LockInstancesNodes()
2069

    
2070
  def BuildHooksEnv(self):
2071
    """Build hooks env.
2072

2073
    This runs on master, primary and secondary nodes of the instance.
2074

2075
    """
2076
    env = {
2077
      "FORCE": self.op.force,
2078
      }
2079
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2080
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2081
          list(self.instance.secondary_nodes))
2082
    return env, nl, nl
2083

    
2084
  def CheckPrereq(self):
2085
    """Check prerequisites.
2086

2087
    This checks that the instance is in the cluster.
2088

2089
    """
2090
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2091
    assert self.instance is not None, \
2092
      "Cannot retrieve locked instance %s" % self.op.instance_name
2093

    
2094
    # check bridges existance
2095
    _CheckInstanceBridgesExist(instance)
2096

    
2097
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2098
                         "starting instance %s" % instance.name,
2099
                         instance.memory)
2100

    
2101
  def Exec(self, feedback_fn):
2102
    """Start the instance.
2103

2104
    """
2105
    instance = self.instance
2106
    force = self.op.force
2107
    extra_args = getattr(self.op, "extra_args", "")
2108

    
2109
    self.cfg.MarkInstanceUp(instance.name)
2110

    
2111
    node_current = instance.primary_node
2112

    
2113
    _StartInstanceDisks(self.cfg, instance, force)
2114

    
2115
    if not rpc.call_instance_start(node_current, instance, extra_args):
2116
      _ShutdownInstanceDisks(instance, self.cfg)
2117
      raise errors.OpExecError("Could not start instance")
2118

    
2119

    
2120
class LURebootInstance(LogicalUnit):
2121
  """Reboot an instance.
2122

2123
  """
2124
  HPATH = "instance-reboot"
2125
  HTYPE = constants.HTYPE_INSTANCE
2126
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2127
  REQ_BGL = False
2128

    
2129
  def ExpandNames(self):
2130
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2131
                                   constants.INSTANCE_REBOOT_HARD,
2132
                                   constants.INSTANCE_REBOOT_FULL]:
2133
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2134
                                  (constants.INSTANCE_REBOOT_SOFT,
2135
                                   constants.INSTANCE_REBOOT_HARD,
2136
                                   constants.INSTANCE_REBOOT_FULL))
2137
    self._ExpandAndLockInstance()
2138
    self.needed_locks[locking.LEVEL_NODE] = []
2139
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2140

    
2141
  def DeclareLocks(self, level):
2142
    if level == locking.LEVEL_NODE:
2143
      primary_only = not constants.INSTANCE_REBOOT_FULL
2144
      self._LockInstancesNodes(primary_only=primary_only)
2145

    
2146
  def BuildHooksEnv(self):
2147
    """Build hooks env.
2148

2149
    This runs on master, primary and secondary nodes of the instance.
2150

2151
    """
2152
    env = {
2153
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2154
      }
2155
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2156
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2157
          list(self.instance.secondary_nodes))
2158
    return env, nl, nl
2159

    
2160
  def CheckPrereq(self):
2161
    """Check prerequisites.
2162

2163
    This checks that the instance is in the cluster.
2164

2165
    """
2166
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2167
    assert self.instance is not None, \
2168
      "Cannot retrieve locked instance %s" % self.op.instance_name
2169

    
2170
    # check bridges existance
2171
    _CheckInstanceBridgesExist(instance)
2172

    
2173
  def Exec(self, feedback_fn):
2174
    """Reboot the instance.
2175

2176
    """
2177
    instance = self.instance
2178
    ignore_secondaries = self.op.ignore_secondaries
2179
    reboot_type = self.op.reboot_type
2180
    extra_args = getattr(self.op, "extra_args", "")
2181

    
2182
    node_current = instance.primary_node
2183

    
2184
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2185
                       constants.INSTANCE_REBOOT_HARD]:
2186
      if not rpc.call_instance_reboot(node_current, instance,
2187
                                      reboot_type, extra_args):
2188
        raise errors.OpExecError("Could not reboot instance")
2189
    else:
2190
      if not rpc.call_instance_shutdown(node_current, instance):
2191
        raise errors.OpExecError("could not shutdown instance for full reboot")
2192
      _ShutdownInstanceDisks(instance, self.cfg)
2193
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2194
      if not rpc.call_instance_start(node_current, instance, extra_args):
2195
        _ShutdownInstanceDisks(instance, self.cfg)
2196
        raise errors.OpExecError("Could not start instance for full reboot")
2197

    
2198
    self.cfg.MarkInstanceUp(instance.name)
2199

    
2200

    
2201
class LUShutdownInstance(LogicalUnit):
2202
  """Shutdown an instance.
2203

2204
  """
2205
  HPATH = "instance-stop"
2206
  HTYPE = constants.HTYPE_INSTANCE
2207
  _OP_REQP = ["instance_name"]
2208
  REQ_BGL = False
2209

    
2210
  def ExpandNames(self):
2211
    self._ExpandAndLockInstance()
2212
    self.needed_locks[locking.LEVEL_NODE] = []
2213
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2214

    
2215
  def DeclareLocks(self, level):
2216
    if level == locking.LEVEL_NODE:
2217
      self._LockInstancesNodes()
2218

    
2219
  def BuildHooksEnv(self):
2220
    """Build hooks env.
2221

2222
    This runs on master, primary and secondary nodes of the instance.
2223

2224
    """
2225
    env = _BuildInstanceHookEnvByObject(self.instance)
2226
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2227
          list(self.instance.secondary_nodes))
2228
    return env, nl, nl
2229

    
2230
  def CheckPrereq(self):
2231
    """Check prerequisites.
2232

2233
    This checks that the instance is in the cluster.
2234

2235
    """
2236
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2237
    assert self.instance is not None, \
2238
      "Cannot retrieve locked instance %s" % self.op.instance_name
2239

    
2240
  def Exec(self, feedback_fn):
2241
    """Shutdown the instance.
2242

2243
    """
2244
    instance = self.instance
2245
    node_current = instance.primary_node
2246
    self.cfg.MarkInstanceDown(instance.name)
2247
    if not rpc.call_instance_shutdown(node_current, instance):
2248
      logger.Error("could not shutdown instance")
2249

    
2250
    _ShutdownInstanceDisks(instance, self.cfg)
2251

    
2252

    
2253
class LUReinstallInstance(LogicalUnit):
2254
  """Reinstall an instance.
2255

2256
  """
2257
  HPATH = "instance-reinstall"
2258
  HTYPE = constants.HTYPE_INSTANCE
2259
  _OP_REQP = ["instance_name"]
2260
  REQ_BGL = False
2261

    
2262
  def ExpandNames(self):
2263
    self._ExpandAndLockInstance()
2264
    self.needed_locks[locking.LEVEL_NODE] = []
2265
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2266

    
2267
  def DeclareLocks(self, level):
2268
    if level == locking.LEVEL_NODE:
2269
      self._LockInstancesNodes()
2270

    
2271
  def BuildHooksEnv(self):
2272
    """Build hooks env.
2273

2274
    This runs on master, primary and secondary nodes of the instance.
2275

2276
    """
2277
    env = _BuildInstanceHookEnvByObject(self.instance)
2278
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2279
          list(self.instance.secondary_nodes))
2280
    return env, nl, nl
2281

    
2282
  def CheckPrereq(self):
2283
    """Check prerequisites.
2284

2285
    This checks that the instance is in the cluster and is not running.
2286

2287
    """
2288
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289
    assert instance is not None, \
2290
      "Cannot retrieve locked instance %s" % self.op.instance_name
2291

    
2292
    if instance.disk_template == constants.DT_DISKLESS:
2293
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2294
                                 self.op.instance_name)
2295
    if instance.status != "down":
2296
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2297
                                 self.op.instance_name)
2298
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2299
    if remote_info:
2300
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2301
                                 (self.op.instance_name,
2302
                                  instance.primary_node))
2303

    
2304
    self.op.os_type = getattr(self.op, "os_type", None)
2305
    if self.op.os_type is not None:
2306
      # OS verification
2307
      pnode = self.cfg.GetNodeInfo(
2308
        self.cfg.ExpandNodeName(instance.primary_node))
2309
      if pnode is None:
2310
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2311
                                   self.op.pnode)
2312
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2313
      if not os_obj:
2314
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2315
                                   " primary node"  % self.op.os_type)
2316

    
2317
    self.instance = instance
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Reinstall the instance.
2321

2322
    """
2323
    inst = self.instance
2324

    
2325
    if self.op.os_type is not None:
2326
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2327
      inst.os = self.op.os_type
2328
      self.cfg.AddInstance(inst)
2329

    
2330
    _StartInstanceDisks(self.cfg, inst, None)
2331
    try:
2332
      feedback_fn("Running the instance OS create scripts...")
2333
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2334
        raise errors.OpExecError("Could not install OS for instance %s"
2335
                                 " on node %s" %
2336
                                 (inst.name, inst.primary_node))
2337
    finally:
2338
      _ShutdownInstanceDisks(inst, self.cfg)
2339

    
2340

    
2341
class LURenameInstance(LogicalUnit):
2342
  """Rename an instance.
2343

2344
  """
2345
  HPATH = "instance-rename"
2346
  HTYPE = constants.HTYPE_INSTANCE
2347
  _OP_REQP = ["instance_name", "new_name"]
2348

    
2349
  def BuildHooksEnv(self):
2350
    """Build hooks env.
2351

2352
    This runs on master, primary and secondary nodes of the instance.
2353

2354
    """
2355
    env = _BuildInstanceHookEnvByObject(self.instance)
2356
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2357
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2358
          list(self.instance.secondary_nodes))
2359
    return env, nl, nl
2360

    
2361
  def CheckPrereq(self):
2362
    """Check prerequisites.
2363

2364
    This checks that the instance is in the cluster and is not running.
2365

2366
    """
2367
    instance = self.cfg.GetInstanceInfo(
2368
      self.cfg.ExpandInstanceName(self.op.instance_name))
2369
    if instance is None:
2370
      raise errors.OpPrereqError("Instance '%s' not known" %
2371
                                 self.op.instance_name)
2372
    if instance.status != "down":
2373
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374
                                 self.op.instance_name)
2375
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2376
    if remote_info:
2377
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2378
                                 (self.op.instance_name,
2379
                                  instance.primary_node))
2380
    self.instance = instance
2381

    
2382
    # new name verification
2383
    name_info = utils.HostInfo(self.op.new_name)
2384

    
2385
    self.op.new_name = new_name = name_info.name
2386
    instance_list = self.cfg.GetInstanceList()
2387
    if new_name in instance_list:
2388
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2389
                                 new_name)
2390

    
2391
    if not getattr(self.op, "ignore_ip", False):
2392
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2393
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2394
                                   (name_info.ip, new_name))
2395

    
2396

    
2397
  def Exec(self, feedback_fn):
2398
    """Reinstall the instance.
2399

2400
    """
2401
    inst = self.instance
2402
    old_name = inst.name
2403

    
2404
    if inst.disk_template == constants.DT_FILE:
2405
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2406

    
2407
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2408
    # Change the instance lock. This is definitely safe while we hold the BGL
2409
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2410
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2411

    
2412
    # re-read the instance from the configuration after rename
2413
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2414

    
2415
    if inst.disk_template == constants.DT_FILE:
2416
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2417
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2418
                                                old_file_storage_dir,
2419
                                                new_file_storage_dir)
2420

    
2421
      if not result:
2422
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2423
                                 " directory '%s' to '%s' (but the instance"
2424
                                 " has been renamed in Ganeti)" % (
2425
                                 inst.primary_node, old_file_storage_dir,
2426
                                 new_file_storage_dir))
2427

    
2428
      if not result[0]:
2429
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2430
                                 " (but the instance has been renamed in"
2431
                                 " Ganeti)" % (old_file_storage_dir,
2432
                                               new_file_storage_dir))
2433

    
2434
    _StartInstanceDisks(self.cfg, inst, None)
2435
    try:
2436
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2437
                                          "sda", "sdb"):
2438
        msg = ("Could not run OS rename script for instance %s on node %s"
2439
               " (but the instance has been renamed in Ganeti)" %
2440
               (inst.name, inst.primary_node))
2441
        logger.Error(msg)
2442
    finally:
2443
      _ShutdownInstanceDisks(inst, self.cfg)
2444

    
2445

    
2446
class LURemoveInstance(LogicalUnit):
2447
  """Remove an instance.
2448

2449
  """
2450
  HPATH = "instance-remove"
2451
  HTYPE = constants.HTYPE_INSTANCE
2452
  _OP_REQP = ["instance_name", "ignore_failures"]
2453

    
2454
  def BuildHooksEnv(self):
2455
    """Build hooks env.
2456

2457
    This runs on master, primary and secondary nodes of the instance.
2458

2459
    """
2460
    env = _BuildInstanceHookEnvByObject(self.instance)
2461
    nl = [self.sstore.GetMasterNode()]
2462
    return env, nl, nl
2463

    
2464
  def CheckPrereq(self):
2465
    """Check prerequisites.
2466

2467
    This checks that the instance is in the cluster.
2468

2469
    """
2470
    instance = self.cfg.GetInstanceInfo(
2471
      self.cfg.ExpandInstanceName(self.op.instance_name))
2472
    if instance is None:
2473
      raise errors.OpPrereqError("Instance '%s' not known" %
2474
                                 self.op.instance_name)
2475
    self.instance = instance
2476

    
2477
  def Exec(self, feedback_fn):
2478
    """Remove the instance.
2479

2480
    """
2481
    instance = self.instance
2482
    logger.Info("shutting down instance %s on node %s" %
2483
                (instance.name, instance.primary_node))
2484

    
2485
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2486
      if self.op.ignore_failures:
2487
        feedback_fn("Warning: can't shutdown instance")
2488
      else:
2489
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2490
                                 (instance.name, instance.primary_node))
2491

    
2492
    logger.Info("removing block devices for instance %s" % instance.name)
2493

    
2494
    if not _RemoveDisks(instance, self.cfg):
2495
      if self.op.ignore_failures:
2496
        feedback_fn("Warning: can't remove instance's disks")
2497
      else:
2498
        raise errors.OpExecError("Can't remove instance's disks")
2499

    
2500
    logger.Info("removing instance %s out of cluster config" % instance.name)
2501

    
2502
    self.cfg.RemoveInstance(instance.name)
2503
    # Remove the new instance from the Ganeti Lock Manager
2504
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2505

    
2506

    
2507
class LUQueryInstances(NoHooksLU):
2508
  """Logical unit for querying instances.
2509

2510
  """
2511
  _OP_REQP = ["output_fields", "names"]
2512
  REQ_BGL = False
2513

    
2514
  def ExpandNames(self):
2515
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2516
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2517
                               "admin_state", "admin_ram",
2518
                               "disk_template", "ip", "mac", "bridge",
2519
                               "sda_size", "sdb_size", "vcpus", "tags",
2520
                               "auto_balance",
2521
                               "network_port", "kernel_path", "initrd_path",
2522
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2523
                               "hvm_cdrom_image_path", "hvm_nic_type",
2524
                               "hvm_disk_type", "vnc_bind_address"],
2525
                       dynamic=self.dynamic_fields,
2526
                       selected=self.op.output_fields)
2527

    
2528
    self.needed_locks = {}
2529
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2530
    self.share_locks[locking.LEVEL_NODE] = 1
2531

    
2532
    # TODO: we could lock instances (and nodes) only if the user asked for
2533
    # dynamic fields. For that we need atomic ways to get info for a group of
2534
    # instances from the config, though.
2535
    if not self.op.names:
2536
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2537
    else:
2538
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2539
        _GetWantedInstances(self, self.op.names)
2540

    
2541
    self.needed_locks[locking.LEVEL_NODE] = []
2542
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2543

    
2544
  def DeclareLocks(self, level):
2545
    # TODO: locking of nodes could be avoided when not querying them
2546
    if level == locking.LEVEL_NODE:
2547
      self._LockInstancesNodes()
2548

    
2549
  def CheckPrereq(self):
2550
    """Check prerequisites.
2551

2552
    """
2553
    # This of course is valid only if we locked the instances
2554
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2555

    
2556
  def Exec(self, feedback_fn):
2557
    """Computes the list of nodes and their attributes.
2558

2559
    """
2560
    instance_names = self.wanted
2561
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2562
                     in instance_names]
2563

    
2564
    # begin data gathering
2565

    
2566
    nodes = frozenset([inst.primary_node for inst in instance_list])
2567

    
2568
    bad_nodes = []
2569
    if self.dynamic_fields.intersection(self.op.output_fields):
2570
      live_data = {}
2571
      node_data = rpc.call_all_instances_info(nodes)
2572
      for name in nodes:
2573
        result = node_data[name]
2574
        if result:
2575
          live_data.update(result)
2576
        elif result == False:
2577
          bad_nodes.append(name)
2578
        # else no instance is alive
2579
    else:
2580
      live_data = dict([(name, {}) for name in instance_names])
2581

    
2582
    # end data gathering
2583

    
2584
    output = []
2585
    for instance in instance_list:
2586
      iout = []
2587
      for field in self.op.output_fields:
2588
        if field == "name":
2589
          val = instance.name
2590
        elif field == "os":
2591
          val = instance.os
2592
        elif field == "pnode":
2593
          val = instance.primary_node
2594
        elif field == "snodes":
2595
          val = list(instance.secondary_nodes)
2596
        elif field == "admin_state":
2597
          val = (instance.status != "down")
2598
        elif field == "oper_state":
2599
          if instance.primary_node in bad_nodes:
2600
            val = None
2601
          else:
2602
            val = bool(live_data.get(instance.name))
2603
        elif field == "status":
2604
          if instance.primary_node in bad_nodes:
2605
            val = "ERROR_nodedown"
2606
          else:
2607
            running = bool(live_data.get(instance.name))
2608
            if running:
2609
              if instance.status != "down":
2610
                val = "running"
2611
              else:
2612
                val = "ERROR_up"
2613
            else:
2614
              if instance.status != "down":
2615
                val = "ERROR_down"
2616
              else:
2617
                val = "ADMIN_down"
2618
        elif field == "admin_ram":
2619
          val = instance.memory
2620
        elif field == "oper_ram":
2621
          if instance.primary_node in bad_nodes:
2622
            val = None
2623
          elif instance.name in live_data:
2624
            val = live_data[instance.name].get("memory", "?")
2625
          else:
2626
            val = "-"
2627
        elif field == "disk_template":
2628
          val = instance.disk_template
2629
        elif field == "ip":
2630
          val = instance.nics[0].ip
2631
        elif field == "bridge":
2632
          val = instance.nics[0].bridge
2633
        elif field == "mac":
2634
          val = instance.nics[0].mac
2635
        elif field == "sda_size" or field == "sdb_size":
2636
          disk = instance.FindDisk(field[:3])
2637
          if disk is None:
2638
            val = None
2639
          else:
2640
            val = disk.size
2641
        elif field == "vcpus":
2642
          val = instance.vcpus
2643
        elif field == "tags":
2644
          val = list(instance.GetTags())
2645
        elif field in ("network_port", "kernel_path", "initrd_path",
2646
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2647
                       "hvm_cdrom_image_path", "hvm_nic_type",
2648
                       "hvm_disk_type", "vnc_bind_address"):
2649
          val = getattr(instance, field, None)
2650
          if val is not None:
2651
            pass
2652
          elif field in ("hvm_nic_type", "hvm_disk_type",
2653
                         "kernel_path", "initrd_path"):
2654
            val = "default"
2655
          else:
2656
            val = "-"
2657
        else:
2658
          raise errors.ParameterError(field)
2659
        iout.append(val)
2660
      output.append(iout)
2661

    
2662
    return output
2663

    
2664

    
2665
class LUFailoverInstance(LogicalUnit):
2666
  """Failover an instance.
2667

2668
  """
2669
  HPATH = "instance-failover"
2670
  HTYPE = constants.HTYPE_INSTANCE
2671
  _OP_REQP = ["instance_name", "ignore_consistency"]
2672
  REQ_BGL = False
2673

    
2674
  def ExpandNames(self):
2675
    self._ExpandAndLockInstance()
2676
    self.needed_locks[locking.LEVEL_NODE] = []
2677
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2678

    
2679
  def DeclareLocks(self, level):
2680
    if level == locking.LEVEL_NODE:
2681
      self._LockInstancesNodes()
2682

    
2683
  def BuildHooksEnv(self):
2684
    """Build hooks env.
2685

2686
    This runs on master, primary and secondary nodes of the instance.
2687

2688
    """
2689
    env = {
2690
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2691
      }
2692
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2693
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2694
    return env, nl, nl
2695

    
2696
  def CheckPrereq(self):
2697
    """Check prerequisites.
2698

2699
    This checks that the instance is in the cluster.
2700

2701
    """
2702
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703
    assert self.instance is not None, \
2704
      "Cannot retrieve locked instance %s" % self.op.instance_name
2705

    
2706
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2707
      raise errors.OpPrereqError("Instance's disk layout is not"
2708
                                 " network mirrored, cannot failover.")
2709

    
2710
    secondary_nodes = instance.secondary_nodes
2711
    if not secondary_nodes:
2712
      raise errors.ProgrammerError("no secondary node but using "
2713
                                   "a mirrored disk template")
2714

    
2715
    target_node = secondary_nodes[0]
2716
    # check memory requirements on the secondary node
2717
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2718
                         instance.name, instance.memory)
2719

    
2720
    # check bridge existance
2721
    brlist = [nic.bridge for nic in instance.nics]
2722
    if not rpc.call_bridges_exist(target_node, brlist):
2723
      raise errors.OpPrereqError("One or more target bridges %s does not"
2724
                                 " exist on destination node '%s'" %
2725
                                 (brlist, target_node))
2726

    
2727
  def Exec(self, feedback_fn):
2728
    """Failover an instance.
2729

2730
    The failover is done by shutting it down on its present node and
2731
    starting it on the secondary.
2732

2733
    """
2734
    instance = self.instance
2735

    
2736
    source_node = instance.primary_node
2737
    target_node = instance.secondary_nodes[0]
2738

    
2739
    feedback_fn("* checking disk consistency between source and target")
2740
    for dev in instance.disks:
2741
      # for drbd, these are drbd over lvm
2742
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2743
        if instance.status == "up" and not self.op.ignore_consistency:
2744
          raise errors.OpExecError("Disk %s is degraded on target node,"
2745
                                   " aborting failover." % dev.iv_name)
2746

    
2747
    feedback_fn("* shutting down instance on source node")
2748
    logger.Info("Shutting down instance %s on node %s" %
2749
                (instance.name, source_node))
2750

    
2751
    if not rpc.call_instance_shutdown(source_node, instance):
2752
      if self.op.ignore_consistency:
2753
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2754
                     " anyway. Please make sure node %s is down"  %
2755
                     (instance.name, source_node, source_node))
2756
      else:
2757
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2758
                                 (instance.name, source_node))
2759

    
2760
    feedback_fn("* deactivating the instance's disks on source node")
2761
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2762
      raise errors.OpExecError("Can't shut down the instance's disks.")
2763

    
2764
    instance.primary_node = target_node
2765
    # distribute new instance config to the other nodes
2766
    self.cfg.Update(instance)
2767

    
2768
    # Only start the instance if it's marked as up
2769
    if instance.status == "up":
2770
      feedback_fn("* activating the instance's disks on target node")
2771
      logger.Info("Starting instance %s on node %s" %
2772
                  (instance.name, target_node))
2773

    
2774
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2775
                                               ignore_secondaries=True)
2776
      if not disks_ok:
2777
        _ShutdownInstanceDisks(instance, self.cfg)
2778
        raise errors.OpExecError("Can't activate the instance's disks")
2779

    
2780
      feedback_fn("* starting the instance on the target node")
2781
      if not rpc.call_instance_start(target_node, instance, None):
2782
        _ShutdownInstanceDisks(instance, self.cfg)
2783
        raise errors.OpExecError("Could not start instance %s on node %s." %
2784
                                 (instance.name, target_node))
2785

    
2786

    
2787
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2788
  """Create a tree of block devices on the primary node.
2789

2790
  This always creates all devices.
2791

2792
  """
2793
  if device.children:
2794
    for child in device.children:
2795
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2796
        return False
2797

    
2798
  cfg.SetDiskID(device, node)
2799
  new_id = rpc.call_blockdev_create(node, device, device.size,
2800
                                    instance.name, True, info)
2801
  if not new_id:
2802
    return False
2803
  if device.physical_id is None:
2804
    device.physical_id = new_id
2805
  return True
2806

    
2807

    
2808
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2809
  """Create a tree of block devices on a secondary node.
2810

2811
  If this device type has to be created on secondaries, create it and
2812
  all its children.
2813

2814
  If not, just recurse to children keeping the same 'force' value.
2815

2816
  """
2817
  if device.CreateOnSecondary():
2818
    force = True
2819
  if device.children:
2820
    for child in device.children:
2821
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2822
                                        child, force, info):
2823
        return False
2824

    
2825
  if not force:
2826
    return True
2827
  cfg.SetDiskID(device, node)
2828
  new_id = rpc.call_blockdev_create(node, device, device.size,
2829
                                    instance.name, False, info)
2830
  if not new_id:
2831
    return False
2832
  if device.physical_id is None:
2833
    device.physical_id = new_id
2834
  return True
2835

    
2836

    
2837
def _GenerateUniqueNames(cfg, exts):
2838
  """Generate a suitable LV name.
2839

2840
  This will generate a logical volume name for the given instance.
2841

2842
  """
2843
  results = []
2844
  for val in exts:
2845
    new_id = cfg.GenerateUniqueID()
2846
    results.append("%s%s" % (new_id, val))
2847
  return results
2848

    
2849

    
2850
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2851
  """Generate a drbd8 device complete with its children.
2852

2853
  """
2854
  port = cfg.AllocatePort()
2855
  vgname = cfg.GetVGName()
2856
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2857
                          logical_id=(vgname, names[0]))
2858
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2859
                          logical_id=(vgname, names[1]))
2860
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2861
                          logical_id = (primary, secondary, port),
2862
                          children = [dev_data, dev_meta],
2863
                          iv_name=iv_name)
2864
  return drbd_dev
2865

    
2866

    
2867
def _GenerateDiskTemplate(cfg, template_name,
2868
                          instance_name, primary_node,
2869
                          secondary_nodes, disk_sz, swap_sz,
2870
                          file_storage_dir, file_driver):
2871
  """Generate the entire disk layout for a given template type.
2872

2873
  """
2874
  #TODO: compute space requirements
2875

    
2876
  vgname = cfg.GetVGName()
2877
  if template_name == constants.DT_DISKLESS:
2878
    disks = []
2879
  elif template_name == constants.DT_PLAIN:
2880
    if len(secondary_nodes) != 0:
2881
      raise errors.ProgrammerError("Wrong template configuration")
2882

    
2883
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2884
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2885
                           logical_id=(vgname, names[0]),
2886
                           iv_name = "sda")
2887
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2888
                           logical_id=(vgname, names[1]),
2889
                           iv_name = "sdb")
2890
    disks = [sda_dev, sdb_dev]
2891
  elif template_name == constants.DT_DRBD8:
2892
    if len(secondary_nodes) != 1:
2893
      raise errors.ProgrammerError("Wrong template configuration")
2894
    remote_node = secondary_nodes[0]
2895
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2896
                                       ".sdb_data", ".sdb_meta"])
2897
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2898
                                         disk_sz, names[0:2], "sda")
2899
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2900
                                         swap_sz, names[2:4], "sdb")
2901
    disks = [drbd_sda_dev, drbd_sdb_dev]
2902
  elif template_name == constants.DT_FILE:
2903
    if len(secondary_nodes) != 0:
2904
      raise errors.ProgrammerError("Wrong template configuration")
2905

    
2906
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2907
                                iv_name="sda", logical_id=(file_driver,
2908
                                "%s/sda" % file_storage_dir))
2909
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2910
                                iv_name="sdb", logical_id=(file_driver,
2911
                                "%s/sdb" % file_storage_dir))
2912
    disks = [file_sda_dev, file_sdb_dev]
2913
  else:
2914
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2915
  return disks
2916

    
2917

    
2918
def _GetInstanceInfoText(instance):
2919
  """Compute that text that should be added to the disk's metadata.
2920

2921
  """
2922
  return "originstname+%s" % instance.name
2923

    
2924

    
2925
def _CreateDisks(cfg, instance):
2926
  """Create all disks for an instance.
2927

2928
  This abstracts away some work from AddInstance.
2929

2930
  Args:
2931
    instance: the instance object
2932

2933
  Returns:
2934
    True or False showing the success of the creation process
2935

2936
  """
2937
  info = _GetInstanceInfoText(instance)
2938

    
2939
  if instance.disk_template == constants.DT_FILE:
2940
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2941
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2942
                                              file_storage_dir)
2943

    
2944
    if not result:
2945
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2946
      return False
2947

    
2948
    if not result[0]:
2949
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2950
      return False
2951

    
2952
  for device in instance.disks:
2953
    logger.Info("creating volume %s for instance %s" %
2954
                (device.iv_name, instance.name))
2955
    #HARDCODE
2956
    for secondary_node in instance.secondary_nodes:
2957
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2958
                                        device, False, info):
2959
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2960
                     (device.iv_name, device, secondary_node))
2961
        return False
2962
    #HARDCODE
2963
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2964
                                    instance, device, info):
2965
      logger.Error("failed to create volume %s on primary!" %
2966
                   device.iv_name)
2967
      return False
2968

    
2969
  return True
2970

    
2971

    
2972
def _RemoveDisks(instance, cfg):
2973
  """Remove all disks for an instance.
2974

2975
  This abstracts away some work from `AddInstance()` and
2976
  `RemoveInstance()`. Note that in case some of the devices couldn't
2977
  be removed, the removal will continue with the other ones (compare
2978
  with `_CreateDisks()`).
2979

2980
  Args:
2981
    instance: the instance object
2982

2983
  Returns:
2984
    True or False showing the success of the removal proces
2985

2986
  """
2987
  logger.Info("removing block devices for instance %s" % instance.name)
2988

    
2989
  result = True
2990
  for device in instance.disks:
2991
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2992
      cfg.SetDiskID(disk, node)
2993
      if not rpc.call_blockdev_remove(node, disk):
2994
        logger.Error("could not remove block device %s on node %s,"
2995
                     " continuing anyway" %
2996
                     (device.iv_name, node))
2997
        result = False
2998

    
2999
  if instance.disk_template == constants.DT_FILE:
3000
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3001
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3002
                                            file_storage_dir):
3003
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3004
      result = False
3005

    
3006
  return result
3007

    
3008

    
3009
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3010
  """Compute disk size requirements in the volume group
3011

3012
  This is currently hard-coded for the two-drive layout.
3013

3014
  """
3015
  # Required free disk space as a function of disk and swap space
3016
  req_size_dict = {
3017
    constants.DT_DISKLESS: None,
3018
    constants.DT_PLAIN: disk_size + swap_size,
3019
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3020
    constants.DT_DRBD8: disk_size + swap_size + 256,
3021
    constants.DT_FILE: None,
3022
  }
3023

    
3024
  if disk_template not in req_size_dict:
3025
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3026
                                 " is unknown" %  disk_template)
3027

    
3028
  return req_size_dict[disk_template]
3029

    
3030

    
3031
class LUCreateInstance(LogicalUnit):
3032
  """Create an instance.
3033

3034
  """
3035
  HPATH = "instance-add"
3036
  HTYPE = constants.HTYPE_INSTANCE
3037
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3038
              "disk_template", "swap_size", "mode", "start", "vcpus",
3039
              "wait_for_sync", "ip_check", "mac"]
3040

    
3041
  def _RunAllocator(self):
3042
    """Run the allocator based on input opcode.
3043

3044
    """
3045
    disks = [{"size": self.op.disk_size, "mode": "w"},
3046
             {"size": self.op.swap_size, "mode": "w"}]
3047
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3048
             "bridge": self.op.bridge}]
3049
    ial = IAllocator(self.cfg, self.sstore,
3050
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3051
                     name=self.op.instance_name,
3052
                     disk_template=self.op.disk_template,
3053
                     tags=[],
3054
                     os=self.op.os_type,
3055
                     vcpus=self.op.vcpus,
3056
                     mem_size=self.op.mem_size,
3057
                     disks=disks,
3058
                     nics=nics,
3059
                     )
3060

    
3061
    ial.Run(self.op.iallocator)
3062

    
3063
    if not ial.success:
3064
      raise errors.OpPrereqError("Can't compute nodes using"
3065
                                 " iallocator '%s': %s" % (self.op.iallocator,
3066
                                                           ial.info))
3067
    if len(ial.nodes) != ial.required_nodes:
3068
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3069
                                 " of nodes (%s), required %s" %
3070
                                 (len(ial.nodes), ial.required_nodes))
3071
    self.op.pnode = ial.nodes[0]
3072
    logger.ToStdout("Selected nodes for the instance: %s" %
3073
                    (", ".join(ial.nodes),))
3074
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3075
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3076
    if ial.required_nodes == 2:
3077
      self.op.snode = ial.nodes[1]
3078

    
3079
  def BuildHooksEnv(self):
3080
    """Build hooks env.
3081

3082
    This runs on master, primary and secondary nodes of the instance.
3083

3084
    """
3085
    env = {
3086
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3087
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3088
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3089
      "INSTANCE_ADD_MODE": self.op.mode,
3090
      }
3091
    if self.op.mode == constants.INSTANCE_IMPORT:
3092
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3093
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3094
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3095

    
3096
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3097
      primary_node=self.op.pnode,
3098
      secondary_nodes=self.secondaries,
3099
      status=self.instance_status,
3100
      os_type=self.op.os_type,
3101
      memory=self.op.mem_size,
3102
      vcpus=self.op.vcpus,
3103
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3104
    ))
3105

    
3106
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3107
          self.secondaries)
3108
    return env, nl, nl
3109

    
3110

    
3111
  def CheckPrereq(self):
3112
    """Check prerequisites.
3113

3114
    """
3115
    # set optional parameters to none if they don't exist
3116
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3117
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3118
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3119
      if not hasattr(self.op, attr):
3120
        setattr(self.op, attr, None)
3121

    
3122
    if self.op.mode not in (constants.INSTANCE_CREATE,
3123
                            constants.INSTANCE_IMPORT):
3124
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3125
                                 self.op.mode)
3126

    
3127
    if (not self.cfg.GetVGName() and
3128
        self.op.disk_template not in constants.DTS_NOT_LVM):
3129
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3130
                                 " instances")
3131

    
3132
    if self.op.mode == constants.INSTANCE_IMPORT:
3133
      src_node = getattr(self.op, "src_node", None)
3134
      src_path = getattr(self.op, "src_path", None)
3135
      if src_node is None or src_path is None:
3136
        raise errors.OpPrereqError("Importing an instance requires source"
3137
                                   " node and path options")
3138
      src_node_full = self.cfg.ExpandNodeName(src_node)
3139
      if src_node_full is None:
3140
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3141
      self.op.src_node = src_node = src_node_full
3142

    
3143
      if not os.path.isabs(src_path):
3144
        raise errors.OpPrereqError("The source path must be absolute")
3145

    
3146
      export_info = rpc.call_export_info(src_node, src_path)
3147

    
3148
      if not export_info:
3149
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3150

    
3151
      if not export_info.has_section(constants.INISECT_EXP):
3152
        raise errors.ProgrammerError("Corrupted export config")
3153

    
3154
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3155
      if (int(ei_version) != constants.EXPORT_VERSION):
3156
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3157
                                   (ei_version, constants.EXPORT_VERSION))
3158

    
3159
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3160
        raise errors.OpPrereqError("Can't import instance with more than"
3161
                                   " one data disk")
3162

    
3163
      # FIXME: are the old os-es, disk sizes, etc. useful?
3164
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3165
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3166
                                                         'disk0_dump'))
3167
      self.src_image = diskimage
3168
    else: # INSTANCE_CREATE
3169
      if getattr(self.op, "os_type", None) is None:
3170
        raise errors.OpPrereqError("No guest OS specified")
3171

    
3172
    #### instance parameters check
3173

    
3174
    # disk template and mirror node verification
3175
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3176
      raise errors.OpPrereqError("Invalid disk template name")
3177

    
3178
    # instance name verification
3179
    hostname1 = utils.HostInfo(self.op.instance_name)
3180

    
3181
    self.op.instance_name = instance_name = hostname1.name
3182
    instance_list = self.cfg.GetInstanceList()
3183
    if instance_name in instance_list:
3184
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3185
                                 instance_name)
3186

    
3187
    # ip validity checks
3188
    ip = getattr(self.op, "ip", None)
3189
    if ip is None or ip.lower() == "none":
3190
      inst_ip = None
3191
    elif ip.lower() == "auto":
3192
      inst_ip = hostname1.ip
3193
    else:
3194
      if not utils.IsValidIP(ip):
3195
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3196
                                   " like a valid IP" % ip)
3197
      inst_ip = ip
3198
    self.inst_ip = self.op.ip = inst_ip
3199

    
3200
    if self.op.start and not self.op.ip_check:
3201
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3202
                                 " adding an instance in start mode")
3203

    
3204
    if self.op.ip_check:
3205
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3206
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3207
                                   (hostname1.ip, instance_name))
3208

    
3209
    # MAC address verification
3210
    if self.op.mac != "auto":
3211
      if not utils.IsValidMac(self.op.mac.lower()):
3212
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3213
                                   self.op.mac)
3214

    
3215
    # bridge verification
3216
    bridge = getattr(self.op, "bridge", None)
3217
    if bridge is None:
3218
      self.op.bridge = self.cfg.GetDefBridge()
3219
    else:
3220
      self.op.bridge = bridge
3221

    
3222
    # boot order verification
3223
    if self.op.hvm_boot_order is not None:
3224
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3225
        raise errors.OpPrereqError("invalid boot order specified,"
3226
                                   " must be one or more of [acdn]")
3227
    # file storage checks
3228
    if (self.op.file_driver and
3229
        not self.op.file_driver in constants.FILE_DRIVER):
3230
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3231
                                 self.op.file_driver)
3232

    
3233
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3234
      raise errors.OpPrereqError("File storage directory not a relative"
3235
                                 " path")
3236
    #### allocator run
3237

    
3238
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3239
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3240
                                 " node must be given")
3241

    
3242
    if self.op.iallocator is not None:
3243
      self._RunAllocator()
3244

    
3245
    #### node related checks
3246

    
3247
    # check primary node
3248
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3249
    if pnode is None:
3250
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3251
                                 self.op.pnode)
3252
    self.op.pnode = pnode.name
3253
    self.pnode = pnode
3254
    self.secondaries = []
3255

    
3256
    # mirror node verification
3257
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3258
      if getattr(self.op, "snode", None) is None:
3259
        raise errors.OpPrereqError("The networked disk templates need"
3260
                                   " a mirror node")
3261

    
3262
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3263
      if snode_name is None:
3264
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3265
                                   self.op.snode)
3266
      elif snode_name == pnode.name:
3267
        raise errors.OpPrereqError("The secondary node cannot be"
3268
                                   " the primary node.")
3269
      self.secondaries.append(snode_name)
3270

    
3271
    req_size = _ComputeDiskSize(self.op.disk_template,
3272
                                self.op.disk_size, self.op.swap_size)
3273

    
3274
    # Check lv size requirements
3275
    if req_size is not None:
3276
      nodenames = [pnode.name] + self.secondaries
3277
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3278
      for node in nodenames:
3279
        info = nodeinfo.get(node, None)
3280
        if not info:
3281
          raise errors.OpPrereqError("Cannot get current information"
3282
                                     " from node '%s'" % node)
3283
        vg_free = info.get('vg_free', None)
3284
        if not isinstance(vg_free, int):
3285
          raise errors.OpPrereqError("Can't compute free disk space on"
3286
                                     " node %s" % node)
3287
        if req_size > info['vg_free']:
3288
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3289
                                     " %d MB available, %d MB required" %
3290
                                     (node, info['vg_free'], req_size))
3291

    
3292
    # os verification
3293
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3294
    if not os_obj:
3295
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3296
                                 " primary node"  % self.op.os_type)
3297

    
3298
    if self.op.kernel_path == constants.VALUE_NONE:
3299
      raise errors.OpPrereqError("Can't set instance kernel to none")
3300

    
3301

    
3302
    # bridge check on primary node
3303
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3304
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3305
                                 " destination node '%s'" %
3306
                                 (self.op.bridge, pnode.name))
3307

    
3308
    # memory check on primary node
3309
    if self.op.start:
3310
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3311
                           "creating instance %s" % self.op.instance_name,
3312
                           self.op.mem_size)
3313

    
3314
    # hvm_cdrom_image_path verification
3315
    if self.op.hvm_cdrom_image_path is not None:
3316
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3317
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3318
                                   " be an absolute path or None, not %s" %
3319
                                   self.op.hvm_cdrom_image_path)
3320
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3321
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3322
                                   " regular file or a symlink pointing to"
3323
                                   " an existing regular file, not %s" %
3324
                                   self.op.hvm_cdrom_image_path)
3325

    
3326
    # vnc_bind_address verification
3327
    if self.op.vnc_bind_address is not None:
3328
      if not utils.IsValidIP(self.op.vnc_bind_address):
3329
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3330
                                   " like a valid IP address" %
3331
                                   self.op.vnc_bind_address)
3332

    
3333
    # Xen HVM device type checks
3334
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3335
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3336
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3337
                                   " hypervisor" % self.op.hvm_nic_type)
3338
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3339
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3340
                                   " hypervisor" % self.op.hvm_disk_type)
3341

    
3342
    if self.op.start:
3343
      self.instance_status = 'up'
3344
    else:
3345
      self.instance_status = 'down'
3346

    
3347
  def Exec(self, feedback_fn):
3348
    """Create and add the instance to the cluster.
3349

3350
    """
3351
    instance = self.op.instance_name
3352
    pnode_name = self.pnode.name
3353

    
3354
    if self.op.mac == "auto":
3355
      mac_address = self.cfg.GenerateMAC()
3356
    else:
3357
      mac_address = self.op.mac
3358

    
3359
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3360
    if self.inst_ip is not None:
3361
      nic.ip = self.inst_ip
3362

    
3363
    ht_kind = self.sstore.GetHypervisorType()
3364
    if ht_kind in constants.HTS_REQ_PORT:
3365
      network_port = self.cfg.AllocatePort()
3366
    else:
3367
      network_port = None
3368

    
3369
    if self.op.vnc_bind_address is None:
3370
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3371

    
3372
    # this is needed because os.path.join does not accept None arguments
3373
    if self.op.file_storage_dir is None:
3374
      string_file_storage_dir = ""
3375
    else:
3376
      string_file_storage_dir = self.op.file_storage_dir
3377

    
3378
    # build the full file storage dir path
3379
    file_storage_dir = os.path.normpath(os.path.join(
3380
                                        self.sstore.GetFileStorageDir(),
3381
                                        string_file_storage_dir, instance))
3382

    
3383

    
3384
    disks = _GenerateDiskTemplate(self.cfg,
3385
                                  self.op.disk_template,
3386
                                  instance, pnode_name,
3387
                                  self.secondaries, self.op.disk_size,
3388
                                  self.op.swap_size,
3389
                                  file_storage_dir,
3390
                                  self.op.file_driver)
3391

    
3392
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3393
                            primary_node=pnode_name,
3394
                            memory=self.op.mem_size,
3395
                            vcpus=self.op.vcpus,
3396
                            nics=[nic], disks=disks,
3397
                            disk_template=self.op.disk_template,
3398
                            status=self.instance_status,
3399
                            network_port=network_port,
3400
                            kernel_path=self.op.kernel_path,
3401
                            initrd_path=self.op.initrd_path,
3402
                            hvm_boot_order=self.op.hvm_boot_order,
3403
                            hvm_acpi=self.op.hvm_acpi,
3404
                            hvm_pae=self.op.hvm_pae,
3405
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3406
                            vnc_bind_address=self.op.vnc_bind_address,
3407
                            hvm_nic_type=self.op.hvm_nic_type,
3408
                            hvm_disk_type=self.op.hvm_disk_type,
3409
                            )
3410

    
3411
    feedback_fn("* creating instance disks...")
3412
    if not _CreateDisks(self.cfg, iobj):
3413
      _RemoveDisks(iobj, self.cfg)
3414
      raise errors.OpExecError("Device creation failed, reverting...")
3415

    
3416
    feedback_fn("adding instance %s to cluster config" % instance)
3417

    
3418
    self.cfg.AddInstance(iobj)
3419
    # Add the new instance to the Ganeti Lock Manager
3420
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3421

    
3422
    if self.op.wait_for_sync:
3423
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3424
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3425
      # make sure the disks are not degraded (still sync-ing is ok)
3426
      time.sleep(15)
3427
      feedback_fn("* checking mirrors status")
3428
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3429
    else:
3430
      disk_abort = False
3431

    
3432
    if disk_abort:
3433
      _RemoveDisks(iobj, self.cfg)
3434
      self.cfg.RemoveInstance(iobj.name)
3435
      # Remove the new instance from the Ganeti Lock Manager
3436
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3437
      raise errors.OpExecError("There are some degraded disks for"
3438
                               " this instance")
3439

    
3440
    feedback_fn("creating os for instance %s on node %s" %
3441
                (instance, pnode_name))
3442

    
3443
    if iobj.disk_template != constants.DT_DISKLESS:
3444
      if self.op.mode == constants.INSTANCE_CREATE:
3445
        feedback_fn("* running the instance OS create scripts...")
3446
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3447
          raise errors.OpExecError("could not add os for instance %s"
3448
                                   " on node %s" %
3449
                                   (instance, pnode_name))
3450

    
3451
      elif self.op.mode == constants.INSTANCE_IMPORT:
3452
        feedback_fn("* running the instance OS import scripts...")
3453
        src_node = self.op.src_node
3454
        src_image = self.src_image
3455
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3456
                                                src_node, src_image):
3457
          raise errors.OpExecError("Could not import os for instance"
3458
                                   " %s on node %s" %
3459
                                   (instance, pnode_name))
3460
      else:
3461
        # also checked in the prereq part
3462
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3463
                                     % self.op.mode)
3464

    
3465
    if self.op.start:
3466
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3467
      feedback_fn("* starting instance...")
3468
      if not rpc.call_instance_start(pnode_name, iobj, None):
3469
        raise errors.OpExecError("Could not start instance")
3470

    
3471

    
3472
class LUConnectConsole(NoHooksLU):
3473
  """Connect to an instance's console.
3474

3475
  This is somewhat special in that it returns the command line that
3476
  you need to run on the master node in order to connect to the
3477
  console.
3478

3479
  """
3480
  _OP_REQP = ["instance_name"]
3481
  REQ_BGL = False
3482

    
3483
  def ExpandNames(self):
3484
    self._ExpandAndLockInstance()
3485

    
3486
  def CheckPrereq(self):
3487
    """Check prerequisites.
3488

3489
    This checks that the instance is in the cluster.
3490

3491
    """
3492
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3493
    assert self.instance is not None, \
3494
      "Cannot retrieve locked instance %s" % self.op.instance_name
3495

    
3496
  def Exec(self, feedback_fn):
3497
    """Connect to the console of an instance
3498

3499
    """
3500
    instance = self.instance
3501
    node = instance.primary_node
3502

    
3503
    node_insts = rpc.call_instance_list([node])[node]
3504
    if node_insts is False:
3505
      raise errors.OpExecError("Can't connect to node %s." % node)
3506

    
3507
    if instance.name not in node_insts:
3508
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3509

    
3510
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3511

    
3512
    hyper = hypervisor.GetHypervisor()
3513
    console_cmd = hyper.GetShellCommandForConsole(instance)
3514

    
3515
    # build ssh cmdline
3516
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3517

    
3518

    
3519
class LUReplaceDisks(LogicalUnit):
3520
  """Replace the disks of an instance.
3521

3522
  """
3523
  HPATH = "mirrors-replace"
3524
  HTYPE = constants.HTYPE_INSTANCE
3525
  _OP_REQP = ["instance_name", "mode", "disks"]
3526
  REQ_BGL = False
3527

    
3528
  def ExpandNames(self):
3529
    self._ExpandAndLockInstance()
3530

    
3531
    if not hasattr(self.op, "remote_node"):
3532
      self.op.remote_node = None
3533

    
3534
    ia_name = getattr(self.op, "iallocator", None)
3535
    if ia_name is not None:
3536
      if self.op.remote_node is not None:
3537
        raise errors.OpPrereqError("Give either the iallocator or the new"
3538
                                   " secondary, not both")
3539
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3540
    elif self.op.remote_node is not None:
3541
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3542
      if remote_node is None:
3543
        raise errors.OpPrereqError("Node '%s' not known" %
3544
                                   self.op.remote_node)
3545
      self.op.remote_node = remote_node
3546
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3547
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3548
    else:
3549
      self.needed_locks[locking.LEVEL_NODE] = []
3550
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3551

    
3552
  def DeclareLocks(self, level):
3553
    # If we're not already locking all nodes in the set we have to declare the
3554
    # instance's primary/secondary nodes.
3555
    if (level == locking.LEVEL_NODE and
3556
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3557
      self._LockInstancesNodes()
3558

    
3559
  def _RunAllocator(self):
3560
    """Compute a new secondary node using an IAllocator.
3561

3562
    """
3563
    ial = IAllocator(self.cfg, self.sstore,
3564
                     mode=constants.IALLOCATOR_MODE_RELOC,
3565
                     name=self.op.instance_name,
3566
                     relocate_from=[self.sec_node])
3567

    
3568
    ial.Run(self.op.iallocator)
3569

    
3570
    if not ial.success:
3571
      raise errors.OpPrereqError("Can't compute nodes using"
3572
                                 " iallocator '%s': %s" % (self.op.iallocator,
3573
                                                           ial.info))
3574
    if len(ial.nodes) != ial.required_nodes:
3575
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3576
                                 " of nodes (%s), required %s" %
3577
                                 (len(ial.nodes), ial.required_nodes))
3578
    self.op.remote_node = ial.nodes[0]
3579
    logger.ToStdout("Selected new secondary for the instance: %s" %
3580
                    self.op.remote_node)
3581

    
3582
  def BuildHooksEnv(self):
3583
    """Build hooks env.
3584

3585
    This runs on the master, the primary and all the secondaries.
3586

3587
    """
3588
    env = {
3589
      "MODE": self.op.mode,
3590
      "NEW_SECONDARY": self.op.remote_node,
3591
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3592
      }
3593
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3594
    nl = [
3595
      self.sstore.GetMasterNode(),
3596
      self.instance.primary_node,
3597
      ]
3598
    if self.op.remote_node is not None:
3599
      nl.append(self.op.remote_node)
3600
    return env, nl, nl
3601

    
3602
  def CheckPrereq(self):
3603
    """Check prerequisites.
3604

3605
    This checks that the instance is in the cluster.
3606

3607
    """
3608
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3609
    assert instance is not None, \
3610
      "Cannot retrieve locked instance %s" % self.op.instance_name
3611
    self.instance = instance
3612

    
3613
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3614
      raise errors.OpPrereqError("Instance's disk layout is not"
3615
                                 " network mirrored.")
3616

    
3617
    if len(instance.secondary_nodes) != 1:
3618
      raise errors.OpPrereqError("The instance has a strange layout,"
3619
                                 " expected one secondary but found %d" %
3620
                                 len(instance.secondary_nodes))
3621

    
3622
    self.sec_node = instance.secondary_nodes[0]
3623

    
3624
    ia_name = getattr(self.op, "iallocator", None)
3625
    if ia_name is not None:
3626
      self._RunAllocator()
3627

    
3628
    remote_node = self.op.remote_node
3629
    if remote_node is not None:
3630
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3631
      assert self.remote_node_info is not None, \
3632
        "Cannot retrieve locked node %s" % remote_node
3633
    else:
3634
      self.remote_node_info = None
3635
    if remote_node == instance.primary_node:
3636
      raise errors.OpPrereqError("The specified node is the primary node of"
3637
                                 " the instance.")
3638
    elif remote_node == self.sec_node:
3639
      if self.op.mode == constants.REPLACE_DISK_SEC:
3640
        # this is for DRBD8, where we can't execute the same mode of
3641
        # replacement as for drbd7 (no different port allocated)
3642
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3643
                                   " replacement")
3644
    if instance.disk_template == constants.DT_DRBD8:
3645
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3646
          remote_node is not None):
3647
        # switch to replace secondary mode
3648
        self.op.mode = constants.REPLACE_DISK_SEC
3649

    
3650
      if self.op.mode == constants.REPLACE_DISK_ALL:
3651
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3652
                                   " secondary disk replacement, not"
3653
                                   " both at once")
3654
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3655
        if remote_node is not None:
3656
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3657
                                     " the secondary while doing a primary"
3658
                                     " node disk replacement")
3659
        self.tgt_node = instance.primary_node
3660
        self.oth_node = instance.secondary_nodes[0]
3661
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3662
        self.new_node = remote_node # this can be None, in which case
3663
                                    # we don't change the secondary
3664
        self.tgt_node = instance.secondary_nodes[0]
3665
        self.oth_node = instance.primary_node
3666
      else:
3667
        raise errors.ProgrammerError("Unhandled disk replace mode")
3668

    
3669
    for name in self.op.disks:
3670
      if instance.FindDisk(name) is None:
3671
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3672
                                   (name, instance.name))
3673

    
3674
  def _ExecD8DiskOnly(self, feedback_fn):
3675
    """Replace a disk on the primary or secondary for dbrd8.
3676

3677
    The algorithm for replace is quite complicated:
3678
      - for each disk to be replaced:
3679
        - create new LVs on the target node with unique names
3680
        - detach old LVs from the drbd device
3681
        - rename old LVs to name_replaced.<time_t>
3682
        - rename new LVs to old LVs
3683
        - attach the new LVs (with the old names now) to the drbd device
3684
      - wait for sync across all devices
3685
      - for each modified disk:
3686
        - remove old LVs (which have the name name_replaces.<time_t>)
3687

3688
    Failures are not very well handled.
3689

3690
    """
3691
    steps_total = 6
3692
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3693
    instance = self.instance
3694
    iv_names = {}
3695
    vgname = self.cfg.GetVGName()
3696
    # start of work
3697
    cfg = self.cfg
3698
    tgt_node = self.tgt_node
3699
    oth_node = self.oth_node
3700

    
3701
    # Step: check device activation
3702
    self.proc.LogStep(1, steps_total, "check device existence")
3703
    info("checking volume groups")
3704
    my_vg = cfg.GetVGName()
3705
    results = rpc.call_vg_list([oth_node, tgt_node])
3706
    if not results:
3707
      raise errors.OpExecError("Can't list volume groups on the nodes")
3708
    for node in oth_node, tgt_node:
3709
      res = results.get(node, False)
3710
      if not res or my_vg not in res:
3711
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3712
                                 (my_vg, node))
3713
    for dev in instance.disks:
3714
      if not dev.iv_name in self.op.disks:
3715
        continue
3716
      for node in tgt_node, oth_node:
3717
        info("checking %s on %s" % (dev.iv_name, node))
3718
        cfg.SetDiskID(dev, node)
3719
        if not rpc.call_blockdev_find(node, dev):
3720
          raise errors.OpExecError("Can't find device %s on node %s" %
3721
                                   (dev.iv_name, node))
3722

    
3723
    # Step: check other node consistency
3724
    self.proc.LogStep(2, steps_total, "check peer consistency")
3725
    for dev in instance.disks:
3726
      if not dev.iv_name in self.op.disks:
3727
        continue
3728
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3729
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3730
                                   oth_node==instance.primary_node):
3731
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3732
                                 " to replace disks on this node (%s)" %
3733
                                 (oth_node, tgt_node))
3734

    
3735
    # Step: create new storage
3736
    self.proc.LogStep(3, steps_total, "allocate new storage")
3737
    for dev in instance.disks:
3738
      if not dev.iv_name in self.op.disks:
3739
        continue
3740
      size = dev.size
3741
      cfg.SetDiskID(dev, tgt_node)
3742
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3743
      names = _GenerateUniqueNames(cfg, lv_names)
3744
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3745
                             logical_id=(vgname, names[0]))
3746
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3747
                             logical_id=(vgname, names[1]))
3748
      new_lvs = [lv_data, lv_meta]
3749
      old_lvs = dev.children
3750
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3751
      info("creating new local storage on %s for %s" %
3752
           (tgt_node, dev.iv_name))
3753
      # since we *always* want to create this LV, we use the
3754
      # _Create...OnPrimary (which forces the creation), even if we
3755
      # are talking about the secondary node
3756
      for new_lv in new_lvs:
3757
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3758
                                        _GetInstanceInfoText(instance)):
3759
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3760
                                   " node '%s'" %
3761
                                   (new_lv.logical_id[1], tgt_node))
3762

    
3763
    # Step: for each lv, detach+rename*2+attach
3764
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3765
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3766
      info("detaching %s drbd from local storage" % dev.iv_name)
3767
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3768
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3769
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3770
      #dev.children = []
3771
      #cfg.Update(instance)
3772

    
3773
      # ok, we created the new LVs, so now we know we have the needed
3774
      # storage; as such, we proceed on the target node to rename
3775
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3776
      # using the assumption that logical_id == physical_id (which in
3777
      # turn is the unique_id on that node)
3778

    
3779
      # FIXME(iustin): use a better name for the replaced LVs
3780
      temp_suffix = int(time.time())
3781
      ren_fn = lambda d, suff: (d.physical_id[0],
3782
                                d.physical_id[1] + "_replaced-%s" % suff)
3783
      # build the rename list based on what LVs exist on the node
3784
      rlist = []
3785
      for to_ren in old_lvs:
3786
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3787
        if find_res is not None: # device exists
3788
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3789

    
3790
      info("renaming the old LVs on the target node")
3791
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3792
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3793
      # now we rename the new LVs to the old LVs
3794
      info("renaming the new LVs on the target node")
3795
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3796
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3797
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3798

    
3799
      for old, new in zip(old_lvs, new_lvs):
3800
        new.logical_id = old.logical_id
3801
        cfg.SetDiskID(new, tgt_node)
3802

    
3803
      for disk in old_lvs:
3804
        disk.logical_id = ren_fn(disk, temp_suffix)
3805
        cfg.SetDiskID(disk, tgt_node)
3806

    
3807
      # now that the new lvs have the old name, we can add them to the device
3808
      info("adding new mirror component on %s" % tgt_node)
3809
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3810
        for new_lv in new_lvs:
3811
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3812
            warning("Can't rollback device %s", hint="manually cleanup unused"
3813
                    " logical volumes")
3814
        raise errors.OpExecError("Can't add local storage to drbd")
3815

    
3816
      dev.children = new_lvs
3817
      cfg.Update(instance)
3818

    
3819
    # Step: wait for sync
3820

    
3821
    # this can fail as the old devices are degraded and _WaitForSync
3822
    # does a combined result over all disks, so we don't check its
3823
    # return value
3824
    self.proc.LogStep(5, steps_total, "sync devices")
3825
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3826

    
3827
    # so check manually all the devices
3828
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3829
      cfg.SetDiskID(dev, instance.primary_node)
3830
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3831
      if is_degr:
3832
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3833

    
3834
    # Step: remove old storage
3835
    self.proc.LogStep(6, steps_total, "removing old storage")
3836
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3837
      info("remove logical volumes for %s" % name)
3838
      for lv in old_lvs:
3839
        cfg.SetDiskID(lv, tgt_node)
3840
        if not rpc.call_blockdev_remove(tgt_node, lv):
3841
          warning("Can't remove old LV", hint="manually remove unused LVs")
3842
          continue
3843

    
3844
  def _ExecD8Secondary(self, feedback_fn):
3845
    """Replace the secondary node for drbd8.
3846

3847
    The algorithm for replace is quite complicated:
3848
      - for all disks of the instance:
3849
        - create new LVs on the new node with same names
3850
        - shutdown the drbd device on the old secondary
3851
        - disconnect the drbd network on the primary
3852
        - create the drbd device on the new secondary
3853
        - network attach the drbd on the primary, using an artifice:
3854
          the drbd code for Attach() will connect to the network if it
3855
          finds a device which is connected to the good local disks but
3856
          not network enabled
3857
      - wait for sync across all devices
3858
      - remove all disks from the old secondary
3859

3860
    Failures are not very well handled.
3861

3862
    """
3863
    steps_total = 6
3864
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3865
    instance = self.instance
3866
    iv_names = {}
3867
    vgname = self.cfg.GetVGName()
3868
    # start of work
3869
    cfg = self.cfg
3870
    old_node = self.tgt_node
3871
    new_node = self.new_node
3872
    pri_node = instance.primary_node
3873

    
3874
    # Step: check device activation
3875
    self.proc.LogStep(1, steps_total, "check device existence")
3876
    info("checking volume groups")
3877
    my_vg = cfg.GetVGName()
3878
    results = rpc.call_vg_list([pri_node, new_node])
3879
    if not results:
3880
      raise errors.OpExecError("Can't list volume groups on the nodes")
3881
    for node in pri_node, new_node:
3882
      res = results.get(node, False)
3883
      if not res or my_vg not in res:
3884
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3885
                                 (my_vg, node))
3886
    for dev in instance.disks:
3887
      if not dev.iv_name in self.op.disks:
3888
        continue
3889
      info("checking %s on %s" % (dev.iv_name, pri_node))
3890
      cfg.SetDiskID(dev, pri_node)
3891
      if not rpc.call_blockdev_find(pri_node, dev):
3892
        raise errors.OpExecError("Can't find device %s on node %s" %
3893
                                 (dev.iv_name, pri_node))
3894

    
3895
    # Step: check other node consistency
3896
    self.proc.LogStep(2, steps_total, "check peer consistency")
3897
    for dev in instance.disks:
3898
      if not dev.iv_name in self.op.disks:
3899
        continue
3900
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3901
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3902
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3903
                                 " unsafe to replace the secondary" %
3904
                                 pri_node)
3905

    
3906
    # Step: create new storage
3907
    self.proc.LogStep(3, steps_total, "allocate new storage")
3908
    for dev in instance.disks:
3909
      size = dev.size
3910
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3911
      # since we *always* want to create this LV, we use the
3912
      # _Create...OnPrimary (which forces the creation), even if we
3913
      # are talking about the secondary node
3914
      for new_lv in dev.children:
3915
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3916
                                        _GetInstanceInfoText(instance)):
3917
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3918
                                   " node '%s'" %
3919
                                   (new_lv.logical_id[1], new_node))
3920

    
3921
      iv_names[dev.iv_name] = (dev, dev.children)
3922

    
3923
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3924
    for dev in instance.disks:
3925
      size = dev.size
3926
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3927
      # create new devices on new_node
3928
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3929
                              logical_id=(pri_node, new_node,
3930
                                          dev.logical_id[2]),
3931
                              children=dev.children)
3932
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3933
                                        new_drbd, False,
3934
                                      _GetInstanceInfoText(instance)):
3935
        raise errors.OpExecError("Failed to create new DRBD on"
3936
                                 " node '%s'" % new_node)
3937

    
3938
    for dev in instance.disks:
3939
      # we have new devices, shutdown the drbd on the old secondary
3940
      info("shutting down drbd for %s on old node" % dev.iv_name)
3941
      cfg.SetDiskID(dev, old_node)
3942
      if not rpc.call_blockdev_shutdown(old_node, dev):
3943
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3944
                hint="Please cleanup this device manually as soon as possible")
3945

    
3946
    info("detaching primary drbds from the network (=> standalone)")
3947
    done = 0
3948
    for dev in instance.disks:
3949
      cfg.SetDiskID(dev, pri_node)
3950
      # set the physical (unique in bdev terms) id to None, meaning
3951
      # detach from network
3952
      dev.physical_id = (None,) * len(dev.physical_id)
3953
      # and 'find' the device, which will 'fix' it to match the
3954
      # standalone state
3955
      if rpc.call_blockdev_find(pri_node, dev):
3956
        done += 1
3957
      else:
3958
        warning("Failed to detach drbd %s from network, unusual case" %
3959
                dev.iv_name)
3960

    
3961
    if not done:
3962
      # no detaches succeeded (very unlikely)
3963
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3964

    
3965
    # if we managed to detach at least one, we update all the disks of
3966
    # the instance to point to the new secondary
3967
    info("updating instance configuration")
3968
    for dev in instance.disks:
3969
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3970
      cfg.SetDiskID(dev, pri_node)
3971
    cfg.Update(instance)
3972

    
3973
    # and now perform the drbd attach
3974
    info("attaching primary drbds to new secondary (standalone => connected)")
3975
    failures = []
3976
    for dev in instance.disks:
3977
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3978
      # since the attach is smart, it's enough to 'find' the device,
3979
      # it will automatically activate the network, if the physical_id
3980
      # is correct
3981
      cfg.SetDiskID(dev, pri_node)
3982
      if not rpc.call_blockdev_find(pri_node, dev):
3983
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3984
                "please do a gnt-instance info to see the status of disks")
3985

    
3986
    # this can fail as the old devices are degraded and _WaitForSync
3987
    # does a combined result over all disks, so we don't check its
3988
    # return value
3989
    self.proc.LogStep(5, steps_total, "sync devices")
3990
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3991

    
3992
    # so check manually all the devices
3993
    for name, (dev, old_lvs) in iv_names.iteritems():
3994
      cfg.SetDiskID(dev, pri_node)
3995
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3996
      if is_degr:
3997
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3998

    
3999
    self.proc.LogStep(6, steps_total, "removing old storage")
4000
    for name, (dev, old_lvs) in iv_names.iteritems():
4001
      info("remove logical volumes for %s" % name)
4002
      for lv in old_lvs:
4003
        cfg.SetDiskID(lv, old_node)
4004
        if not rpc.call_blockdev_remove(old_node, lv):
4005
          warning("Can't remove LV on old secondary",
4006
                  hint="Cleanup stale volumes by hand")
4007

    
4008
  def Exec(self, feedback_fn):
4009
    """Execute disk replacement.
4010

4011
    This dispatches the disk replacement to the appropriate handler.
4012

4013
    """
4014
    instance = self.instance
4015

    
4016
    # Activate the instance disks if we're replacing them on a down instance
4017
    if instance.status == "down":
4018
      _StartInstanceDisks(self.cfg, instance, True)
4019

    
4020
    if instance.disk_template == constants.DT_DRBD8:
4021
      if self.op.remote_node is None:
4022
        fn = self._ExecD8DiskOnly
4023
      else:
4024
        fn = self._ExecD8Secondary
4025
    else:
4026
      raise errors.ProgrammerError("Unhandled disk replacement case")
4027

    
4028
    ret = fn(feedback_fn)
4029

    
4030
    # Deactivate the instance disks if we're replacing them on a down instance
4031
    if instance.status == "down":
4032
      _SafeShutdownInstanceDisks(instance, self.cfg)
4033

    
4034
    return ret
4035

    
4036

    
4037
class LUGrowDisk(LogicalUnit):
4038
  """Grow a disk of an instance.
4039

4040
  """
4041
  HPATH = "disk-grow"
4042
  HTYPE = constants.HTYPE_INSTANCE
4043
  _OP_REQP = ["instance_name", "disk", "amount"]
4044
  REQ_BGL = False
4045

    
4046
  def ExpandNames(self):
4047
    self._ExpandAndLockInstance()
4048
    self.needed_locks[locking.LEVEL_NODE] = []
4049
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4050

    
4051
  def DeclareLocks(self, level):
4052
    if level == locking.LEVEL_NODE:
4053
      self._LockInstancesNodes()
4054

    
4055
  def BuildHooksEnv(self):
4056
    """Build hooks env.
4057

4058
    This runs on the master, the primary and all the secondaries.
4059

4060
    """
4061
    env = {
4062
      "DISK": self.op.disk,
4063
      "AMOUNT": self.op.amount,
4064
      }
4065
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4066
    nl = [
4067
      self.sstore.GetMasterNode(),
4068
      self.instance.primary_node,
4069
      ]
4070
    return env, nl, nl
4071

    
4072
  def CheckPrereq(self):
4073
    """Check prerequisites.
4074

4075
    This checks that the instance is in the cluster.
4076

4077
    """
4078
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4079
    assert instance is not None, \
4080
      "Cannot retrieve locked instance %s" % self.op.instance_name
4081

    
4082
    self.instance = instance
4083

    
4084
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4085
      raise errors.OpPrereqError("Instance's disk layout does not support"
4086
                                 " growing.")
4087

    
4088
    if instance.FindDisk(self.op.disk) is None:
4089
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4090
                                 (self.op.disk, instance.name))
4091

    
4092
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4093
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4094
    for node in nodenames:
4095
      info = nodeinfo.get(node, None)
4096
      if not info:
4097
        raise errors.OpPrereqError("Cannot get current information"
4098
                                   " from node '%s'" % node)
4099
      vg_free = info.get('vg_free', None)
4100
      if not isinstance(vg_free, int):
4101
        raise errors.OpPrereqError("Can't compute free disk space on"
4102
                                   " node %s" % node)
4103
      if self.op.amount > info['vg_free']:
4104
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4105
                                   " %d MiB available, %d MiB required" %
4106
                                   (node, info['vg_free'], self.op.amount))
4107

    
4108
  def Exec(self, feedback_fn):
4109
    """Execute disk grow.
4110

4111
    """
4112
    instance = self.instance
4113
    disk = instance.FindDisk(self.op.disk)
4114
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4115
      self.cfg.SetDiskID(disk, node)
4116
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4117
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4118
        raise errors.OpExecError("grow request failed to node %s" % node)
4119
      elif not result[0]:
4120
        raise errors.OpExecError("grow request failed to node %s: %s" %
4121
                                 (node, result[1]))
4122
    disk.RecordGrow(self.op.amount)
4123
    self.cfg.Update(instance)
4124
    return
4125

    
4126

    
4127
class LUQueryInstanceData(NoHooksLU):
4128
  """Query runtime instance data.
4129

4130
  """
4131
  _OP_REQP = ["instances"]
4132

    
4133
  def CheckPrereq(self):
4134
    """Check prerequisites.
4135

4136
    This only checks the optional instance list against the existing names.
4137

4138
    """
4139
    if not isinstance(self.op.instances, list):
4140
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4141
    if self.op.instances:
4142
      self.wanted_instances = []
4143
      names = self.op.instances
4144
      for name in names:
4145
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4146
        if instance is None:
4147
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4148
        self.wanted_instances.append(instance)
4149
    else:
4150
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4151
                               in self.cfg.GetInstanceList()]
4152
    return
4153

    
4154

    
4155
  def _ComputeDiskStatus(self, instance, snode, dev):
4156
    """Compute block device status.
4157

4158
    """
4159
    self.cfg.SetDiskID(dev, instance.primary_node)
4160
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4161
    if dev.dev_type in constants.LDS_DRBD:
4162
      # we change the snode then (otherwise we use the one passed in)
4163
      if dev.logical_id[0] == instance.primary_node:
4164
        snode = dev.logical_id[1]
4165
      else:
4166
        snode = dev.logical_id[0]
4167

    
4168
    if snode:
4169
      self.cfg.SetDiskID(dev, snode)
4170
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4171
    else:
4172
      dev_sstatus = None
4173

    
4174
    if dev.children:
4175
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4176
                      for child in dev.children]
4177
    else:
4178
      dev_children = []
4179

    
4180
    data = {
4181
      "iv_name": dev.iv_name,
4182
      "dev_type": dev.dev_type,
4183
      "logical_id": dev.logical_id,
4184
      "physical_id": dev.physical_id,
4185
      "pstatus": dev_pstatus,
4186
      "sstatus": dev_sstatus,
4187
      "children": dev_children,
4188
      }
4189

    
4190
    return data
4191

    
4192
  def Exec(self, feedback_fn):
4193
    """Gather and return data"""
4194
    result = {}
4195
    for instance in self.wanted_instances:
4196
      remote_info = rpc.call_instance_info(instance.primary_node,
4197
                                                instance.name)
4198
      if remote_info and "state" in remote_info:
4199
        remote_state = "up"
4200
      else:
4201
        remote_state = "down"
4202
      if instance.status == "down":
4203
        config_state = "down"
4204
      else:
4205
        config_state = "up"
4206

    
4207
      disks = [self._ComputeDiskStatus(instance, None, device)
4208
               for device in instance.disks]
4209

    
4210
      idict = {
4211
        "name": instance.name,
4212
        "config_state": config_state,
4213
        "run_state": remote_state,
4214
        "pnode": instance.primary_node,
4215
        "snodes": instance.secondary_nodes,
4216
        "os": instance.os,
4217
        "memory": instance.memory,
4218
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4219
        "disks": disks,
4220
        "vcpus": instance.vcpus,
4221
        }
4222

    
4223
      htkind = self.sstore.GetHypervisorType()
4224
      if htkind == constants.HT_XEN_PVM30:
4225
        idict["kernel_path"] = instance.kernel_path
4226
        idict["initrd_path"] = instance.initrd_path
4227

    
4228
      if htkind == constants.HT_XEN_HVM31:
4229
        idict["hvm_boot_order"] = instance.hvm_boot_order
4230
        idict["hvm_acpi"] = instance.hvm_acpi
4231
        idict["hvm_pae"] = instance.hvm_pae
4232
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4233
        idict["hvm_nic_type"] = instance.hvm_nic_type
4234
        idict["hvm_disk_type"] = instance.hvm_disk_type
4235

    
4236
      if htkind in constants.HTS_REQ_PORT:
4237
        if instance.vnc_bind_address is None:
4238
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4239
        else:
4240
          vnc_bind_address = instance.vnc_bind_address
4241
        if instance.network_port is None:
4242
          vnc_console_port = None
4243
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4244
          vnc_console_port = "%s:%s" % (instance.primary_node,
4245
                                       instance.network_port)
4246
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4247
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4248
                                                   instance.network_port,
4249
                                                   instance.primary_node)
4250
        else:
4251
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4252
                                        instance.network_port)
4253
        idict["vnc_console_port"] = vnc_console_port
4254
        idict["vnc_bind_address"] = vnc_bind_address
4255
        idict["network_port"] = instance.network_port
4256

    
4257
      result[instance.name] = idict
4258

    
4259
    return result
4260

    
4261

    
4262
class LUSetInstanceParams(LogicalUnit):
4263
  """Modifies an instances's parameters.
4264

4265
  """
4266
  HPATH = "instance-modify"
4267
  HTYPE = constants.HTYPE_INSTANCE
4268
  _OP_REQP = ["instance_name"]
4269
  REQ_BGL = False
4270

    
4271
  def ExpandNames(self):
4272
    self._ExpandAndLockInstance()
4273

    
4274
  def BuildHooksEnv(self):
4275
    """Build hooks env.
4276

4277
    This runs on the master, primary and secondaries.
4278

4279
    """
4280
    args = dict()
4281
    if self.mem:
4282
      args['memory'] = self.mem
4283
    if self.vcpus:
4284
      args['vcpus'] = self.vcpus
4285
    if self.do_ip or self.do_bridge or self.mac:
4286
      if self.do_ip:
4287
        ip = self.ip
4288
      else:
4289
        ip = self.instance.nics[0].ip
4290
      if self.bridge:
4291
        bridge = self.bridge
4292
      else:
4293
        bridge = self.instance.nics[0].bridge
4294
      if self.mac:
4295
        mac = self.mac
4296
      else:
4297
        mac = self.instance.nics[0].mac
4298
      args['nics'] = [(ip, bridge, mac)]
4299
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4300
    nl = [self.sstore.GetMasterNode(),
4301
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4302
    return env, nl, nl
4303

    
4304
  def CheckPrereq(self):
4305
    """Check prerequisites.
4306

4307
    This only checks the instance list against the existing names.
4308

4309
    """
4310
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4311
    # a separate CheckArguments function, if we implement one, so the operation
4312
    # can be aborted without waiting for any lock, should it have an error...
4313
    self.mem = getattr(self.op, "mem", None)
4314
    self.vcpus = getattr(self.op, "vcpus", None)
4315
    self.ip = getattr(self.op, "ip", None)
4316
    self.mac = getattr(self.op, "mac", None)
4317
    self.bridge = getattr(self.op, "bridge", None)
4318
    self.kernel_path = getattr(self.op, "kernel_path", None)
4319
    self.initrd_path = getattr(self.op, "initrd_path", None)
4320
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4321
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4322
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4323
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4324
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4325
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4326
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4327
    self.force = getattr(self.op, "force", None)
4328
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4329
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4330
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4331
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4332
    if all_parms.count(None) == len(all_parms):
4333
      raise errors.OpPrereqError("No changes submitted")
4334
    if self.mem is not None:
4335
      try:
4336
        self.mem = int(self.mem)
4337
      except ValueError, err:
4338
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4339
    if self.vcpus is not None:
4340
      try:
4341
        self.vcpus = int(self.vcpus)
4342
      except ValueError, err:
4343
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4344
    if self.ip is not None:
4345
      self.do_ip = True
4346
      if self.ip.lower() == "none":
4347
        self.ip = None
4348
      else:
4349
        if not utils.IsValidIP(self.ip):
4350
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4351
    else:
4352
      self.do_ip = False
4353
    self.do_bridge = (self.bridge is not None)
4354
    if self.mac is not None:
4355
      if self.cfg.IsMacInUse(self.mac):
4356
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4357
                                   self.mac)
4358
      if not utils.IsValidMac(self.mac):
4359
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4360

    
4361
    if self.kernel_path is not None:
4362
      self.do_kernel_path = True
4363
      if self.kernel_path == constants.VALUE_NONE:
4364
        raise errors.OpPrereqError("Can't set instance to no kernel")
4365

    
4366
      if self.kernel_path != constants.VALUE_DEFAULT:
4367
        if not os.path.isabs(self.kernel_path):
4368
          raise errors.OpPrereqError("The kernel path must be an absolute"
4369
                                    " filename")
4370
    else:
4371
      self.do_kernel_path = False
4372

    
4373
    if self.initrd_path is not None:
4374
      self.do_initrd_path = True
4375
      if self.initrd_path not in (constants.VALUE_NONE,
4376
                                  constants.VALUE_DEFAULT):
4377
        if not os.path.isabs(self.initrd_path):
4378
          raise errors.OpPrereqError("The initrd path must be an absolute"
4379
                                    " filename")
4380
    else:
4381
      self.do_initrd_path = False
4382

    
4383
    # boot order verification
4384
    if self.hvm_boot_order is not None:
4385
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4386
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4387
          raise errors.OpPrereqError("invalid boot order specified,"
4388
                                     " must be one or more of [acdn]"
4389
                                     " or 'default'")
4390

    
4391
    # hvm_cdrom_image_path verification
4392
    if self.op.hvm_cdrom_image_path is not None:
4393
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4394
              self.op.hvm_cdrom_image_path.lower() == "none"):
4395
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4396
                                   " be an absolute path or None, not %s" %
4397
                                   self.op.hvm_cdrom_image_path)
4398
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4399
              self.op.hvm_cdrom_image_path.lower() == "none"):
4400
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4401
                                   " regular file or a symlink pointing to"
4402
                                   " an existing regular file, not %s" %
4403
                                   self.op.hvm_cdrom_image_path)
4404

    
4405
    # vnc_bind_address verification
4406
    if self.op.vnc_bind_address is not None:
4407
      if not utils.IsValidIP(self.op.vnc_bind_address):
4408
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4409
                                   " like a valid IP address" %
4410
                                   self.op.vnc_bind_address)
4411

    
4412
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4413
    assert self.instance is not None, \
4414
      "Cannot retrieve locked instance %s" % self.op.instance_name
4415
    self.warn = []
4416
    if self.mem is not None and not self.force:
4417
      pnode = self.instance.primary_node
4418
      nodelist = [pnode]
4419
      nodelist.extend(instance.secondary_nodes)
4420
      instance_info = rpc.call_instance_info(pnode, instance.name)
4421
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4422

    
4423
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4424
        # Assume the primary node is unreachable and go ahead
4425
        self.warn.append("Can't get info from primary node %s" % pnode)
4426
      else:
4427
        if instance_info:
4428
          current_mem = instance_info['memory']
4429
        else:
4430
          # Assume instance not running
4431
          # (there is a slight race condition here, but it's not very probable,
4432
          # and we have no other way to check)
4433
          current_mem = 0
4434
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4435
        if miss_mem > 0:
4436
          raise errors.OpPrereqError("This change will prevent the instance"
4437
                                     " from starting, due to %d MB of memory"
4438
                                     " missing on its primary node" % miss_mem)
4439

    
4440
      for node in instance.secondary_nodes:
4441
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4442
          self.warn.append("Can't get info from secondary node %s" % node)
4443
        elif self.mem > nodeinfo[node]['memory_free']:
4444
          self.warn.append("Not enough memory to failover instance to secondary"
4445
                           " node %s" % node)
4446

    
4447
    # Xen HVM device type checks
4448
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4449
      if self.op.hvm_nic_type is not None:
4450
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4451
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4452
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4453
      if self.op.hvm_disk_type is not None:
4454
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4455
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4456
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4457

    
4458
    return
4459

    
4460
  def Exec(self, feedback_fn):
4461
    """Modifies an instance.
4462

4463
    All parameters take effect only at the next restart of the instance.
4464
    """
4465
    # Process here the warnings from CheckPrereq, as we don't have a
4466
    # feedback_fn there.
4467
    for warn in self.warn:
4468
      feedback_fn("WARNING: %s" % warn)
4469

    
4470
    result = []
4471
    instance = self.instance
4472
    if self.mem:
4473
      instance.memory = self.mem
4474
      result.append(("mem", self.mem))
4475
    if self.vcpus:
4476
      instance.vcpus = self.vcpus
4477
      result.append(("vcpus",  self.vcpus))
4478
    if self.do_ip:
4479
      instance.nics[0].ip = self.ip
4480
      result.append(("ip", self.ip))
4481
    if self.bridge:
4482
      instance.nics[0].bridge = self.bridge
4483
      result.append(("bridge", self.bridge))
4484
    if self.mac:
4485
      instance.nics[0].mac = self.mac
4486
      result.append(("mac", self.mac))
4487
    if self.do_kernel_path:
4488
      instance.kernel_path = self.kernel_path
4489
      result.append(("kernel_path", self.kernel_path))
4490
    if self.do_initrd_path:
4491
      instance.initrd_path = self.initrd_path
4492
      result.append(("initrd_path", self.initrd_path))
4493
    if self.hvm_boot_order:
4494
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4495
        instance.hvm_boot_order = None
4496
      else:
4497
        instance.hvm_boot_order = self.hvm_boot_order
4498
      result.append(("hvm_boot_order", self.hvm_boot_order))
4499
    if self.hvm_acpi is not None:
4500
      instance.hvm_acpi = self.hvm_acpi
4501
      result.append(("hvm_acpi", self.hvm_acpi))
4502
    if self.hvm_pae is not None:
4503
      instance.hvm_pae = self.hvm_pae
4504
      result.append(("hvm_pae", self.hvm_pae))
4505
    if self.hvm_nic_type is not None:
4506
      instance.hvm_nic_type = self.hvm_nic_type
4507
      result.append(("hvm_nic_type", self.hvm_nic_type))
4508
    if self.hvm_disk_type is not None:
4509
      instance.hvm_disk_type = self.hvm_disk_type
4510
      result.append(("hvm_disk_type", self.hvm_disk_type))
4511
    if self.hvm_cdrom_image_path:
4512
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4513
        instance.hvm_cdrom_image_path = None
4514
      else:
4515
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4516
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4517
    if self.vnc_bind_address:
4518
      instance.vnc_bind_address = self.vnc_bind_address
4519
      result.append(("vnc_bind_address", self.vnc_bind_address))
4520

    
4521
    self.cfg.Update(instance)
4522

    
4523
    return result
4524

    
4525

    
4526
class LUQueryExports(NoHooksLU):
4527
  """Query the exports list
4528

4529
  """
4530
  _OP_REQP = ['nodes']
4531
  REQ_BGL = False
4532

    
4533
  def ExpandNames(self):
4534
    self.needed_locks = {}
4535
    self.share_locks[locking.LEVEL_NODE] = 1
4536
    if not self.op.nodes:
4537
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4538
    else:
4539
      self.needed_locks[locking.LEVEL_NODE] = \
4540
        _GetWantedNodes(self, self.op.nodes)
4541

    
4542
  def CheckPrereq(self):
4543
    """Check prerequisites.
4544

4545
    """
4546
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4547

    
4548
  def Exec(self, feedback_fn):
4549
    """Compute the list of all the exported system images.
4550

4551
    Returns:
4552
      a dictionary with the structure node->(export-list)
4553
      where export-list is a list of the instances exported on
4554
      that node.
4555

4556
    """
4557
    return rpc.call_export_list(self.nodes)
4558

    
4559

    
4560
class LUExportInstance(LogicalUnit):
4561
  """Export an instance to an image in the cluster.
4562

4563
  """
4564
  HPATH = "instance-export"
4565
  HTYPE = constants.HTYPE_INSTANCE
4566
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4567
  REQ_BGL = False
4568

    
4569
  def ExpandNames(self):
4570
    self._ExpandAndLockInstance()
4571
    # FIXME: lock only instance primary and destination node
4572
    #
4573
    # Sad but true, for now we have do lock all nodes, as we don't know where
4574
    # the previous export might be, and and in this LU we search for it and
4575
    # remove it from its current node. In the future we could fix this by:
4576
    #  - making a tasklet to search (share-lock all), then create the new one,
4577
    #    then one to remove, after
4578
    #  - removing the removal operation altoghether
4579
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4580

    
4581
  def DeclareLocks(self, level):
4582
    """Last minute lock declaration."""
4583
    # All nodes are locked anyway, so nothing to do here.
4584

    
4585
  def BuildHooksEnv(self):
4586
    """Build hooks env.
4587

4588
    This will run on the master, primary node and target node.
4589

4590
    """
4591
    env = {
4592
      "EXPORT_NODE": self.op.target_node,
4593
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4594
      }
4595
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4596
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4597
          self.op.target_node]
4598
    return env, nl, nl
4599

    
4600
  def CheckPrereq(self):
4601
    """Check prerequisites.
4602

4603
    This checks that the instance and node names are valid.
4604

4605
    """
4606
    instance_name = self.op.instance_name
4607
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4608
    assert self.instance is not None, \
4609
          "Cannot retrieve locked instance %s" % self.op.instance_name
4610

    
4611
    self.dst_node = self.cfg.GetNodeInfo(
4612
      self.cfg.ExpandNodeName(self.op.target_node))
4613

    
4614
    assert self.dst_node is not None, \
4615
          "Cannot retrieve locked node %s" % self.op.target_node
4616

    
4617
    # instance disk type verification
4618
    for disk in self.instance.disks:
4619
      if disk.dev_type == constants.LD_FILE:
4620
        raise errors.OpPrereqError("Export not supported for instances with"
4621
                                   " file-based disks")
4622

    
4623
  def Exec(self, feedback_fn):
4624
    """Export an instance to an image in the cluster.
4625

4626
    """
4627
    instance = self.instance
4628
    dst_node = self.dst_node
4629
    src_node = instance.primary_node
4630
    if self.op.shutdown:
4631
      # shutdown the instance, but not the disks
4632
      if not rpc.call_instance_shutdown(src_node, instance):
4633
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4634
                                 (instance.name, src_node))
4635

    
4636
    vgname = self.cfg.GetVGName()
4637

    
4638
    snap_disks = []
4639

    
4640
    try:
4641
      for disk in instance.disks:
4642
        if disk.iv_name == "sda":
4643
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4644
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4645

    
4646
          if not new_dev_name:
4647
            logger.Error("could not snapshot block device %s on node %s" %
4648
                         (disk.logical_id[1], src_node))
4649
          else:
4650
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4651
                                      logical_id=(vgname, new_dev_name),
4652
                                      physical_id=(vgname, new_dev_name),
4653
                                      iv_name=disk.iv_name)
4654
            snap_disks.append(new_dev)
4655

    
4656
    finally:
4657
      if self.op.shutdown and instance.status == "up":
4658
        if not rpc.call_instance_start(src_node, instance, None):
4659
          _ShutdownInstanceDisks(instance, self.cfg)
4660
          raise errors.OpExecError("Could not start instance")
4661

    
4662
    # TODO: check for size
4663

    
4664
    for dev in snap_disks:
4665
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4666
        logger.Error("could not export block device %s from node %s to node %s"
4667
                     % (dev.logical_id[1], src_node, dst_node.name))
4668
      if not rpc.call_blockdev_remove(src_node, dev):
4669
        logger.Error("could not remove snapshot block device %s from node %s" %
4670
                     (dev.logical_id[1], src_node))
4671

    
4672
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4673
      logger.Error("could not finalize export for instance %s on node %s" %
4674
                   (instance.name, dst_node.name))
4675

    
4676
    nodelist = self.cfg.GetNodeList()
4677
    nodelist.remove(dst_node.name)
4678

    
4679
    # on one-node clusters nodelist will be empty after the removal
4680
    # if we proceed the backup would be removed because OpQueryExports
4681
    # substitutes an empty list with the full cluster node list.
4682
    if nodelist:
4683
      exportlist = rpc.call_export_list(nodelist)
4684
      for node in exportlist:
4685
        if instance.name in exportlist[node]:
4686
          if not rpc.call_export_remove(node, instance.name):
4687
            logger.Error("could not remove older export for instance %s"
4688
                         " on node %s" % (instance.name, node))
4689

    
4690

    
4691
class LURemoveExport(NoHooksLU):
4692
  """Remove exports related to the named instance.
4693

4694
  """
4695
  _OP_REQP = ["instance_name"]
4696

    
4697
  def CheckPrereq(self):
4698
    """Check prerequisites.
4699
    """
4700
    pass
4701

    
4702
  def Exec(self, feedback_fn):
4703
    """Remove any export.
4704

4705
    """
4706
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4707
    # If the instance was not found we'll try with the name that was passed in.
4708
    # This will only work if it was an FQDN, though.
4709
    fqdn_warn = False
4710
    if not instance_name:
4711
      fqdn_warn = True
4712
      instance_name = self.op.instance_name
4713

    
4714
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4715
    found = False
4716
    for node in exportlist:
4717
      if instance_name in exportlist[node]:
4718
        found = True
4719
        if not rpc.call_export_remove(node, instance_name):
4720
          logger.Error("could not remove export for instance %s"
4721
                       " on node %s" % (instance_name, node))
4722

    
4723
    if fqdn_warn and not found:
4724
      feedback_fn("Export not found. If trying to remove an export belonging"
4725
                  " to a deleted instance please use its Fully Qualified"
4726
                  " Domain Name.")
4727

    
4728

    
4729
class TagsLU(NoHooksLU):
4730
  """Generic tags LU.
4731

4732
  This is an abstract class which is the parent of all the other tags LUs.
4733

4734
  """
4735
  def CheckPrereq(self):
4736
    """Check prerequisites.
4737

4738
    """
4739
    if self.op.kind == constants.TAG_CLUSTER:
4740
      self.target = self.cfg.GetClusterInfo()
4741
    elif self.op.kind == constants.TAG_NODE:
4742
      name = self.cfg.ExpandNodeName(self.op.name)
4743
      if name is None:
4744
        raise errors.OpPrereqError("Invalid node name (%s)" %
4745
                                   (self.op.name,))
4746
      self.op.name = name
4747
      self.target = self.cfg.GetNodeInfo(name)
4748
    elif self.op.kind == constants.TAG_INSTANCE:
4749
      name = self.cfg.ExpandInstanceName(self.op.name)
4750
      if name is None:
4751
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4752
                                   (self.op.name,))
4753
      self.op.name = name
4754
      self.target = self.cfg.GetInstanceInfo(name)
4755
    else:
4756
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4757
                                 str(self.op.kind))
4758

    
4759

    
4760
class LUGetTags(TagsLU):
4761
  """Returns the tags of a given object.
4762

4763
  """
4764
  _OP_REQP = ["kind", "name"]
4765

    
4766
  def Exec(self, feedback_fn):
4767
    """Returns the tag list.
4768

4769
    """
4770
    return list(self.target.GetTags())
4771

    
4772

    
4773
class LUSearchTags(NoHooksLU):
4774
  """Searches the tags for a given pattern.
4775

4776
  """
4777
  _OP_REQP = ["pattern"]
4778

    
4779
  def CheckPrereq(self):
4780
    """Check prerequisites.
4781

4782
    This checks the pattern passed for validity by compiling it.
4783

4784
    """
4785
    try:
4786
      self.re = re.compile(self.op.pattern)
4787
    except re.error, err:
4788
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4789
                                 (self.op.pattern, err))
4790

    
4791
  def Exec(self, feedback_fn):
4792
    """Returns the tag list.
4793

4794
    """
4795
    cfg = self.cfg
4796
    tgts = [("/cluster", cfg.GetClusterInfo())]
4797
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4798
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4799
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4800
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4801
    results = []
4802
    for path, target in tgts:
4803
      for tag in target.GetTags():
4804
        if self.re.search(tag):
4805
          results.append((path, tag))
4806
    return results
4807

    
4808

    
4809
class LUAddTags(TagsLU):
4810
  """Sets a tag on a given object.
4811

4812
  """
4813
  _OP_REQP = ["kind", "name", "tags"]
4814

    
4815
  def CheckPrereq(self):
4816
    """Check prerequisites.
4817

4818
    This checks the type and length of the tag name and value.
4819

4820
    """
4821
    TagsLU.CheckPrereq(self)
4822
    for tag in self.op.tags:
4823
      objects.TaggableObject.ValidateTag(tag)
4824

    
4825
  def Exec(self, feedback_fn):
4826
    """Sets the tag.
4827

4828
    """
4829
    try:
4830
      for tag in self.op.tags:
4831
        self.target.AddTag(tag)
4832
    except errors.TagError, err:
4833
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4834
    try:
4835
      self.cfg.Update(self.target)
4836
    except errors.ConfigurationError:
4837
      raise errors.OpRetryError("There has been a modification to the"
4838
                                " config file and the operation has been"
4839
                                " aborted. Please retry.")
4840

    
4841

    
4842
class LUDelTags(TagsLU):
4843
  """Delete a list of tags from a given object.
4844

4845
  """
4846
  _OP_REQP = ["kind", "name", "tags"]
4847

    
4848
  def CheckPrereq(self):
4849
    """Check prerequisites.
4850

4851
    This checks that we have the given tag.
4852

4853
    """
4854
    TagsLU.CheckPrereq(self)
4855
    for tag in self.op.tags:
4856
      objects.TaggableObject.ValidateTag(tag)
4857
    del_tags = frozenset(self.op.tags)
4858
    cur_tags = self.target.GetTags()
4859
    if not del_tags <= cur_tags:
4860
      diff_tags = del_tags - cur_tags
4861
      diff_names = ["'%s'" % tag for tag in diff_tags]
4862
      diff_names.sort()
4863
      raise errors.OpPrereqError("Tag(s) %s not found" %
4864
                                 (",".join(diff_names)))
4865

    
4866
  def Exec(self, feedback_fn):
4867
    """Remove the tag from the object.
4868

4869
    """
4870
    for tag in self.op.tags:
4871
      self.target.RemoveTag(tag)
4872
    try:
4873
      self.cfg.Update(self.target)
4874
    except errors.ConfigurationError:
4875
      raise errors.OpRetryError("There has been a modification to the"
4876
                                " config file and the operation has been"
4877
                                " aborted. Please retry.")
4878

    
4879

    
4880
class LUTestDelay(NoHooksLU):
4881
  """Sleep for a specified amount of time.
4882

4883
  This LU sleeps on the master and/or nodes for a specified amount of
4884
  time.
4885

4886
  """
4887
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4888
  REQ_BGL = False
4889

    
4890
  def ExpandNames(self):
4891
    """Expand names and set required locks.
4892

4893
    This expands the node list, if any.
4894

4895
    """
4896
    self.needed_locks = {}
4897
    if self.op.on_nodes:
4898
      # _GetWantedNodes can be used here, but is not always appropriate to use
4899
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4900
      # more information.
4901
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4902
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4903

    
4904
  def CheckPrereq(self):
4905
    """Check prerequisites.
4906

4907
    """
4908

    
4909
  def Exec(self, feedback_fn):
4910
    """Do the actual sleep.
4911

4912
    """
4913
    if self.op.on_master:
4914
      if not utils.TestDelay(self.op.duration):
4915
        raise errors.OpExecError("Error during master delay test")
4916
    if self.op.on_nodes:
4917
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4918
      if not result:
4919
        raise errors.OpExecError("Complete failure from rpc call")
4920
      for node, node_result in result.items():
4921
        if not node_result:
4922
          raise errors.OpExecError("Failure during rpc call to node %s,"
4923
                                   " result: %s" % (node, node_result))
4924

    
4925

    
4926
class IAllocator(object):
4927
  """IAllocator framework.
4928

4929
  An IAllocator instance has three sets of attributes:
4930
    - cfg/sstore that are needed to query the cluster
4931
    - input data (all members of the _KEYS class attribute are required)
4932
    - four buffer attributes (in|out_data|text), that represent the
4933
      input (to the external script) in text and data structure format,
4934
      and the output from it, again in two formats
4935
    - the result variables from the script (success, info, nodes) for
4936
      easy usage
4937

4938
  """
4939
  _ALLO_KEYS = [
4940
    "mem_size", "disks", "disk_template",
4941
    "os", "tags", "nics", "vcpus",
4942
    ]
4943
  _RELO_KEYS = [
4944
    "relocate_from",
4945
    ]
4946

    
4947
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4948
    self.cfg = cfg
4949
    self.sstore = sstore
4950
    # init buffer variables
4951
    self.in_text = self.out_text = self.in_data = self.out_data = None
4952
    # init all input fields so that pylint is happy
4953
    self.mode = mode
4954
    self.name = name
4955
    self.mem_size = self.disks = self.disk_template = None
4956
    self.os = self.tags = self.nics = self.vcpus = None
4957
    self.relocate_from = None
4958
    # computed fields
4959
    self.required_nodes = None
4960
    # init result fields
4961
    self.success = self.info = self.nodes = None
4962
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4963
      keyset = self._ALLO_KEYS
4964
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4965
      keyset = self._RELO_KEYS
4966
    else:
4967
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4968
                                   " IAllocator" % self.mode)
4969
    for key in kwargs:
4970
      if key not in keyset:
4971
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4972
                                     " IAllocator" % key)
4973
      setattr(self, key, kwargs[key])
4974
    for key in keyset:
4975
      if key not in kwargs:
4976
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4977
                                     " IAllocator" % key)
4978
    self._BuildInputData()
4979

    
4980
  def _ComputeClusterData(self):
4981
    """Compute the generic allocator input data.
4982

4983
    This is the data that is independent of the actual operation.
4984

4985
    """
4986
    cfg = self.cfg
4987
    # cluster data
4988
    data = {
4989
      "version": 1,
4990
      "cluster_name": self.sstore.GetClusterName(),
4991
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4992
      "hypervisor_type": self.sstore.GetHypervisorType(),
4993
      # we don't have job IDs
4994
      }
4995

    
4996
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4997

    
4998
    # node data
4999
    node_results = {}
5000
    node_list = cfg.GetNodeList()
5001
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5002
    for nname in node_list:
5003
      ninfo = cfg.GetNodeInfo(nname)
5004
      if nname not in node_data or not isinstance(node_data[nname], dict):
5005
        raise errors.OpExecError("Can't get data for node %s" % nname)
5006
      remote_info = node_data[nname]
5007
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5008
                   'vg_size', 'vg_free', 'cpu_total']:
5009
        if attr not in remote_info:
5010
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5011
                                   (nname, attr))
5012
        try:
5013
          remote_info[attr] = int(remote_info[attr])
5014
        except ValueError, err:
5015
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5016
                                   " %s" % (nname, attr, str(err)))
5017
      # compute memory used by primary instances
5018
      i_p_mem = i_p_up_mem = 0
5019
      for iinfo in i_list:
5020
        if iinfo.primary_node == nname:
5021
          i_p_mem += iinfo.memory
5022
          if iinfo.status == "up":
5023
            i_p_up_mem += iinfo.memory
5024

    
5025
      # compute memory used by instances
5026
      pnr = {
5027
        "tags": list(ninfo.GetTags()),
5028
        "total_memory": remote_info['memory_total'],
5029
        "reserved_memory": remote_info['memory_dom0'],
5030
        "free_memory": remote_info['memory_free'],
5031
        "i_pri_memory": i_p_mem,
5032
        "i_pri_up_memory": i_p_up_mem,
5033
        "total_disk": remote_info['vg_size'],
5034
        "free_disk": remote_info['vg_free'],
5035
        "primary_ip": ninfo.primary_ip,
5036
        "secondary_ip": ninfo.secondary_ip,
5037
        "total_cpus": remote_info['cpu_total'],
5038
        }
5039
      node_results[nname] = pnr
5040
    data["nodes"] = node_results
5041

    
5042
    # instance data
5043
    instance_data = {}
5044
    for iinfo in i_list:
5045
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5046
                  for n in iinfo.nics]
5047
      pir = {
5048
        "tags": list(iinfo.GetTags()),
5049
        "should_run": iinfo.status == "up",
5050
        "vcpus": iinfo.vcpus,
5051
        "memory": iinfo.memory,
5052
        "os": iinfo.os,
5053
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5054
        "nics": nic_data,
5055
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5056
        "disk_template": iinfo.disk_template,
5057
        }
5058
      instance_data[iinfo.name] = pir
5059

    
5060
    data["instances"] = instance_data
5061

    
5062
    self.in_data = data
5063

    
5064
  def _AddNewInstance(self):
5065
    """Add new instance data to allocator structure.
5066

5067
    This in combination with _AllocatorGetClusterData will create the
5068
    correct structure needed as input for the allocator.
5069

5070
    The checks for the completeness of the opcode must have already been
5071
    done.
5072

5073
    """
5074
    data = self.in_data
5075
    if len(self.disks) != 2:
5076
      raise errors.OpExecError("Only two-disk configurations supported")
5077

    
5078
    disk_space = _ComputeDiskSize(self.disk_template,
5079
                                  self.disks[0]["size"], self.disks[1]["size"])
5080

    
5081
    if self.disk_template in constants.DTS_NET_MIRROR:
5082
      self.required_nodes = 2
5083
    else:
5084
      self.required_nodes = 1
5085
    request = {
5086
      "type": "allocate",
5087
      "name": self.name,
5088
      "disk_template": self.disk_template,
5089
      "tags": self.tags,
5090
      "os": self.os,
5091
      "vcpus": self.vcpus,
5092
      "memory": self.mem_size,
5093
      "disks": self.disks,
5094
      "disk_space_total": disk_space,
5095
      "nics": self.nics,
5096
      "required_nodes": self.required_nodes,
5097
      }
5098
    data["request"] = request
5099

    
5100
  def _AddRelocateInstance(self):
5101
    """Add relocate instance data to allocator structure.
5102

5103
    This in combination with _IAllocatorGetClusterData will create the
5104
    correct structure needed as input for the allocator.
5105

5106
    The checks for the completeness of the opcode must have already been
5107
    done.
5108

5109
    """
5110
    instance = self.cfg.GetInstanceInfo(self.name)
5111
    if instance is None:
5112
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5113
                                   " IAllocator" % self.name)
5114

    
5115
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5116
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5117

    
5118
    if len(instance.secondary_nodes) != 1:
5119
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5120

    
5121
    self.required_nodes = 1
5122

    
5123
    disk_space = _ComputeDiskSize(instance.disk_template,
5124
                                  instance.disks[0].size,
5125
                                  instance.disks[1].size)
5126

    
5127
    request = {
5128
      "type": "relocate",
5129
      "name": self.name,
5130
      "disk_space_total": disk_space,
5131
      "required_nodes": self.required_nodes,
5132
      "relocate_from": self.relocate_from,
5133
      }
5134
    self.in_data["request"] = request
5135

    
5136
  def _BuildInputData(self):
5137
    """Build input data structures.
5138

5139
    """
5140
    self._ComputeClusterData()
5141

    
5142
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5143
      self._AddNewInstance()
5144
    else:
5145
      self._AddRelocateInstance()
5146

    
5147
    self.in_text = serializer.Dump(self.in_data)
5148

    
5149
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5150
    """Run an instance allocator and return the results.
5151

5152
    """
5153
    data = self.in_text
5154

    
5155
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5156

    
5157
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5158
      raise errors.OpExecError("Invalid result from master iallocator runner")
5159

    
5160
    rcode, stdout, stderr, fail = result
5161

    
5162
    if rcode == constants.IARUN_NOTFOUND:
5163
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5164
    elif rcode == constants.IARUN_FAILURE:
5165
      raise errors.OpExecError("Instance allocator call failed: %s,"
5166
                               " output: %s" % (fail, stdout+stderr))
5167
    self.out_text = stdout
5168
    if validate:
5169
      self._ValidateResult()
5170

    
5171
  def _ValidateResult(self):
5172
    """Process the allocator results.
5173

5174
    This will process and if successful save the result in
5175
    self.out_data and the other parameters.
5176

5177
    """
5178
    try:
5179
      rdict = serializer.Load(self.out_text)
5180
    except Exception, err:
5181
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5182

    
5183
    if not isinstance(rdict, dict):
5184
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5185

    
5186
    for key in "success", "info", "nodes":
5187
      if key not in rdict:
5188
        raise errors.OpExecError("Can't parse iallocator results:"
5189
                                 " missing key '%s'" % key)
5190
      setattr(self, key, rdict[key])
5191

    
5192
    if not isinstance(rdict["nodes"], list):
5193
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5194
                               " is not a list")
5195
    self.out_data = rdict
5196

    
5197

    
5198
class LUTestAllocator(NoHooksLU):
5199
  """Run allocator tests.
5200

5201
  This LU runs the allocator tests
5202

5203
  """
5204
  _OP_REQP = ["direction", "mode", "name"]
5205

    
5206
  def CheckPrereq(self):
5207
    """Check prerequisites.
5208

5209
    This checks the opcode parameters depending on the director and mode test.
5210

5211
    """
5212
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5213
      for attr in ["name", "mem_size", "disks", "disk_template",
5214
                   "os", "tags", "nics", "vcpus"]:
5215
        if not hasattr(self.op, attr):
5216
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5217
                                     attr)
5218
      iname = self.cfg.ExpandInstanceName(self.op.name)
5219
      if iname is not None:
5220
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5221
                                   iname)
5222
      if not isinstance(self.op.nics, list):
5223
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5224
      for row in self.op.nics:
5225
        if (not isinstance(row, dict) or
5226
            "mac" not in row or
5227
            "ip" not in row or
5228
            "bridge" not in row):
5229
          raise errors.OpPrereqError("Invalid contents of the"
5230
                                     " 'nics' parameter")
5231
      if not isinstance(self.op.disks, list):
5232
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5233
      if len(self.op.disks) != 2:
5234
        raise errors.OpPrereqError("Only two-disk configurations supported")
5235
      for row in self.op.disks:
5236
        if (not isinstance(row, dict) or
5237
            "size" not in row or
5238
            not isinstance(row["size"], int) or
5239
            "mode" not in row or
5240
            row["mode"] not in ['r', 'w']):
5241
          raise errors.OpPrereqError("Invalid contents of the"
5242
                                     " 'disks' parameter")
5243
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5244
      if not hasattr(self.op, "name"):
5245
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5246
      fname = self.cfg.ExpandInstanceName(self.op.name)
5247
      if fname is None:
5248
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5249
                                   self.op.name)
5250
      self.op.name = fname
5251
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5252
    else:
5253
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5254
                                 self.op.mode)
5255

    
5256
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5257
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5258
        raise errors.OpPrereqError("Missing allocator name")
5259
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5260
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5261
                                 self.op.direction)
5262

    
5263
  def Exec(self, feedback_fn):
5264
    """Run the allocator test.
5265

5266
    """
5267
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5268
      ial = IAllocator(self.cfg, self.sstore,
5269
                       mode=self.op.mode,
5270
                       name=self.op.name,
5271
                       mem_size=self.op.mem_size,
5272
                       disks=self.op.disks,
5273
                       disk_template=self.op.disk_template,
5274
                       os=self.op.os,
5275
                       tags=self.op.tags,
5276
                       nics=self.op.nics,
5277
                       vcpus=self.op.vcpus,
5278
                       )
5279
    else:
5280
      ial = IAllocator(self.cfg, self.sstore,
5281
                       mode=self.op.mode,
5282
                       name=self.op.name,
5283
                       relocate_from=list(self.relocate_from),
5284
                       )
5285

    
5286
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5287
      result = ial.in_text
5288
    else:
5289
      ial.Run(self.op.allocator, validate=False)
5290
      result = ial.out_text
5291
    return result