Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7baf741d

History | View | Annotate | Download (184.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104

    
1105
  def BuildHooksEnv(self):
1106
    """Build hooks env.
1107

1108
    """
1109
    env = {
1110
      "OP_TARGET": self.sstore.GetClusterName(),
1111
      "NEW_VG_NAME": self.op.vg_name,
1112
      }
1113
    mn = self.sstore.GetMasterNode()
1114
    return env, [mn], [mn]
1115

    
1116
  def CheckPrereq(self):
1117
    """Check prerequisites.
1118

1119
    This checks whether the given params don't conflict and
1120
    if the given volume group is valid.
1121

1122
    """
1123
    if not self.op.vg_name:
1124
      instances = [self.cfg.GetInstanceInfo(name)
1125
                   for name in self.cfg.GetInstanceList()]
1126
      for inst in instances:
1127
        for disk in inst.disks:
1128
          if _RecursiveCheckIfLVMBased(disk):
1129
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1130
                                       " lvm-based instances exist")
1131

    
1132
    # if vg_name not None, checks given volume group on all nodes
1133
    if self.op.vg_name:
1134
      node_list = self.cfg.GetNodeList()
1135
      vglist = rpc.call_vg_list(node_list)
1136
      for node in node_list:
1137
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1138
                                              constants.MIN_VG_SIZE)
1139
        if vgstatus:
1140
          raise errors.OpPrereqError("Error on node '%s': %s" %
1141
                                     (node, vgstatus))
1142

    
1143
  def Exec(self, feedback_fn):
1144
    """Change the parameters of the cluster.
1145

1146
    """
1147
    if self.op.vg_name != self.cfg.GetVGName():
1148
      self.cfg.SetVGName(self.op.vg_name)
1149
    else:
1150
      feedback_fn("Cluster LVM configuration already in desired"
1151
                  " state, not changing")
1152

    
1153

    
1154
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1155
  """Sleep and poll for an instance's disk to sync.
1156

1157
  """
1158
  if not instance.disks:
1159
    return True
1160

    
1161
  if not oneshot:
1162
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1163

    
1164
  node = instance.primary_node
1165

    
1166
  for dev in instance.disks:
1167
    cfgw.SetDiskID(dev, node)
1168

    
1169
  retries = 0
1170
  while True:
1171
    max_time = 0
1172
    done = True
1173
    cumul_degraded = False
1174
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1175
    if not rstats:
1176
      proc.LogWarning("Can't get any data from node %s" % node)
1177
      retries += 1
1178
      if retries >= 10:
1179
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1180
                                 " aborting." % node)
1181
      time.sleep(6)
1182
      continue
1183
    retries = 0
1184
    for i in range(len(rstats)):
1185
      mstat = rstats[i]
1186
      if mstat is None:
1187
        proc.LogWarning("Can't compute data for node %s/%s" %
1188
                        (node, instance.disks[i].iv_name))
1189
        continue
1190
      # we ignore the ldisk parameter
1191
      perc_done, est_time, is_degraded, _ = mstat
1192
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1193
      if perc_done is not None:
1194
        done = False
1195
        if est_time is not None:
1196
          rem_time = "%d estimated seconds remaining" % est_time
1197
          max_time = est_time
1198
        else:
1199
          rem_time = "no time estimate"
1200
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1201
                     (instance.disks[i].iv_name, perc_done, rem_time))
1202
    if done or oneshot:
1203
      break
1204

    
1205
    time.sleep(min(60, max_time))
1206

    
1207
  if done:
1208
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1209
  return not cumul_degraded
1210

    
1211

    
1212
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1213
  """Check that mirrors are not degraded.
1214

1215
  The ldisk parameter, if True, will change the test from the
1216
  is_degraded attribute (which represents overall non-ok status for
1217
  the device(s)) to the ldisk (representing the local storage status).
1218

1219
  """
1220
  cfgw.SetDiskID(dev, node)
1221
  if ldisk:
1222
    idx = 6
1223
  else:
1224
    idx = 5
1225

    
1226
  result = True
1227
  if on_primary or dev.AssembleOnSecondary():
1228
    rstats = rpc.call_blockdev_find(node, dev)
1229
    if not rstats:
1230
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1231
      result = False
1232
    else:
1233
      result = result and (not rstats[idx])
1234
  if dev.children:
1235
    for child in dev.children:
1236
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1237

    
1238
  return result
1239

    
1240

    
1241
class LUDiagnoseOS(NoHooksLU):
1242
  """Logical unit for OS diagnose/query.
1243

1244
  """
1245
  _OP_REQP = ["output_fields", "names"]
1246
  REQ_BGL = False
1247

    
1248
  def ExpandNames(self):
1249
    if self.op.names:
1250
      raise errors.OpPrereqError("Selective OS query not supported")
1251

    
1252
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1253
    _CheckOutputFields(static=[],
1254
                       dynamic=self.dynamic_fields,
1255
                       selected=self.op.output_fields)
1256

    
1257
    # Lock all nodes, in shared mode
1258
    self.needed_locks = {}
1259
    self.share_locks[locking.LEVEL_NODE] = 1
1260
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1261

    
1262
  def CheckPrereq(self):
1263
    """Check prerequisites.
1264

1265
    """
1266

    
1267
  @staticmethod
1268
  def _DiagnoseByOS(node_list, rlist):
1269
    """Remaps a per-node return list into an a per-os per-node dictionary
1270

1271
      Args:
1272
        node_list: a list with the names of all nodes
1273
        rlist: a map with node names as keys and OS objects as values
1274

1275
      Returns:
1276
        map: a map with osnames as keys and as value another map, with
1277
             nodes as
1278
             keys and list of OS objects as values
1279
             e.g. {"debian-etch": {"node1": [<object>,...],
1280
                                   "node2": [<object>,]}
1281
                  }
1282

1283
    """
1284
    all_os = {}
1285
    for node_name, nr in rlist.iteritems():
1286
      if not nr:
1287
        continue
1288
      for os_obj in nr:
1289
        if os_obj.name not in all_os:
1290
          # build a list of nodes for this os containing empty lists
1291
          # for each node in node_list
1292
          all_os[os_obj.name] = {}
1293
          for nname in node_list:
1294
            all_os[os_obj.name][nname] = []
1295
        all_os[os_obj.name][node_name].append(os_obj)
1296
    return all_os
1297

    
1298
  def Exec(self, feedback_fn):
1299
    """Compute the list of OSes.
1300

1301
    """
1302
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1303
    node_data = rpc.call_os_diagnose(node_list)
1304
    if node_data == False:
1305
      raise errors.OpExecError("Can't gather the list of OSes")
1306
    pol = self._DiagnoseByOS(node_list, node_data)
1307
    output = []
1308
    for os_name, os_data in pol.iteritems():
1309
      row = []
1310
      for field in self.op.output_fields:
1311
        if field == "name":
1312
          val = os_name
1313
        elif field == "valid":
1314
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1315
        elif field == "node_status":
1316
          val = {}
1317
          for node_name, nos_list in os_data.iteritems():
1318
            val[node_name] = [(v.status, v.path) for v in nos_list]
1319
        else:
1320
          raise errors.ParameterError(field)
1321
        row.append(val)
1322
      output.append(row)
1323

    
1324
    return output
1325

    
1326

    
1327
class LURemoveNode(LogicalUnit):
1328
  """Logical unit for removing a node.
1329

1330
  """
1331
  HPATH = "node-remove"
1332
  HTYPE = constants.HTYPE_NODE
1333
  _OP_REQP = ["node_name"]
1334

    
1335
  def BuildHooksEnv(self):
1336
    """Build hooks env.
1337

1338
    This doesn't run on the target node in the pre phase as a failed
1339
    node would then be impossible to remove.
1340

1341
    """
1342
    env = {
1343
      "OP_TARGET": self.op.node_name,
1344
      "NODE_NAME": self.op.node_name,
1345
      }
1346
    all_nodes = self.cfg.GetNodeList()
1347
    all_nodes.remove(self.op.node_name)
1348
    return env, all_nodes, all_nodes
1349

    
1350
  def CheckPrereq(self):
1351
    """Check prerequisites.
1352

1353
    This checks:
1354
     - the node exists in the configuration
1355
     - it does not have primary or secondary instances
1356
     - it's not the master
1357

1358
    Any errors are signalled by raising errors.OpPrereqError.
1359

1360
    """
1361
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1362
    if node is None:
1363
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1364

    
1365
    instance_list = self.cfg.GetInstanceList()
1366

    
1367
    masternode = self.sstore.GetMasterNode()
1368
    if node.name == masternode:
1369
      raise errors.OpPrereqError("Node is the master node,"
1370
                                 " you need to failover first.")
1371

    
1372
    for instance_name in instance_list:
1373
      instance = self.cfg.GetInstanceInfo(instance_name)
1374
      if node.name == instance.primary_node:
1375
        raise errors.OpPrereqError("Instance %s still running on the node,"
1376
                                   " please remove first." % instance_name)
1377
      if node.name in instance.secondary_nodes:
1378
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1379
                                   " please remove first." % instance_name)
1380
    self.op.node_name = node.name
1381
    self.node = node
1382

    
1383
  def Exec(self, feedback_fn):
1384
    """Removes the node from the cluster.
1385

1386
    """
1387
    node = self.node
1388
    logger.Info("stopping the node daemon and removing configs from node %s" %
1389
                node.name)
1390

    
1391
    self.context.RemoveNode(node.name)
1392

    
1393
    rpc.call_node_leave_cluster(node.name)
1394

    
1395

    
1396
class LUQueryNodes(NoHooksLU):
1397
  """Logical unit for querying nodes.
1398

1399
  """
1400
  _OP_REQP = ["output_fields", "names"]
1401
  REQ_BGL = False
1402

    
1403
  def ExpandNames(self):
1404
    self.dynamic_fields = frozenset([
1405
      "dtotal", "dfree",
1406
      "mtotal", "mnode", "mfree",
1407
      "bootid",
1408
      "ctotal",
1409
      ])
1410

    
1411
    self.static_fields = frozenset([
1412
      "name", "pinst_cnt", "sinst_cnt",
1413
      "pinst_list", "sinst_list",
1414
      "pip", "sip", "tags",
1415
      ])
1416

    
1417
    _CheckOutputFields(static=self.static_fields,
1418
                       dynamic=self.dynamic_fields,
1419
                       selected=self.op.output_fields)
1420

    
1421
    self.needed_locks = {}
1422
    self.share_locks[locking.LEVEL_NODE] = 1
1423

    
1424
    if self.op.names:
1425
      self.wanted = _GetWantedNodes(self, self.op.names)
1426
    else:
1427
      self.wanted = locking.ALL_SET
1428

    
1429
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1430
    if self.do_locking:
1431
      # if we don't request only static fields, we need to lock the nodes
1432
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1433

    
1434

    
1435
  def CheckPrereq(self):
1436
    """Check prerequisites.
1437

1438
    """
1439
    # The validation of the node list is done in the _GetWantedNodes,
1440
    # if non empty, and if empty, there's no validation to do
1441
    pass
1442

    
1443
  def Exec(self, feedback_fn):
1444
    """Computes the list of nodes and their attributes.
1445

1446
    """
1447
    all_info = self.cfg.GetAllNodesInfo()
1448
    if self.do_locking:
1449
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1450
    else:
1451
      nodenames = all_info.keys()
1452
    nodelist = [all_info[name] for name in nodenames]
1453

    
1454
    # begin data gathering
1455

    
1456
    if self.dynamic_fields.intersection(self.op.output_fields):
1457
      live_data = {}
1458
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1459
      for name in nodenames:
1460
        nodeinfo = node_data.get(name, None)
1461
        if nodeinfo:
1462
          live_data[name] = {
1463
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1464
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1465
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1466
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1467
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1468
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1469
            "bootid": nodeinfo['bootid'],
1470
            }
1471
        else:
1472
          live_data[name] = {}
1473
    else:
1474
      live_data = dict.fromkeys(nodenames, {})
1475

    
1476
    node_to_primary = dict([(name, set()) for name in nodenames])
1477
    node_to_secondary = dict([(name, set()) for name in nodenames])
1478

    
1479
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1480
                             "sinst_cnt", "sinst_list"))
1481
    if inst_fields & frozenset(self.op.output_fields):
1482
      instancelist = self.cfg.GetInstanceList()
1483

    
1484
      for instance_name in instancelist:
1485
        inst = self.cfg.GetInstanceInfo(instance_name)
1486
        if inst.primary_node in node_to_primary:
1487
          node_to_primary[inst.primary_node].add(inst.name)
1488
        for secnode in inst.secondary_nodes:
1489
          if secnode in node_to_secondary:
1490
            node_to_secondary[secnode].add(inst.name)
1491

    
1492
    # end data gathering
1493

    
1494
    output = []
1495
    for node in nodelist:
1496
      node_output = []
1497
      for field in self.op.output_fields:
1498
        if field == "name":
1499
          val = node.name
1500
        elif field == "pinst_list":
1501
          val = list(node_to_primary[node.name])
1502
        elif field == "sinst_list":
1503
          val = list(node_to_secondary[node.name])
1504
        elif field == "pinst_cnt":
1505
          val = len(node_to_primary[node.name])
1506
        elif field == "sinst_cnt":
1507
          val = len(node_to_secondary[node.name])
1508
        elif field == "pip":
1509
          val = node.primary_ip
1510
        elif field == "sip":
1511
          val = node.secondary_ip
1512
        elif field == "tags":
1513
          val = list(node.GetTags())
1514
        elif field in self.dynamic_fields:
1515
          val = live_data[node.name].get(field, None)
1516
        else:
1517
          raise errors.ParameterError(field)
1518
        node_output.append(val)
1519
      output.append(node_output)
1520

    
1521
    return output
1522

    
1523

    
1524
class LUQueryNodeVolumes(NoHooksLU):
1525
  """Logical unit for getting volumes on node(s).
1526

1527
  """
1528
  _OP_REQP = ["nodes", "output_fields"]
1529
  REQ_BGL = False
1530

    
1531
  def ExpandNames(self):
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536
    self.needed_locks = {}
1537
    self.share_locks[locking.LEVEL_NODE] = 1
1538
    if not self.op.nodes:
1539
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1540
    else:
1541
      self.needed_locks[locking.LEVEL_NODE] = \
1542
        _GetWantedNodes(self, self.op.nodes)
1543

    
1544
  def CheckPrereq(self):
1545
    """Check prerequisites.
1546

1547
    This checks that the fields required are valid output fields.
1548

1549
    """
1550
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Computes the list of nodes and their attributes.
1554

1555
    """
1556
    nodenames = self.nodes
1557
    volumes = rpc.call_node_volumes(nodenames)
1558

    
1559
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1560
             in self.cfg.GetInstanceList()]
1561

    
1562
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1563

    
1564
    output = []
1565
    for node in nodenames:
1566
      if node not in volumes or not volumes[node]:
1567
        continue
1568

    
1569
      node_vols = volumes[node][:]
1570
      node_vols.sort(key=lambda vol: vol['dev'])
1571

    
1572
      for vol in node_vols:
1573
        node_output = []
1574
        for field in self.op.output_fields:
1575
          if field == "node":
1576
            val = node
1577
          elif field == "phys":
1578
            val = vol['dev']
1579
          elif field == "vg":
1580
            val = vol['vg']
1581
          elif field == "name":
1582
            val = vol['name']
1583
          elif field == "size":
1584
            val = int(float(vol['size']))
1585
          elif field == "instance":
1586
            for inst in ilist:
1587
              if node not in lv_by_node[inst]:
1588
                continue
1589
              if vol['name'] in lv_by_node[inst][node]:
1590
                val = inst.name
1591
                break
1592
            else:
1593
              val = '-'
1594
          else:
1595
            raise errors.ParameterError(field)
1596
          node_output.append(str(val))
1597

    
1598
        output.append(node_output)
1599

    
1600
    return output
1601

    
1602

    
1603
class LUAddNode(LogicalUnit):
1604
  """Logical unit for adding node to the cluster.
1605

1606
  """
1607
  HPATH = "node-add"
1608
  HTYPE = constants.HTYPE_NODE
1609
  _OP_REQP = ["node_name"]
1610

    
1611
  def BuildHooksEnv(self):
1612
    """Build hooks env.
1613

1614
    This will run on all nodes before, and on all nodes + the new node after.
1615

1616
    """
1617
    env = {
1618
      "OP_TARGET": self.op.node_name,
1619
      "NODE_NAME": self.op.node_name,
1620
      "NODE_PIP": self.op.primary_ip,
1621
      "NODE_SIP": self.op.secondary_ip,
1622
      }
1623
    nodes_0 = self.cfg.GetNodeList()
1624
    nodes_1 = nodes_0 + [self.op.node_name, ]
1625
    return env, nodes_0, nodes_1
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks:
1631
     - the new node is not already in the config
1632
     - it is resolvable
1633
     - its parameters (single/dual homed) matches the cluster
1634

1635
    Any errors are signalled by raising errors.OpPrereqError.
1636

1637
    """
1638
    node_name = self.op.node_name
1639
    cfg = self.cfg
1640

    
1641
    dns_data = utils.HostInfo(node_name)
1642

    
1643
    node = dns_data.name
1644
    primary_ip = self.op.primary_ip = dns_data.ip
1645
    secondary_ip = getattr(self.op, "secondary_ip", None)
1646
    if secondary_ip is None:
1647
      secondary_ip = primary_ip
1648
    if not utils.IsValidIP(secondary_ip):
1649
      raise errors.OpPrereqError("Invalid secondary IP given")
1650
    self.op.secondary_ip = secondary_ip
1651

    
1652
    node_list = cfg.GetNodeList()
1653
    if not self.op.readd and node in node_list:
1654
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1655
                                 node)
1656
    elif self.op.readd and node not in node_list:
1657
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1658

    
1659
    for existing_node_name in node_list:
1660
      existing_node = cfg.GetNodeInfo(existing_node_name)
1661

    
1662
      if self.op.readd and node == existing_node_name:
1663
        if (existing_node.primary_ip != primary_ip or
1664
            existing_node.secondary_ip != secondary_ip):
1665
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1666
                                     " address configuration as before")
1667
        continue
1668

    
1669
      if (existing_node.primary_ip == primary_ip or
1670
          existing_node.secondary_ip == primary_ip or
1671
          existing_node.primary_ip == secondary_ip or
1672
          existing_node.secondary_ip == secondary_ip):
1673
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1674
                                   " existing node %s" % existing_node.name)
1675

    
1676
    # check that the type of the node (single versus dual homed) is the
1677
    # same as for the master
1678
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1679
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1680
    newbie_singlehomed = secondary_ip == primary_ip
1681
    if master_singlehomed != newbie_singlehomed:
1682
      if master_singlehomed:
1683
        raise errors.OpPrereqError("The master has no private ip but the"
1684
                                   " new node has one")
1685
      else:
1686
        raise errors.OpPrereqError("The master has a private ip but the"
1687
                                   " new node doesn't have one")
1688

    
1689
    # checks reachablity
1690
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1691
      raise errors.OpPrereqError("Node not reachable by ping")
1692

    
1693
    if not newbie_singlehomed:
1694
      # check reachability from my secondary ip to newbie's secondary ip
1695
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1696
                           source=myself.secondary_ip):
1697
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1698
                                   " based ping to noded port")
1699

    
1700
    self.new_node = objects.Node(name=node,
1701
                                 primary_ip=primary_ip,
1702
                                 secondary_ip=secondary_ip)
1703

    
1704
  def Exec(self, feedback_fn):
1705
    """Adds the new node to the cluster.
1706

1707
    """
1708
    new_node = self.new_node
1709
    node = new_node.name
1710

    
1711
    # check connectivity
1712
    result = rpc.call_version([node])[node]
1713
    if result:
1714
      if constants.PROTOCOL_VERSION == result:
1715
        logger.Info("communication to node %s fine, sw version %s match" %
1716
                    (node, result))
1717
      else:
1718
        raise errors.OpExecError("Version mismatch master version %s,"
1719
                                 " node version %s" %
1720
                                 (constants.PROTOCOL_VERSION, result))
1721
    else:
1722
      raise errors.OpExecError("Cannot get version from the new node")
1723

    
1724
    # setup ssh on node
1725
    logger.Info("copy ssh key to node %s" % node)
1726
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1727
    keyarray = []
1728
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1729
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1730
                priv_key, pub_key]
1731

    
1732
    for i in keyfiles:
1733
      f = open(i, 'r')
1734
      try:
1735
        keyarray.append(f.read())
1736
      finally:
1737
        f.close()
1738

    
1739
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1740
                               keyarray[3], keyarray[4], keyarray[5])
1741

    
1742
    if not result:
1743
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1744

    
1745
    # Add node to our /etc/hosts, and add key to known_hosts
1746
    utils.AddHostToEtcHosts(new_node.name)
1747

    
1748
    if new_node.secondary_ip != new_node.primary_ip:
1749
      if not rpc.call_node_tcp_ping(new_node.name,
1750
                                    constants.LOCALHOST_IP_ADDRESS,
1751
                                    new_node.secondary_ip,
1752
                                    constants.DEFAULT_NODED_PORT,
1753
                                    10, False):
1754
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1755
                                 " you gave (%s). Please fix and re-run this"
1756
                                 " command." % new_node.secondary_ip)
1757

    
1758
    node_verify_list = [self.sstore.GetMasterNode()]
1759
    node_verify_param = {
1760
      'nodelist': [node],
1761
      # TODO: do a node-net-test as well?
1762
    }
1763

    
1764
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1765
    for verifier in node_verify_list:
1766
      if not result[verifier]:
1767
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1768
                                 " for remote verification" % verifier)
1769
      if result[verifier]['nodelist']:
1770
        for failed in result[verifier]['nodelist']:
1771
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1772
                      (verifier, result[verifier]['nodelist'][failed]))
1773
        raise errors.OpExecError("ssh/hostname verification failed.")
1774

    
1775
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1776
    # including the node just added
1777
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1778
    dist_nodes = self.cfg.GetNodeList()
1779
    if not self.op.readd:
1780
      dist_nodes.append(node)
1781
    if myself.name in dist_nodes:
1782
      dist_nodes.remove(myself.name)
1783

    
1784
    logger.Debug("Copying hosts and known_hosts to all nodes")
1785
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1786
      result = rpc.call_upload_file(dist_nodes, fname)
1787
      for to_node in dist_nodes:
1788
        if not result[to_node]:
1789
          logger.Error("copy of file %s to node %s failed" %
1790
                       (fname, to_node))
1791

    
1792
    to_copy = self.sstore.GetFileList()
1793
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1794
      to_copy.append(constants.VNC_PASSWORD_FILE)
1795
    for fname in to_copy:
1796
      result = rpc.call_upload_file([node], fname)
1797
      if not result[node]:
1798
        logger.Error("could not copy file %s to node %s" % (fname, node))
1799

    
1800
    if self.op.readd:
1801
      self.context.ReaddNode(new_node)
1802
    else:
1803
      self.context.AddNode(new_node)
1804

    
1805

    
1806
class LUQueryClusterInfo(NoHooksLU):
1807
  """Query cluster configuration.
1808

1809
  """
1810
  _OP_REQP = []
1811
  REQ_MASTER = False
1812
  REQ_BGL = False
1813

    
1814
  def ExpandNames(self):
1815
    self.needed_locks = {}
1816

    
1817
  def CheckPrereq(self):
1818
    """No prerequsites needed for this LU.
1819

1820
    """
1821
    pass
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Return cluster config.
1825

1826
    """
1827
    result = {
1828
      "name": self.sstore.GetClusterName(),
1829
      "software_version": constants.RELEASE_VERSION,
1830
      "protocol_version": constants.PROTOCOL_VERSION,
1831
      "config_version": constants.CONFIG_VERSION,
1832
      "os_api_version": constants.OS_API_VERSION,
1833
      "export_version": constants.EXPORT_VERSION,
1834
      "master": self.sstore.GetMasterNode(),
1835
      "architecture": (platform.architecture()[0], platform.machine()),
1836
      "hypervisor_type": self.sstore.GetHypervisorType(),
1837
      }
1838

    
1839
    return result
1840

    
1841

    
1842
class LUDumpClusterConfig(NoHooksLU):
1843
  """Return a text-representation of the cluster-config.
1844

1845
  """
1846
  _OP_REQP = []
1847
  REQ_BGL = False
1848

    
1849
  def ExpandNames(self):
1850
    self.needed_locks = {}
1851

    
1852
  def CheckPrereq(self):
1853
    """No prerequisites.
1854

1855
    """
1856
    pass
1857

    
1858
  def Exec(self, feedback_fn):
1859
    """Dump a representation of the cluster config to the standard output.
1860

1861
    """
1862
    return self.cfg.DumpConfig()
1863

    
1864

    
1865
class LUActivateInstanceDisks(NoHooksLU):
1866
  """Bring up an instance's disks.
1867

1868
  """
1869
  _OP_REQP = ["instance_name"]
1870
  REQ_BGL = False
1871

    
1872
  def ExpandNames(self):
1873
    self._ExpandAndLockInstance()
1874
    self.needed_locks[locking.LEVEL_NODE] = []
1875
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1876

    
1877
  def DeclareLocks(self, level):
1878
    if level == locking.LEVEL_NODE:
1879
      self._LockInstancesNodes()
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1888
    assert self.instance is not None, \
1889
      "Cannot retrieve locked instance %s" % self.op.instance_name
1890

    
1891
  def Exec(self, feedback_fn):
1892
    """Activate the disks.
1893

1894
    """
1895
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1896
    if not disks_ok:
1897
      raise errors.OpExecError("Cannot activate block devices")
1898

    
1899
    return disks_info
1900

    
1901

    
1902
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1903
  """Prepare the block devices for an instance.
1904

1905
  This sets up the block devices on all nodes.
1906

1907
  Args:
1908
    instance: a ganeti.objects.Instance object
1909
    ignore_secondaries: if true, errors on secondary nodes won't result
1910
                        in an error return from the function
1911

1912
  Returns:
1913
    false if the operation failed
1914
    list of (host, instance_visible_name, node_visible_name) if the operation
1915
         suceeded with the mapping from node devices to instance devices
1916
  """
1917
  device_info = []
1918
  disks_ok = True
1919
  iname = instance.name
1920
  # With the two passes mechanism we try to reduce the window of
1921
  # opportunity for the race condition of switching DRBD to primary
1922
  # before handshaking occured, but we do not eliminate it
1923

    
1924
  # The proper fix would be to wait (with some limits) until the
1925
  # connection has been made and drbd transitions from WFConnection
1926
  # into any other network-connected state (Connected, SyncTarget,
1927
  # SyncSource, etc.)
1928

    
1929
  # 1st pass, assemble on all nodes in secondary mode
1930
  for inst_disk in instance.disks:
1931
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1932
      cfg.SetDiskID(node_disk, node)
1933
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1934
      if not result:
1935
        logger.Error("could not prepare block device %s on node %s"
1936
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1937
        if not ignore_secondaries:
1938
          disks_ok = False
1939

    
1940
  # FIXME: race condition on drbd migration to primary
1941

    
1942
  # 2nd pass, do only the primary node
1943
  for inst_disk in instance.disks:
1944
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1945
      if node != instance.primary_node:
1946
        continue
1947
      cfg.SetDiskID(node_disk, node)
1948
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1949
      if not result:
1950
        logger.Error("could not prepare block device %s on node %s"
1951
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1952
        disks_ok = False
1953
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1954

    
1955
  # leave the disks configured for the primary node
1956
  # this is a workaround that would be fixed better by
1957
  # improving the logical/physical id handling
1958
  for disk in instance.disks:
1959
    cfg.SetDiskID(disk, instance.primary_node)
1960

    
1961
  return disks_ok, device_info
1962

    
1963

    
1964
def _StartInstanceDisks(cfg, instance, force):
1965
  """Start the disks of an instance.
1966

1967
  """
1968
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1969
                                           ignore_secondaries=force)
1970
  if not disks_ok:
1971
    _ShutdownInstanceDisks(instance, cfg)
1972
    if force is not None and not force:
1973
      logger.Error("If the message above refers to a secondary node,"
1974
                   " you can retry the operation using '--force'.")
1975
    raise errors.OpExecError("Disk consistency error")
1976

    
1977

    
1978
class LUDeactivateInstanceDisks(NoHooksLU):
1979
  """Shutdown an instance's disks.
1980

1981
  """
1982
  _OP_REQP = ["instance_name"]
1983
  REQ_BGL = False
1984

    
1985
  def ExpandNames(self):
1986
    self._ExpandAndLockInstance()
1987
    self.needed_locks[locking.LEVEL_NODE] = []
1988
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1989

    
1990
  def DeclareLocks(self, level):
1991
    if level == locking.LEVEL_NODE:
1992
      self._LockInstancesNodes()
1993

    
1994
  def CheckPrereq(self):
1995
    """Check prerequisites.
1996

1997
    This checks that the instance is in the cluster.
1998

1999
    """
2000
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2001
    assert self.instance is not None, \
2002
      "Cannot retrieve locked instance %s" % self.op.instance_name
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Deactivate the disks
2006

2007
    """
2008
    instance = self.instance
2009
    _SafeShutdownInstanceDisks(instance, self.cfg)
2010

    
2011

    
2012
def _SafeShutdownInstanceDisks(instance, cfg):
2013
  """Shutdown block devices of an instance.
2014

2015
  This function checks if an instance is running, before calling
2016
  _ShutdownInstanceDisks.
2017

2018
  """
2019
  ins_l = rpc.call_instance_list([instance.primary_node])
2020
  ins_l = ins_l[instance.primary_node]
2021
  if not type(ins_l) is list:
2022
    raise errors.OpExecError("Can't contact node '%s'" %
2023
                             instance.primary_node)
2024

    
2025
  if instance.name in ins_l:
2026
    raise errors.OpExecError("Instance is running, can't shutdown"
2027
                             " block devices.")
2028

    
2029
  _ShutdownInstanceDisks(instance, cfg)
2030

    
2031

    
2032
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2033
  """Shutdown block devices of an instance.
2034

2035
  This does the shutdown on all nodes of the instance.
2036

2037
  If the ignore_primary is false, errors on the primary node are
2038
  ignored.
2039

2040
  """
2041
  result = True
2042
  for disk in instance.disks:
2043
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2044
      cfg.SetDiskID(top_disk, node)
2045
      if not rpc.call_blockdev_shutdown(node, top_disk):
2046
        logger.Error("could not shutdown block device %s on node %s" %
2047
                     (disk.iv_name, node))
2048
        if not ignore_primary or node != instance.primary_node:
2049
          result = False
2050
  return result
2051

    
2052

    
2053
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2054
  """Checks if a node has enough free memory.
2055

2056
  This function check if a given node has the needed amount of free
2057
  memory. In case the node has less memory or we cannot get the
2058
  information from the node, this function raise an OpPrereqError
2059
  exception.
2060

2061
  Args:
2062
    - cfg: a ConfigWriter instance
2063
    - node: the node name
2064
    - reason: string to use in the error message
2065
    - requested: the amount of memory in MiB
2066

2067
  """
2068
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2069
  if not nodeinfo or not isinstance(nodeinfo, dict):
2070
    raise errors.OpPrereqError("Could not contact node %s for resource"
2071
                             " information" % (node,))
2072

    
2073
  free_mem = nodeinfo[node].get('memory_free')
2074
  if not isinstance(free_mem, int):
2075
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2076
                             " was '%s'" % (node, free_mem))
2077
  if requested > free_mem:
2078
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2079
                             " needed %s MiB, available %s MiB" %
2080
                             (node, reason, requested, free_mem))
2081

    
2082

    
2083
class LUStartupInstance(LogicalUnit):
2084
  """Starts an instance.
2085

2086
  """
2087
  HPATH = "instance-start"
2088
  HTYPE = constants.HTYPE_INSTANCE
2089
  _OP_REQP = ["instance_name", "force"]
2090
  REQ_BGL = False
2091

    
2092
  def ExpandNames(self):
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      self._LockInstancesNodes()
2100

    
2101
  def BuildHooksEnv(self):
2102
    """Build hooks env.
2103

2104
    This runs on master, primary and secondary nodes of the instance.
2105

2106
    """
2107
    env = {
2108
      "FORCE": self.op.force,
2109
      }
2110
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2111
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2112
          list(self.instance.secondary_nodes))
2113
    return env, nl, nl
2114

    
2115
  def CheckPrereq(self):
2116
    """Check prerequisites.
2117

2118
    This checks that the instance is in the cluster.
2119

2120
    """
2121
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2122
    assert self.instance is not None, \
2123
      "Cannot retrieve locked instance %s" % self.op.instance_name
2124

    
2125
    # check bridges existance
2126
    _CheckInstanceBridgesExist(instance)
2127

    
2128
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2129
                         "starting instance %s" % instance.name,
2130
                         instance.memory)
2131

    
2132
  def Exec(self, feedback_fn):
2133
    """Start the instance.
2134

2135
    """
2136
    instance = self.instance
2137
    force = self.op.force
2138
    extra_args = getattr(self.op, "extra_args", "")
2139

    
2140
    self.cfg.MarkInstanceUp(instance.name)
2141

    
2142
    node_current = instance.primary_node
2143

    
2144
    _StartInstanceDisks(self.cfg, instance, force)
2145

    
2146
    if not rpc.call_instance_start(node_current, instance, extra_args):
2147
      _ShutdownInstanceDisks(instance, self.cfg)
2148
      raise errors.OpExecError("Could not start instance")
2149

    
2150

    
2151
class LURebootInstance(LogicalUnit):
2152
  """Reboot an instance.
2153

2154
  """
2155
  HPATH = "instance-reboot"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2162
                                   constants.INSTANCE_REBOOT_HARD,
2163
                                   constants.INSTANCE_REBOOT_FULL]:
2164
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2165
                                  (constants.INSTANCE_REBOOT_SOFT,
2166
                                   constants.INSTANCE_REBOOT_HARD,
2167
                                   constants.INSTANCE_REBOOT_FULL))
2168
    self._ExpandAndLockInstance()
2169
    self.needed_locks[locking.LEVEL_NODE] = []
2170
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2171

    
2172
  def DeclareLocks(self, level):
2173
    if level == locking.LEVEL_NODE:
2174
      primary_only = not constants.INSTANCE_REBOOT_FULL
2175
      self._LockInstancesNodes(primary_only=primary_only)
2176

    
2177
  def BuildHooksEnv(self):
2178
    """Build hooks env.
2179

2180
    This runs on master, primary and secondary nodes of the instance.
2181

2182
    """
2183
    env = {
2184
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2185
      }
2186
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2187
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2188
          list(self.instance.secondary_nodes))
2189
    return env, nl, nl
2190

    
2191
  def CheckPrereq(self):
2192
    """Check prerequisites.
2193

2194
    This checks that the instance is in the cluster.
2195

2196
    """
2197
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2198
    assert self.instance is not None, \
2199
      "Cannot retrieve locked instance %s" % self.op.instance_name
2200

    
2201
    # check bridges existance
2202
    _CheckInstanceBridgesExist(instance)
2203

    
2204
  def Exec(self, feedback_fn):
2205
    """Reboot the instance.
2206

2207
    """
2208
    instance = self.instance
2209
    ignore_secondaries = self.op.ignore_secondaries
2210
    reboot_type = self.op.reboot_type
2211
    extra_args = getattr(self.op, "extra_args", "")
2212

    
2213
    node_current = instance.primary_node
2214

    
2215
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2216
                       constants.INSTANCE_REBOOT_HARD]:
2217
      if not rpc.call_instance_reboot(node_current, instance,
2218
                                      reboot_type, extra_args):
2219
        raise errors.OpExecError("Could not reboot instance")
2220
    else:
2221
      if not rpc.call_instance_shutdown(node_current, instance):
2222
        raise errors.OpExecError("could not shutdown instance for full reboot")
2223
      _ShutdownInstanceDisks(instance, self.cfg)
2224
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2225
      if not rpc.call_instance_start(node_current, instance, extra_args):
2226
        _ShutdownInstanceDisks(instance, self.cfg)
2227
        raise errors.OpExecError("Could not start instance for full reboot")
2228

    
2229
    self.cfg.MarkInstanceUp(instance.name)
2230

    
2231

    
2232
class LUShutdownInstance(LogicalUnit):
2233
  """Shutdown an instance.
2234

2235
  """
2236
  HPATH = "instance-stop"
2237
  HTYPE = constants.HTYPE_INSTANCE
2238
  _OP_REQP = ["instance_name"]
2239
  REQ_BGL = False
2240

    
2241
  def ExpandNames(self):
2242
    self._ExpandAndLockInstance()
2243
    self.needed_locks[locking.LEVEL_NODE] = []
2244
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2245

    
2246
  def DeclareLocks(self, level):
2247
    if level == locking.LEVEL_NODE:
2248
      self._LockInstancesNodes()
2249

    
2250
  def BuildHooksEnv(self):
2251
    """Build hooks env.
2252

2253
    This runs on master, primary and secondary nodes of the instance.
2254

2255
    """
2256
    env = _BuildInstanceHookEnvByObject(self.instance)
2257
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2258
          list(self.instance.secondary_nodes))
2259
    return env, nl, nl
2260

    
2261
  def CheckPrereq(self):
2262
    """Check prerequisites.
2263

2264
    This checks that the instance is in the cluster.
2265

2266
    """
2267
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2268
    assert self.instance is not None, \
2269
      "Cannot retrieve locked instance %s" % self.op.instance_name
2270

    
2271
  def Exec(self, feedback_fn):
2272
    """Shutdown the instance.
2273

2274
    """
2275
    instance = self.instance
2276
    node_current = instance.primary_node
2277
    self.cfg.MarkInstanceDown(instance.name)
2278
    if not rpc.call_instance_shutdown(node_current, instance):
2279
      logger.Error("could not shutdown instance")
2280

    
2281
    _ShutdownInstanceDisks(instance, self.cfg)
2282

    
2283

    
2284
class LUReinstallInstance(LogicalUnit):
2285
  """Reinstall an instance.
2286

2287
  """
2288
  HPATH = "instance-reinstall"
2289
  HTYPE = constants.HTYPE_INSTANCE
2290
  _OP_REQP = ["instance_name"]
2291
  REQ_BGL = False
2292

    
2293
  def ExpandNames(self):
2294
    self._ExpandAndLockInstance()
2295
    self.needed_locks[locking.LEVEL_NODE] = []
2296
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2297

    
2298
  def DeclareLocks(self, level):
2299
    if level == locking.LEVEL_NODE:
2300
      self._LockInstancesNodes()
2301

    
2302
  def BuildHooksEnv(self):
2303
    """Build hooks env.
2304

2305
    This runs on master, primary and secondary nodes of the instance.
2306

2307
    """
2308
    env = _BuildInstanceHookEnvByObject(self.instance)
2309
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2310
          list(self.instance.secondary_nodes))
2311
    return env, nl, nl
2312

    
2313
  def CheckPrereq(self):
2314
    """Check prerequisites.
2315

2316
    This checks that the instance is in the cluster and is not running.
2317

2318
    """
2319
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2320
    assert instance is not None, \
2321
      "Cannot retrieve locked instance %s" % self.op.instance_name
2322

    
2323
    if instance.disk_template == constants.DT_DISKLESS:
2324
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2325
                                 self.op.instance_name)
2326
    if instance.status != "down":
2327
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2328
                                 self.op.instance_name)
2329
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2330
    if remote_info:
2331
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2332
                                 (self.op.instance_name,
2333
                                  instance.primary_node))
2334

    
2335
    self.op.os_type = getattr(self.op, "os_type", None)
2336
    if self.op.os_type is not None:
2337
      # OS verification
2338
      pnode = self.cfg.GetNodeInfo(
2339
        self.cfg.ExpandNodeName(instance.primary_node))
2340
      if pnode is None:
2341
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2342
                                   self.op.pnode)
2343
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2344
      if not os_obj:
2345
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2346
                                   " primary node"  % self.op.os_type)
2347

    
2348
    self.instance = instance
2349

    
2350
  def Exec(self, feedback_fn):
2351
    """Reinstall the instance.
2352

2353
    """
2354
    inst = self.instance
2355

    
2356
    if self.op.os_type is not None:
2357
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2358
      inst.os = self.op.os_type
2359
      self.cfg.AddInstance(inst)
2360

    
2361
    _StartInstanceDisks(self.cfg, inst, None)
2362
    try:
2363
      feedback_fn("Running the instance OS create scripts...")
2364
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2365
        raise errors.OpExecError("Could not install OS for instance %s"
2366
                                 " on node %s" %
2367
                                 (inst.name, inst.primary_node))
2368
    finally:
2369
      _ShutdownInstanceDisks(inst, self.cfg)
2370

    
2371

    
2372
class LURenameInstance(LogicalUnit):
2373
  """Rename an instance.
2374

2375
  """
2376
  HPATH = "instance-rename"
2377
  HTYPE = constants.HTYPE_INSTANCE
2378
  _OP_REQP = ["instance_name", "new_name"]
2379

    
2380
  def BuildHooksEnv(self):
2381
    """Build hooks env.
2382

2383
    This runs on master, primary and secondary nodes of the instance.
2384

2385
    """
2386
    env = _BuildInstanceHookEnvByObject(self.instance)
2387
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2388
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2389
          list(self.instance.secondary_nodes))
2390
    return env, nl, nl
2391

    
2392
  def CheckPrereq(self):
2393
    """Check prerequisites.
2394

2395
    This checks that the instance is in the cluster and is not running.
2396

2397
    """
2398
    instance = self.cfg.GetInstanceInfo(
2399
      self.cfg.ExpandInstanceName(self.op.instance_name))
2400
    if instance is None:
2401
      raise errors.OpPrereqError("Instance '%s' not known" %
2402
                                 self.op.instance_name)
2403
    if instance.status != "down":
2404
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2405
                                 self.op.instance_name)
2406
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2407
    if remote_info:
2408
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2409
                                 (self.op.instance_name,
2410
                                  instance.primary_node))
2411
    self.instance = instance
2412

    
2413
    # new name verification
2414
    name_info = utils.HostInfo(self.op.new_name)
2415

    
2416
    self.op.new_name = new_name = name_info.name
2417
    instance_list = self.cfg.GetInstanceList()
2418
    if new_name in instance_list:
2419
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2420
                                 new_name)
2421

    
2422
    if not getattr(self.op, "ignore_ip", False):
2423
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2424
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2425
                                   (name_info.ip, new_name))
2426

    
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Reinstall the instance.
2430

2431
    """
2432
    inst = self.instance
2433
    old_name = inst.name
2434

    
2435
    if inst.disk_template == constants.DT_FILE:
2436
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2437

    
2438
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2439
    # Change the instance lock. This is definitely safe while we hold the BGL
2440
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2441
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2442

    
2443
    # re-read the instance from the configuration after rename
2444
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2445

    
2446
    if inst.disk_template == constants.DT_FILE:
2447
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2448
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2449
                                                old_file_storage_dir,
2450
                                                new_file_storage_dir)
2451

    
2452
      if not result:
2453
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2454
                                 " directory '%s' to '%s' (but the instance"
2455
                                 " has been renamed in Ganeti)" % (
2456
                                 inst.primary_node, old_file_storage_dir,
2457
                                 new_file_storage_dir))
2458

    
2459
      if not result[0]:
2460
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2461
                                 " (but the instance has been renamed in"
2462
                                 " Ganeti)" % (old_file_storage_dir,
2463
                                               new_file_storage_dir))
2464

    
2465
    _StartInstanceDisks(self.cfg, inst, None)
2466
    try:
2467
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2468
                                          "sda", "sdb"):
2469
        msg = ("Could not run OS rename script for instance %s on node %s"
2470
               " (but the instance has been renamed in Ganeti)" %
2471
               (inst.name, inst.primary_node))
2472
        logger.Error(msg)
2473
    finally:
2474
      _ShutdownInstanceDisks(inst, self.cfg)
2475

    
2476

    
2477
class LURemoveInstance(LogicalUnit):
2478
  """Remove an instance.
2479

2480
  """
2481
  HPATH = "instance-remove"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name", "ignore_failures"]
2484

    
2485
  def BuildHooksEnv(self):
2486
    """Build hooks env.
2487

2488
    This runs on master, primary and secondary nodes of the instance.
2489

2490
    """
2491
    env = _BuildInstanceHookEnvByObject(self.instance)
2492
    nl = [self.sstore.GetMasterNode()]
2493
    return env, nl, nl
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster.
2499

2500
    """
2501
    instance = self.cfg.GetInstanceInfo(
2502
      self.cfg.ExpandInstanceName(self.op.instance_name))
2503
    if instance is None:
2504
      raise errors.OpPrereqError("Instance '%s' not known" %
2505
                                 self.op.instance_name)
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Remove the instance.
2510

2511
    """
2512
    instance = self.instance
2513
    logger.Info("shutting down instance %s on node %s" %
2514
                (instance.name, instance.primary_node))
2515

    
2516
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2517
      if self.op.ignore_failures:
2518
        feedback_fn("Warning: can't shutdown instance")
2519
      else:
2520
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2521
                                 (instance.name, instance.primary_node))
2522

    
2523
    logger.Info("removing block devices for instance %s" % instance.name)
2524

    
2525
    if not _RemoveDisks(instance, self.cfg):
2526
      if self.op.ignore_failures:
2527
        feedback_fn("Warning: can't remove instance's disks")
2528
      else:
2529
        raise errors.OpExecError("Can't remove instance's disks")
2530

    
2531
    logger.Info("removing instance %s out of cluster config" % instance.name)
2532

    
2533
    self.cfg.RemoveInstance(instance.name)
2534
    # Remove the new instance from the Ganeti Lock Manager
2535
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2536

    
2537

    
2538
class LUQueryInstances(NoHooksLU):
2539
  """Logical unit for querying instances.
2540

2541
  """
2542
  _OP_REQP = ["output_fields", "names"]
2543
  REQ_BGL = False
2544

    
2545
  def ExpandNames(self):
2546
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2547
    self.static_fields = frozenset([
2548
      "name", "os", "pnode", "snodes",
2549
      "admin_state", "admin_ram",
2550
      "disk_template", "ip", "mac", "bridge",
2551
      "sda_size", "sdb_size", "vcpus", "tags",
2552
      "auto_balance",
2553
      "network_port", "kernel_path", "initrd_path",
2554
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2555
      "hvm_cdrom_image_path", "hvm_nic_type",
2556
      "hvm_disk_type", "vnc_bind_address",
2557
      ])
2558
    _CheckOutputFields(static=self.static_fields,
2559
                       dynamic=self.dynamic_fields,
2560
                       selected=self.op.output_fields)
2561

    
2562
    self.needed_locks = {}
2563
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2564
    self.share_locks[locking.LEVEL_NODE] = 1
2565

    
2566
    if self.op.names:
2567
      self.wanted = _GetWantedInstances(self, self.op.names)
2568
    else:
2569
      self.wanted = locking.ALL_SET
2570

    
2571
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2572
    if self.do_locking:
2573
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2574
      self.needed_locks[locking.LEVEL_NODE] = []
2575
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2576

    
2577
  def DeclareLocks(self, level):
2578
    if level == locking.LEVEL_NODE and self.do_locking:
2579
      self._LockInstancesNodes()
2580

    
2581
  def CheckPrereq(self):
2582
    """Check prerequisites.
2583

2584
    """
2585
    pass
2586

    
2587
  def Exec(self, feedback_fn):
2588
    """Computes the list of nodes and their attributes.
2589

2590
    """
2591
    all_info = self.cfg.GetAllInstancesInfo()
2592
    if self.do_locking:
2593
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2594
    else:
2595
      instance_names = all_info.keys()
2596
    instance_list = [all_info[iname] for iname in instance_names]
2597

    
2598
    # begin data gathering
2599

    
2600
    nodes = frozenset([inst.primary_node for inst in instance_list])
2601

    
2602
    bad_nodes = []
2603
    if self.dynamic_fields.intersection(self.op.output_fields):
2604
      live_data = {}
2605
      node_data = rpc.call_all_instances_info(nodes)
2606
      for name in nodes:
2607
        result = node_data[name]
2608
        if result:
2609
          live_data.update(result)
2610
        elif result == False:
2611
          bad_nodes.append(name)
2612
        # else no instance is alive
2613
    else:
2614
      live_data = dict([(name, {}) for name in instance_names])
2615

    
2616
    # end data gathering
2617

    
2618
    output = []
2619
    for instance in instance_list:
2620
      iout = []
2621
      for field in self.op.output_fields:
2622
        if field == "name":
2623
          val = instance.name
2624
        elif field == "os":
2625
          val = instance.os
2626
        elif field == "pnode":
2627
          val = instance.primary_node
2628
        elif field == "snodes":
2629
          val = list(instance.secondary_nodes)
2630
        elif field == "admin_state":
2631
          val = (instance.status != "down")
2632
        elif field == "oper_state":
2633
          if instance.primary_node in bad_nodes:
2634
            val = None
2635
          else:
2636
            val = bool(live_data.get(instance.name))
2637
        elif field == "status":
2638
          if instance.primary_node in bad_nodes:
2639
            val = "ERROR_nodedown"
2640
          else:
2641
            running = bool(live_data.get(instance.name))
2642
            if running:
2643
              if instance.status != "down":
2644
                val = "running"
2645
              else:
2646
                val = "ERROR_up"
2647
            else:
2648
              if instance.status != "down":
2649
                val = "ERROR_down"
2650
              else:
2651
                val = "ADMIN_down"
2652
        elif field == "admin_ram":
2653
          val = instance.memory
2654
        elif field == "oper_ram":
2655
          if instance.primary_node in bad_nodes:
2656
            val = None
2657
          elif instance.name in live_data:
2658
            val = live_data[instance.name].get("memory", "?")
2659
          else:
2660
            val = "-"
2661
        elif field == "disk_template":
2662
          val = instance.disk_template
2663
        elif field == "ip":
2664
          val = instance.nics[0].ip
2665
        elif field == "bridge":
2666
          val = instance.nics[0].bridge
2667
        elif field == "mac":
2668
          val = instance.nics[0].mac
2669
        elif field == "sda_size" or field == "sdb_size":
2670
          disk = instance.FindDisk(field[:3])
2671
          if disk is None:
2672
            val = None
2673
          else:
2674
            val = disk.size
2675
        elif field == "vcpus":
2676
          val = instance.vcpus
2677
        elif field == "tags":
2678
          val = list(instance.GetTags())
2679
        elif field in ("network_port", "kernel_path", "initrd_path",
2680
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2681
                       "hvm_cdrom_image_path", "hvm_nic_type",
2682
                       "hvm_disk_type", "vnc_bind_address"):
2683
          val = getattr(instance, field, None)
2684
          if val is not None:
2685
            pass
2686
          elif field in ("hvm_nic_type", "hvm_disk_type",
2687
                         "kernel_path", "initrd_path"):
2688
            val = "default"
2689
          else:
2690
            val = "-"
2691
        else:
2692
          raise errors.ParameterError(field)
2693
        iout.append(val)
2694
      output.append(iout)
2695

    
2696
    return output
2697

    
2698

    
2699
class LUFailoverInstance(LogicalUnit):
2700
  """Failover an instance.
2701

2702
  """
2703
  HPATH = "instance-failover"
2704
  HTYPE = constants.HTYPE_INSTANCE
2705
  _OP_REQP = ["instance_name", "ignore_consistency"]
2706
  REQ_BGL = False
2707

    
2708
  def ExpandNames(self):
2709
    self._ExpandAndLockInstance()
2710
    self.needed_locks[locking.LEVEL_NODE] = []
2711
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2712

    
2713
  def DeclareLocks(self, level):
2714
    if level == locking.LEVEL_NODE:
2715
      self._LockInstancesNodes()
2716

    
2717
  def BuildHooksEnv(self):
2718
    """Build hooks env.
2719

2720
    This runs on master, primary and secondary nodes of the instance.
2721

2722
    """
2723
    env = {
2724
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2725
      }
2726
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2727
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2728
    return env, nl, nl
2729

    
2730
  def CheckPrereq(self):
2731
    """Check prerequisites.
2732

2733
    This checks that the instance is in the cluster.
2734

2735
    """
2736
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2737
    assert self.instance is not None, \
2738
      "Cannot retrieve locked instance %s" % self.op.instance_name
2739

    
2740
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2741
      raise errors.OpPrereqError("Instance's disk layout is not"
2742
                                 " network mirrored, cannot failover.")
2743

    
2744
    secondary_nodes = instance.secondary_nodes
2745
    if not secondary_nodes:
2746
      raise errors.ProgrammerError("no secondary node but using "
2747
                                   "a mirrored disk template")
2748

    
2749
    target_node = secondary_nodes[0]
2750
    # check memory requirements on the secondary node
2751
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2752
                         instance.name, instance.memory)
2753

    
2754
    # check bridge existance
2755
    brlist = [nic.bridge for nic in instance.nics]
2756
    if not rpc.call_bridges_exist(target_node, brlist):
2757
      raise errors.OpPrereqError("One or more target bridges %s does not"
2758
                                 " exist on destination node '%s'" %
2759
                                 (brlist, target_node))
2760

    
2761
  def Exec(self, feedback_fn):
2762
    """Failover an instance.
2763

2764
    The failover is done by shutting it down on its present node and
2765
    starting it on the secondary.
2766

2767
    """
2768
    instance = self.instance
2769

    
2770
    source_node = instance.primary_node
2771
    target_node = instance.secondary_nodes[0]
2772

    
2773
    feedback_fn("* checking disk consistency between source and target")
2774
    for dev in instance.disks:
2775
      # for drbd, these are drbd over lvm
2776
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2777
        if instance.status == "up" and not self.op.ignore_consistency:
2778
          raise errors.OpExecError("Disk %s is degraded on target node,"
2779
                                   " aborting failover." % dev.iv_name)
2780

    
2781
    feedback_fn("* shutting down instance on source node")
2782
    logger.Info("Shutting down instance %s on node %s" %
2783
                (instance.name, source_node))
2784

    
2785
    if not rpc.call_instance_shutdown(source_node, instance):
2786
      if self.op.ignore_consistency:
2787
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2788
                     " anyway. Please make sure node %s is down"  %
2789
                     (instance.name, source_node, source_node))
2790
      else:
2791
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2792
                                 (instance.name, source_node))
2793

    
2794
    feedback_fn("* deactivating the instance's disks on source node")
2795
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2796
      raise errors.OpExecError("Can't shut down the instance's disks.")
2797

    
2798
    instance.primary_node = target_node
2799
    # distribute new instance config to the other nodes
2800
    self.cfg.Update(instance)
2801

    
2802
    # Only start the instance if it's marked as up
2803
    if instance.status == "up":
2804
      feedback_fn("* activating the instance's disks on target node")
2805
      logger.Info("Starting instance %s on node %s" %
2806
                  (instance.name, target_node))
2807

    
2808
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2809
                                               ignore_secondaries=True)
2810
      if not disks_ok:
2811
        _ShutdownInstanceDisks(instance, self.cfg)
2812
        raise errors.OpExecError("Can't activate the instance's disks")
2813

    
2814
      feedback_fn("* starting the instance on the target node")
2815
      if not rpc.call_instance_start(target_node, instance, None):
2816
        _ShutdownInstanceDisks(instance, self.cfg)
2817
        raise errors.OpExecError("Could not start instance %s on node %s." %
2818
                                 (instance.name, target_node))
2819

    
2820

    
2821
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2822
  """Create a tree of block devices on the primary node.
2823

2824
  This always creates all devices.
2825

2826
  """
2827
  if device.children:
2828
    for child in device.children:
2829
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2830
        return False
2831

    
2832
  cfg.SetDiskID(device, node)
2833
  new_id = rpc.call_blockdev_create(node, device, device.size,
2834
                                    instance.name, True, info)
2835
  if not new_id:
2836
    return False
2837
  if device.physical_id is None:
2838
    device.physical_id = new_id
2839
  return True
2840

    
2841

    
2842
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2843
  """Create a tree of block devices on a secondary node.
2844

2845
  If this device type has to be created on secondaries, create it and
2846
  all its children.
2847

2848
  If not, just recurse to children keeping the same 'force' value.
2849

2850
  """
2851
  if device.CreateOnSecondary():
2852
    force = True
2853
  if device.children:
2854
    for child in device.children:
2855
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2856
                                        child, force, info):
2857
        return False
2858

    
2859
  if not force:
2860
    return True
2861
  cfg.SetDiskID(device, node)
2862
  new_id = rpc.call_blockdev_create(node, device, device.size,
2863
                                    instance.name, False, info)
2864
  if not new_id:
2865
    return False
2866
  if device.physical_id is None:
2867
    device.physical_id = new_id
2868
  return True
2869

    
2870

    
2871
def _GenerateUniqueNames(cfg, exts):
2872
  """Generate a suitable LV name.
2873

2874
  This will generate a logical volume name for the given instance.
2875

2876
  """
2877
  results = []
2878
  for val in exts:
2879
    new_id = cfg.GenerateUniqueID()
2880
    results.append("%s%s" % (new_id, val))
2881
  return results
2882

    
2883

    
2884
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2885
  """Generate a drbd8 device complete with its children.
2886

2887
  """
2888
  port = cfg.AllocatePort()
2889
  vgname = cfg.GetVGName()
2890
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2891
                          logical_id=(vgname, names[0]))
2892
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2893
                          logical_id=(vgname, names[1]))
2894
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2895
                          logical_id = (primary, secondary, port),
2896
                          children = [dev_data, dev_meta],
2897
                          iv_name=iv_name)
2898
  return drbd_dev
2899

    
2900

    
2901
def _GenerateDiskTemplate(cfg, template_name,
2902
                          instance_name, primary_node,
2903
                          secondary_nodes, disk_sz, swap_sz,
2904
                          file_storage_dir, file_driver):
2905
  """Generate the entire disk layout for a given template type.
2906

2907
  """
2908
  #TODO: compute space requirements
2909

    
2910
  vgname = cfg.GetVGName()
2911
  if template_name == constants.DT_DISKLESS:
2912
    disks = []
2913
  elif template_name == constants.DT_PLAIN:
2914
    if len(secondary_nodes) != 0:
2915
      raise errors.ProgrammerError("Wrong template configuration")
2916

    
2917
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2918
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2919
                           logical_id=(vgname, names[0]),
2920
                           iv_name = "sda")
2921
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2922
                           logical_id=(vgname, names[1]),
2923
                           iv_name = "sdb")
2924
    disks = [sda_dev, sdb_dev]
2925
  elif template_name == constants.DT_DRBD8:
2926
    if len(secondary_nodes) != 1:
2927
      raise errors.ProgrammerError("Wrong template configuration")
2928
    remote_node = secondary_nodes[0]
2929
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2930
                                       ".sdb_data", ".sdb_meta"])
2931
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2932
                                         disk_sz, names[0:2], "sda")
2933
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2934
                                         swap_sz, names[2:4], "sdb")
2935
    disks = [drbd_sda_dev, drbd_sdb_dev]
2936
  elif template_name == constants.DT_FILE:
2937
    if len(secondary_nodes) != 0:
2938
      raise errors.ProgrammerError("Wrong template configuration")
2939

    
2940
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2941
                                iv_name="sda", logical_id=(file_driver,
2942
                                "%s/sda" % file_storage_dir))
2943
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2944
                                iv_name="sdb", logical_id=(file_driver,
2945
                                "%s/sdb" % file_storage_dir))
2946
    disks = [file_sda_dev, file_sdb_dev]
2947
  else:
2948
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2949
  return disks
2950

    
2951

    
2952
def _GetInstanceInfoText(instance):
2953
  """Compute that text that should be added to the disk's metadata.
2954

2955
  """
2956
  return "originstname+%s" % instance.name
2957

    
2958

    
2959
def _CreateDisks(cfg, instance):
2960
  """Create all disks for an instance.
2961

2962
  This abstracts away some work from AddInstance.
2963

2964
  Args:
2965
    instance: the instance object
2966

2967
  Returns:
2968
    True or False showing the success of the creation process
2969

2970
  """
2971
  info = _GetInstanceInfoText(instance)
2972

    
2973
  if instance.disk_template == constants.DT_FILE:
2974
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2975
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2976
                                              file_storage_dir)
2977

    
2978
    if not result:
2979
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2980
      return False
2981

    
2982
    if not result[0]:
2983
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2984
      return False
2985

    
2986
  for device in instance.disks:
2987
    logger.Info("creating volume %s for instance %s" %
2988
                (device.iv_name, instance.name))
2989
    #HARDCODE
2990
    for secondary_node in instance.secondary_nodes:
2991
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2992
                                        device, False, info):
2993
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2994
                     (device.iv_name, device, secondary_node))
2995
        return False
2996
    #HARDCODE
2997
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2998
                                    instance, device, info):
2999
      logger.Error("failed to create volume %s on primary!" %
3000
                   device.iv_name)
3001
      return False
3002

    
3003
  return True
3004

    
3005

    
3006
def _RemoveDisks(instance, cfg):
3007
  """Remove all disks for an instance.
3008

3009
  This abstracts away some work from `AddInstance()` and
3010
  `RemoveInstance()`. Note that in case some of the devices couldn't
3011
  be removed, the removal will continue with the other ones (compare
3012
  with `_CreateDisks()`).
3013

3014
  Args:
3015
    instance: the instance object
3016

3017
  Returns:
3018
    True or False showing the success of the removal proces
3019

3020
  """
3021
  logger.Info("removing block devices for instance %s" % instance.name)
3022

    
3023
  result = True
3024
  for device in instance.disks:
3025
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3026
      cfg.SetDiskID(disk, node)
3027
      if not rpc.call_blockdev_remove(node, disk):
3028
        logger.Error("could not remove block device %s on node %s,"
3029
                     " continuing anyway" %
3030
                     (device.iv_name, node))
3031
        result = False
3032

    
3033
  if instance.disk_template == constants.DT_FILE:
3034
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3035
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3036
                                            file_storage_dir):
3037
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3038
      result = False
3039

    
3040
  return result
3041

    
3042

    
3043
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3044
  """Compute disk size requirements in the volume group
3045

3046
  This is currently hard-coded for the two-drive layout.
3047

3048
  """
3049
  # Required free disk space as a function of disk and swap space
3050
  req_size_dict = {
3051
    constants.DT_DISKLESS: None,
3052
    constants.DT_PLAIN: disk_size + swap_size,
3053
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3054
    constants.DT_DRBD8: disk_size + swap_size + 256,
3055
    constants.DT_FILE: None,
3056
  }
3057

    
3058
  if disk_template not in req_size_dict:
3059
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3060
                                 " is unknown" %  disk_template)
3061

    
3062
  return req_size_dict[disk_template]
3063

    
3064

    
3065
class LUCreateInstance(LogicalUnit):
3066
  """Create an instance.
3067

3068
  """
3069
  HPATH = "instance-add"
3070
  HTYPE = constants.HTYPE_INSTANCE
3071
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3072
              "disk_template", "swap_size", "mode", "start", "vcpus",
3073
              "wait_for_sync", "ip_check", "mac"]
3074
  REQ_BGL = False
3075

    
3076
  def _ExpandNode(self, node):
3077
    """Expands and checks one node name.
3078

3079
    """
3080
    node_full = self.cfg.ExpandNodeName(node)
3081
    if node_full is None:
3082
      raise errors.OpPrereqError("Unknown node %s" % node)
3083
    return node_full
3084

    
3085
  def ExpandNames(self):
3086
    """ExpandNames for CreateInstance.
3087

3088
    Figure out the right locks for instance creation.
3089

3090
    """
3091
    self.needed_locks = {}
3092

    
3093
    # set optional parameters to none if they don't exist
3094
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3095
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3096
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3097
                 "vnc_bind_address"]:
3098
      if not hasattr(self.op, attr):
3099
        setattr(self.op, attr, None)
3100

    
3101
    # verify creation mode
3102
    if self.op.mode not in (constants.INSTANCE_CREATE,
3103
                            constants.INSTANCE_IMPORT):
3104
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3105
                                 self.op.mode)
3106
    # disk template and mirror node verification
3107
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3108
      raise errors.OpPrereqError("Invalid disk template name")
3109

    
3110
    #### instance parameters check
3111

    
3112
    # instance name verification
3113
    hostname1 = utils.HostInfo(self.op.instance_name)
3114
    self.op.instance_name = instance_name = hostname1.name
3115

    
3116
    # this is just a preventive check, but someone might still add this
3117
    # instance in the meantime, and creation will fail at lock-add time
3118
    if instance_name in self.cfg.GetInstanceList():
3119
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3120
                                 instance_name)
3121

    
3122
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3123

    
3124
    # ip validity checks
3125
    ip = getattr(self.op, "ip", None)
3126
    if ip is None or ip.lower() == "none":
3127
      inst_ip = None
3128
    elif ip.lower() == "auto":
3129
      inst_ip = hostname1.ip
3130
    else:
3131
      if not utils.IsValidIP(ip):
3132
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3133
                                   " like a valid IP" % ip)
3134
      inst_ip = ip
3135
    self.inst_ip = self.op.ip = inst_ip
3136
    # used in CheckPrereq for ip ping check
3137
    self.check_ip = hostname1.ip
3138

    
3139
    # MAC address verification
3140
    if self.op.mac != "auto":
3141
      if not utils.IsValidMac(self.op.mac.lower()):
3142
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3143
                                   self.op.mac)
3144

    
3145
    # boot order verification
3146
    if self.op.hvm_boot_order is not None:
3147
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3148
        raise errors.OpPrereqError("invalid boot order specified,"
3149
                                   " must be one or more of [acdn]")
3150
    # file storage checks
3151
    if (self.op.file_driver and
3152
        not self.op.file_driver in constants.FILE_DRIVER):
3153
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3154
                                 self.op.file_driver)
3155

    
3156
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3157
      raise errors.OpPrereqError("File storage directory path not absolute")
3158

    
3159
    ### Node/iallocator related checks
3160
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3161
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3162
                                 " node must be given")
3163

    
3164
    if self.op.iallocator:
3165
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3166
    else:
3167
      self.op.pnode = self._ExpandNode(self.op.pnode)
3168
      nodelist = [self.op.pnode]
3169
      if self.op.snode is not None:
3170
        self.op.snode = self._ExpandNode(self.op.snode)
3171
        nodelist.append(self.op.snode)
3172
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3173

    
3174
    # in case of import lock the source node too
3175
    if self.op.mode == constants.INSTANCE_IMPORT:
3176
      src_node = getattr(self.op, "src_node", None)
3177
      src_path = getattr(self.op, "src_path", None)
3178

    
3179
      if src_node is None or src_path is None:
3180
        raise errors.OpPrereqError("Importing an instance requires source"
3181
                                   " node and path options")
3182

    
3183
      if not os.path.isabs(src_path):
3184
        raise errors.OpPrereqError("The source path must be absolute")
3185

    
3186
      self.op.src_node = src_node = self._ExpandNode(src_node)
3187
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3188
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3189

    
3190
    else: # INSTANCE_CREATE
3191
      if getattr(self.op, "os_type", None) is None:
3192
        raise errors.OpPrereqError("No guest OS specified")
3193

    
3194
  def _RunAllocator(self):
3195
    """Run the allocator based on input opcode.
3196

3197
    """
3198
    disks = [{"size": self.op.disk_size, "mode": "w"},
3199
             {"size": self.op.swap_size, "mode": "w"}]
3200
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3201
             "bridge": self.op.bridge}]
3202
    ial = IAllocator(self.cfg, self.sstore,
3203
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3204
                     name=self.op.instance_name,
3205
                     disk_template=self.op.disk_template,
3206
                     tags=[],
3207
                     os=self.op.os_type,
3208
                     vcpus=self.op.vcpus,
3209
                     mem_size=self.op.mem_size,
3210
                     disks=disks,
3211
                     nics=nics,
3212
                     )
3213

    
3214
    ial.Run(self.op.iallocator)
3215

    
3216
    if not ial.success:
3217
      raise errors.OpPrereqError("Can't compute nodes using"
3218
                                 " iallocator '%s': %s" % (self.op.iallocator,
3219
                                                           ial.info))
3220
    if len(ial.nodes) != ial.required_nodes:
3221
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3222
                                 " of nodes (%s), required %s" %
3223
                                 (len(ial.nodes), ial.required_nodes))
3224
    self.op.pnode = ial.nodes[0]
3225
    logger.ToStdout("Selected nodes for the instance: %s" %
3226
                    (", ".join(ial.nodes),))
3227
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3228
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3229
    if ial.required_nodes == 2:
3230
      self.op.snode = ial.nodes[1]
3231

    
3232
  def BuildHooksEnv(self):
3233
    """Build hooks env.
3234

3235
    This runs on master, primary and secondary nodes of the instance.
3236

3237
    """
3238
    env = {
3239
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3240
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3241
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3242
      "INSTANCE_ADD_MODE": self.op.mode,
3243
      }
3244
    if self.op.mode == constants.INSTANCE_IMPORT:
3245
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3246
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3247
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3248

    
3249
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3250
      primary_node=self.op.pnode,
3251
      secondary_nodes=self.secondaries,
3252
      status=self.instance_status,
3253
      os_type=self.op.os_type,
3254
      memory=self.op.mem_size,
3255
      vcpus=self.op.vcpus,
3256
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3257
    ))
3258

    
3259
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3260
          self.secondaries)
3261
    return env, nl, nl
3262

    
3263

    
3264
  def CheckPrereq(self):
3265
    """Check prerequisites.
3266

3267
    """
3268
    if (not self.cfg.GetVGName() and
3269
        self.op.disk_template not in constants.DTS_NOT_LVM):
3270
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3271
                                 " instances")
3272

    
3273
    if self.op.mode == constants.INSTANCE_IMPORT:
3274
      src_node = self.op.src_node
3275
      src_path = self.op.src_path
3276

    
3277
      export_info = rpc.call_export_info(src_node, src_path)
3278

    
3279
      if not export_info:
3280
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3281

    
3282
      if not export_info.has_section(constants.INISECT_EXP):
3283
        raise errors.ProgrammerError("Corrupted export config")
3284

    
3285
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3286
      if (int(ei_version) != constants.EXPORT_VERSION):
3287
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3288
                                   (ei_version, constants.EXPORT_VERSION))
3289

    
3290
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3291
        raise errors.OpPrereqError("Can't import instance with more than"
3292
                                   " one data disk")
3293

    
3294
      # FIXME: are the old os-es, disk sizes, etc. useful?
3295
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3296
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3297
                                                         'disk0_dump'))
3298
      self.src_image = diskimage
3299

    
3300
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3301

    
3302
    if self.op.start and not self.op.ip_check:
3303
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3304
                                 " adding an instance in start mode")
3305

    
3306
    if self.op.ip_check:
3307
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3308
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3309
                                   (self.check_ip, instance_name))
3310

    
3311
    # bridge verification
3312
    bridge = getattr(self.op, "bridge", None)
3313
    if bridge is None:
3314
      self.op.bridge = self.cfg.GetDefBridge()
3315
    else:
3316
      self.op.bridge = bridge
3317

    
3318
    #### allocator run
3319

    
3320
    if self.op.iallocator is not None:
3321
      self._RunAllocator()
3322

    
3323
    #### node related checks
3324

    
3325
    # check primary node
3326
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3327
    assert self.pnode is not None, \
3328
      "Cannot retrieve locked node %s" % self.op.pnode
3329
    self.secondaries = []
3330

    
3331
    # mirror node verification
3332
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3333
      if self.op.snode is None:
3334
        raise errors.OpPrereqError("The networked disk templates need"
3335
                                   " a mirror node")
3336
      if self.op.snode == pnode.name:
3337
        raise errors.OpPrereqError("The secondary node cannot be"
3338
                                   " the primary node.")
3339
      self.secondaries.append(self.op.snode)
3340

    
3341
    req_size = _ComputeDiskSize(self.op.disk_template,
3342
                                self.op.disk_size, self.op.swap_size)
3343

    
3344
    # Check lv size requirements
3345
    if req_size is not None:
3346
      nodenames = [pnode.name] + self.secondaries
3347
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3348
      for node in nodenames:
3349
        info = nodeinfo.get(node, None)
3350
        if not info:
3351
          raise errors.OpPrereqError("Cannot get current information"
3352
                                     " from node '%s'" % node)
3353
        vg_free = info.get('vg_free', None)
3354
        if not isinstance(vg_free, int):
3355
          raise errors.OpPrereqError("Can't compute free disk space on"
3356
                                     " node %s" % node)
3357
        if req_size > info['vg_free']:
3358
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3359
                                     " %d MB available, %d MB required" %
3360
                                     (node, info['vg_free'], req_size))
3361

    
3362
    # os verification
3363
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3364
    if not os_obj:
3365
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3366
                                 " primary node"  % self.op.os_type)
3367

    
3368
    if self.op.kernel_path == constants.VALUE_NONE:
3369
      raise errors.OpPrereqError("Can't set instance kernel to none")
3370

    
3371
    # bridge check on primary node
3372
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3373
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3374
                                 " destination node '%s'" %
3375
                                 (self.op.bridge, pnode.name))
3376

    
3377
    # memory check on primary node
3378
    if self.op.start:
3379
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3380
                           "creating instance %s" % self.op.instance_name,
3381
                           self.op.mem_size)
3382

    
3383
    # hvm_cdrom_image_path verification
3384
    if self.op.hvm_cdrom_image_path is not None:
3385
      # FIXME (als): shouldn't these checks happen on the destination node?
3386
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3387
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3388
                                   " be an absolute path or None, not %s" %
3389
                                   self.op.hvm_cdrom_image_path)
3390
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3391
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3392
                                   " regular file or a symlink pointing to"
3393
                                   " an existing regular file, not %s" %
3394
                                   self.op.hvm_cdrom_image_path)
3395

    
3396
    # vnc_bind_address verification
3397
    if self.op.vnc_bind_address is not None:
3398
      if not utils.IsValidIP(self.op.vnc_bind_address):
3399
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3400
                                   " like a valid IP address" %
3401
                                   self.op.vnc_bind_address)
3402

    
3403
    # Xen HVM device type checks
3404
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3405
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3406
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3407
                                   " hypervisor" % self.op.hvm_nic_type)
3408
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3409
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3410
                                   " hypervisor" % self.op.hvm_disk_type)
3411

    
3412
    if self.op.start:
3413
      self.instance_status = 'up'
3414
    else:
3415
      self.instance_status = 'down'
3416

    
3417
  def Exec(self, feedback_fn):
3418
    """Create and add the instance to the cluster.
3419

3420
    """
3421
    instance = self.op.instance_name
3422
    pnode_name = self.pnode.name
3423

    
3424
    if self.op.mac == "auto":
3425
      mac_address = self.cfg.GenerateMAC()
3426
    else:
3427
      mac_address = self.op.mac
3428

    
3429
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3430
    if self.inst_ip is not None:
3431
      nic.ip = self.inst_ip
3432

    
3433
    ht_kind = self.sstore.GetHypervisorType()
3434
    if ht_kind in constants.HTS_REQ_PORT:
3435
      network_port = self.cfg.AllocatePort()
3436
    else:
3437
      network_port = None
3438

    
3439
    if self.op.vnc_bind_address is None:
3440
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3441

    
3442
    # this is needed because os.path.join does not accept None arguments
3443
    if self.op.file_storage_dir is None:
3444
      string_file_storage_dir = ""
3445
    else:
3446
      string_file_storage_dir = self.op.file_storage_dir
3447

    
3448
    # build the full file storage dir path
3449
    file_storage_dir = os.path.normpath(os.path.join(
3450
                                        self.sstore.GetFileStorageDir(),
3451
                                        string_file_storage_dir, instance))
3452

    
3453

    
3454
    disks = _GenerateDiskTemplate(self.cfg,
3455
                                  self.op.disk_template,
3456
                                  instance, pnode_name,
3457
                                  self.secondaries, self.op.disk_size,
3458
                                  self.op.swap_size,
3459
                                  file_storage_dir,
3460
                                  self.op.file_driver)
3461

    
3462
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3463
                            primary_node=pnode_name,
3464
                            memory=self.op.mem_size,
3465
                            vcpus=self.op.vcpus,
3466
                            nics=[nic], disks=disks,
3467
                            disk_template=self.op.disk_template,
3468
                            status=self.instance_status,
3469
                            network_port=network_port,
3470
                            kernel_path=self.op.kernel_path,
3471
                            initrd_path=self.op.initrd_path,
3472
                            hvm_boot_order=self.op.hvm_boot_order,
3473
                            hvm_acpi=self.op.hvm_acpi,
3474
                            hvm_pae=self.op.hvm_pae,
3475
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3476
                            vnc_bind_address=self.op.vnc_bind_address,
3477
                            hvm_nic_type=self.op.hvm_nic_type,
3478
                            hvm_disk_type=self.op.hvm_disk_type,
3479
                            )
3480

    
3481
    feedback_fn("* creating instance disks...")
3482
    if not _CreateDisks(self.cfg, iobj):
3483
      _RemoveDisks(iobj, self.cfg)
3484
      raise errors.OpExecError("Device creation failed, reverting...")
3485

    
3486
    feedback_fn("adding instance %s to cluster config" % instance)
3487

    
3488
    self.cfg.AddInstance(iobj)
3489
    # Declare that we don't want to remove the instance lock anymore, as we've
3490
    # added the instance to the config
3491
    del self.remove_locks[locking.LEVEL_INSTANCE]
3492

    
3493
    if self.op.wait_for_sync:
3494
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3495
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3496
      # make sure the disks are not degraded (still sync-ing is ok)
3497
      time.sleep(15)
3498
      feedback_fn("* checking mirrors status")
3499
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3500
    else:
3501
      disk_abort = False
3502

    
3503
    if disk_abort:
3504
      _RemoveDisks(iobj, self.cfg)
3505
      self.cfg.RemoveInstance(iobj.name)
3506
      # Make sure the instance lock gets removed
3507
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3508
      raise errors.OpExecError("There are some degraded disks for"
3509
                               " this instance")
3510

    
3511
    feedback_fn("creating os for instance %s on node %s" %
3512
                (instance, pnode_name))
3513

    
3514
    if iobj.disk_template != constants.DT_DISKLESS:
3515
      if self.op.mode == constants.INSTANCE_CREATE:
3516
        feedback_fn("* running the instance OS create scripts...")
3517
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3518
          raise errors.OpExecError("could not add os for instance %s"
3519
                                   " on node %s" %
3520
                                   (instance, pnode_name))
3521

    
3522
      elif self.op.mode == constants.INSTANCE_IMPORT:
3523
        feedback_fn("* running the instance OS import scripts...")
3524
        src_node = self.op.src_node
3525
        src_image = self.src_image
3526
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3527
                                                src_node, src_image):
3528
          raise errors.OpExecError("Could not import os for instance"
3529
                                   " %s on node %s" %
3530
                                   (instance, pnode_name))
3531
      else:
3532
        # also checked in the prereq part
3533
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3534
                                     % self.op.mode)
3535

    
3536
    if self.op.start:
3537
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3538
      feedback_fn("* starting instance...")
3539
      if not rpc.call_instance_start(pnode_name, iobj, None):
3540
        raise errors.OpExecError("Could not start instance")
3541

    
3542

    
3543
class LUConnectConsole(NoHooksLU):
3544
  """Connect to an instance's console.
3545

3546
  This is somewhat special in that it returns the command line that
3547
  you need to run on the master node in order to connect to the
3548
  console.
3549

3550
  """
3551
  _OP_REQP = ["instance_name"]
3552
  REQ_BGL = False
3553

    
3554
  def ExpandNames(self):
3555
    self._ExpandAndLockInstance()
3556

    
3557
  def CheckPrereq(self):
3558
    """Check prerequisites.
3559

3560
    This checks that the instance is in the cluster.
3561

3562
    """
3563
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3564
    assert self.instance is not None, \
3565
      "Cannot retrieve locked instance %s" % self.op.instance_name
3566

    
3567
  def Exec(self, feedback_fn):
3568
    """Connect to the console of an instance
3569

3570
    """
3571
    instance = self.instance
3572
    node = instance.primary_node
3573

    
3574
    node_insts = rpc.call_instance_list([node])[node]
3575
    if node_insts is False:
3576
      raise errors.OpExecError("Can't connect to node %s." % node)
3577

    
3578
    if instance.name not in node_insts:
3579
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3580

    
3581
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3582

    
3583
    hyper = hypervisor.GetHypervisor()
3584
    console_cmd = hyper.GetShellCommandForConsole(instance)
3585

    
3586
    # build ssh cmdline
3587
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3588

    
3589

    
3590
class LUReplaceDisks(LogicalUnit):
3591
  """Replace the disks of an instance.
3592

3593
  """
3594
  HPATH = "mirrors-replace"
3595
  HTYPE = constants.HTYPE_INSTANCE
3596
  _OP_REQP = ["instance_name", "mode", "disks"]
3597
  REQ_BGL = False
3598

    
3599
  def ExpandNames(self):
3600
    self._ExpandAndLockInstance()
3601

    
3602
    if not hasattr(self.op, "remote_node"):
3603
      self.op.remote_node = None
3604

    
3605
    ia_name = getattr(self.op, "iallocator", None)
3606
    if ia_name is not None:
3607
      if self.op.remote_node is not None:
3608
        raise errors.OpPrereqError("Give either the iallocator or the new"
3609
                                   " secondary, not both")
3610
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3611
    elif self.op.remote_node is not None:
3612
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3613
      if remote_node is None:
3614
        raise errors.OpPrereqError("Node '%s' not known" %
3615
                                   self.op.remote_node)
3616
      self.op.remote_node = remote_node
3617
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3618
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3619
    else:
3620
      self.needed_locks[locking.LEVEL_NODE] = []
3621
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3622

    
3623
  def DeclareLocks(self, level):
3624
    # If we're not already locking all nodes in the set we have to declare the
3625
    # instance's primary/secondary nodes.
3626
    if (level == locking.LEVEL_NODE and
3627
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3628
      self._LockInstancesNodes()
3629

    
3630
  def _RunAllocator(self):
3631
    """Compute a new secondary node using an IAllocator.
3632

3633
    """
3634
    ial = IAllocator(self.cfg, self.sstore,
3635
                     mode=constants.IALLOCATOR_MODE_RELOC,
3636
                     name=self.op.instance_name,
3637
                     relocate_from=[self.sec_node])
3638

    
3639
    ial.Run(self.op.iallocator)
3640

    
3641
    if not ial.success:
3642
      raise errors.OpPrereqError("Can't compute nodes using"
3643
                                 " iallocator '%s': %s" % (self.op.iallocator,
3644
                                                           ial.info))
3645
    if len(ial.nodes) != ial.required_nodes:
3646
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3647
                                 " of nodes (%s), required %s" %
3648
                                 (len(ial.nodes), ial.required_nodes))
3649
    self.op.remote_node = ial.nodes[0]
3650
    logger.ToStdout("Selected new secondary for the instance: %s" %
3651
                    self.op.remote_node)
3652

    
3653
  def BuildHooksEnv(self):
3654
    """Build hooks env.
3655

3656
    This runs on the master, the primary and all the secondaries.
3657

3658
    """
3659
    env = {
3660
      "MODE": self.op.mode,
3661
      "NEW_SECONDARY": self.op.remote_node,
3662
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3663
      }
3664
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3665
    nl = [
3666
      self.sstore.GetMasterNode(),
3667
      self.instance.primary_node,
3668
      ]
3669
    if self.op.remote_node is not None:
3670
      nl.append(self.op.remote_node)
3671
    return env, nl, nl
3672

    
3673
  def CheckPrereq(self):
3674
    """Check prerequisites.
3675

3676
    This checks that the instance is in the cluster.
3677

3678
    """
3679
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3680
    assert instance is not None, \
3681
      "Cannot retrieve locked instance %s" % self.op.instance_name
3682
    self.instance = instance
3683

    
3684
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3685
      raise errors.OpPrereqError("Instance's disk layout is not"
3686
                                 " network mirrored.")
3687

    
3688
    if len(instance.secondary_nodes) != 1:
3689
      raise errors.OpPrereqError("The instance has a strange layout,"
3690
                                 " expected one secondary but found %d" %
3691
                                 len(instance.secondary_nodes))
3692

    
3693
    self.sec_node = instance.secondary_nodes[0]
3694

    
3695
    ia_name = getattr(self.op, "iallocator", None)
3696
    if ia_name is not None:
3697
      self._RunAllocator()
3698

    
3699
    remote_node = self.op.remote_node
3700
    if remote_node is not None:
3701
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3702
      assert self.remote_node_info is not None, \
3703
        "Cannot retrieve locked node %s" % remote_node
3704
    else:
3705
      self.remote_node_info = None
3706
    if remote_node == instance.primary_node:
3707
      raise errors.OpPrereqError("The specified node is the primary node of"
3708
                                 " the instance.")
3709
    elif remote_node == self.sec_node:
3710
      if self.op.mode == constants.REPLACE_DISK_SEC:
3711
        # this is for DRBD8, where we can't execute the same mode of
3712
        # replacement as for drbd7 (no different port allocated)
3713
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3714
                                   " replacement")
3715
    if instance.disk_template == constants.DT_DRBD8:
3716
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3717
          remote_node is not None):
3718
        # switch to replace secondary mode
3719
        self.op.mode = constants.REPLACE_DISK_SEC
3720

    
3721
      if self.op.mode == constants.REPLACE_DISK_ALL:
3722
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3723
                                   " secondary disk replacement, not"
3724
                                   " both at once")
3725
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3726
        if remote_node is not None:
3727
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3728
                                     " the secondary while doing a primary"
3729
                                     " node disk replacement")
3730
        self.tgt_node = instance.primary_node
3731
        self.oth_node = instance.secondary_nodes[0]
3732
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3733
        self.new_node = remote_node # this can be None, in which case
3734
                                    # we don't change the secondary
3735
        self.tgt_node = instance.secondary_nodes[0]
3736
        self.oth_node = instance.primary_node
3737
      else:
3738
        raise errors.ProgrammerError("Unhandled disk replace mode")
3739

    
3740
    for name in self.op.disks:
3741
      if instance.FindDisk(name) is None:
3742
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3743
                                   (name, instance.name))
3744

    
3745
  def _ExecD8DiskOnly(self, feedback_fn):
3746
    """Replace a disk on the primary or secondary for dbrd8.
3747

3748
    The algorithm for replace is quite complicated:
3749
      - for each disk to be replaced:
3750
        - create new LVs on the target node with unique names
3751
        - detach old LVs from the drbd device
3752
        - rename old LVs to name_replaced.<time_t>
3753
        - rename new LVs to old LVs
3754
        - attach the new LVs (with the old names now) to the drbd device
3755
      - wait for sync across all devices
3756
      - for each modified disk:
3757
        - remove old LVs (which have the name name_replaces.<time_t>)
3758

3759
    Failures are not very well handled.
3760

3761
    """
3762
    steps_total = 6
3763
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3764
    instance = self.instance
3765
    iv_names = {}
3766
    vgname = self.cfg.GetVGName()
3767
    # start of work
3768
    cfg = self.cfg
3769
    tgt_node = self.tgt_node
3770
    oth_node = self.oth_node
3771

    
3772
    # Step: check device activation
3773
    self.proc.LogStep(1, steps_total, "check device existence")
3774
    info("checking volume groups")
3775
    my_vg = cfg.GetVGName()
3776
    results = rpc.call_vg_list([oth_node, tgt_node])
3777
    if not results:
3778
      raise errors.OpExecError("Can't list volume groups on the nodes")
3779
    for node in oth_node, tgt_node:
3780
      res = results.get(node, False)
3781
      if not res or my_vg not in res:
3782
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3783
                                 (my_vg, node))
3784
    for dev in instance.disks:
3785
      if not dev.iv_name in self.op.disks:
3786
        continue
3787
      for node in tgt_node, oth_node:
3788
        info("checking %s on %s" % (dev.iv_name, node))
3789
        cfg.SetDiskID(dev, node)
3790
        if not rpc.call_blockdev_find(node, dev):
3791
          raise errors.OpExecError("Can't find device %s on node %s" %
3792
                                   (dev.iv_name, node))
3793

    
3794
    # Step: check other node consistency
3795
    self.proc.LogStep(2, steps_total, "check peer consistency")
3796
    for dev in instance.disks:
3797
      if not dev.iv_name in self.op.disks:
3798
        continue
3799
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3800
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3801
                                   oth_node==instance.primary_node):
3802
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3803
                                 " to replace disks on this node (%s)" %
3804
                                 (oth_node, tgt_node))
3805

    
3806
    # Step: create new storage
3807
    self.proc.LogStep(3, steps_total, "allocate new storage")
3808
    for dev in instance.disks:
3809
      if not dev.iv_name in self.op.disks:
3810
        continue
3811
      size = dev.size
3812
      cfg.SetDiskID(dev, tgt_node)
3813
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3814
      names = _GenerateUniqueNames(cfg, lv_names)
3815
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3816
                             logical_id=(vgname, names[0]))
3817
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3818
                             logical_id=(vgname, names[1]))
3819
      new_lvs = [lv_data, lv_meta]
3820
      old_lvs = dev.children
3821
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3822
      info("creating new local storage on %s for %s" %
3823
           (tgt_node, dev.iv_name))
3824
      # since we *always* want to create this LV, we use the
3825
      # _Create...OnPrimary (which forces the creation), even if we
3826
      # are talking about the secondary node
3827
      for new_lv in new_lvs:
3828
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3829
                                        _GetInstanceInfoText(instance)):
3830
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3831
                                   " node '%s'" %
3832
                                   (new_lv.logical_id[1], tgt_node))
3833

    
3834
    # Step: for each lv, detach+rename*2+attach
3835
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3836
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3837
      info("detaching %s drbd from local storage" % dev.iv_name)
3838
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3839
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3840
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3841
      #dev.children = []
3842
      #cfg.Update(instance)
3843

    
3844
      # ok, we created the new LVs, so now we know we have the needed
3845
      # storage; as such, we proceed on the target node to rename
3846
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3847
      # using the assumption that logical_id == physical_id (which in
3848
      # turn is the unique_id on that node)
3849

    
3850
      # FIXME(iustin): use a better name for the replaced LVs
3851
      temp_suffix = int(time.time())
3852
      ren_fn = lambda d, suff: (d.physical_id[0],
3853
                                d.physical_id[1] + "_replaced-%s" % suff)
3854
      # build the rename list based on what LVs exist on the node
3855
      rlist = []
3856
      for to_ren in old_lvs:
3857
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3858
        if find_res is not None: # device exists
3859
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3860

    
3861
      info("renaming the old LVs on the target node")
3862
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3863
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3864
      # now we rename the new LVs to the old LVs
3865
      info("renaming the new LVs on the target node")
3866
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3867
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3868
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3869

    
3870
      for old, new in zip(old_lvs, new_lvs):
3871
        new.logical_id = old.logical_id
3872
        cfg.SetDiskID(new, tgt_node)
3873

    
3874
      for disk in old_lvs:
3875
        disk.logical_id = ren_fn(disk, temp_suffix)
3876
        cfg.SetDiskID(disk, tgt_node)
3877

    
3878
      # now that the new lvs have the old name, we can add them to the device
3879
      info("adding new mirror component on %s" % tgt_node)
3880
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3881
        for new_lv in new_lvs:
3882
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3883
            warning("Can't rollback device %s", hint="manually cleanup unused"
3884
                    " logical volumes")
3885
        raise errors.OpExecError("Can't add local storage to drbd")
3886

    
3887
      dev.children = new_lvs
3888
      cfg.Update(instance)
3889

    
3890
    # Step: wait for sync
3891

    
3892
    # this can fail as the old devices are degraded and _WaitForSync
3893
    # does a combined result over all disks, so we don't check its
3894
    # return value
3895
    self.proc.LogStep(5, steps_total, "sync devices")
3896
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3897

    
3898
    # so check manually all the devices
3899
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3900
      cfg.SetDiskID(dev, instance.primary_node)
3901
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3902
      if is_degr:
3903
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3904

    
3905
    # Step: remove old storage
3906
    self.proc.LogStep(6, steps_total, "removing old storage")
3907
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3908
      info("remove logical volumes for %s" % name)
3909
      for lv in old_lvs:
3910
        cfg.SetDiskID(lv, tgt_node)
3911
        if not rpc.call_blockdev_remove(tgt_node, lv):
3912
          warning("Can't remove old LV", hint="manually remove unused LVs")
3913
          continue
3914

    
3915
  def _ExecD8Secondary(self, feedback_fn):
3916
    """Replace the secondary node for drbd8.
3917

3918
    The algorithm for replace is quite complicated:
3919
      - for all disks of the instance:
3920
        - create new LVs on the new node with same names
3921
        - shutdown the drbd device on the old secondary
3922
        - disconnect the drbd network on the primary
3923
        - create the drbd device on the new secondary
3924
        - network attach the drbd on the primary, using an artifice:
3925
          the drbd code for Attach() will connect to the network if it
3926
          finds a device which is connected to the good local disks but
3927
          not network enabled
3928
      - wait for sync across all devices
3929
      - remove all disks from the old secondary
3930

3931
    Failures are not very well handled.
3932

3933
    """
3934
    steps_total = 6
3935
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3936
    instance = self.instance
3937
    iv_names = {}
3938
    vgname = self.cfg.GetVGName()
3939
    # start of work
3940
    cfg = self.cfg
3941
    old_node = self.tgt_node
3942
    new_node = self.new_node
3943
    pri_node = instance.primary_node
3944

    
3945
    # Step: check device activation
3946
    self.proc.LogStep(1, steps_total, "check device existence")
3947
    info("checking volume groups")
3948
    my_vg = cfg.GetVGName()
3949
    results = rpc.call_vg_list([pri_node, new_node])
3950
    if not results:
3951
      raise errors.OpExecError("Can't list volume groups on the nodes")
3952
    for node in pri_node, new_node:
3953
      res = results.get(node, False)
3954
      if not res or my_vg not in res:
3955
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3956
                                 (my_vg, node))
3957
    for dev in instance.disks:
3958
      if not dev.iv_name in self.op.disks:
3959
        continue
3960
      info("checking %s on %s" % (dev.iv_name, pri_node))
3961
      cfg.SetDiskID(dev, pri_node)
3962
      if not rpc.call_blockdev_find(pri_node, dev):
3963
        raise errors.OpExecError("Can't find device %s on node %s" %
3964
                                 (dev.iv_name, pri_node))
3965

    
3966
    # Step: check other node consistency
3967
    self.proc.LogStep(2, steps_total, "check peer consistency")
3968
    for dev in instance.disks:
3969
      if not dev.iv_name in self.op.disks:
3970
        continue
3971
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3972
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3973
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3974
                                 " unsafe to replace the secondary" %
3975
                                 pri_node)
3976

    
3977
    # Step: create new storage
3978
    self.proc.LogStep(3, steps_total, "allocate new storage")
3979
    for dev in instance.disks:
3980
      size = dev.size
3981
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3982
      # since we *always* want to create this LV, we use the
3983
      # _Create...OnPrimary (which forces the creation), even if we
3984
      # are talking about the secondary node
3985
      for new_lv in dev.children:
3986
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3987
                                        _GetInstanceInfoText(instance)):
3988
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3989
                                   " node '%s'" %
3990
                                   (new_lv.logical_id[1], new_node))
3991

    
3992
      iv_names[dev.iv_name] = (dev, dev.children)
3993

    
3994
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3995
    for dev in instance.disks:
3996
      size = dev.size
3997
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3998
      # create new devices on new_node
3999
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4000
                              logical_id=(pri_node, new_node,
4001
                                          dev.logical_id[2]),
4002
                              children=dev.children)
4003
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4004
                                        new_drbd, False,
4005
                                      _GetInstanceInfoText(instance)):
4006
        raise errors.OpExecError("Failed to create new DRBD on"
4007
                                 " node '%s'" % new_node)
4008

    
4009
    for dev in instance.disks:
4010
      # we have new devices, shutdown the drbd on the old secondary
4011
      info("shutting down drbd for %s on old node" % dev.iv_name)
4012
      cfg.SetDiskID(dev, old_node)
4013
      if not rpc.call_blockdev_shutdown(old_node, dev):
4014
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4015
                hint="Please cleanup this device manually as soon as possible")
4016

    
4017
    info("detaching primary drbds from the network (=> standalone)")
4018
    done = 0
4019
    for dev in instance.disks:
4020
      cfg.SetDiskID(dev, pri_node)
4021
      # set the physical (unique in bdev terms) id to None, meaning
4022
      # detach from network
4023
      dev.physical_id = (None,) * len(dev.physical_id)
4024
      # and 'find' the device, which will 'fix' it to match the
4025
      # standalone state
4026
      if rpc.call_blockdev_find(pri_node, dev):
4027
        done += 1
4028
      else:
4029
        warning("Failed to detach drbd %s from network, unusual case" %
4030
                dev.iv_name)
4031

    
4032
    if not done:
4033
      # no detaches succeeded (very unlikely)
4034
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4035

    
4036
    # if we managed to detach at least one, we update all the disks of
4037
    # the instance to point to the new secondary
4038
    info("updating instance configuration")
4039
    for dev in instance.disks:
4040
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4041
      cfg.SetDiskID(dev, pri_node)
4042
    cfg.Update(instance)
4043

    
4044
    # and now perform the drbd attach
4045
    info("attaching primary drbds to new secondary (standalone => connected)")
4046
    failures = []
4047
    for dev in instance.disks:
4048
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4049
      # since the attach is smart, it's enough to 'find' the device,
4050
      # it will automatically activate the network, if the physical_id
4051
      # is correct
4052
      cfg.SetDiskID(dev, pri_node)
4053
      if not rpc.call_blockdev_find(pri_node, dev):
4054
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4055
                "please do a gnt-instance info to see the status of disks")
4056

    
4057
    # this can fail as the old devices are degraded and _WaitForSync
4058
    # does a combined result over all disks, so we don't check its
4059
    # return value
4060
    self.proc.LogStep(5, steps_total, "sync devices")
4061
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4062

    
4063
    # so check manually all the devices
4064
    for name, (dev, old_lvs) in iv_names.iteritems():
4065
      cfg.SetDiskID(dev, pri_node)
4066
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4067
      if is_degr:
4068
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4069

    
4070
    self.proc.LogStep(6, steps_total, "removing old storage")
4071
    for name, (dev, old_lvs) in iv_names.iteritems():
4072
      info("remove logical volumes for %s" % name)
4073
      for lv in old_lvs:
4074
        cfg.SetDiskID(lv, old_node)
4075
        if not rpc.call_blockdev_remove(old_node, lv):
4076
          warning("Can't remove LV on old secondary",
4077
                  hint="Cleanup stale volumes by hand")
4078

    
4079
  def Exec(self, feedback_fn):
4080
    """Execute disk replacement.
4081

4082
    This dispatches the disk replacement to the appropriate handler.
4083

4084
    """
4085
    instance = self.instance
4086

    
4087
    # Activate the instance disks if we're replacing them on a down instance
4088
    if instance.status == "down":
4089
      _StartInstanceDisks(self.cfg, instance, True)
4090

    
4091
    if instance.disk_template == constants.DT_DRBD8:
4092
      if self.op.remote_node is None:
4093
        fn = self._ExecD8DiskOnly
4094
      else:
4095
        fn = self._ExecD8Secondary
4096
    else:
4097
      raise errors.ProgrammerError("Unhandled disk replacement case")
4098

    
4099
    ret = fn(feedback_fn)
4100

    
4101
    # Deactivate the instance disks if we're replacing them on a down instance
4102
    if instance.status == "down":
4103
      _SafeShutdownInstanceDisks(instance, self.cfg)
4104

    
4105
    return ret
4106

    
4107

    
4108
class LUGrowDisk(LogicalUnit):
4109
  """Grow a disk of an instance.
4110

4111
  """
4112
  HPATH = "disk-grow"
4113
  HTYPE = constants.HTYPE_INSTANCE
4114
  _OP_REQP = ["instance_name", "disk", "amount"]
4115
  REQ_BGL = False
4116

    
4117
  def ExpandNames(self):
4118
    self._ExpandAndLockInstance()
4119
    self.needed_locks[locking.LEVEL_NODE] = []
4120
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4121

    
4122
  def DeclareLocks(self, level):
4123
    if level == locking.LEVEL_NODE:
4124
      self._LockInstancesNodes()
4125

    
4126
  def BuildHooksEnv(self):
4127
    """Build hooks env.
4128

4129
    This runs on the master, the primary and all the secondaries.
4130

4131
    """
4132
    env = {
4133
      "DISK": self.op.disk,
4134
      "AMOUNT": self.op.amount,
4135
      }
4136
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4137
    nl = [
4138
      self.sstore.GetMasterNode(),
4139
      self.instance.primary_node,
4140
      ]
4141
    return env, nl, nl
4142

    
4143
  def CheckPrereq(self):
4144
    """Check prerequisites.
4145

4146
    This checks that the instance is in the cluster.
4147

4148
    """
4149
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4150
    assert instance is not None, \
4151
      "Cannot retrieve locked instance %s" % self.op.instance_name
4152

    
4153
    self.instance = instance
4154

    
4155
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4156
      raise errors.OpPrereqError("Instance's disk layout does not support"
4157
                                 " growing.")
4158

    
4159
    if instance.FindDisk(self.op.disk) is None:
4160
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4161
                                 (self.op.disk, instance.name))
4162

    
4163
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4164
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4165
    for node in nodenames:
4166
      info = nodeinfo.get(node, None)
4167
      if not info:
4168
        raise errors.OpPrereqError("Cannot get current information"
4169
                                   " from node '%s'" % node)
4170
      vg_free = info.get('vg_free', None)
4171
      if not isinstance(vg_free, int):
4172
        raise errors.OpPrereqError("Can't compute free disk space on"
4173
                                   " node %s" % node)
4174
      if self.op.amount > info['vg_free']:
4175
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4176
                                   " %d MiB available, %d MiB required" %
4177
                                   (node, info['vg_free'], self.op.amount))
4178

    
4179
  def Exec(self, feedback_fn):
4180
    """Execute disk grow.
4181

4182
    """
4183
    instance = self.instance
4184
    disk = instance.FindDisk(self.op.disk)
4185
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4186
      self.cfg.SetDiskID(disk, node)
4187
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4188
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4189
        raise errors.OpExecError("grow request failed to node %s" % node)
4190
      elif not result[0]:
4191
        raise errors.OpExecError("grow request failed to node %s: %s" %
4192
                                 (node, result[1]))
4193
    disk.RecordGrow(self.op.amount)
4194
    self.cfg.Update(instance)
4195
    return
4196

    
4197

    
4198
class LUQueryInstanceData(NoHooksLU):
4199
  """Query runtime instance data.
4200

4201
  """
4202
  _OP_REQP = ["instances"]
4203
  REQ_BGL = False
4204
  def ExpandNames(self):
4205
    self.needed_locks = {}
4206
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4207

    
4208
    if not isinstance(self.op.instances, list):
4209
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4210

    
4211
    if self.op.instances:
4212
      self.wanted_names = []
4213
      for name in self.op.instances:
4214
        full_name = self.cfg.ExpandInstanceName(name)
4215
        if full_name is None:
4216
          raise errors.OpPrereqError("Instance '%s' not known" %
4217
                                     self.op.instance_name)
4218
        self.wanted_names.append(full_name)
4219
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4220
    else:
4221
      self.wanted_names = None
4222
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4223

    
4224
    self.needed_locks[locking.LEVEL_NODE] = []
4225
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4226

    
4227
  def DeclareLocks(self, level):
4228
    if level == locking.LEVEL_NODE:
4229
      self._LockInstancesNodes()
4230

    
4231
  def CheckPrereq(self):
4232
    """Check prerequisites.
4233

4234
    This only checks the optional instance list against the existing names.
4235

4236
    """
4237
    if self.wanted_names is None:
4238
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4239

    
4240
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4241
                             in self.wanted_names]
4242
    return
4243

    
4244
  def _ComputeDiskStatus(self, instance, snode, dev):
4245
    """Compute block device status.
4246

4247
    """
4248
    self.cfg.SetDiskID(dev, instance.primary_node)
4249
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4250
    if dev.dev_type in constants.LDS_DRBD:
4251
      # we change the snode then (otherwise we use the one passed in)
4252
      if dev.logical_id[0] == instance.primary_node:
4253
        snode = dev.logical_id[1]
4254
      else:
4255
        snode = dev.logical_id[0]
4256

    
4257
    if snode:
4258
      self.cfg.SetDiskID(dev, snode)
4259
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4260
    else:
4261
      dev_sstatus = None
4262

    
4263
    if dev.children:
4264
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4265
                      for child in dev.children]
4266
    else:
4267
      dev_children = []
4268

    
4269
    data = {
4270
      "iv_name": dev.iv_name,
4271
      "dev_type": dev.dev_type,
4272
      "logical_id": dev.logical_id,
4273
      "physical_id": dev.physical_id,
4274
      "pstatus": dev_pstatus,
4275
      "sstatus": dev_sstatus,
4276
      "children": dev_children,
4277
      }
4278

    
4279
    return data
4280

    
4281
  def Exec(self, feedback_fn):
4282
    """Gather and return data"""
4283
    result = {}
4284
    for instance in self.wanted_instances:
4285
      remote_info = rpc.call_instance_info(instance.primary_node,
4286
                                                instance.name)
4287
      if remote_info and "state" in remote_info:
4288
        remote_state = "up"
4289
      else:
4290
        remote_state = "down"
4291
      if instance.status == "down":
4292
        config_state = "down"
4293
      else:
4294
        config_state = "up"
4295

    
4296
      disks = [self._ComputeDiskStatus(instance, None, device)
4297
               for device in instance.disks]
4298

    
4299
      idict = {
4300
        "name": instance.name,
4301
        "config_state": config_state,
4302
        "run_state": remote_state,
4303
        "pnode": instance.primary_node,
4304
        "snodes": instance.secondary_nodes,
4305
        "os": instance.os,
4306
        "memory": instance.memory,
4307
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4308
        "disks": disks,
4309
        "vcpus": instance.vcpus,
4310
        }
4311

    
4312
      htkind = self.sstore.GetHypervisorType()
4313
      if htkind == constants.HT_XEN_PVM30:
4314
        idict["kernel_path"] = instance.kernel_path
4315
        idict["initrd_path"] = instance.initrd_path
4316

    
4317
      if htkind == constants.HT_XEN_HVM31:
4318
        idict["hvm_boot_order"] = instance.hvm_boot_order
4319
        idict["hvm_acpi"] = instance.hvm_acpi
4320
        idict["hvm_pae"] = instance.hvm_pae
4321
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4322
        idict["hvm_nic_type"] = instance.hvm_nic_type
4323
        idict["hvm_disk_type"] = instance.hvm_disk_type
4324

    
4325
      if htkind in constants.HTS_REQ_PORT:
4326
        if instance.vnc_bind_address is None:
4327
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4328
        else:
4329
          vnc_bind_address = instance.vnc_bind_address
4330
        if instance.network_port is None:
4331
          vnc_console_port = None
4332
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4333
          vnc_console_port = "%s:%s" % (instance.primary_node,
4334
                                       instance.network_port)
4335
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4336
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4337
                                                   instance.network_port,
4338
                                                   instance.primary_node)
4339
        else:
4340
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4341
                                        instance.network_port)
4342
        idict["vnc_console_port"] = vnc_console_port
4343
        idict["vnc_bind_address"] = vnc_bind_address
4344
        idict["network_port"] = instance.network_port
4345

    
4346
      result[instance.name] = idict
4347

    
4348
    return result
4349

    
4350

    
4351
class LUSetInstanceParams(LogicalUnit):
4352
  """Modifies an instances's parameters.
4353

4354
  """
4355
  HPATH = "instance-modify"
4356
  HTYPE = constants.HTYPE_INSTANCE
4357
  _OP_REQP = ["instance_name"]
4358
  REQ_BGL = False
4359

    
4360
  def ExpandNames(self):
4361
    self._ExpandAndLockInstance()
4362

    
4363
  def BuildHooksEnv(self):
4364
    """Build hooks env.
4365

4366
    This runs on the master, primary and secondaries.
4367

4368
    """
4369
    args = dict()
4370
    if self.mem:
4371
      args['memory'] = self.mem
4372
    if self.vcpus:
4373
      args['vcpus'] = self.vcpus
4374
    if self.do_ip or self.do_bridge or self.mac:
4375
      if self.do_ip:
4376
        ip = self.ip
4377
      else:
4378
        ip = self.instance.nics[0].ip
4379
      if self.bridge:
4380
        bridge = self.bridge
4381
      else:
4382
        bridge = self.instance.nics[0].bridge
4383
      if self.mac:
4384
        mac = self.mac
4385
      else:
4386
        mac = self.instance.nics[0].mac
4387
      args['nics'] = [(ip, bridge, mac)]
4388
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4389
    nl = [self.sstore.GetMasterNode(),
4390
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4391
    return env, nl, nl
4392

    
4393
  def CheckPrereq(self):
4394
    """Check prerequisites.
4395

4396
    This only checks the instance list against the existing names.
4397

4398
    """
4399
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4400
    # a separate CheckArguments function, if we implement one, so the operation
4401
    # can be aborted without waiting for any lock, should it have an error...
4402
    self.mem = getattr(self.op, "mem", None)
4403
    self.vcpus = getattr(self.op, "vcpus", None)
4404
    self.ip = getattr(self.op, "ip", None)
4405
    self.mac = getattr(self.op, "mac", None)
4406
    self.bridge = getattr(self.op, "bridge", None)
4407
    self.kernel_path = getattr(self.op, "kernel_path", None)
4408
    self.initrd_path = getattr(self.op, "initrd_path", None)
4409
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4410
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4411
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4412
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4413
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4414
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4415
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4416
    self.force = getattr(self.op, "force", None)
4417
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4418
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4419
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4420
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4421
    if all_parms.count(None) == len(all_parms):
4422
      raise errors.OpPrereqError("No changes submitted")
4423
    if self.mem is not None:
4424
      try:
4425
        self.mem = int(self.mem)
4426
      except ValueError, err:
4427
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4428
    if self.vcpus is not None:
4429
      try:
4430
        self.vcpus = int(self.vcpus)
4431
      except ValueError, err:
4432
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4433
    if self.ip is not None:
4434
      self.do_ip = True
4435
      if self.ip.lower() == "none":
4436
        self.ip = None
4437
      else:
4438
        if not utils.IsValidIP(self.ip):
4439
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4440
    else:
4441
      self.do_ip = False
4442
    self.do_bridge = (self.bridge is not None)
4443
    if self.mac is not None:
4444
      if self.cfg.IsMacInUse(self.mac):
4445
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4446
                                   self.mac)
4447
      if not utils.IsValidMac(self.mac):
4448
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4449

    
4450
    if self.kernel_path is not None:
4451
      self.do_kernel_path = True
4452
      if self.kernel_path == constants.VALUE_NONE:
4453
        raise errors.OpPrereqError("Can't set instance to no kernel")
4454

    
4455
      if self.kernel_path != constants.VALUE_DEFAULT:
4456
        if not os.path.isabs(self.kernel_path):
4457
          raise errors.OpPrereqError("The kernel path must be an absolute"
4458
                                    " filename")
4459
    else:
4460
      self.do_kernel_path = False
4461

    
4462
    if self.initrd_path is not None:
4463
      self.do_initrd_path = True
4464
      if self.initrd_path not in (constants.VALUE_NONE,
4465
                                  constants.VALUE_DEFAULT):
4466
        if not os.path.isabs(self.initrd_path):
4467
          raise errors.OpPrereqError("The initrd path must be an absolute"
4468
                                    " filename")
4469
    else:
4470
      self.do_initrd_path = False
4471

    
4472
    # boot order verification
4473
    if self.hvm_boot_order is not None:
4474
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4475
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4476
          raise errors.OpPrereqError("invalid boot order specified,"
4477
                                     " must be one or more of [acdn]"
4478
                                     " or 'default'")
4479

    
4480
    # hvm_cdrom_image_path verification
4481
    if self.op.hvm_cdrom_image_path is not None:
4482
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4483
              self.op.hvm_cdrom_image_path.lower() == "none"):
4484
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4485
                                   " be an absolute path or None, not %s" %
4486
                                   self.op.hvm_cdrom_image_path)
4487
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4488
              self.op.hvm_cdrom_image_path.lower() == "none"):
4489
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4490
                                   " regular file or a symlink pointing to"
4491
                                   " an existing regular file, not %s" %
4492
                                   self.op.hvm_cdrom_image_path)
4493

    
4494
    # vnc_bind_address verification
4495
    if self.op.vnc_bind_address is not None:
4496
      if not utils.IsValidIP(self.op.vnc_bind_address):
4497
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4498
                                   " like a valid IP address" %
4499
                                   self.op.vnc_bind_address)
4500

    
4501
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4502
    assert self.instance is not None, \
4503
      "Cannot retrieve locked instance %s" % self.op.instance_name
4504
    self.warn = []
4505
    if self.mem is not None and not self.force:
4506
      pnode = self.instance.primary_node
4507
      nodelist = [pnode]
4508
      nodelist.extend(instance.secondary_nodes)
4509
      instance_info = rpc.call_instance_info(pnode, instance.name)
4510
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4511

    
4512
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4513
        # Assume the primary node is unreachable and go ahead
4514
        self.warn.append("Can't get info from primary node %s" % pnode)
4515
      else:
4516
        if instance_info:
4517
          current_mem = instance_info['memory']
4518
        else:
4519
          # Assume instance not running
4520
          # (there is a slight race condition here, but it's not very probable,
4521
          # and we have no other way to check)
4522
          current_mem = 0
4523
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4524
        if miss_mem > 0:
4525
          raise errors.OpPrereqError("This change will prevent the instance"
4526
                                     " from starting, due to %d MB of memory"
4527
                                     " missing on its primary node" % miss_mem)
4528

    
4529
      for node in instance.secondary_nodes:
4530
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4531
          self.warn.append("Can't get info from secondary node %s" % node)
4532
        elif self.mem > nodeinfo[node]['memory_free']:
4533
          self.warn.append("Not enough memory to failover instance to secondary"
4534
                           " node %s" % node)
4535

    
4536
    # Xen HVM device type checks
4537
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4538
      if self.op.hvm_nic_type is not None:
4539
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4540
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4541
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4542
      if self.op.hvm_disk_type is not None:
4543
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4544
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4545
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4546

    
4547
    return
4548

    
4549
  def Exec(self, feedback_fn):
4550
    """Modifies an instance.
4551

4552
    All parameters take effect only at the next restart of the instance.
4553
    """
4554
    # Process here the warnings from CheckPrereq, as we don't have a
4555
    # feedback_fn there.
4556
    for warn in self.warn:
4557
      feedback_fn("WARNING: %s" % warn)
4558

    
4559
    result = []
4560
    instance = self.instance
4561
    if self.mem:
4562
      instance.memory = self.mem
4563
      result.append(("mem", self.mem))
4564
    if self.vcpus:
4565
      instance.vcpus = self.vcpus
4566
      result.append(("vcpus",  self.vcpus))
4567
    if self.do_ip:
4568
      instance.nics[0].ip = self.ip
4569
      result.append(("ip", self.ip))
4570
    if self.bridge:
4571
      instance.nics[0].bridge = self.bridge
4572
      result.append(("bridge", self.bridge))
4573
    if self.mac:
4574
      instance.nics[0].mac = self.mac
4575
      result.append(("mac", self.mac))
4576
    if self.do_kernel_path:
4577
      instance.kernel_path = self.kernel_path
4578
      result.append(("kernel_path", self.kernel_path))
4579
    if self.do_initrd_path:
4580
      instance.initrd_path = self.initrd_path
4581
      result.append(("initrd_path", self.initrd_path))
4582
    if self.hvm_boot_order:
4583
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4584
        instance.hvm_boot_order = None
4585
      else:
4586
        instance.hvm_boot_order = self.hvm_boot_order
4587
      result.append(("hvm_boot_order", self.hvm_boot_order))
4588
    if self.hvm_acpi is not None:
4589
      instance.hvm_acpi = self.hvm_acpi
4590
      result.append(("hvm_acpi", self.hvm_acpi))
4591
    if self.hvm_pae is not None:
4592
      instance.hvm_pae = self.hvm_pae
4593
      result.append(("hvm_pae", self.hvm_pae))
4594
    if self.hvm_nic_type is not None:
4595
      instance.hvm_nic_type = self.hvm_nic_type
4596
      result.append(("hvm_nic_type", self.hvm_nic_type))
4597
    if self.hvm_disk_type is not None:
4598
      instance.hvm_disk_type = self.hvm_disk_type
4599
      result.append(("hvm_disk_type", self.hvm_disk_type))
4600
    if self.hvm_cdrom_image_path:
4601
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4602
        instance.hvm_cdrom_image_path = None
4603
      else:
4604
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4605
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4606
    if self.vnc_bind_address:
4607
      instance.vnc_bind_address = self.vnc_bind_address
4608
      result.append(("vnc_bind_address", self.vnc_bind_address))
4609

    
4610
    self.cfg.Update(instance)
4611

    
4612
    return result
4613

    
4614

    
4615
class LUQueryExports(NoHooksLU):
4616
  """Query the exports list
4617

4618
  """
4619
  _OP_REQP = ['nodes']
4620
  REQ_BGL = False
4621

    
4622
  def ExpandNames(self):
4623
    self.needed_locks = {}
4624
    self.share_locks[locking.LEVEL_NODE] = 1
4625
    if not self.op.nodes:
4626
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4627
    else:
4628
      self.needed_locks[locking.LEVEL_NODE] = \
4629
        _GetWantedNodes(self, self.op.nodes)
4630

    
4631
  def CheckPrereq(self):
4632
    """Check prerequisites.
4633

4634
    """
4635
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4636

    
4637
  def Exec(self, feedback_fn):
4638
    """Compute the list of all the exported system images.
4639

4640
    Returns:
4641
      a dictionary with the structure node->(export-list)
4642
      where export-list is a list of the instances exported on
4643
      that node.
4644

4645
    """
4646
    return rpc.call_export_list(self.nodes)
4647

    
4648

    
4649
class LUExportInstance(LogicalUnit):
4650
  """Export an instance to an image in the cluster.
4651

4652
  """
4653
  HPATH = "instance-export"
4654
  HTYPE = constants.HTYPE_INSTANCE
4655
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4656
  REQ_BGL = False
4657

    
4658
  def ExpandNames(self):
4659
    self._ExpandAndLockInstance()
4660
    # FIXME: lock only instance primary and destination node
4661
    #
4662
    # Sad but true, for now we have do lock all nodes, as we don't know where
4663
    # the previous export might be, and and in this LU we search for it and
4664
    # remove it from its current node. In the future we could fix this by:
4665
    #  - making a tasklet to search (share-lock all), then create the new one,
4666
    #    then one to remove, after
4667
    #  - removing the removal operation altoghether
4668
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4669

    
4670
  def DeclareLocks(self, level):
4671
    """Last minute lock declaration."""
4672
    # All nodes are locked anyway, so nothing to do here.
4673

    
4674
  def BuildHooksEnv(self):
4675
    """Build hooks env.
4676

4677
    This will run on the master, primary node and target node.
4678

4679
    """
4680
    env = {
4681
      "EXPORT_NODE": self.op.target_node,
4682
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4683
      }
4684
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4685
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4686
          self.op.target_node]
4687
    return env, nl, nl
4688

    
4689
  def CheckPrereq(self):
4690
    """Check prerequisites.
4691

4692
    This checks that the instance and node names are valid.
4693

4694
    """
4695
    instance_name = self.op.instance_name
4696
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4697
    assert self.instance is not None, \
4698
          "Cannot retrieve locked instance %s" % self.op.instance_name
4699

    
4700
    self.dst_node = self.cfg.GetNodeInfo(
4701
      self.cfg.ExpandNodeName(self.op.target_node))
4702

    
4703
    assert self.dst_node is not None, \
4704
          "Cannot retrieve locked node %s" % self.op.target_node
4705

    
4706
    # instance disk type verification
4707
    for disk in self.instance.disks:
4708
      if disk.dev_type == constants.LD_FILE:
4709
        raise errors.OpPrereqError("Export not supported for instances with"
4710
                                   " file-based disks")
4711

    
4712
  def Exec(self, feedback_fn):
4713
    """Export an instance to an image in the cluster.
4714

4715
    """
4716
    instance = self.instance
4717
    dst_node = self.dst_node
4718
    src_node = instance.primary_node
4719
    if self.op.shutdown:
4720
      # shutdown the instance, but not the disks
4721
      if not rpc.call_instance_shutdown(src_node, instance):
4722
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4723
                                 (instance.name, src_node))
4724

    
4725
    vgname = self.cfg.GetVGName()
4726

    
4727
    snap_disks = []
4728

    
4729
    try:
4730
      for disk in instance.disks:
4731
        if disk.iv_name == "sda":
4732
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4733
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4734

    
4735
          if not new_dev_name:
4736
            logger.Error("could not snapshot block device %s on node %s" %
4737
                         (disk.logical_id[1], src_node))
4738
          else:
4739
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4740
                                      logical_id=(vgname, new_dev_name),
4741
                                      physical_id=(vgname, new_dev_name),
4742
                                      iv_name=disk.iv_name)
4743
            snap_disks.append(new_dev)
4744

    
4745
    finally:
4746
      if self.op.shutdown and instance.status == "up":
4747
        if not rpc.call_instance_start(src_node, instance, None):
4748
          _ShutdownInstanceDisks(instance, self.cfg)
4749
          raise errors.OpExecError("Could not start instance")
4750

    
4751
    # TODO: check for size
4752

    
4753
    for dev in snap_disks:
4754
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4755
        logger.Error("could not export block device %s from node %s to node %s"
4756
                     % (dev.logical_id[1], src_node, dst_node.name))
4757
      if not rpc.call_blockdev_remove(src_node, dev):
4758
        logger.Error("could not remove snapshot block device %s from node %s" %
4759
                     (dev.logical_id[1], src_node))
4760

    
4761
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4762
      logger.Error("could not finalize export for instance %s on node %s" %
4763
                   (instance.name, dst_node.name))
4764

    
4765
    nodelist = self.cfg.GetNodeList()
4766
    nodelist.remove(dst_node.name)
4767

    
4768
    # on one-node clusters nodelist will be empty after the removal
4769
    # if we proceed the backup would be removed because OpQueryExports
4770
    # substitutes an empty list with the full cluster node list.
4771
    if nodelist:
4772
      exportlist = rpc.call_export_list(nodelist)
4773
      for node in exportlist:
4774
        if instance.name in exportlist[node]:
4775
          if not rpc.call_export_remove(node, instance.name):
4776
            logger.Error("could not remove older export for instance %s"
4777
                         " on node %s" % (instance.name, node))
4778

    
4779

    
4780
class LURemoveExport(NoHooksLU):
4781
  """Remove exports related to the named instance.
4782

4783
  """
4784
  _OP_REQP = ["instance_name"]
4785

    
4786
  def CheckPrereq(self):
4787
    """Check prerequisites.
4788
    """
4789
    pass
4790

    
4791
  def Exec(self, feedback_fn):
4792
    """Remove any export.
4793

4794
    """
4795
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4796
    # If the instance was not found we'll try with the name that was passed in.
4797
    # This will only work if it was an FQDN, though.
4798
    fqdn_warn = False
4799
    if not instance_name:
4800
      fqdn_warn = True
4801
      instance_name = self.op.instance_name
4802

    
4803
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4804
    found = False
4805
    for node in exportlist:
4806
      if instance_name in exportlist[node]:
4807
        found = True
4808
        if not rpc.call_export_remove(node, instance_name):
4809
          logger.Error("could not remove export for instance %s"
4810
                       " on node %s" % (instance_name, node))
4811

    
4812
    if fqdn_warn and not found:
4813
      feedback_fn("Export not found. If trying to remove an export belonging"
4814
                  " to a deleted instance please use its Fully Qualified"
4815
                  " Domain Name.")
4816

    
4817

    
4818
class TagsLU(NoHooksLU):
4819
  """Generic tags LU.
4820

4821
  This is an abstract class which is the parent of all the other tags LUs.
4822

4823
  """
4824
  def CheckPrereq(self):
4825
    """Check prerequisites.
4826

4827
    """
4828
    if self.op.kind == constants.TAG_CLUSTER:
4829
      self.target = self.cfg.GetClusterInfo()
4830
    elif self.op.kind == constants.TAG_NODE:
4831
      name = self.cfg.ExpandNodeName(self.op.name)
4832
      if name is None:
4833
        raise errors.OpPrereqError("Invalid node name (%s)" %
4834
                                   (self.op.name,))
4835
      self.op.name = name
4836
      self.target = self.cfg.GetNodeInfo(name)
4837
    elif self.op.kind == constants.TAG_INSTANCE:
4838
      name = self.cfg.ExpandInstanceName(self.op.name)
4839
      if name is None:
4840
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4841
                                   (self.op.name,))
4842
      self.op.name = name
4843
      self.target = self.cfg.GetInstanceInfo(name)
4844
    else:
4845
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4846
                                 str(self.op.kind))
4847

    
4848

    
4849
class LUGetTags(TagsLU):
4850
  """Returns the tags of a given object.
4851

4852
  """
4853
  _OP_REQP = ["kind", "name"]
4854

    
4855
  def Exec(self, feedback_fn):
4856
    """Returns the tag list.
4857

4858
    """
4859
    return list(self.target.GetTags())
4860

    
4861

    
4862
class LUSearchTags(NoHooksLU):
4863
  """Searches the tags for a given pattern.
4864

4865
  """
4866
  _OP_REQP = ["pattern"]
4867

    
4868
  def CheckPrereq(self):
4869
    """Check prerequisites.
4870

4871
    This checks the pattern passed for validity by compiling it.
4872

4873
    """
4874
    try:
4875
      self.re = re.compile(self.op.pattern)
4876
    except re.error, err:
4877
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4878
                                 (self.op.pattern, err))
4879

    
4880
  def Exec(self, feedback_fn):
4881
    """Returns the tag list.
4882

4883
    """
4884
    cfg = self.cfg
4885
    tgts = [("/cluster", cfg.GetClusterInfo())]
4886
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4887
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4888
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4889
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4890
    results = []
4891
    for path, target in tgts:
4892
      for tag in target.GetTags():
4893
        if self.re.search(tag):
4894
          results.append((path, tag))
4895
    return results
4896

    
4897

    
4898
class LUAddTags(TagsLU):
4899
  """Sets a tag on a given object.
4900

4901
  """
4902
  _OP_REQP = ["kind", "name", "tags"]
4903

    
4904
  def CheckPrereq(self):
4905
    """Check prerequisites.
4906

4907
    This checks the type and length of the tag name and value.
4908

4909
    """
4910
    TagsLU.CheckPrereq(self)
4911
    for tag in self.op.tags:
4912
      objects.TaggableObject.ValidateTag(tag)
4913

    
4914
  def Exec(self, feedback_fn):
4915
    """Sets the tag.
4916

4917
    """
4918
    try:
4919
      for tag in self.op.tags:
4920
        self.target.AddTag(tag)
4921
    except errors.TagError, err:
4922
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4923
    try:
4924
      self.cfg.Update(self.target)
4925
    except errors.ConfigurationError:
4926
      raise errors.OpRetryError("There has been a modification to the"
4927
                                " config file and the operation has been"
4928
                                " aborted. Please retry.")
4929

    
4930

    
4931
class LUDelTags(TagsLU):
4932
  """Delete a list of tags from a given object.
4933

4934
  """
4935
  _OP_REQP = ["kind", "name", "tags"]
4936

    
4937
  def CheckPrereq(self):
4938
    """Check prerequisites.
4939

4940
    This checks that we have the given tag.
4941

4942
    """
4943
    TagsLU.CheckPrereq(self)
4944
    for tag in self.op.tags:
4945
      objects.TaggableObject.ValidateTag(tag)
4946
    del_tags = frozenset(self.op.tags)
4947
    cur_tags = self.target.GetTags()
4948
    if not del_tags <= cur_tags:
4949
      diff_tags = del_tags - cur_tags
4950
      diff_names = ["'%s'" % tag for tag in diff_tags]
4951
      diff_names.sort()
4952
      raise errors.OpPrereqError("Tag(s) %s not found" %
4953
                                 (",".join(diff_names)))
4954

    
4955
  def Exec(self, feedback_fn):
4956
    """Remove the tag from the object.
4957

4958
    """
4959
    for tag in self.op.tags:
4960
      self.target.RemoveTag(tag)
4961
    try:
4962
      self.cfg.Update(self.target)
4963
    except errors.ConfigurationError:
4964
      raise errors.OpRetryError("There has been a modification to the"
4965
                                " config file and the operation has been"
4966
                                " aborted. Please retry.")
4967

    
4968

    
4969
class LUTestDelay(NoHooksLU):
4970
  """Sleep for a specified amount of time.
4971

4972
  This LU sleeps on the master and/or nodes for a specified amount of
4973
  time.
4974

4975
  """
4976
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4977
  REQ_BGL = False
4978

    
4979
  def ExpandNames(self):
4980
    """Expand names and set required locks.
4981

4982
    This expands the node list, if any.
4983

4984
    """
4985
    self.needed_locks = {}
4986
    if self.op.on_nodes:
4987
      # _GetWantedNodes can be used here, but is not always appropriate to use
4988
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4989
      # more information.
4990
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4991
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4992

    
4993
  def CheckPrereq(self):
4994
    """Check prerequisites.
4995

4996
    """
4997

    
4998
  def Exec(self, feedback_fn):
4999
    """Do the actual sleep.
5000

5001
    """
5002
    if self.op.on_master:
5003
      if not utils.TestDelay(self.op.duration):
5004
        raise errors.OpExecError("Error during master delay test")
5005
    if self.op.on_nodes:
5006
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5007
      if not result:
5008
        raise errors.OpExecError("Complete failure from rpc call")
5009
      for node, node_result in result.items():
5010
        if not node_result:
5011
          raise errors.OpExecError("Failure during rpc call to node %s,"
5012
                                   " result: %s" % (node, node_result))
5013

    
5014

    
5015
class IAllocator(object):
5016
  """IAllocator framework.
5017

5018
  An IAllocator instance has three sets of attributes:
5019
    - cfg/sstore that are needed to query the cluster
5020
    - input data (all members of the _KEYS class attribute are required)
5021
    - four buffer attributes (in|out_data|text), that represent the
5022
      input (to the external script) in text and data structure format,
5023
      and the output from it, again in two formats
5024
    - the result variables from the script (success, info, nodes) for
5025
      easy usage
5026

5027
  """
5028
  _ALLO_KEYS = [
5029
    "mem_size", "disks", "disk_template",
5030
    "os", "tags", "nics", "vcpus",
5031
    ]
5032
  _RELO_KEYS = [
5033
    "relocate_from",
5034
    ]
5035

    
5036
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5037
    self.cfg = cfg
5038
    self.sstore = sstore
5039
    # init buffer variables
5040
    self.in_text = self.out_text = self.in_data = self.out_data = None
5041
    # init all input fields so that pylint is happy
5042
    self.mode = mode
5043
    self.name = name
5044
    self.mem_size = self.disks = self.disk_template = None
5045
    self.os = self.tags = self.nics = self.vcpus = None
5046
    self.relocate_from = None
5047
    # computed fields
5048
    self.required_nodes = None
5049
    # init result fields
5050
    self.success = self.info = self.nodes = None
5051
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5052
      keyset = self._ALLO_KEYS
5053
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5054
      keyset = self._RELO_KEYS
5055
    else:
5056
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5057
                                   " IAllocator" % self.mode)
5058
    for key in kwargs:
5059
      if key not in keyset:
5060
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5061
                                     " IAllocator" % key)
5062
      setattr(self, key, kwargs[key])
5063
    for key in keyset:
5064
      if key not in kwargs:
5065
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5066
                                     " IAllocator" % key)
5067
    self._BuildInputData()
5068

    
5069
  def _ComputeClusterData(self):
5070
    """Compute the generic allocator input data.
5071

5072
    This is the data that is independent of the actual operation.
5073

5074
    """
5075
    cfg = self.cfg
5076
    # cluster data
5077
    data = {
5078
      "version": 1,
5079
      "cluster_name": self.sstore.GetClusterName(),
5080
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5081
      "hypervisor_type": self.sstore.GetHypervisorType(),
5082
      # we don't have job IDs
5083
      }
5084

    
5085
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5086

    
5087
    # node data
5088
    node_results = {}
5089
    node_list = cfg.GetNodeList()
5090
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5091
    for nname in node_list:
5092
      ninfo = cfg.GetNodeInfo(nname)
5093
      if nname not in node_data or not isinstance(node_data[nname], dict):
5094
        raise errors.OpExecError("Can't get data for node %s" % nname)
5095
      remote_info = node_data[nname]
5096
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5097
                   'vg_size', 'vg_free', 'cpu_total']:
5098
        if attr not in remote_info:
5099
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5100
                                   (nname, attr))
5101
        try:
5102
          remote_info[attr] = int(remote_info[attr])
5103
        except ValueError, err:
5104
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5105
                                   " %s" % (nname, attr, str(err)))
5106
      # compute memory used by primary instances
5107
      i_p_mem = i_p_up_mem = 0
5108
      for iinfo in i_list:
5109
        if iinfo.primary_node == nname:
5110
          i_p_mem += iinfo.memory
5111
          if iinfo.status == "up":
5112
            i_p_up_mem += iinfo.memory
5113

    
5114
      # compute memory used by instances
5115
      pnr = {
5116
        "tags": list(ninfo.GetTags()),
5117
        "total_memory": remote_info['memory_total'],
5118
        "reserved_memory": remote_info['memory_dom0'],
5119
        "free_memory": remote_info['memory_free'],
5120
        "i_pri_memory": i_p_mem,
5121
        "i_pri_up_memory": i_p_up_mem,
5122
        "total_disk": remote_info['vg_size'],
5123
        "free_disk": remote_info['vg_free'],
5124
        "primary_ip": ninfo.primary_ip,
5125
        "secondary_ip": ninfo.secondary_ip,
5126
        "total_cpus": remote_info['cpu_total'],
5127
        }
5128
      node_results[nname] = pnr
5129
    data["nodes"] = node_results
5130

    
5131
    # instance data
5132
    instance_data = {}
5133
    for iinfo in i_list:
5134
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5135
                  for n in iinfo.nics]
5136
      pir = {
5137
        "tags": list(iinfo.GetTags()),
5138
        "should_run": iinfo.status == "up",
5139
        "vcpus": iinfo.vcpus,
5140
        "memory": iinfo.memory,
5141
        "os": iinfo.os,
5142
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5143
        "nics": nic_data,
5144
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5145
        "disk_template": iinfo.disk_template,
5146
        }
5147
      instance_data[iinfo.name] = pir
5148

    
5149
    data["instances"] = instance_data
5150

    
5151
    self.in_data = data
5152

    
5153
  def _AddNewInstance(self):
5154
    """Add new instance data to allocator structure.
5155

5156
    This in combination with _AllocatorGetClusterData will create the
5157
    correct structure needed as input for the allocator.
5158

5159
    The checks for the completeness of the opcode must have already been
5160
    done.
5161

5162
    """
5163
    data = self.in_data
5164
    if len(self.disks) != 2:
5165
      raise errors.OpExecError("Only two-disk configurations supported")
5166

    
5167
    disk_space = _ComputeDiskSize(self.disk_template,
5168
                                  self.disks[0]["size"], self.disks[1]["size"])
5169

    
5170
    if self.disk_template in constants.DTS_NET_MIRROR:
5171
      self.required_nodes = 2
5172
    else:
5173
      self.required_nodes = 1
5174
    request = {
5175
      "type": "allocate",
5176
      "name": self.name,
5177
      "disk_template": self.disk_template,
5178
      "tags": self.tags,
5179
      "os": self.os,
5180
      "vcpus": self.vcpus,
5181
      "memory": self.mem_size,
5182
      "disks": self.disks,
5183
      "disk_space_total": disk_space,
5184
      "nics": self.nics,
5185
      "required_nodes": self.required_nodes,
5186
      }
5187
    data["request"] = request
5188

    
5189
  def _AddRelocateInstance(self):
5190
    """Add relocate instance data to allocator structure.
5191

5192
    This in combination with _IAllocatorGetClusterData will create the
5193
    correct structure needed as input for the allocator.
5194

5195
    The checks for the completeness of the opcode must have already been
5196
    done.
5197

5198
    """
5199
    instance = self.cfg.GetInstanceInfo(self.name)
5200
    if instance is None:
5201
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5202
                                   " IAllocator" % self.name)
5203

    
5204
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5205
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5206

    
5207
    if len(instance.secondary_nodes) != 1:
5208
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5209

    
5210
    self.required_nodes = 1
5211

    
5212
    disk_space = _ComputeDiskSize(instance.disk_template,
5213
                                  instance.disks[0].size,
5214
                                  instance.disks[1].size)
5215

    
5216
    request = {
5217
      "type": "relocate",
5218
      "name": self.name,
5219
      "disk_space_total": disk_space,
5220
      "required_nodes": self.required_nodes,
5221
      "relocate_from": self.relocate_from,
5222
      }
5223
    self.in_data["request"] = request
5224

    
5225
  def _BuildInputData(self):
5226
    """Build input data structures.
5227

5228
    """
5229
    self._ComputeClusterData()
5230

    
5231
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5232
      self._AddNewInstance()
5233
    else:
5234
      self._AddRelocateInstance()
5235

    
5236
    self.in_text = serializer.Dump(self.in_data)
5237

    
5238
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5239
    """Run an instance allocator and return the results.
5240

5241
    """
5242
    data = self.in_text
5243

    
5244
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5245

    
5246
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5247
      raise errors.OpExecError("Invalid result from master iallocator runner")
5248

    
5249
    rcode, stdout, stderr, fail = result
5250

    
5251
    if rcode == constants.IARUN_NOTFOUND:
5252
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5253
    elif rcode == constants.IARUN_FAILURE:
5254
      raise errors.OpExecError("Instance allocator call failed: %s,"
5255
                               " output: %s" % (fail, stdout+stderr))
5256
    self.out_text = stdout
5257
    if validate:
5258
      self._ValidateResult()
5259

    
5260
  def _ValidateResult(self):
5261
    """Process the allocator results.
5262

5263
    This will process and if successful save the result in
5264
    self.out_data and the other parameters.
5265

5266
    """
5267
    try:
5268
      rdict = serializer.Load(self.out_text)
5269
    except Exception, err:
5270
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5271

    
5272
    if not isinstance(rdict, dict):
5273
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5274

    
5275
    for key in "success", "info", "nodes":
5276
      if key not in rdict:
5277
        raise errors.OpExecError("Can't parse iallocator results:"
5278
                                 " missing key '%s'" % key)
5279
      setattr(self, key, rdict[key])
5280

    
5281
    if not isinstance(rdict["nodes"], list):
5282
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5283
                               " is not a list")
5284
    self.out_data = rdict
5285

    
5286

    
5287
class LUTestAllocator(NoHooksLU):
5288
  """Run allocator tests.
5289

5290
  This LU runs the allocator tests
5291

5292
  """
5293
  _OP_REQP = ["direction", "mode", "name"]
5294

    
5295
  def CheckPrereq(self):
5296
    """Check prerequisites.
5297

5298
    This checks the opcode parameters depending on the director and mode test.
5299

5300
    """
5301
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5302
      for attr in ["name", "mem_size", "disks", "disk_template",
5303
                   "os", "tags", "nics", "vcpus"]:
5304
        if not hasattr(self.op, attr):
5305
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5306
                                     attr)
5307
      iname = self.cfg.ExpandInstanceName(self.op.name)
5308
      if iname is not None:
5309
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5310
                                   iname)
5311
      if not isinstance(self.op.nics, list):
5312
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5313
      for row in self.op.nics:
5314
        if (not isinstance(row, dict) or
5315
            "mac" not in row or
5316
            "ip" not in row or
5317
            "bridge" not in row):
5318
          raise errors.OpPrereqError("Invalid contents of the"
5319
                                     " 'nics' parameter")
5320
      if not isinstance(self.op.disks, list):
5321
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5322
      if len(self.op.disks) != 2:
5323
        raise errors.OpPrereqError("Only two-disk configurations supported")
5324
      for row in self.op.disks:
5325
        if (not isinstance(row, dict) or
5326
            "size" not in row or
5327
            not isinstance(row["size"], int) or
5328
            "mode" not in row or
5329
            row["mode"] not in ['r', 'w']):
5330
          raise errors.OpPrereqError("Invalid contents of the"
5331
                                     " 'disks' parameter")
5332
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5333
      if not hasattr(self.op, "name"):
5334
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5335
      fname = self.cfg.ExpandInstanceName(self.op.name)
5336
      if fname is None:
5337
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5338
                                   self.op.name)
5339
      self.op.name = fname
5340
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5341
    else:
5342
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5343
                                 self.op.mode)
5344

    
5345
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5346
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5347
        raise errors.OpPrereqError("Missing allocator name")
5348
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5349
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5350
                                 self.op.direction)
5351

    
5352
  def Exec(self, feedback_fn):
5353
    """Run the allocator test.
5354

5355
    """
5356
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5357
      ial = IAllocator(self.cfg, self.sstore,
5358
                       mode=self.op.mode,
5359
                       name=self.op.name,
5360
                       mem_size=self.op.mem_size,
5361
                       disks=self.op.disks,
5362
                       disk_template=self.op.disk_template,
5363
                       os=self.op.os,
5364
                       tags=self.op.tags,
5365
                       nics=self.op.nics,
5366
                       vcpus=self.op.vcpus,
5367
                       )
5368
    else:
5369
      ial = IAllocator(self.cfg, self.sstore,
5370
                       mode=self.op.mode,
5371
                       name=self.op.name,
5372
                       relocate_from=list(self.relocate_from),
5373
                       )
5374

    
5375
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5376
      result = ial.in_text
5377
    else:
5378
      ial.Run(self.op.allocator, validate=False)
5379
      result = ial.out_text
5380
    return result