Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ fb4b324b

History | View | Annotate | Download (247 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import re
30
import platform
31
import logging
32
import copy
33

    
34
from ganeti import ssh
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import serializer
42
from ganeti import ssconf
43

    
44

    
45
class LogicalUnit(object):
46
  """Logical Unit base class.
47

48
  Subclasses must follow these rules:
49
    - implement ExpandNames
50
    - implement CheckPrereq
51
    - implement Exec
52
    - implement BuildHooksEnv
53
    - redefine HPATH and HTYPE
54
    - optionally redefine their run requirements:
55
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56

57
  Note that all commands require root permissions.
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_BGL = True
64

    
65
  def __init__(self, processor, op, context, rpc):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overridden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = context.cfg
75
    self.context = context
76
    self.rpc = rpc
77
    # Dicts used to declare locking needs to mcpu
78
    self.needed_locks = None
79
    self.acquired_locks = {}
80
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81
    self.add_locks = {}
82
    self.remove_locks = {}
83
    # Used to force good behavior when calling helper functions
84
    self.recalculate_locks = {}
85
    self.__ssh = None
86
    # logging
87
    self.LogWarning = processor.LogWarning
88
    self.LogInfo = processor.LogInfo
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95
    self.CheckArguments()
96

    
97
  def __GetSSH(self):
98
    """Returns the SshRunner object
99

100
    """
101
    if not self.__ssh:
102
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103
    return self.__ssh
104

    
105
  ssh = property(fget=__GetSSH)
106

    
107
  def CheckArguments(self):
108
    """Check syntactic validity for the opcode arguments.
109

110
    This method is for doing a simple syntactic check and ensure
111
    validity of opcode parameters, without any cluster-related
112
    checks. While the same can be accomplished in ExpandNames and/or
113
    CheckPrereq, doing these separate is better because:
114

115
      - ExpandNames is left as as purely a lock-related function
116
      - CheckPrereq is run after we have acquired locks (and possible
117
        waited for them)
118

119
    The function is allowed to change the self.op attribute so that
120
    later methods can no longer worry about missing parameters.
121

122
    """
123
    pass
124

    
125
  def ExpandNames(self):
126
    """Expand names for this LU.
127

128
    This method is called before starting to execute the opcode, and it should
129
    update all the parameters of the opcode to their canonical form (e.g. a
130
    short node name must be fully expanded after this method has successfully
131
    completed). This way locking, hooks, logging, ecc. can work correctly.
132

133
    LUs which implement this method must also populate the self.needed_locks
134
    member, as a dict with lock levels as keys, and a list of needed lock names
135
    as values. Rules:
136

137
      - use an empty dict if you don't need any lock
138
      - if you don't need any lock at a particular level omit that level
139
      - don't put anything for the BGL level
140
      - if you want all locks at a level use locking.ALL_SET as a value
141

142
    If you need to share locks (rather than acquire them exclusively) at one
143
    level you can modify self.share_locks, setting a true value (usually 1) for
144
    that level. By default locks are not shared.
145

146
    Examples::
147

148
      # Acquire all nodes and one instance
149
      self.needed_locks = {
150
        locking.LEVEL_NODE: locking.ALL_SET,
151
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152
      }
153
      # Acquire just two nodes
154
      self.needed_locks = {
155
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156
      }
157
      # Acquire no locks
158
      self.needed_locks = {} # No, you can't leave it to the default value None
159

160
    """
161
    # The implementation of this method is mandatory only if the new LU is
162
    # concurrent, so that old LUs don't need to be changed all at the same
163
    # time.
164
    if self.REQ_BGL:
165
      self.needed_locks = {} # Exclusive LUs don't need locks.
166
    else:
167
      raise NotImplementedError
168

    
169
  def DeclareLocks(self, level):
170
    """Declare LU locking needs for a level
171

172
    While most LUs can just declare their locking needs at ExpandNames time,
173
    sometimes there's the need to calculate some locks after having acquired
174
    the ones before. This function is called just before acquiring locks at a
175
    particular level, but after acquiring the ones at lower levels, and permits
176
    such calculations. It can be used to modify self.needed_locks, and by
177
    default it does nothing.
178

179
    This function is only called if you have something already set in
180
    self.needed_locks for the level.
181

182
    @param level: Locking level which is going to be locked
183
    @type level: member of ganeti.locking.LEVELS
184

185
    """
186

    
187
  def CheckPrereq(self):
188
    """Check prerequisites for this LU.
189

190
    This method should check that the prerequisites for the execution
191
    of this LU are fulfilled. It can do internode communication, but
192
    it should be idempotent - no cluster or system changes are
193
    allowed.
194

195
    The method should raise errors.OpPrereqError in case something is
196
    not fulfilled. Its return value is ignored.
197

198
    This method should also update all the parameters of the opcode to
199
    their canonical form if it hasn't been done by ExpandNames before.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def Exec(self, feedback_fn):
205
    """Execute the LU.
206

207
    This method should implement the actual work. It should raise
208
    errors.OpExecError for failures that are somewhat dealt with in
209
    code, or expected.
210

211
    """
212
    raise NotImplementedError
213

    
214
  def BuildHooksEnv(self):
215
    """Build hooks environment for this LU.
216

217
    This method should return a three-node tuple consisting of: a dict
218
    containing the environment that will be used for running the
219
    specific hook for this LU, a list of node names on which the hook
220
    should run before the execution, and a list of node names on which
221
    the hook should run after the execution.
222

223
    The keys of the dict must not have 'GANETI_' prefixed as this will
224
    be handled in the hooks runner. Also note additional keys will be
225
    added by the hooks runner. If the LU doesn't define any
226
    environment, an empty dict (and not None) should be returned.
227

228
    No nodes should be returned as an empty list (and not None).
229

230
    Note that if the HPATH for a LU class is None, this function will
231
    not be called.
232

233
    """
234
    raise NotImplementedError
235

    
236
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237
    """Notify the LU about the results of its hooks.
238

239
    This method is called every time a hooks phase is executed, and notifies
240
    the Logical Unit about the hooks' result. The LU can then use it to alter
241
    its result based on the hooks.  By default the method does nothing and the
242
    previous result is passed back unchanged but any LU can define it if it
243
    wants to use the local cluster hook-scripts somehow.
244

245
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
246
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247
    @param hook_results: the results of the multi-node hooks rpc call
248
    @param feedback_fn: function used send feedback back to the caller
249
    @param lu_result: the previous Exec result this LU had, or None
250
        in the PRE phase
251
    @return: the new Exec result, based on the previous result
252
        and hook results
253

254
    """
255
    return lu_result
256

    
257
  def _ExpandAndLockInstance(self):
258
    """Helper function to expand and lock an instance.
259

260
    Many LUs that work on an instance take its name in self.op.instance_name
261
    and need to expand it and then declare the expanded name for locking. This
262
    function does it, and then updates self.op.instance_name to the expanded
263
    name. It also initializes needed_locks as a dict, if this hasn't been done
264
    before.
265

266
    """
267
    if self.needed_locks is None:
268
      self.needed_locks = {}
269
    else:
270
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271
        "_ExpandAndLockInstance called with instance-level locks set"
272
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273
    if expanded_name is None:
274
      raise errors.OpPrereqError("Instance '%s' not known" %
275
                                  self.op.instance_name)
276
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277
    self.op.instance_name = expanded_name
278

    
279
  def _LockInstancesNodes(self, primary_only=False):
280
    """Helper function to declare instances' nodes for locking.
281

282
    This function should be called after locking one or more instances to lock
283
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284
    with all primary or secondary nodes for instances already locked and
285
    present in self.needed_locks[locking.LEVEL_INSTANCE].
286

287
    It should be called from DeclareLocks, and for safety only works if
288
    self.recalculate_locks[locking.LEVEL_NODE] is set.
289

290
    In the future it may grow parameters to just lock some instance's nodes, or
291
    to just lock primaries or secondary nodes, if needed.
292

293
    If should be called in DeclareLocks in a way similar to::
294

295
      if level == locking.LEVEL_NODE:
296
        self._LockInstancesNodes()
297

298
    @type primary_only: boolean
299
    @param primary_only: only lock primary nodes of locked instances
300

301
    """
302
    assert locking.LEVEL_NODE in self.recalculate_locks, \
303
      "_LockInstancesNodes helper function called with no nodes to recalculate"
304

    
305
    # TODO: check if we're really been called with the instance locks held
306

    
307
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308
    # future we might want to have different behaviors depending on the value
309
    # of self.recalculate_locks[locking.LEVEL_NODE]
310
    wanted_nodes = []
311
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312
      instance = self.context.cfg.GetInstanceInfo(instance_name)
313
      wanted_nodes.append(instance.primary_node)
314
      if not primary_only:
315
        wanted_nodes.extend(instance.secondary_nodes)
316

    
317
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321

    
322
    del self.recalculate_locks[locking.LEVEL_NODE]
323

    
324

    
325
class NoHooksLU(LogicalUnit):
326
  """Simple LU which runs no hooks.
327

328
  This LU is intended as a parent for other LogicalUnits which will
329
  run no hooks, in order to reduce duplicate code.
330

331
  """
332
  HPATH = None
333
  HTYPE = None
334

    
335

    
336
def _GetWantedNodes(lu, nodes):
337
  """Returns list of checked and expanded node names.
338

339
  @type lu: L{LogicalUnit}
340
  @param lu: the logical unit on whose behalf we execute
341
  @type nodes: list
342
  @param nodes: list of node names or None for all nodes
343
  @rtype: list
344
  @return: the list of nodes, sorted
345
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346

347
  """
348
  if not isinstance(nodes, list):
349
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
350

    
351
  if not nodes:
352
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353
      " non-empty list of nodes whose name is to be expanded.")
354

    
355
  wanted = []
356
  for name in nodes:
357
    node = lu.cfg.ExpandNodeName(name)
358
    if node is None:
359
      raise errors.OpPrereqError("No such node name '%s'" % name)
360
    wanted.append(node)
361

    
362
  return utils.NiceSort(wanted)
363

    
364

    
365
def _GetWantedInstances(lu, instances):
366
  """Returns list of checked and expanded instance names.
367

368
  @type lu: L{LogicalUnit}
369
  @param lu: the logical unit on whose behalf we execute
370
  @type instances: list
371
  @param instances: list of instance names or None for all instances
372
  @rtype: list
373
  @return: the list of instances, sorted
374
  @raise errors.OpPrereqError: if the instances parameter is wrong type
375
  @raise errors.OpPrereqError: if any of the passed instances is not found
376

377
  """
378
  if not isinstance(instances, list):
379
    raise errors.OpPrereqError("Invalid argument type 'instances'")
380

    
381
  if instances:
382
    wanted = []
383

    
384
    for name in instances:
385
      instance = lu.cfg.ExpandInstanceName(name)
386
      if instance is None:
387
        raise errors.OpPrereqError("No such instance name '%s'" % name)
388
      wanted.append(instance)
389

    
390
  else:
391
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392
  return wanted
393

    
394

    
395
def _CheckOutputFields(static, dynamic, selected):
396
  """Checks whether all selected fields are valid.
397

398
  @type static: L{utils.FieldSet}
399
  @param static: static fields set
400
  @type dynamic: L{utils.FieldSet}
401
  @param dynamic: dynamic fields set
402

403
  """
404
  f = utils.FieldSet()
405
  f.Extend(static)
406
  f.Extend(dynamic)
407

    
408
  delta = f.NonMatching(selected)
409
  if delta:
410
    raise errors.OpPrereqError("Unknown output fields selected: %s"
411
                               % ",".join(delta))
412

    
413

    
414
def _CheckBooleanOpField(op, name):
415
  """Validates boolean opcode parameters.
416

417
  This will ensure that an opcode parameter is either a boolean value,
418
  or None (but that it always exists).
419

420
  """
421
  val = getattr(op, name, None)
422
  if not (val is None or isinstance(val, bool)):
423
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424
                               (name, str(val)))
425
  setattr(op, name, val)
426

    
427

    
428
def _CheckNodeOnline(lu, node):
429
  """Ensure that a given node is online.
430

431
  @param lu: the LU on behalf of which we make the check
432
  @param node: the node to check
433
  @raise errors.OpPrereqError: if the node is offline
434

435
  """
436
  if lu.cfg.GetNodeInfo(node).offline:
437
    raise errors.OpPrereqError("Can't use offline node %s" % node)
438

    
439

    
440
def _CheckNodeNotDrained(lu, node):
441
  """Ensure that a given node is not drained.
442

443
  @param lu: the LU on behalf of which we make the check
444
  @param node: the node to check
445
  @raise errors.OpPrereqError: if the node is drained
446

447
  """
448
  if lu.cfg.GetNodeInfo(node).drained:
449
    raise errors.OpPrereqError("Can't use drained node %s" % node)
450

    
451

    
452
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453
                          memory, vcpus, nics, disk_template, disks,
454
                          bep, hvp, hypervisor_name):
455
  """Builds instance related env variables for hooks
456

457
  This builds the hook environment from individual variables.
458

459
  @type name: string
460
  @param name: the name of the instance
461
  @type primary_node: string
462
  @param primary_node: the name of the instance's primary node
463
  @type secondary_nodes: list
464
  @param secondary_nodes: list of secondary nodes as strings
465
  @type os_type: string
466
  @param os_type: the name of the instance's OS
467
  @type status: boolean
468
  @param status: the should_run status of the instance
469
  @type memory: string
470
  @param memory: the memory size of the instance
471
  @type vcpus: string
472
  @param vcpus: the count of VCPUs the instance has
473
  @type nics: list
474
  @param nics: list of tuples (ip, bridge, mac) representing
475
      the NICs the instance  has
476
  @type disk_template: string
477
  @param disk_template: the disk template of the instance
478
  @type disks: list
479
  @param disks: the list of (size, mode) pairs
480
  @type bep: dict
481
  @param bep: the backend parameters for the instance
482
  @type hvp: dict
483
  @param hvp: the hypervisor parameters for the instance
484
  @type hypervisor_name: string
485
  @param hypervisor_name: the hypervisor for the instance
486
  @rtype: dict
487
  @return: the hook environment for this instance
488

489
  """
490
  if status:
491
    str_status = "up"
492
  else:
493
    str_status = "down"
494
  env = {
495
    "OP_TARGET": name,
496
    "INSTANCE_NAME": name,
497
    "INSTANCE_PRIMARY": primary_node,
498
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499
    "INSTANCE_OS_TYPE": os_type,
500
    "INSTANCE_STATUS": str_status,
501
    "INSTANCE_MEMORY": memory,
502
    "INSTANCE_VCPUS": vcpus,
503
    "INSTANCE_DISK_TEMPLATE": disk_template,
504
    "INSTANCE_HYPERVISOR": hypervisor_name,
505
  }
506

    
507
  if nics:
508
    nic_count = len(nics)
509
    for idx, (ip, bridge, mac) in enumerate(nics):
510
      if ip is None:
511
        ip = ""
512
      env["INSTANCE_NIC%d_IP" % idx] = ip
513
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514
      env["INSTANCE_NIC%d_MAC" % idx] = mac
515
  else:
516
    nic_count = 0
517

    
518
  env["INSTANCE_NIC_COUNT"] = nic_count
519

    
520
  if disks:
521
    disk_count = len(disks)
522
    for idx, (size, mode) in enumerate(disks):
523
      env["INSTANCE_DISK%d_SIZE" % idx] = size
524
      env["INSTANCE_DISK%d_MODE" % idx] = mode
525
  else:
526
    disk_count = 0
527

    
528
  env["INSTANCE_DISK_COUNT"] = disk_count
529

    
530
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
531
    for key, value in source.items():
532
      env["INSTANCE_%s_%s" % (kind, key)] = value
533

    
534
  return env
535

    
536

    
537
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538
  """Builds instance related env variables for hooks from an object.
539

540
  @type lu: L{LogicalUnit}
541
  @param lu: the logical unit on whose behalf we execute
542
  @type instance: L{objects.Instance}
543
  @param instance: the instance for which we should build the
544
      environment
545
  @type override: dict
546
  @param override: dictionary with key/values that will override
547
      our values
548
  @rtype: dict
549
  @return: the hook environment dictionary
550

551
  """
552
  cluster = lu.cfg.GetClusterInfo()
553
  bep = cluster.FillBE(instance)
554
  hvp = cluster.FillHV(instance)
555
  args = {
556
    'name': instance.name,
557
    'primary_node': instance.primary_node,
558
    'secondary_nodes': instance.secondary_nodes,
559
    'os_type': instance.os,
560
    'status': instance.admin_up,
561
    'memory': bep[constants.BE_MEMORY],
562
    'vcpus': bep[constants.BE_VCPUS],
563
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564
    'disk_template': instance.disk_template,
565
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
566
    'bep': bep,
567
    'hvp': hvp,
568
    'hypervisor': instance.hypervisor,
569
  }
570
  if override:
571
    args.update(override)
572
  return _BuildInstanceHookEnv(**args)
573

    
574

    
575
def _AdjustCandidatePool(lu):
576
  """Adjust the candidate pool after node operations.
577

578
  """
579
  mod_list = lu.cfg.MaintainCandidatePool()
580
  if mod_list:
581
    lu.LogInfo("Promoted nodes to master candidate role: %s",
582
               ", ".join(node.name for node in mod_list))
583
    for name in mod_list:
584
      lu.context.ReaddNode(name)
585
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586
  if mc_now > mc_max:
587
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588
               (mc_now, mc_max))
589

    
590

    
591
def _CheckInstanceBridgesExist(lu, instance):
592
  """Check that the bridges needed by an instance exist.
593

594
  """
595
  # check bridges existence
596
  brlist = [nic.bridge for nic in instance.nics]
597
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598
  result.Raise()
599
  if not result.data:
600
    raise errors.OpPrereqError("One or more target bridges %s does not"
601
                               " exist on destination node '%s'" %
602
                               (brlist, instance.primary_node))
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signaled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existence and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
752
                        " candidates (and the file is outdated)" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just ran in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analyze the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    for node in nodes:
1294
      # node_volume
1295
      lvs = node_lvs[node]
1296
      if lvs.failed:
1297
        if not lvs.offline:
1298
          self.LogWarning("Connection to node %s failed: %s" %
1299
                          (node, lvs.data))
1300
        continue
1301
      lvs = lvs.data
1302
      if isinstance(lvs, basestring):
1303
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304
        res_nlvm[node] = lvs
1305
        continue
1306
      elif not isinstance(lvs, dict):
1307
        logging.warning("Connection to node %s failed or invalid data"
1308
                        " returned", node)
1309
        res_nodes.append(node)
1310
        continue
1311

    
1312
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313
        inst = nv_dict.pop((node, lv_name), None)
1314
        if (not lv_online and inst is not None
1315
            and inst.name not in res_instances):
1316
          res_instances.append(inst.name)
1317

    
1318
    # any leftover items in nv_dict are missing LVs, let's arrange the
1319
    # data better
1320
    for key, inst in nv_dict.iteritems():
1321
      if inst.name not in res_missing:
1322
        res_missing[inst.name] = []
1323
      res_missing[inst.name].append(key)
1324

    
1325
    return result
1326

    
1327

    
1328
class LURenameCluster(LogicalUnit):
1329
  """Rename the cluster.
1330

1331
  """
1332
  HPATH = "cluster-rename"
1333
  HTYPE = constants.HTYPE_CLUSTER
1334
  _OP_REQP = ["name"]
1335

    
1336
  def BuildHooksEnv(self):
1337
    """Build hooks env.
1338

1339
    """
1340
    env = {
1341
      "OP_TARGET": self.cfg.GetClusterName(),
1342
      "NEW_NAME": self.op.name,
1343
      }
1344
    mn = self.cfg.GetMasterNode()
1345
    return env, [mn], [mn]
1346

    
1347
  def CheckPrereq(self):
1348
    """Verify that the passed name is a valid one.
1349

1350
    """
1351
    hostname = utils.HostInfo(self.op.name)
1352

    
1353
    new_name = hostname.name
1354
    self.ip = new_ip = hostname.ip
1355
    old_name = self.cfg.GetClusterName()
1356
    old_ip = self.cfg.GetMasterIP()
1357
    if new_name == old_name and new_ip == old_ip:
1358
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1359
                                 " cluster has changed")
1360
    if new_ip != old_ip:
1361
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1362
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1363
                                   " reachable on the network. Aborting." %
1364
                                   new_ip)
1365

    
1366
    self.op.name = new_name
1367

    
1368
  def Exec(self, feedback_fn):
1369
    """Rename the cluster.
1370

1371
    """
1372
    clustername = self.op.name
1373
    ip = self.ip
1374

    
1375
    # shutdown the master IP
1376
    master = self.cfg.GetMasterNode()
1377
    result = self.rpc.call_node_stop_master(master, False)
1378
    if result.failed or not result.data:
1379
      raise errors.OpExecError("Could not disable the master role")
1380

    
1381
    try:
1382
      cluster = self.cfg.GetClusterInfo()
1383
      cluster.cluster_name = clustername
1384
      cluster.master_ip = ip
1385
      self.cfg.Update(cluster)
1386

    
1387
      # update the known hosts file
1388
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1389
      node_list = self.cfg.GetNodeList()
1390
      try:
1391
        node_list.remove(master)
1392
      except ValueError:
1393
        pass
1394
      result = self.rpc.call_upload_file(node_list,
1395
                                         constants.SSH_KNOWN_HOSTS_FILE)
1396
      for to_node, to_result in result.iteritems():
1397
        if to_result.failed or not to_result.data:
1398
          logging.error("Copy of file %s to node %s failed",
1399
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1400

    
1401
    finally:
1402
      result = self.rpc.call_node_start_master(master, False)
1403
      if result.failed or not result.data:
1404
        self.LogWarning("Could not re-enable the master role on"
1405
                        " the master, please restart manually.")
1406

    
1407

    
1408
def _RecursiveCheckIfLVMBased(disk):
1409
  """Check if the given disk or its children are lvm-based.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk to check
1413
  @rtype: boolean
1414
  @return: boolean indicating whether a LD_LV dev_type was found or not
1415

1416
  """
1417
  if disk.children:
1418
    for chdisk in disk.children:
1419
      if _RecursiveCheckIfLVMBased(chdisk):
1420
        return True
1421
  return disk.dev_type == constants.LD_LV
1422

    
1423

    
1424
class LUSetClusterParams(LogicalUnit):
1425
  """Change the parameters of the cluster.
1426

1427
  """
1428
  HPATH = "cluster-modify"
1429
  HTYPE = constants.HTYPE_CLUSTER
1430
  _OP_REQP = []
1431
  REQ_BGL = False
1432

    
1433
  def CheckArguments(self):
1434
    """Check parameters
1435

1436
    """
1437
    if not hasattr(self.op, "candidate_pool_size"):
1438
      self.op.candidate_pool_size = None
1439
    if self.op.candidate_pool_size is not None:
1440
      try:
1441
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1442
      except (ValueError, TypeError), err:
1443
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1444
                                   str(err))
1445
      if self.op.candidate_pool_size < 1:
1446
        raise errors.OpPrereqError("At least one master candidate needed")
1447

    
1448
  def ExpandNames(self):
1449
    # FIXME: in the future maybe other cluster params won't require checking on
1450
    # all nodes to be modified.
1451
    self.needed_locks = {
1452
      locking.LEVEL_NODE: locking.ALL_SET,
1453
    }
1454
    self.share_locks[locking.LEVEL_NODE] = 1
1455

    
1456
  def BuildHooksEnv(self):
1457
    """Build hooks env.
1458

1459
    """
1460
    env = {
1461
      "OP_TARGET": self.cfg.GetClusterName(),
1462
      "NEW_VG_NAME": self.op.vg_name,
1463
      }
1464
    mn = self.cfg.GetMasterNode()
1465
    return env, [mn], [mn]
1466

    
1467
  def CheckPrereq(self):
1468
    """Check prerequisites.
1469

1470
    This checks whether the given params don't conflict and
1471
    if the given volume group is valid.
1472

1473
    """
1474
    if self.op.vg_name is not None and not self.op.vg_name:
1475
      instances = self.cfg.GetAllInstancesInfo().values()
1476
      for inst in instances:
1477
        for disk in inst.disks:
1478
          if _RecursiveCheckIfLVMBased(disk):
1479
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1480
                                       " lvm-based instances exist")
1481

    
1482
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1483

    
1484
    # if vg_name not None, checks given volume group on all nodes
1485
    if self.op.vg_name:
1486
      vglist = self.rpc.call_vg_list(node_list)
1487
      for node in node_list:
1488
        if vglist[node].failed:
1489
          # ignoring down node
1490
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1491
          continue
1492
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1493
                                              self.op.vg_name,
1494
                                              constants.MIN_VG_SIZE)
1495
        if vgstatus:
1496
          raise errors.OpPrereqError("Error on node '%s': %s" %
1497
                                     (node, vgstatus))
1498

    
1499
    self.cluster = cluster = self.cfg.GetClusterInfo()
1500
    # validate beparams changes
1501
    if self.op.beparams:
1502
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1503
      self.new_beparams = cluster.FillDict(
1504
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1505

    
1506
    # hypervisor list/parameters
1507
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1508
    if self.op.hvparams:
1509
      if not isinstance(self.op.hvparams, dict):
1510
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1511
      for hv_name, hv_dict in self.op.hvparams.items():
1512
        if hv_name not in self.new_hvparams:
1513
          self.new_hvparams[hv_name] = hv_dict
1514
        else:
1515
          self.new_hvparams[hv_name].update(hv_dict)
1516

    
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.hv_list = self.op.enabled_hypervisors
1519
    else:
1520
      self.hv_list = cluster.enabled_hypervisors
1521

    
1522
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1523
      # either the enabled list has changed, or the parameters have, validate
1524
      for hv_name, hv_params in self.new_hvparams.items():
1525
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1526
            (self.op.enabled_hypervisors and
1527
             hv_name in self.op.enabled_hypervisors)):
1528
          # either this is a new hypervisor, or its parameters have changed
1529
          hv_class = hypervisor.GetHypervisor(hv_name)
1530
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1531
          hv_class.CheckParameterSyntax(hv_params)
1532
          _CheckHVParams(self, node_list, hv_name, hv_params)
1533

    
1534
  def Exec(self, feedback_fn):
1535
    """Change the parameters of the cluster.
1536

1537
    """
1538
    if self.op.vg_name is not None:
1539
      new_volume = self.op.vg_name
1540
      if not new_volume:
1541
        new_volume = None
1542
      if new_volume != self.cfg.GetVGName():
1543
        self.cfg.SetVGName(new_volume)
1544
      else:
1545
        feedback_fn("Cluster LVM configuration already in desired"
1546
                    " state, not changing")
1547
    if self.op.hvparams:
1548
      self.cluster.hvparams = self.new_hvparams
1549
    if self.op.enabled_hypervisors is not None:
1550
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1551
    if self.op.beparams:
1552
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1553
    if self.op.candidate_pool_size is not None:
1554
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1555
      # we need to update the pool size here, otherwise the save will fail
1556
      _AdjustCandidatePool(self)
1557

    
1558
    self.cfg.Update(self.cluster)
1559

    
1560

    
1561
class LURedistributeConfig(NoHooksLU):
1562
  """Force the redistribution of cluster configuration.
1563

1564
  This is a very simple LU.
1565

1566
  """
1567
  _OP_REQP = []
1568
  REQ_BGL = False
1569

    
1570
  def ExpandNames(self):
1571
    self.needed_locks = {
1572
      locking.LEVEL_NODE: locking.ALL_SET,
1573
    }
1574
    self.share_locks[locking.LEVEL_NODE] = 1
1575

    
1576
  def CheckPrereq(self):
1577
    """Check prerequisites.
1578

1579
    """
1580

    
1581
  def Exec(self, feedback_fn):
1582
    """Redistribute the configuration.
1583

1584
    """
1585
    self.cfg.Update(self.cfg.GetClusterInfo())
1586

    
1587

    
1588
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1589
  """Sleep and poll for an instance's disk to sync.
1590

1591
  """
1592
  if not instance.disks:
1593
    return True
1594

    
1595
  if not oneshot:
1596
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1597

    
1598
  node = instance.primary_node
1599

    
1600
  for dev in instance.disks:
1601
    lu.cfg.SetDiskID(dev, node)
1602

    
1603
  retries = 0
1604
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1605
  while True:
1606
    max_time = 0
1607
    done = True
1608
    cumul_degraded = False
1609
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1610
    if rstats.failed or not rstats.data:
1611
      lu.LogWarning("Can't get any data from node %s", node)
1612
      retries += 1
1613
      if retries >= 10:
1614
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1615
                                 " aborting." % node)
1616
      time.sleep(6)
1617
      continue
1618
    rstats = rstats.data
1619
    retries = 0
1620
    for i, mstat in enumerate(rstats):
1621
      if mstat is None:
1622
        lu.LogWarning("Can't compute data for node %s/%s",
1623
                           node, instance.disks[i].iv_name)
1624
        continue
1625
      # we ignore the ldisk parameter
1626
      perc_done, est_time, is_degraded, _ = mstat
1627
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1628
      if perc_done is not None:
1629
        done = False
1630
        if est_time is not None:
1631
          rem_time = "%d estimated seconds remaining" % est_time
1632
          max_time = est_time
1633
        else:
1634
          rem_time = "no time estimate"
1635
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1636
                        (instance.disks[i].iv_name, perc_done, rem_time))
1637

    
1638
    # if we're done but degraded, let's do a few small retries, to
1639
    # make sure we see a stable and not transient situation; therefore
1640
    # we force restart of the loop
1641
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1642
      logging.info("Degraded disks found, %d retries left", degr_retries)
1643
      degr_retries -= 1
1644
      time.sleep(1)
1645
      continue
1646

    
1647
    if done or oneshot:
1648
      break
1649

    
1650
    time.sleep(min(60, max_time))
1651

    
1652
  if done:
1653
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1654
  return not cumul_degraded
1655

    
1656

    
1657
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1658
  """Check that mirrors are not degraded.
1659

1660
  The ldisk parameter, if True, will change the test from the
1661
  is_degraded attribute (which represents overall non-ok status for
1662
  the device(s)) to the ldisk (representing the local storage status).
1663

1664
  """
1665
  lu.cfg.SetDiskID(dev, node)
1666
  if ldisk:
1667
    idx = 6
1668
  else:
1669
    idx = 5
1670

    
1671
  result = True
1672
  if on_primary or dev.AssembleOnSecondary():
1673
    rstats = lu.rpc.call_blockdev_find(node, dev)
1674
    msg = rstats.RemoteFailMsg()
1675
    if msg:
1676
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1677
      result = False
1678
    elif not rstats.payload:
1679
      lu.LogWarning("Can't find disk on node %s", node)
1680
      result = False
1681
    else:
1682
      result = result and (not rstats.payload[idx])
1683
  if dev.children:
1684
    for child in dev.children:
1685
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1686

    
1687
  return result
1688

    
1689

    
1690
class LUDiagnoseOS(NoHooksLU):
1691
  """Logical unit for OS diagnose/query.
1692

1693
  """
1694
  _OP_REQP = ["output_fields", "names"]
1695
  REQ_BGL = False
1696
  _FIELDS_STATIC = utils.FieldSet()
1697
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1698

    
1699
  def ExpandNames(self):
1700
    if self.op.names:
1701
      raise errors.OpPrereqError("Selective OS query not supported")
1702

    
1703
    _CheckOutputFields(static=self._FIELDS_STATIC,
1704
                       dynamic=self._FIELDS_DYNAMIC,
1705
                       selected=self.op.output_fields)
1706

    
1707
    # Lock all nodes, in shared mode
1708
    # Temporary removal of locks, should be reverted later
1709
    # TODO: reintroduce locks when they are lighter-weight
1710
    self.needed_locks = {}
1711
    #self.share_locks[locking.LEVEL_NODE] = 1
1712
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1713

    
1714
  def CheckPrereq(self):
1715
    """Check prerequisites.
1716

1717
    """
1718

    
1719
  @staticmethod
1720
  def _DiagnoseByOS(node_list, rlist):
1721
    """Remaps a per-node return list into an a per-os per-node dictionary
1722

1723
    @param node_list: a list with the names of all nodes
1724
    @param rlist: a map with node names as keys and OS objects as values
1725

1726
    @rtype: dict
1727
    @return: a dictionary with osnames as keys and as value another map, with
1728
        nodes as keys and list of OS objects as values, eg::
1729

1730
          {"debian-etch": {"node1": [<object>,...],
1731
                           "node2": [<object>,]}
1732
          }
1733

1734
    """
1735
    all_os = {}
1736
    # we build here the list of nodes that didn't fail the RPC (at RPC
1737
    # level), so that nodes with a non-responding node daemon don't
1738
    # make all OSes invalid
1739
    good_nodes = [node_name for node_name in rlist
1740
                  if not rlist[node_name].failed]
1741
    for node_name, nr in rlist.iteritems():
1742
      if nr.failed or not nr.data:
1743
        continue
1744
      for os_obj in nr.data:
1745
        if os_obj.name not in all_os:
1746
          # build a list of nodes for this os containing empty lists
1747
          # for each node in node_list
1748
          all_os[os_obj.name] = {}
1749
          for nname in good_nodes:
1750
            all_os[os_obj.name][nname] = []
1751
        all_os[os_obj.name][node_name].append(os_obj)
1752
    return all_os
1753

    
1754
  def Exec(self, feedback_fn):
1755
    """Compute the list of OSes.
1756

1757
    """
1758
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1759
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1760
    if node_data == False:
1761
      raise errors.OpExecError("Can't gather the list of OSes")
1762
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1763
    output = []
1764
    for os_name, os_data in pol.iteritems():
1765
      row = []
1766
      for field in self.op.output_fields:
1767
        if field == "name":
1768
          val = os_name
1769
        elif field == "valid":
1770
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1771
        elif field == "node_status":
1772
          val = {}
1773
          for node_name, nos_list in os_data.iteritems():
1774
            val[node_name] = [(v.status, v.path) for v in nos_list]
1775
        else:
1776
          raise errors.ParameterError(field)
1777
        row.append(val)
1778
      output.append(row)
1779

    
1780
    return output
1781

    
1782

    
1783
class LURemoveNode(LogicalUnit):
1784
  """Logical unit for removing a node.
1785

1786
  """
1787
  HPATH = "node-remove"
1788
  HTYPE = constants.HTYPE_NODE
1789
  _OP_REQP = ["node_name"]
1790

    
1791
  def BuildHooksEnv(self):
1792
    """Build hooks env.
1793

1794
    This doesn't run on the target node in the pre phase as a failed
1795
    node would then be impossible to remove.
1796

1797
    """
1798
    env = {
1799
      "OP_TARGET": self.op.node_name,
1800
      "NODE_NAME": self.op.node_name,
1801
      }
1802
    all_nodes = self.cfg.GetNodeList()
1803
    all_nodes.remove(self.op.node_name)
1804
    return env, all_nodes, all_nodes
1805

    
1806
  def CheckPrereq(self):
1807
    """Check prerequisites.
1808

1809
    This checks:
1810
     - the node exists in the configuration
1811
     - it does not have primary or secondary instances
1812
     - it's not the master
1813

1814
    Any errors are signaled by raising errors.OpPrereqError.
1815

1816
    """
1817
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1818
    if node is None:
1819
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1820

    
1821
    instance_list = self.cfg.GetInstanceList()
1822

    
1823
    masternode = self.cfg.GetMasterNode()
1824
    if node.name == masternode:
1825
      raise errors.OpPrereqError("Node is the master node,"
1826
                                 " you need to failover first.")
1827

    
1828
    for instance_name in instance_list:
1829
      instance = self.cfg.GetInstanceInfo(instance_name)
1830
      if node.name in instance.all_nodes:
1831
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1832
                                   " please remove first." % instance_name)
1833
    self.op.node_name = node.name
1834
    self.node = node
1835

    
1836
  def Exec(self, feedback_fn):
1837
    """Removes the node from the cluster.
1838

1839
    """
1840
    node = self.node
1841
    logging.info("Stopping the node daemon and removing configs from node %s",
1842
                 node.name)
1843

    
1844
    self.context.RemoveNode(node.name)
1845

    
1846
    self.rpc.call_node_leave_cluster(node.name)
1847

    
1848
    # Promote nodes to master candidate as needed
1849
    _AdjustCandidatePool(self)
1850

    
1851

    
1852
class LUQueryNodes(NoHooksLU):
1853
  """Logical unit for querying nodes.
1854

1855
  """
1856
  _OP_REQP = ["output_fields", "names", "use_locking"]
1857
  REQ_BGL = False
1858
  _FIELDS_DYNAMIC = utils.FieldSet(
1859
    "dtotal", "dfree",
1860
    "mtotal", "mnode", "mfree",
1861
    "bootid",
1862
    "ctotal", "cnodes", "csockets",
1863
    )
1864

    
1865
  _FIELDS_STATIC = utils.FieldSet(
1866
    "name", "pinst_cnt", "sinst_cnt",
1867
    "pinst_list", "sinst_list",
1868
    "pip", "sip", "tags",
1869
    "serial_no",
1870
    "master_candidate",
1871
    "master",
1872
    "offline",
1873
    "drained",
1874
    "role",
1875
    )
1876

    
1877
  def ExpandNames(self):
1878
    _CheckOutputFields(static=self._FIELDS_STATIC,
1879
                       dynamic=self._FIELDS_DYNAMIC,
1880
                       selected=self.op.output_fields)
1881

    
1882
    self.needed_locks = {}
1883
    self.share_locks[locking.LEVEL_NODE] = 1
1884

    
1885
    if self.op.names:
1886
      self.wanted = _GetWantedNodes(self, self.op.names)
1887
    else:
1888
      self.wanted = locking.ALL_SET
1889

    
1890
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1891
    self.do_locking = self.do_node_query and self.op.use_locking
1892
    if self.do_locking:
1893
      # if we don't request only static fields, we need to lock the nodes
1894
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1895

    
1896

    
1897
  def CheckPrereq(self):
1898
    """Check prerequisites.
1899

1900
    """
1901
    # The validation of the node list is done in the _GetWantedNodes,
1902
    # if non empty, and if empty, there's no validation to do
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Computes the list of nodes and their attributes.
1907

1908
    """
1909
    all_info = self.cfg.GetAllNodesInfo()
1910
    if self.do_locking:
1911
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1912
    elif self.wanted != locking.ALL_SET:
1913
      nodenames = self.wanted
1914
      missing = set(nodenames).difference(all_info.keys())
1915
      if missing:
1916
        raise errors.OpExecError(
1917
          "Some nodes were removed before retrieving their data: %s" % missing)
1918
    else:
1919
      nodenames = all_info.keys()
1920

    
1921
    nodenames = utils.NiceSort(nodenames)
1922
    nodelist = [all_info[name] for name in nodenames]
1923

    
1924
    # begin data gathering
1925

    
1926
    if self.do_node_query:
1927
      live_data = {}
1928
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1929
                                          self.cfg.GetHypervisorType())
1930
      for name in nodenames:
1931
        nodeinfo = node_data[name]
1932
        if not nodeinfo.failed and nodeinfo.data:
1933
          nodeinfo = nodeinfo.data
1934
          fn = utils.TryConvert
1935
          live_data[name] = {
1936
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1937
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1938
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1939
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1940
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1941
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1942
            "bootid": nodeinfo.get('bootid', None),
1943
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1944
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1945
            }
1946
        else:
1947
          live_data[name] = {}
1948
    else:
1949
      live_data = dict.fromkeys(nodenames, {})
1950

    
1951
    node_to_primary = dict([(name, set()) for name in nodenames])
1952
    node_to_secondary = dict([(name, set()) for name in nodenames])
1953

    
1954
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1955
                             "sinst_cnt", "sinst_list"))
1956
    if inst_fields & frozenset(self.op.output_fields):
1957
      instancelist = self.cfg.GetInstanceList()
1958

    
1959
      for instance_name in instancelist:
1960
        inst = self.cfg.GetInstanceInfo(instance_name)
1961
        if inst.primary_node in node_to_primary:
1962
          node_to_primary[inst.primary_node].add(inst.name)
1963
        for secnode in inst.secondary_nodes:
1964
          if secnode in node_to_secondary:
1965
            node_to_secondary[secnode].add(inst.name)
1966

    
1967
    master_node = self.cfg.GetMasterNode()
1968

    
1969
    # end data gathering
1970

    
1971
    output = []
1972
    for node in nodelist:
1973
      node_output = []
1974
      for field in self.op.output_fields:
1975
        if field == "name":
1976
          val = node.name
1977
        elif field == "pinst_list":
1978
          val = list(node_to_primary[node.name])
1979
        elif field == "sinst_list":
1980
          val = list(node_to_secondary[node.name])
1981
        elif field == "pinst_cnt":
1982
          val = len(node_to_primary[node.name])
1983
        elif field == "sinst_cnt":
1984
          val = len(node_to_secondary[node.name])
1985
        elif field == "pip":
1986
          val = node.primary_ip
1987
        elif field == "sip":
1988
          val = node.secondary_ip
1989
        elif field == "tags":
1990
          val = list(node.GetTags())
1991
        elif field == "serial_no":
1992
          val = node.serial_no
1993
        elif field == "master_candidate":
1994
          val = node.master_candidate
1995
        elif field == "master":
1996
          val = node.name == master_node
1997
        elif field == "offline":
1998
          val = node.offline
1999
        elif field == "drained":
2000
          val = node.drained
2001
        elif self._FIELDS_DYNAMIC.Matches(field):
2002
          val = live_data[node.name].get(field, None)
2003
        elif field == "role":
2004
          if node.name == master_node:
2005
            val = "M"
2006
          elif node.master_candidate:
2007
            val = "C"
2008
          elif node.drained:
2009
            val = "D"
2010
          elif node.offline:
2011
            val = "O"
2012
          else:
2013
            val = "R"
2014
        else:
2015
          raise errors.ParameterError(field)
2016
        node_output.append(val)
2017
      output.append(node_output)
2018

    
2019
    return output
2020

    
2021

    
2022
class LUQueryNodeVolumes(NoHooksLU):
2023
  """Logical unit for getting volumes on node(s).
2024

2025
  """
2026
  _OP_REQP = ["nodes", "output_fields"]
2027
  REQ_BGL = False
2028
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2029
  _FIELDS_STATIC = utils.FieldSet("node")
2030

    
2031
  def ExpandNames(self):
2032
    _CheckOutputFields(static=self._FIELDS_STATIC,
2033
                       dynamic=self._FIELDS_DYNAMIC,
2034
                       selected=self.op.output_fields)
2035

    
2036
    self.needed_locks = {}
2037
    self.share_locks[locking.LEVEL_NODE] = 1
2038
    if not self.op.nodes:
2039
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2040
    else:
2041
      self.needed_locks[locking.LEVEL_NODE] = \
2042
        _GetWantedNodes(self, self.op.nodes)
2043

    
2044
  def CheckPrereq(self):
2045
    """Check prerequisites.
2046

2047
    This checks that the fields required are valid output fields.
2048

2049
    """
2050
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2051

    
2052
  def Exec(self, feedback_fn):
2053
    """Computes the list of nodes and their attributes.
2054

2055
    """
2056
    nodenames = self.nodes
2057
    volumes = self.rpc.call_node_volumes(nodenames)
2058

    
2059
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2060
             in self.cfg.GetInstanceList()]
2061

    
2062
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2063

    
2064
    output = []
2065
    for node in nodenames:
2066
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2067
        continue
2068

    
2069
      node_vols = volumes[node].data[:]
2070
      node_vols.sort(key=lambda vol: vol['dev'])
2071

    
2072
      for vol in node_vols:
2073
        node_output = []
2074
        for field in self.op.output_fields:
2075
          if field == "node":
2076
            val = node
2077
          elif field == "phys":
2078
            val = vol['dev']
2079
          elif field == "vg":
2080
            val = vol['vg']
2081
          elif field == "name":
2082
            val = vol['name']
2083
          elif field == "size":
2084
            val = int(float(vol['size']))
2085
          elif field == "instance":
2086
            for inst in ilist:
2087
              if node not in lv_by_node[inst]:
2088
                continue
2089
              if vol['name'] in lv_by_node[inst][node]:
2090
                val = inst.name
2091
                break
2092
            else:
2093
              val = '-'
2094
          else:
2095
            raise errors.ParameterError(field)
2096
          node_output.append(str(val))
2097

    
2098
        output.append(node_output)
2099

    
2100
    return output
2101

    
2102

    
2103
class LUAddNode(LogicalUnit):
2104
  """Logical unit for adding node to the cluster.
2105

2106
  """
2107
  HPATH = "node-add"
2108
  HTYPE = constants.HTYPE_NODE
2109
  _OP_REQP = ["node_name"]
2110

    
2111
  def BuildHooksEnv(self):
2112
    """Build hooks env.
2113

2114
    This will run on all nodes before, and on all nodes + the new node after.
2115

2116
    """
2117
    env = {
2118
      "OP_TARGET": self.op.node_name,
2119
      "NODE_NAME": self.op.node_name,
2120
      "NODE_PIP": self.op.primary_ip,
2121
      "NODE_SIP": self.op.secondary_ip,
2122
      }
2123
    nodes_0 = self.cfg.GetNodeList()
2124
    nodes_1 = nodes_0 + [self.op.node_name, ]
2125
    return env, nodes_0, nodes_1
2126

    
2127
  def CheckPrereq(self):
2128
    """Check prerequisites.
2129

2130
    This checks:
2131
     - the new node is not already in the config
2132
     - it is resolvable
2133
     - its parameters (single/dual homed) matches the cluster
2134

2135
    Any errors are signaled by raising errors.OpPrereqError.
2136

2137
    """
2138
    node_name = self.op.node_name
2139
    cfg = self.cfg
2140

    
2141
    dns_data = utils.HostInfo(node_name)
2142

    
2143
    node = dns_data.name
2144
    primary_ip = self.op.primary_ip = dns_data.ip
2145
    secondary_ip = getattr(self.op, "secondary_ip", None)
2146
    if secondary_ip is None:
2147
      secondary_ip = primary_ip
2148
    if not utils.IsValidIP(secondary_ip):
2149
      raise errors.OpPrereqError("Invalid secondary IP given")
2150
    self.op.secondary_ip = secondary_ip
2151

    
2152
    node_list = cfg.GetNodeList()
2153
    if not self.op.readd and node in node_list:
2154
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2155
                                 node)
2156
    elif self.op.readd and node not in node_list:
2157
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2158

    
2159
    for existing_node_name in node_list:
2160
      existing_node = cfg.GetNodeInfo(existing_node_name)
2161

    
2162
      if self.op.readd and node == existing_node_name:
2163
        if (existing_node.primary_ip != primary_ip or
2164
            existing_node.secondary_ip != secondary_ip):
2165
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2166
                                     " address configuration as before")
2167
        continue
2168

    
2169
      if (existing_node.primary_ip == primary_ip or
2170
          existing_node.secondary_ip == primary_ip or
2171
          existing_node.primary_ip == secondary_ip or
2172
          existing_node.secondary_ip == secondary_ip):
2173
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2174
                                   " existing node %s" % existing_node.name)
2175

    
2176
    # check that the type of the node (single versus dual homed) is the
2177
    # same as for the master
2178
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2179
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2180
    newbie_singlehomed = secondary_ip == primary_ip
2181
    if master_singlehomed != newbie_singlehomed:
2182
      if master_singlehomed:
2183
        raise errors.OpPrereqError("The master has no private ip but the"
2184
                                   " new node has one")
2185
      else:
2186
        raise errors.OpPrereqError("The master has a private ip but the"
2187
                                   " new node doesn't have one")
2188

    
2189
    # checks reachability
2190
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2191
      raise errors.OpPrereqError("Node not reachable by ping")
2192

    
2193
    if not newbie_singlehomed:
2194
      # check reachability from my secondary ip to newbie's secondary ip
2195
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2196
                           source=myself.secondary_ip):
2197
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2198
                                   " based ping to noded port")
2199

    
2200
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2201
    if self.op.readd:
2202
      exceptions = [node]
2203
    else:
2204
      exceptions = []
2205
    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2206
    # the new node will increase mc_max with one, so:
2207
    mc_max = min(mc_max + 1, cp_size)
2208
    self.master_candidate = mc_now < mc_max
2209

    
2210
    if self.op.readd:
2211
      self.new_node = self.cfg.GetNodeInfo(node)
2212
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
2213
    else:
2214
      self.new_node = objects.Node(name=node,
2215
                                   primary_ip=primary_ip,
2216
                                   secondary_ip=secondary_ip,
2217
                                   master_candidate=self.master_candidate,
2218
                                   offline=False, drained=False)
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Adds the new node to the cluster.
2222

2223
    """
2224
    new_node = self.new_node
2225
    node = new_node.name
2226

    
2227
    # for re-adds, reset the offline/drained/master-candidate flags;
2228
    # we need to reset here, otherwise offline would prevent RPC calls
2229
    # later in the procedure; this also means that if the re-add
2230
    # fails, we are left with a non-offlined, broken node
2231
    if self.op.readd:
2232
      new_node.drained = new_node.offline = False
2233
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2234
      # if we demote the node, we do cleanup later in the procedure
2235
      new_node.master_candidate = self.master_candidate
2236

    
2237
    # notify the user about any possible mc promotion
2238
    if new_node.master_candidate:
2239
      self.LogInfo("Node will be a master candidate")
2240

    
2241
    # check connectivity
2242
    result = self.rpc.call_version([node])[node]
2243
    result.Raise()
2244
    if result.data:
2245
      if constants.PROTOCOL_VERSION == result.data:
2246
        logging.info("Communication to node %s fine, sw version %s match",
2247
                     node, result.data)
2248
      else:
2249
        raise errors.OpExecError("Version mismatch master version %s,"
2250
                                 " node version %s" %
2251
                                 (constants.PROTOCOL_VERSION, result.data))
2252
    else:
2253
      raise errors.OpExecError("Cannot get version from the new node")
2254

    
2255
    # setup ssh on node
2256
    logging.info("Copy ssh key to node %s", node)
2257
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2258
    keyarray = []
2259
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2260
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2261
                priv_key, pub_key]
2262

    
2263
    for i in keyfiles:
2264
      f = open(i, 'r')
2265
      try:
2266
        keyarray.append(f.read())
2267
      finally:
2268
        f.close()
2269

    
2270
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2271
                                    keyarray[2],
2272
                                    keyarray[3], keyarray[4], keyarray[5])
2273

    
2274
    msg = result.RemoteFailMsg()
2275
    if msg:
2276
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2277
                               " new node: %s" % msg)
2278

    
2279
    # Add node to our /etc/hosts, and add key to known_hosts
2280
    utils.AddHostToEtcHosts(new_node.name)
2281

    
2282
    if new_node.secondary_ip != new_node.primary_ip:
2283
      result = self.rpc.call_node_has_ip_address(new_node.name,
2284
                                                 new_node.secondary_ip)
2285
      if result.failed or not result.data:
2286
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2287
                                 " you gave (%s). Please fix and re-run this"
2288
                                 " command." % new_node.secondary_ip)
2289

    
2290
    node_verify_list = [self.cfg.GetMasterNode()]
2291
    node_verify_param = {
2292
      'nodelist': [node],
2293
      # TODO: do a node-net-test as well?
2294
    }
2295

    
2296
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2297
                                       self.cfg.GetClusterName())
2298
    for verifier in node_verify_list:
2299
      if result[verifier].failed or not result[verifier].data:
2300
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2301
                                 " for remote verification" % verifier)
2302
      if result[verifier].data['nodelist']:
2303
        for failed in result[verifier].data['nodelist']:
2304
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2305
                      (verifier, result[verifier].data['nodelist'][failed]))
2306
        raise errors.OpExecError("ssh/hostname verification failed.")
2307

    
2308
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2309
    # including the node just added
2310
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2311
    dist_nodes = self.cfg.GetNodeList()
2312
    if not self.op.readd:
2313
      dist_nodes.append(node)
2314
    if myself.name in dist_nodes:
2315
      dist_nodes.remove(myself.name)
2316

    
2317
    logging.debug("Copying hosts and known_hosts to all nodes")
2318
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2319
      result = self.rpc.call_upload_file(dist_nodes, fname)
2320
      for to_node, to_result in result.iteritems():
2321
        if to_result.failed or not to_result.data:
2322
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2323

    
2324
    to_copy = []
2325
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2326
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2327
      to_copy.append(constants.VNC_PASSWORD_FILE)
2328

    
2329
    for fname in to_copy:
2330
      result = self.rpc.call_upload_file([node], fname)
2331
      if result[node].failed or not result[node]:
2332
        logging.error("Could not copy file %s to node %s", fname, node)
2333

    
2334
    if self.op.readd:
2335
      self.context.ReaddNode(new_node)
2336
      # make sure we redistribute the config
2337
      self.cfg.Update(new_node)
2338
      # and make sure the new node will not have old files around
2339
      if not new_node.master_candidate:
2340
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2341
        msg = result.RemoteFailMsg()
2342
        if msg:
2343
          self.LogWarning("Node failed to demote itself from master"
2344
                          " candidate status: %s" % msg)
2345
    else:
2346
      self.context.AddNode(new_node)
2347

    
2348

    
2349
class LUSetNodeParams(LogicalUnit):
2350
  """Modifies the parameters of a node.
2351

2352
  """
2353
  HPATH = "node-modify"
2354
  HTYPE = constants.HTYPE_NODE
2355
  _OP_REQP = ["node_name"]
2356
  REQ_BGL = False
2357

    
2358
  def CheckArguments(self):
2359
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2360
    if node_name is None:
2361
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2362
    self.op.node_name = node_name
2363
    _CheckBooleanOpField(self.op, 'master_candidate')
2364
    _CheckBooleanOpField(self.op, 'offline')
2365
    _CheckBooleanOpField(self.op, 'drained')
2366
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2367
    if all_mods.count(None) == 3:
2368
      raise errors.OpPrereqError("Please pass at least one modification")
2369
    if all_mods.count(True) > 1:
2370
      raise errors.OpPrereqError("Can't set the node into more than one"
2371
                                 " state at the same time")
2372

    
2373
  def ExpandNames(self):
2374
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2375

    
2376
  def BuildHooksEnv(self):
2377
    """Build hooks env.
2378

2379
    This runs on the master node.
2380

2381
    """
2382
    env = {
2383
      "OP_TARGET": self.op.node_name,
2384
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2385
      "OFFLINE": str(self.op.offline),
2386
      "DRAINED": str(self.op.drained),
2387
      }
2388
    nl = [self.cfg.GetMasterNode(),
2389
          self.op.node_name]
2390
    return env, nl, nl
2391

    
2392
  def CheckPrereq(self):
2393
    """Check prerequisites.
2394

2395
    This only checks the instance list against the existing names.
2396

2397
    """
2398
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2399

    
2400
    if ((self.op.master_candidate == False or self.op.offline == True or
2401
         self.op.drained == True) and node.master_candidate):
2402
      # we will demote the node from master_candidate
2403
      if self.op.node_name == self.cfg.GetMasterNode():
2404
        raise errors.OpPrereqError("The master node has to be a"
2405
                                   " master candidate, online and not drained")
2406
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2407
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2408
      if num_candidates <= cp_size:
2409
        msg = ("Not enough master candidates (desired"
2410
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2411
        if self.op.force:
2412
          self.LogWarning(msg)
2413
        else:
2414
          raise errors.OpPrereqError(msg)
2415

    
2416
    if (self.op.master_candidate == True and
2417
        ((node.offline and not self.op.offline == False) or
2418
         (node.drained and not self.op.drained == False))):
2419
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2420
                                 " to master_candidate" % node.name)
2421

    
2422
    return
2423

    
2424
  def Exec(self, feedback_fn):
2425
    """Modifies a node.
2426

2427
    """
2428
    node = self.node
2429

    
2430
    result = []
2431
    changed_mc = False
2432

    
2433
    if self.op.offline is not None:
2434
      node.offline = self.op.offline
2435
      result.append(("offline", str(self.op.offline)))
2436
      if self.op.offline == True:
2437
        if node.master_candidate:
2438
          node.master_candidate = False
2439
          changed_mc = True
2440
          result.append(("master_candidate", "auto-demotion due to offline"))
2441
        if node.drained:
2442
          node.drained = False
2443
          result.append(("drained", "clear drained status due to offline"))
2444

    
2445
    if self.op.master_candidate is not None:
2446
      node.master_candidate = self.op.master_candidate
2447
      changed_mc = True
2448
      result.append(("master_candidate", str(self.op.master_candidate)))
2449
      if self.op.master_candidate == False:
2450
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2451
        msg = rrc.RemoteFailMsg()
2452
        if msg:
2453
          self.LogWarning("Node failed to demote itself: %s" % msg)
2454

    
2455
    if self.op.drained is not None:
2456
      node.drained = self.op.drained
2457
      result.append(("drained", str(self.op.drained)))
2458
      if self.op.drained == True:
2459
        if node.master_candidate:
2460
          node.master_candidate = False
2461
          changed_mc = True
2462
          result.append(("master_candidate", "auto-demotion due to drain"))
2463
          rrc = self.rpc.call_node_demote_from_mc(node.name)
2464
          msg = rrc.RemoteFailMsg()
2465
          if msg:
2466
            self.LogWarning("Node failed to demote itself: %s" % msg)
2467
        if node.offline:
2468
          node.offline = False
2469
          result.append(("offline", "clear offline status due to drain"))
2470

    
2471
    # this will trigger configuration file update, if needed
2472
    self.cfg.Update(node)
2473
    # this will trigger job queue propagation or cleanup
2474
    if changed_mc:
2475
      self.context.ReaddNode(node)
2476

    
2477
    return result
2478

    
2479

    
2480
class LUQueryClusterInfo(NoHooksLU):
2481
  """Query cluster configuration.
2482

2483
  """
2484
  _OP_REQP = []
2485
  REQ_BGL = False
2486

    
2487
  def ExpandNames(self):
2488
    self.needed_locks = {}
2489

    
2490
  def CheckPrereq(self):
2491
    """No prerequsites needed for this LU.
2492

2493
    """
2494
    pass
2495

    
2496
  def Exec(self, feedback_fn):
2497
    """Return cluster config.
2498

2499
    """
2500
    cluster = self.cfg.GetClusterInfo()
2501
    result = {
2502
      "software_version": constants.RELEASE_VERSION,
2503
      "protocol_version": constants.PROTOCOL_VERSION,
2504
      "config_version": constants.CONFIG_VERSION,
2505
      "os_api_version": constants.OS_API_VERSION,
2506
      "export_version": constants.EXPORT_VERSION,
2507
      "architecture": (platform.architecture()[0], platform.machine()),
2508
      "name": cluster.cluster_name,
2509
      "master": cluster.master_node,
2510
      "default_hypervisor": cluster.default_hypervisor,
2511
      "enabled_hypervisors": cluster.enabled_hypervisors,
2512
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2513
                        for hypervisor_name in cluster.enabled_hypervisors]),
2514
      "beparams": cluster.beparams,
2515
      "candidate_pool_size": cluster.candidate_pool_size,
2516
      "default_bridge": cluster.default_bridge,
2517
      "master_netdev": cluster.master_netdev,
2518
      "volume_group_name": cluster.volume_group_name,
2519
      "file_storage_dir": cluster.file_storage_dir,
2520
      }
2521

    
2522
    return result
2523

    
2524

    
2525
class LUQueryConfigValues(NoHooksLU):
2526
  """Return configuration values.
2527

2528
  """
2529
  _OP_REQP = []
2530
  REQ_BGL = False
2531
  _FIELDS_DYNAMIC = utils.FieldSet()
2532
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2533

    
2534
  def ExpandNames(self):
2535
    self.needed_locks = {}
2536

    
2537
    _CheckOutputFields(static=self._FIELDS_STATIC,
2538
                       dynamic=self._FIELDS_DYNAMIC,
2539
                       selected=self.op.output_fields)
2540

    
2541
  def CheckPrereq(self):
2542
    """No prerequisites.
2543

2544
    """
2545
    pass
2546

    
2547
  def Exec(self, feedback_fn):
2548
    """Dump a representation of the cluster config to the standard output.
2549

2550
    """
2551
    values = []
2552
    for field in self.op.output_fields:
2553
      if field == "cluster_name":
2554
        entry = self.cfg.GetClusterName()
2555
      elif field == "master_node":
2556
        entry = self.cfg.GetMasterNode()
2557
      elif field == "drain_flag":
2558
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2559
      else:
2560
        raise errors.ParameterError(field)
2561
      values.append(entry)
2562
    return values
2563

    
2564

    
2565
class LUActivateInstanceDisks(NoHooksLU):
2566
  """Bring up an instance's disks.
2567

2568
  """
2569
  _OP_REQP = ["instance_name"]
2570
  REQ_BGL = False
2571

    
2572
  def ExpandNames(self):
2573
    self._ExpandAndLockInstance()
2574
    self.needed_locks[locking.LEVEL_NODE] = []
2575
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2576

    
2577
  def DeclareLocks(self, level):
2578
    if level == locking.LEVEL_NODE:
2579
      self._LockInstancesNodes()
2580

    
2581
  def CheckPrereq(self):
2582
    """Check prerequisites.
2583

2584
    This checks that the instance is in the cluster.
2585

2586
    """
2587
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2588
    assert self.instance is not None, \
2589
      "Cannot retrieve locked instance %s" % self.op.instance_name
2590
    _CheckNodeOnline(self, self.instance.primary_node)
2591

    
2592
  def Exec(self, feedback_fn):
2593
    """Activate the disks.
2594

2595
    """
2596
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2597
    if not disks_ok:
2598
      raise errors.OpExecError("Cannot activate block devices")
2599

    
2600
    return disks_info
2601

    
2602

    
2603
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2604
  """Prepare the block devices for an instance.
2605

2606
  This sets up the block devices on all nodes.
2607

2608
  @type lu: L{LogicalUnit}
2609
  @param lu: the logical unit on whose behalf we execute
2610
  @type instance: L{objects.Instance}
2611
  @param instance: the instance for whose disks we assemble
2612
  @type ignore_secondaries: boolean
2613
  @param ignore_secondaries: if true, errors on secondary nodes
2614
      won't result in an error return from the function
2615
  @return: False if the operation failed, otherwise a list of
2616
      (host, instance_visible_name, node_visible_name)
2617
      with the mapping from node devices to instance devices
2618

2619
  """
2620
  device_info = []
2621
  disks_ok = True
2622
  iname = instance.name
2623
  # With the two passes mechanism we try to reduce the window of
2624
  # opportunity for the race condition of switching DRBD to primary
2625
  # before handshaking occured, but we do not eliminate it
2626

    
2627
  # The proper fix would be to wait (with some limits) until the
2628
  # connection has been made and drbd transitions from WFConnection
2629
  # into any other network-connected state (Connected, SyncTarget,
2630
  # SyncSource, etc.)
2631

    
2632
  # 1st pass, assemble on all nodes in secondary mode
2633
  for inst_disk in instance.disks:
2634
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2635
      lu.cfg.SetDiskID(node_disk, node)
2636
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2637
      msg = result.RemoteFailMsg()
2638
      if msg:
2639
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2640
                           " (is_primary=False, pass=1): %s",
2641
                           inst_disk.iv_name, node, msg)
2642
        if not ignore_secondaries:
2643
          disks_ok = False
2644

    
2645
  # FIXME: race condition on drbd migration to primary
2646

    
2647
  # 2nd pass, do only the primary node
2648
  for inst_disk in instance.disks:
2649
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2650
      if node != instance.primary_node:
2651
        continue
2652
      lu.cfg.SetDiskID(node_disk, node)
2653
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2654
      msg = result.RemoteFailMsg()
2655
      if msg:
2656
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2657
                           " (is_primary=True, pass=2): %s",
2658
                           inst_disk.iv_name, node, msg)
2659
        disks_ok = False
2660
    device_info.append((instance.primary_node, inst_disk.iv_name,
2661
                        result.payload))
2662

    
2663
  # leave the disks configured for the primary node
2664
  # this is a workaround that would be fixed better by
2665
  # improving the logical/physical id handling
2666
  for disk in instance.disks:
2667
    lu.cfg.SetDiskID(disk, instance.primary_node)
2668

    
2669
  return disks_ok, device_info
2670

    
2671

    
2672
def _StartInstanceDisks(lu, instance, force):
2673
  """Start the disks of an instance.
2674

2675
  """
2676
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2677
                                           ignore_secondaries=force)
2678
  if not disks_ok:
2679
    _ShutdownInstanceDisks(lu, instance)
2680
    if force is not None and not force:
2681
      lu.proc.LogWarning("", hint="If the message above refers to a"
2682
                         " secondary node,"
2683
                         " you can retry the operation using '--force'.")
2684
    raise errors.OpExecError("Disk consistency error")
2685

    
2686

    
2687
class LUDeactivateInstanceDisks(NoHooksLU):
2688
  """Shutdown an instance's disks.
2689

2690
  """
2691
  _OP_REQP = ["instance_name"]
2692
  REQ_BGL = False
2693

    
2694
  def ExpandNames(self):
2695
    self._ExpandAndLockInstance()
2696
    self.needed_locks[locking.LEVEL_NODE] = []
2697
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2698

    
2699
  def DeclareLocks(self, level):
2700
    if level == locking.LEVEL_NODE:
2701
      self._LockInstancesNodes()
2702

    
2703
  def CheckPrereq(self):
2704
    """Check prerequisites.
2705

2706
    This checks that the instance is in the cluster.
2707

2708
    """
2709
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2710
    assert self.instance is not None, \
2711
      "Cannot retrieve locked instance %s" % self.op.instance_name
2712

    
2713
  def Exec(self, feedback_fn):
2714
    """Deactivate the disks
2715

2716
    """
2717
    instance = self.instance
2718
    _SafeShutdownInstanceDisks(self, instance)
2719

    
2720

    
2721
def _SafeShutdownInstanceDisks(lu, instance):
2722
  """Shutdown block devices of an instance.
2723

2724
  This function checks if an instance is running, before calling
2725
  _ShutdownInstanceDisks.
2726

2727
  """
2728
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2729
                                      [instance.hypervisor])
2730
  ins_l = ins_l[instance.primary_node]
2731
  if ins_l.failed or not isinstance(ins_l.data, list):
2732
    raise errors.OpExecError("Can't contact node '%s'" %
2733
                             instance.primary_node)
2734

    
2735
  if instance.name in ins_l.data:
2736
    raise errors.OpExecError("Instance is running, can't shutdown"
2737
                             " block devices.")
2738

    
2739
  _ShutdownInstanceDisks(lu, instance)
2740

    
2741

    
2742
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2743
  """Shutdown block devices of an instance.
2744

2745
  This does the shutdown on all nodes of the instance.
2746

2747
  If the ignore_primary is false, errors on the primary node are
2748
  ignored.
2749

2750
  """
2751
  all_result = True
2752
  for disk in instance.disks:
2753
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2754
      lu.cfg.SetDiskID(top_disk, node)
2755
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2756
      msg = result.RemoteFailMsg()
2757
      if msg:
2758
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2759
                      disk.iv_name, node, msg)
2760
        if not ignore_primary or node != instance.primary_node:
2761
          all_result = False
2762
  return all_result
2763

    
2764

    
2765
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2766
  """Checks if a node has enough free memory.
2767

2768
  This function check if a given node has the needed amount of free
2769
  memory. In case the node has less memory or we cannot get the
2770
  information from the node, this function raise an OpPrereqError
2771
  exception.
2772

2773
  @type lu: C{LogicalUnit}
2774
  @param lu: a logical unit from which we get configuration data
2775
  @type node: C{str}
2776
  @param node: the node to check
2777
  @type reason: C{str}
2778
  @param reason: string to use in the error message
2779
  @type requested: C{int}
2780
  @param requested: the amount of memory in MiB to check for
2781
  @type hypervisor_name: C{str}
2782
  @param hypervisor_name: the hypervisor to ask for memory stats
2783
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2784
      we cannot check the node
2785

2786
  """
2787
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2788
  nodeinfo[node].Raise()
2789
  free_mem = nodeinfo[node].data.get('memory_free')
2790
  if not isinstance(free_mem, int):
2791
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2792
                             " was '%s'" % (node, free_mem))
2793
  if requested > free_mem:
2794
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2795
                             " needed %s MiB, available %s MiB" %
2796
                             (node, reason, requested, free_mem))
2797

    
2798

    
2799
class LUStartupInstance(LogicalUnit):
2800
  """Starts an instance.
2801

2802
  """
2803
  HPATH = "instance-start"
2804
  HTYPE = constants.HTYPE_INSTANCE
2805
  _OP_REQP = ["instance_name", "force"]
2806
  REQ_BGL = False
2807

    
2808
  def ExpandNames(self):
2809
    self._ExpandAndLockInstance()
2810

    
2811
  def BuildHooksEnv(self):
2812
    """Build hooks env.
2813

2814
    This runs on master, primary and secondary nodes of the instance.
2815

2816
    """
2817
    env = {
2818
      "FORCE": self.op.force,
2819
      }
2820
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2821
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2822
    return env, nl, nl
2823

    
2824
  def CheckPrereq(self):
2825
    """Check prerequisites.
2826

2827
    This checks that the instance is in the cluster.
2828

2829
    """
2830
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2831
    assert self.instance is not None, \
2832
      "Cannot retrieve locked instance %s" % self.op.instance_name
2833

    
2834
    # extra beparams
2835
    self.beparams = getattr(self.op, "beparams", {})
2836
    if self.beparams:
2837
      if not isinstance(self.beparams, dict):
2838
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2839
                                   " dict" % (type(self.beparams), ))
2840
      # fill the beparams dict
2841
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2842
      self.op.beparams = self.beparams
2843

    
2844
    # extra hvparams
2845
    self.hvparams = getattr(self.op, "hvparams", {})
2846
    if self.hvparams:
2847
      if not isinstance(self.hvparams, dict):
2848
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2849
                                   " dict" % (type(self.hvparams), ))
2850

    
2851
      # check hypervisor parameter syntax (locally)
2852
      cluster = self.cfg.GetClusterInfo()
2853
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2854
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2855
                                    instance.hvparams)
2856
      filled_hvp.update(self.hvparams)
2857
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2858
      hv_type.CheckParameterSyntax(filled_hvp)
2859
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2860
      self.op.hvparams = self.hvparams
2861

    
2862
    _CheckNodeOnline(self, instance.primary_node)
2863

    
2864
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2865
    # check bridges existence
2866
    _CheckInstanceBridgesExist(self, instance)
2867

    
2868
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2869
                                              instance.name,
2870
                                              instance.hypervisor)
2871
    remote_info.Raise()
2872
    if not remote_info.data:
2873
      _CheckNodeFreeMemory(self, instance.primary_node,
2874
                           "starting instance %s" % instance.name,
2875
                           bep[constants.BE_MEMORY], instance.hypervisor)
2876

    
2877
  def Exec(self, feedback_fn):
2878
    """Start the instance.
2879

2880
    """
2881
    instance = self.instance
2882
    force = self.op.force
2883

    
2884
    self.cfg.MarkInstanceUp(instance.name)
2885

    
2886
    node_current = instance.primary_node
2887

    
2888
    _StartInstanceDisks(self, instance, force)
2889

    
2890
    result = self.rpc.call_instance_start(node_current, instance,
2891
                                          self.hvparams, self.beparams)
2892
    msg = result.RemoteFailMsg()
2893
    if msg:
2894
      _ShutdownInstanceDisks(self, instance)
2895
      raise errors.OpExecError("Could not start instance: %s" % msg)
2896

    
2897

    
2898
class LURebootInstance(LogicalUnit):
2899
  """Reboot an instance.
2900

2901
  """
2902
  HPATH = "instance-reboot"
2903
  HTYPE = constants.HTYPE_INSTANCE
2904
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2905
  REQ_BGL = False
2906

    
2907
  def ExpandNames(self):
2908
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2909
                                   constants.INSTANCE_REBOOT_HARD,
2910
                                   constants.INSTANCE_REBOOT_FULL]:
2911
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2912
                                  (constants.INSTANCE_REBOOT_SOFT,
2913
                                   constants.INSTANCE_REBOOT_HARD,
2914
                                   constants.INSTANCE_REBOOT_FULL))
2915
    self._ExpandAndLockInstance()
2916

    
2917
  def BuildHooksEnv(self):
2918
    """Build hooks env.
2919

2920
    This runs on master, primary and secondary nodes of the instance.
2921

2922
    """
2923
    env = {
2924
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2925
      "REBOOT_TYPE": self.op.reboot_type,
2926
      }
2927
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2928
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2929
    return env, nl, nl
2930

    
2931
  def CheckPrereq(self):
2932
    """Check prerequisites.
2933

2934
    This checks that the instance is in the cluster.
2935

2936
    """
2937
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2938
    assert self.instance is not None, \
2939
      "Cannot retrieve locked instance %s" % self.op.instance_name
2940

    
2941
    _CheckNodeOnline(self, instance.primary_node)
2942

    
2943
    # check bridges existence
2944
    _CheckInstanceBridgesExist(self, instance)
2945

    
2946
  def Exec(self, feedback_fn):
2947
    """Reboot the instance.
2948

2949
    """
2950
    instance = self.instance
2951
    ignore_secondaries = self.op.ignore_secondaries
2952
    reboot_type = self.op.reboot_type
2953

    
2954
    node_current = instance.primary_node
2955

    
2956
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2957
                       constants.INSTANCE_REBOOT_HARD]:
2958
      for disk in instance.disks:
2959
        self.cfg.SetDiskID(disk, node_current)
2960
      result = self.rpc.call_instance_reboot(node_current, instance,
2961
                                             reboot_type)
2962
      msg = result.RemoteFailMsg()
2963
      if msg:
2964
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2965
    else:
2966
      result = self.rpc.call_instance_shutdown(node_current, instance)
2967
      msg = result.RemoteFailMsg()
2968
      if msg:
2969
        raise errors.OpExecError("Could not shutdown instance for"
2970
                                 " full reboot: %s" % msg)
2971
      _ShutdownInstanceDisks(self, instance)
2972
      _StartInstanceDisks(self, instance, ignore_secondaries)
2973
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2974
      msg = result.RemoteFailMsg()
2975
      if msg:
2976
        _ShutdownInstanceDisks(self, instance)
2977
        raise errors.OpExecError("Could not start instance for"
2978
                                 " full reboot: %s" % msg)
2979

    
2980
    self.cfg.MarkInstanceUp(instance.name)
2981

    
2982

    
2983
class LUShutdownInstance(LogicalUnit):
2984
  """Shutdown an instance.
2985

2986
  """
2987
  HPATH = "instance-stop"
2988
  HTYPE = constants.HTYPE_INSTANCE
2989
  _OP_REQP = ["instance_name"]
2990
  REQ_BGL = False
2991

    
2992
  def ExpandNames(self):
2993
    self._ExpandAndLockInstance()
2994

    
2995
  def BuildHooksEnv(self):
2996
    """Build hooks env.
2997

2998
    This runs on master, primary and secondary nodes of the instance.
2999

3000
    """
3001
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3002
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3003
    return env, nl, nl
3004

    
3005
  def CheckPrereq(self):
3006
    """Check prerequisites.
3007

3008
    This checks that the instance is in the cluster.
3009

3010
    """
3011
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3012
    assert self.instance is not None, \
3013
      "Cannot retrieve locked instance %s" % self.op.instance_name
3014
    _CheckNodeOnline(self, self.instance.primary_node)
3015

    
3016
  def Exec(self, feedback_fn):
3017
    """Shutdown the instance.
3018

3019
    """
3020
    instance = self.instance
3021
    node_current = instance.primary_node
3022
    self.cfg.MarkInstanceDown(instance.name)
3023
    result = self.rpc.call_instance_shutdown(node_current, instance)
3024
    msg = result.RemoteFailMsg()
3025
    if msg:
3026
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3027

    
3028
    _ShutdownInstanceDisks(self, instance)
3029

    
3030

    
3031
class LUReinstallInstance(LogicalUnit):
3032
  """Reinstall an instance.
3033

3034
  """
3035
  HPATH = "instance-reinstall"
3036
  HTYPE = constants.HTYPE_INSTANCE
3037
  _OP_REQP = ["instance_name"]
3038
  REQ_BGL = False
3039

    
3040
  def ExpandNames(self):
3041
    self._ExpandAndLockInstance()
3042

    
3043
  def BuildHooksEnv(self):
3044
    """Build hooks env.
3045

3046
    This runs on master, primary and secondary nodes of the instance.
3047

3048
    """
3049
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3050
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3051
    return env, nl, nl
3052

    
3053
  def CheckPrereq(self):
3054
    """Check prerequisites.
3055

3056
    This checks that the instance is in the cluster and is not running.
3057

3058
    """
3059
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3060
    assert instance is not None, \
3061
      "Cannot retrieve locked instance %s" % self.op.instance_name
3062
    _CheckNodeOnline(self, instance.primary_node)
3063

    
3064
    if instance.disk_template == constants.DT_DISKLESS:
3065
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3066
                                 self.op.instance_name)
3067
    if instance.admin_up:
3068
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3069
                                 self.op.instance_name)
3070
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3071
                                              instance.name,
3072
                                              instance.hypervisor)
3073
    remote_info.Raise()
3074
    if remote_info.data:
3075
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3076
                                 (self.op.instance_name,
3077
                                  instance.primary_node))
3078

    
3079
    self.op.os_type = getattr(self.op, "os_type", None)
3080
    if self.op.os_type is not None:
3081
      # OS verification
3082
      pnode = self.cfg.GetNodeInfo(
3083
        self.cfg.ExpandNodeName(instance.primary_node))
3084
      if pnode is None:
3085
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3086
                                   self.op.pnode)
3087
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3088
      result.Raise()
3089
      if not isinstance(result.data, objects.OS):
3090
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3091
                                   " primary node"  % self.op.os_type)
3092

    
3093
    self.instance = instance
3094

    
3095
  def Exec(self, feedback_fn):
3096
    """Reinstall the instance.
3097

3098
    """
3099
    inst = self.instance
3100

    
3101
    if self.op.os_type is not None:
3102
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3103
      inst.os = self.op.os_type
3104
      self.cfg.Update(inst)
3105

    
3106
    _StartInstanceDisks(self, inst, None)
3107
    try:
3108
      feedback_fn("Running the instance OS create scripts...")
3109
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3110
      msg = result.RemoteFailMsg()
3111
      if msg:
3112
        raise errors.OpExecError("Could not install OS for instance %s"
3113
                                 " on node %s: %s" %
3114
                                 (inst.name, inst.primary_node, msg))
3115
    finally:
3116
      _ShutdownInstanceDisks(self, inst)
3117

    
3118

    
3119
class LURenameInstance(LogicalUnit):
3120
  """Rename an instance.
3121

3122
  """
3123
  HPATH = "instance-rename"
3124
  HTYPE = constants.HTYPE_INSTANCE
3125
  _OP_REQP = ["instance_name", "new_name"]
3126

    
3127
  def BuildHooksEnv(self):
3128
    """Build hooks env.
3129

3130
    This runs on master, primary and secondary nodes of the instance.
3131

3132
    """
3133
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3134
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3135
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3136
    return env, nl, nl
3137

    
3138
  def CheckPrereq(self):
3139
    """Check prerequisites.
3140

3141
    This checks that the instance is in the cluster and is not running.
3142

3143
    """
3144
    instance = self.cfg.GetInstanceInfo(
3145
      self.cfg.ExpandInstanceName(self.op.instance_name))
3146
    if instance is None:
3147
      raise errors.OpPrereqError("Instance '%s' not known" %
3148
                                 self.op.instance_name)
3149
    _CheckNodeOnline(self, instance.primary_node)
3150

    
3151
    if instance.admin_up:
3152
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3153
                                 self.op.instance_name)
3154
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3155
                                              instance.name,
3156
                                              instance.hypervisor)
3157
    remote_info.Raise()
3158
    if remote_info.data:
3159
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3160
                                 (self.op.instance_name,
3161
                                  instance.primary_node))
3162
    self.instance = instance
3163

    
3164
    # new name verification
3165
    name_info = utils.HostInfo(self.op.new_name)
3166

    
3167
    self.op.new_name = new_name = name_info.name
3168
    instance_list = self.cfg.GetInstanceList()
3169
    if new_name in instance_list:
3170
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3171
                                 new_name)
3172

    
3173
    if not getattr(self.op, "ignore_ip", False):
3174
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3175
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3176
                                   (name_info.ip, new_name))
3177

    
3178

    
3179
  def Exec(self, feedback_fn):
3180
    """Reinstall the instance.
3181

3182
    """
3183
    inst = self.instance
3184
    old_name = inst.name
3185

    
3186
    if inst.disk_template == constants.DT_FILE:
3187
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3188

    
3189
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3190
    # Change the instance lock. This is definitely safe while we hold the BGL
3191
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3192
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3193

    
3194
    # re-read the instance from the configuration after rename
3195
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3196

    
3197
    if inst.disk_template == constants.DT_FILE:
3198
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3199
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3200
                                                     old_file_storage_dir,
3201
                                                     new_file_storage_dir)
3202
      result.Raise()
3203
      if not result.data:
3204
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3205
                                 " directory '%s' to '%s' (but the instance"
3206
                                 " has been renamed in Ganeti)" % (
3207
                                 inst.primary_node, old_file_storage_dir,
3208
                                 new_file_storage_dir))
3209

    
3210
      if not result.data[0]:
3211
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3212
                                 " (but the instance has been renamed in"
3213
                                 " Ganeti)" % (old_file_storage_dir,
3214
                                               new_file_storage_dir))
3215

    
3216
    _StartInstanceDisks(self, inst, None)
3217
    try:
3218
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3219
                                                 old_name)
3220
      msg = result.RemoteFailMsg()
3221
      if msg:
3222
        msg = ("Could not run OS rename script for instance %s on node %s"
3223
               " (but the instance has been renamed in Ganeti): %s" %
3224
               (inst.name, inst.primary_node, msg))
3225
        self.proc.LogWarning(msg)
3226
    finally:
3227
      _ShutdownInstanceDisks(self, inst)
3228

    
3229

    
3230
class LURemoveInstance(LogicalUnit):
3231
  """Remove an instance.
3232

3233
  """
3234
  HPATH = "instance-remove"
3235
  HTYPE = constants.HTYPE_INSTANCE
3236
  _OP_REQP = ["instance_name", "ignore_failures"]
3237
  REQ_BGL = False
3238

    
3239
  def ExpandNames(self):
3240
    self._ExpandAndLockInstance()
3241
    self.needed_locks[locking.LEVEL_NODE] = []
3242
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3243

    
3244
  def DeclareLocks(self, level):
3245
    if level == locking.LEVEL_NODE:
3246
      self._LockInstancesNodes()
3247

    
3248
  def BuildHooksEnv(self):
3249
    """Build hooks env.
3250

3251
    This runs on master, primary and secondary nodes of the instance.
3252

3253
    """
3254
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3255
    nl = [self.cfg.GetMasterNode()]
3256
    return env, nl, nl
3257

    
3258
  def CheckPrereq(self):
3259
    """Check prerequisites.
3260

3261
    This checks that the instance is in the cluster.
3262

3263
    """
3264
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3265
    assert self.instance is not None, \
3266
      "Cannot retrieve locked instance %s" % self.op.instance_name
3267

    
3268
  def Exec(self, feedback_fn):
3269
    """Remove the instance.
3270

3271
    """
3272
    instance = self.instance
3273
    logging.info("Shutting down instance %s on node %s",
3274
                 instance.name, instance.primary_node)
3275

    
3276
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3277
    msg = result.RemoteFailMsg()
3278
    if msg:
3279
      if self.op.ignore_failures:
3280
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3281
      else:
3282
        raise errors.OpExecError("Could not shutdown instance %s on"
3283
                                 " node %s: %s" %
3284
                                 (instance.name, instance.primary_node, msg))
3285

    
3286
    logging.info("Removing block devices for instance %s", instance.name)
3287

    
3288
    if not _RemoveDisks(self, instance):
3289
      if self.op.ignore_failures:
3290
        feedback_fn("Warning: can't remove instance's disks")
3291
      else:
3292
        raise errors.OpExecError("Can't remove instance's disks")
3293

    
3294
    logging.info("Removing instance %s out of cluster config", instance.name)
3295

    
3296
    self.cfg.RemoveInstance(instance.name)
3297
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3298

    
3299

    
3300
class LUQueryInstances(NoHooksLU):
3301
  """Logical unit for querying instances.
3302

3303
  """
3304
  _OP_REQP = ["output_fields", "names", "use_locking"]
3305
  REQ_BGL = False
3306
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3307
                                    "admin_state",
3308
                                    "disk_template", "ip", "mac", "bridge",
3309
                                    "sda_size", "sdb_size", "vcpus", "tags",
3310
                                    "network_port", "beparams",
3311
                                    r"(disk)\.(size)/([0-9]+)",
3312
                                    r"(disk)\.(sizes)", "disk_usage",
3313
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3314
                                    r"(nic)\.(macs|ips|bridges)",
3315
                                    r"(disk|nic)\.(count)",
3316
                                    "serial_no", "hypervisor", "hvparams",] +
3317
                                  ["hv/%s" % name
3318
                                   for name in constants.HVS_PARAMETERS] +
3319
                                  ["be/%s" % name
3320
                                   for name in constants.BES_PARAMETERS])
3321
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3322

    
3323

    
3324
  def ExpandNames(self):
3325
    _CheckOutputFields(static=self._FIELDS_STATIC,
3326
                       dynamic=self._FIELDS_DYNAMIC,
3327
                       selected=self.op.output_fields)
3328

    
3329
    self.needed_locks = {}
3330
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3331
    self.share_locks[locking.LEVEL_NODE] = 1
3332

    
3333
    if self.op.names:
3334
      self.wanted = _GetWantedInstances(self, self.op.names)
3335
    else:
3336
      self.wanted = locking.ALL_SET
3337

    
3338
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3339
    self.do_locking = self.do_node_query and self.op.use_locking
3340
    if self.do_locking:
3341
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3342
      self.needed_locks[locking.LEVEL_NODE] = []
3343
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3344

    
3345
  def DeclareLocks(self, level):
3346
    if level == locking.LEVEL_NODE and self.do_locking:
3347
      self._LockInstancesNodes()
3348

    
3349
  def CheckPrereq(self):
3350
    """Check prerequisites.
3351

3352
    """
3353
    pass
3354

    
3355
  def Exec(self, feedback_fn):
3356
    """Computes the list of nodes and their attributes.
3357

3358
    """
3359
    all_info = self.cfg.GetAllInstancesInfo()
3360
    if self.wanted == locking.ALL_SET:
3361
      # caller didn't specify instance names, so ordering is not important
3362
      if self.do_locking:
3363
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3364
      else:
3365
        instance_names = all_info.keys()
3366
      instance_names = utils.NiceSort(instance_names)
3367
    else:
3368
      # caller did specify names, so we must keep the ordering
3369
      if self.do_locking:
3370
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3371
      else:
3372
        tgt_set = all_info.keys()
3373
      missing = set(self.wanted).difference(tgt_set)
3374
      if missing:
3375
        raise errors.OpExecError("Some instances were removed before"
3376
                                 " retrieving their data: %s" % missing)
3377
      instance_names = self.wanted
3378

    
3379
    instance_list = [all_info[iname] for iname in instance_names]
3380

    
3381
    # begin data gathering
3382

    
3383
    nodes = frozenset([inst.primary_node for inst in instance_list])
3384
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3385

    
3386
    bad_nodes = []
3387
    off_nodes = []
3388
    if self.do_node_query:
3389
      live_data = {}
3390
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3391
      for name in nodes:
3392
        result = node_data[name]
3393
        if result.offline:
3394
          # offline nodes will be in both lists
3395
          off_nodes.append(name)
3396
        if result.failed:
3397
          bad_nodes.append(name)
3398
        else:
3399
          if result.data:
3400
            live_data.update(result.data)
3401
            # else no instance is alive
3402
    else:
3403
      live_data = dict([(name, {}) for name in instance_names])
3404

    
3405
    # end data gathering
3406

    
3407
    HVPREFIX = "hv/"
3408
    BEPREFIX = "be/"
3409
    output = []
3410
    for instance in instance_list:
3411
      iout = []
3412
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3413
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3414
      for field in self.op.output_fields:
3415
        st_match = self._FIELDS_STATIC.Matches(field)
3416
        if field == "name":
3417
          val = instance.name
3418
        elif field == "os":
3419
          val = instance.os
3420
        elif field == "pnode":
3421
          val = instance.primary_node
3422
        elif field == "snodes":
3423
          val = list(instance.secondary_nodes)
3424
        elif field == "admin_state":
3425
          val = instance.admin_up
3426
        elif field == "oper_state":
3427
          if instance.primary_node in bad_nodes:
3428
            val = None
3429
          else:
3430
            val = bool(live_data.get(instance.name))
3431
        elif field == "status":
3432
          if instance.primary_node in off_nodes:
3433
            val = "ERROR_nodeoffline"
3434
          elif instance.primary_node in bad_nodes:
3435
            val = "ERROR_nodedown"
3436
          else:
3437
            running = bool(live_data.get(instance.name))
3438
            if running:
3439
              if instance.admin_up:
3440
                val = "running"
3441
              else:
3442
                val = "ERROR_up"
3443
            else:
3444
              if instance.admin_up:
3445
                val = "ERROR_down"
3446
              else:
3447
                val = "ADMIN_down"
3448
        elif field == "oper_ram":
3449
          if instance.primary_node in bad_nodes:
3450
            val = None
3451
          elif instance.name in live_data:
3452
            val = live_data[instance.name].get("memory", "?")
3453
          else:
3454
            val = "-"
3455
        elif field == "vcpus":
3456
          val = i_be[constants.BE_VCPUS]
3457
        elif field == "disk_template":
3458
          val = instance.disk_template
3459
        elif field == "ip":
3460
          if instance.nics:
3461
            val = instance.nics[0].ip
3462
          else:
3463
            val = None
3464
        elif field == "bridge":
3465
          if instance.nics:
3466
            val = instance.nics[0].bridge
3467
          else:
3468
            val = None
3469
        elif field == "mac":
3470
          if instance.nics:
3471
            val = instance.nics[0].mac
3472
          else:
3473
            val = None
3474
        elif field == "sda_size" or field == "sdb_size":
3475
          idx = ord(field[2]) - ord('a')
3476
          try:
3477
            val = instance.FindDisk(idx).size
3478
          except errors.OpPrereqError:
3479
            val = None
3480
        elif field == "disk_usage": # total disk usage per node
3481
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3482
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3483
        elif field == "tags":
3484
          val = list(instance.GetTags())
3485
        elif field == "serial_no":
3486
          val = instance.serial_no
3487
        elif field == "network_port":
3488
          val = instance.network_port
3489
        elif field == "hypervisor":
3490
          val = instance.hypervisor
3491
        elif field == "hvparams":
3492
          val = i_hv
3493
        elif (field.startswith(HVPREFIX) and
3494
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3495
          val = i_hv.get(field[len(HVPREFIX):], None)
3496
        elif field == "beparams":
3497
          val = i_be
3498
        elif (field.startswith(BEPREFIX) and
3499
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3500
          val = i_be.get(field[len(BEPREFIX):], None)
3501
        elif st_match and st_match.groups():
3502
          # matches a variable list
3503
          st_groups = st_match.groups()
3504
          if st_groups and st_groups[0] == "disk":
3505
            if st_groups[1] == "count":
3506
              val = len(instance.disks)
3507
            elif st_groups[1] == "sizes":
3508
              val = [disk.size for disk in instance.disks]
3509
            elif st_groups[1] == "size":
3510
              try:
3511
                val = instance.FindDisk(st_groups[2]).size
3512
              except errors.OpPrereqError:
3513
                val = None
3514
            else:
3515
              assert False, "Unhandled disk parameter"
3516
          elif st_groups[0] == "nic":
3517
            if st_groups[1] == "count":
3518
              val = len(instance.nics)
3519
            elif st_groups[1] == "macs":
3520
              val = [nic.mac for nic in instance.nics]
3521
            elif st_groups[1] == "ips":
3522
              val = [nic.ip for nic in instance.nics]
3523
            elif st_groups[1] == "bridges":
3524
              val = [nic.bridge for nic in instance.nics]
3525
            else:
3526
              # index-based item
3527
              nic_idx = int(st_groups[2])
3528
              if nic_idx >= len(instance.nics):
3529
                val = None
3530
              else:
3531
                if st_groups[1] == "mac":
3532
                  val = instance.nics[nic_idx].mac
3533
                elif st_groups[1] == "ip":
3534
                  val = instance.nics[nic_idx].ip
3535
                elif st_groups[1] == "bridge":
3536
                  val = instance.nics[nic_idx].bridge
3537
                else:
3538
                  assert False, "Unhandled NIC parameter"
3539
          else:
3540
            assert False, ("Declared but unhandled variable parameter '%s'" %
3541
                           field)
3542
        else:
3543
          assert False, "Declared but unhandled parameter '%s'" % field
3544
        iout.append(val)
3545
      output.append(iout)
3546

    
3547
    return output
3548

    
3549

    
3550
class LUFailoverInstance(LogicalUnit):
3551
  """Failover an instance.
3552

3553
  """
3554
  HPATH = "instance-failover"
3555
  HTYPE = constants.HTYPE_INSTANCE
3556
  _OP_REQP = ["instance_name", "ignore_consistency"]
3557
  REQ_BGL = False
3558

    
3559
  def ExpandNames(self):
3560
    self._ExpandAndLockInstance()
3561
    self.needed_locks[locking.LEVEL_NODE] = []
3562
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3563

    
3564
  def DeclareLocks(self, level):
3565
    if level == locking.LEVEL_NODE:
3566
      self._LockInstancesNodes()
3567

    
3568
  def BuildHooksEnv(self):
3569
    """Build hooks env.
3570

3571
    This runs on master, primary and secondary nodes of the instance.
3572

3573
    """
3574
    env = {
3575
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3576
      }
3577
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3578
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3579
    return env, nl, nl
3580

    
3581
  def CheckPrereq(self):
3582
    """Check prerequisites.
3583

3584
    This checks that the instance is in the cluster.
3585

3586
    """
3587
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3588
    assert self.instance is not None, \
3589
      "Cannot retrieve locked instance %s" % self.op.instance_name
3590

    
3591
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3592
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3593
      raise errors.OpPrereqError("Instance's disk layout is not"
3594
                                 " network mirrored, cannot failover.")
3595

    
3596
    secondary_nodes = instance.secondary_nodes
3597
    if not secondary_nodes:
3598
      raise errors.ProgrammerError("no secondary node but using "
3599
                                   "a mirrored disk template")
3600

    
3601
    target_node = secondary_nodes[0]
3602
    _CheckNodeOnline(self, target_node)
3603
    _CheckNodeNotDrained(self, target_node)
3604

    
3605
    if instance.admin_up:
3606
      # check memory requirements on the secondary node
3607
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3608
                           instance.name, bep[constants.BE_MEMORY],
3609
                           instance.hypervisor)
3610
    else:
3611
      self.LogInfo("Not checking memory on the secondary node as"
3612
                   " instance will not be started")
3613

    
3614
    # check bridge existence
3615
    brlist = [nic.bridge for nic in instance.nics]
3616
    result = self.rpc.call_bridges_exist(target_node, brlist)
3617
    result.Raise()
3618
    if not result.data:
3619
      raise errors.OpPrereqError("One or more target bridges %s does not"
3620
                                 " exist on destination node '%s'" %
3621
                                 (brlist, target_node))
3622

    
3623
  def Exec(self, feedback_fn):
3624
    """Failover an instance.
3625

3626
    The failover is done by shutting it down on its present node and
3627
    starting it on the secondary.
3628

3629
    """
3630
    instance = self.instance
3631

    
3632
    source_node = instance.primary_node
3633
    target_node = instance.secondary_nodes[0]
3634

    
3635
    feedback_fn("* checking disk consistency between source and target")
3636
    for dev in instance.disks:
3637
      # for drbd, these are drbd over lvm
3638
      if not _CheckDiskConsistency(self, dev, target_node, False):
3639
        if instance.admin_up and not self.op.ignore_consistency:
3640
          raise errors.OpExecError("Disk %s is degraded on target node,"
3641
                                   " aborting failover." % dev.iv_name)
3642

    
3643
    feedback_fn("* shutting down instance on source node")
3644
    logging.info("Shutting down instance %s on node %s",
3645
                 instance.name, source_node)
3646

    
3647
    result = self.rpc.call_instance_shutdown(source_node, instance)
3648
    msg = result.RemoteFailMsg()
3649
    if msg:
3650
      if self.op.ignore_consistency:
3651
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3652
                             " Proceeding anyway. Please make sure node"
3653
                             " %s is down. Error details: %s",
3654
                             instance.name, source_node, source_node, msg)
3655
      else:
3656
        raise errors.OpExecError("Could not shutdown instance %s on"
3657
                                 " node %s: %s" %
3658
                                 (instance.name, source_node, msg))
3659

    
3660
    feedback_fn("* deactivating the instance's disks on source node")
3661
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3662
      raise errors.OpExecError("Can't shut down the instance's disks.")
3663

    
3664
    instance.primary_node = target_node
3665
    # distribute new instance config to the other nodes
3666
    self.cfg.Update(instance)
3667

    
3668
    # Only start the instance if it's marked as up
3669
    if instance.admin_up:
3670
      feedback_fn("* activating the instance's disks on target node")
3671
      logging.info("Starting instance %s on node %s",
3672
                   instance.name, target_node)
3673

    
3674
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3675
                                               ignore_secondaries=True)
3676
      if not disks_ok:
3677
        _ShutdownInstanceDisks(self, instance)
3678
        raise errors.OpExecError("Can't activate the instance's disks")
3679

    
3680
      feedback_fn("* starting the instance on the target node")
3681
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3682
      msg = result.RemoteFailMsg()
3683
      if msg:
3684
        _ShutdownInstanceDisks(self, instance)
3685
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3686
                                 (instance.name, target_node, msg))
3687

    
3688

    
3689
class LUMigrateInstance(LogicalUnit):
3690
  """Migrate an instance.
3691

3692
  This is migration without shutting down, compared to the failover,
3693
  which is done with shutdown.
3694

3695
  """
3696
  HPATH = "instance-migrate"
3697
  HTYPE = constants.HTYPE_INSTANCE
3698
  _OP_REQP = ["instance_name", "live", "cleanup"]
3699

    
3700
  REQ_BGL = False
3701

    
3702
  def ExpandNames(self):
3703
    self._ExpandAndLockInstance()
3704
    self.needed_locks[locking.LEVEL_NODE] = []
3705
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3706

    
3707
  def DeclareLocks(self, level):
3708
    if level == locking.LEVEL_NODE:
3709
      self._LockInstancesNodes()
3710

    
3711
  def BuildHooksEnv(self):
3712
    """Build hooks env.
3713

3714
    This runs on master, primary and secondary nodes of the instance.
3715

3716
    """
3717
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3718
    env["MIGRATE_LIVE"] = self.op.live
3719
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3720
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3721
    return env, nl, nl
3722

    
3723
  def CheckPrereq(self):
3724
    """Check prerequisites.
3725

3726
    This checks that the instance is in the cluster.
3727

3728
    """
3729
    instance = self.cfg.GetInstanceInfo(
3730
      self.cfg.ExpandInstanceName(self.op.instance_name))
3731
    if instance is None:
3732
      raise errors.OpPrereqError("Instance '%s' not known" %
3733
                                 self.op.instance_name)
3734

    
3735
    if instance.disk_template != constants.DT_DRBD8:
3736
      raise errors.OpPrereqError("Instance's disk layout is not"
3737
                                 " drbd8, cannot migrate.")
3738

    
3739
    secondary_nodes = instance.secondary_nodes
3740
    if not secondary_nodes:
3741
      raise errors.ConfigurationError("No secondary node but using"
3742
                                      " drbd8 disk template")
3743

    
3744
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3745

    
3746
    target_node = secondary_nodes[0]
3747
    # check memory requirements on the secondary node
3748
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3749
                         instance.name, i_be[constants.BE_MEMORY],
3750
                         instance.hypervisor)
3751

    
3752
    # check bridge existence
3753
    brlist = [nic.bridge for nic in instance.nics]
3754
    result = self.rpc.call_bridges_exist(target_node, brlist)
3755
    if result.failed or not result.data:
3756
      raise errors.OpPrereqError("One or more target bridges %s does not"
3757
                                 " exist on destination node '%s'" %
3758
                                 (brlist, target_node))
3759

    
3760
    if not self.op.cleanup:
3761
      _CheckNodeNotDrained(self, target_node)
3762
      result = self.rpc.call_instance_migratable(instance.primary_node,
3763
                                                 instance)
3764
      msg = result.RemoteFailMsg()
3765
      if msg:
3766
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3767
                                   msg)
3768

    
3769
    self.instance = instance
3770

    
3771
  def _WaitUntilSync(self):
3772
    """Poll with custom rpc for disk sync.
3773

3774
    This uses our own step-based rpc call.
3775

3776
    """
3777
    self.feedback_fn("* wait until resync is done")
3778
    all_done = False
3779
    while not all_done:
3780
      all_done = True
3781
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3782
                                            self.nodes_ip,
3783
                                            self.instance.disks)
3784
      min_percent = 100
3785
      for node, nres in result.items():
3786
        msg = nres.RemoteFailMsg()
3787
        if msg:
3788
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3789
                                   (node, msg))
3790
        node_done, node_percent = nres.payload
3791
        all_done = all_done and node_done
3792
        if node_percent is not None:
3793
          min_percent = min(min_percent, node_percent)
3794
      if not all_done:
3795
        if min_percent < 100:
3796
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3797
        time.sleep(2)
3798

    
3799
  def _EnsureSecondary(self, node):
3800
    """Demote a node to secondary.
3801

3802
    """
3803
    self.feedback_fn("* switching node %s to secondary mode" % node)
3804

    
3805
    for dev in self.instance.disks:
3806
      self.cfg.SetDiskID(dev, node)
3807

    
3808
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3809
                                          self.instance.disks)
3810
    msg = result.RemoteFailMsg()
3811
    if msg:
3812
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3813
                               " error %s" % (node, msg))
3814

    
3815
  def _GoStandalone(self):
3816
    """Disconnect from the network.
3817

3818
    """
3819
    self.feedback_fn("* changing into standalone mode")
3820
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3821
                                               self.instance.disks)
3822
    for node, nres in result.items():
3823
      msg = nres.RemoteFailMsg()
3824
      if msg:
3825
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3826
                                 " error %s" % (node, msg))
3827

    
3828
  def _GoReconnect(self, multimaster):
3829
    """Reconnect to the network.
3830

3831
    """
3832
    if multimaster:
3833
      msg = "dual-master"
3834
    else:
3835
      msg = "single-master"
3836
    self.feedback_fn("* changing disks into %s mode" % msg)
3837
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3838
                                           self.instance.disks,
3839
                                           self.instance.name, multimaster)
3840
    for node, nres in result.items():
3841
      msg = nres.RemoteFailMsg()
3842
      if msg:
3843
        raise errors.OpExecError("Cannot change disks config on node %s,"
3844
                                 " error: %s" % (node, msg))
3845

    
3846
  def _ExecCleanup(self):
3847
    """Try to cleanup after a failed migration.
3848

3849
    The cleanup is done by:
3850
      - check that the instance is running only on one node
3851
        (and update the config if needed)
3852
      - change disks on its secondary node to secondary
3853
      - wait until disks are fully synchronized
3854
      - disconnect from the network
3855
      - change disks into single-master mode
3856
      - wait again until disks are fully synchronized
3857

3858
    """
3859
    instance = self.instance
3860
    target_node = self.target_node
3861
    source_node = self.source_node
3862

    
3863
    # check running on only one node
3864
    self.feedback_fn("* checking where the instance actually runs"
3865
                     " (if this hangs, the hypervisor might be in"
3866
                     " a bad state)")
3867
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3868
    for node, result in ins_l.items():
3869
      result.Raise()
3870
      if not isinstance(result.data, list):
3871
        raise errors.OpExecError("Can't contact node '%s'" % node)
3872

    
3873
    runningon_source = instance.name in ins_l[source_node].data
3874
    runningon_target = instance.name in ins_l[target_node].data
3875

    
3876
    if runningon_source and runningon_target:
3877
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3878
                               " or the hypervisor is confused. You will have"
3879
                               " to ensure manually that it runs only on one"
3880
                               " and restart this operation.")
3881

    
3882
    if not (runningon_source or runningon_target):
3883
      raise errors.OpExecError("Instance does not seem to be running at all."
3884
                               " In this case, it's safer to repair by"
3885
                               " running 'gnt-instance stop' to ensure disk"
3886
                               " shutdown, and then restarting it.")
3887

    
3888
    if runningon_target:
3889
      # the migration has actually succeeded, we need to update the config
3890
      self.feedback_fn("* instance running on secondary node (%s),"
3891
                       " updating config" % target_node)
3892
      instance.primary_node = target_node
3893
      self.cfg.Update(instance)
3894
      demoted_node = source_node
3895
    else:
3896
      self.feedback_fn("* instance confirmed to be running on its"
3897
                       " primary node (%s)" % source_node)
3898
      demoted_node = target_node
3899

    
3900
    self._EnsureSecondary(demoted_node)
3901
    try:
3902
      self._WaitUntilSync()
3903
    except errors.OpExecError:
3904
      # we ignore here errors, since if the device is standalone, it
3905
      # won't be able to sync
3906
      pass
3907
    self._GoStandalone()
3908
    self._GoReconnect(False)
3909
    self._WaitUntilSync()
3910

    
3911
    self.feedback_fn("* done")
3912

    
3913
  def _RevertDiskStatus(self):
3914
    """Try to revert the disk status after a failed migration.
3915

3916
    """
3917
    target_node = self.target_node
3918
    try:
3919
      self._EnsureSecondary(target_node)
3920
      self._GoStandalone()
3921
      self._GoReconnect(False)
3922
      self._WaitUntilSync()
3923
    except errors.OpExecError, err:
3924
      self.LogWarning("Migration failed and I can't reconnect the"
3925
                      " drives: error '%s'\n"
3926
                      "Please look and recover the instance status" %
3927
                      str(err))
3928

    
3929
  def _AbortMigration(self):
3930
    """Call the hypervisor code to abort a started migration.
3931

3932
    """
3933
    instance = self.instance
3934
    target_node = self.target_node
3935
    migration_info = self.migration_info
3936

    
3937
    abort_result = self.rpc.call_finalize_migration(target_node,
3938
                                                    instance,
3939
                                                    migration_info,
3940
                                                    False)
3941
    abort_msg = abort_result.RemoteFailMsg()
3942
    if abort_msg:
3943
      logging.error("Aborting migration failed on target node %s: %s" %
3944
                    (target_node, abort_msg))
3945
      # Don't raise an exception here, as we stil have to try to revert the
3946
      # disk status, even if this step failed.
3947

    
3948
  def _ExecMigration(self):
3949
    """Migrate an instance.
3950

3951
    The migrate is done by:
3952
      - change the disks into dual-master mode
3953
      - wait until disks are fully synchronized again
3954
      - migrate the instance
3955
      - change disks on the new secondary node (the old primary) to secondary
3956
      - wait until disks are fully synchronized
3957
      - change disks into single-master mode
3958

3959
    """
3960
    instance = self.instance
3961
    target_node = self.target_node
3962
    source_node = self.source_node
3963

    
3964
    self.feedback_fn("* checking disk consistency between source and target")
3965
    for dev in instance.disks:
3966
      if not _CheckDiskConsistency(self, dev, target_node, False):
3967
        raise errors.OpExecError("Disk %s is degraded or not fully"
3968
                                 " synchronized on target node,"
3969
                                 " aborting migrate." % dev.iv_name)
3970

    
3971
    # First get the migration information from the remote node
3972
    result = self.rpc.call_migration_info(source_node, instance)
3973
    msg = result.RemoteFailMsg()
3974
    if msg:
3975
      log_err = ("Failed fetching source migration information from %s: %s" %
3976
                 (source_node, msg))
3977
      logging.error(log_err)
3978
      raise errors.OpExecError(log_err)
3979

    
3980
    self.migration_info = migration_info = result.payload
3981

    
3982
    # Then switch the disks to master/master mode
3983
    self._EnsureSecondary(target_node)
3984
    self._GoStandalone()
3985
    self._GoReconnect(True)
3986
    self._WaitUntilSync()
3987

    
3988
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3989
    result = self.rpc.call_accept_instance(target_node,
3990
                                           instance,
3991
                                           migration_info,
3992
                                           self.nodes_ip[target_node])
3993

    
3994
    msg = result.RemoteFailMsg()
3995
    if msg:
3996
      logging.error("Instance pre-migration failed, trying to revert"
3997
                    " disk status: %s", msg)
3998
      self._AbortMigration()
3999
      self._RevertDiskStatus()
4000
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4001
                               (instance.name, msg))
4002

    
4003
    self.feedback_fn("* migrating instance to %s" % target_node)
4004
    time.sleep(10)
4005
    result = self.rpc.call_instance_migrate(source_node, instance,
4006
                                            self.nodes_ip[target_node],
4007
                                            self.op.live)
4008
    msg = result.RemoteFailMsg()
4009
    if msg:
4010
      logging.error("Instance migration failed, trying to revert"
4011
                    " disk status: %s", msg)
4012
      self._AbortMigration()
4013
      self._RevertDiskStatus()
4014
      raise errors.OpExecError("Could not migrate instance %s: %s" %
4015
                               (instance.name, msg))
4016
    time.sleep(10)
4017

    
4018
    instance.primary_node = target_node
4019
    # distribute new instance config to the other nodes
4020
    self.cfg.Update(instance)
4021

    
4022
    result = self.rpc.call_finalize_migration(target_node,
4023
                                              instance,
4024
                                              migration_info,
4025
                                              True)
4026
    msg = result.RemoteFailMsg()
4027
    if msg:
4028
      logging.error("Instance migration succeeded, but finalization failed:"
4029
                    " %s" % msg)
4030
      raise errors.OpExecError("Could not finalize instance migration: %s" %
4031
                               msg)
4032

    
4033
    self._EnsureSecondary(source_node)
4034
    self._WaitUntilSync()
4035
    self._GoStandalone()
4036
    self._GoReconnect(False)
4037
    self._WaitUntilSync()
4038

    
4039
    self.feedback_fn("* done")
4040

    
4041
  def Exec(self, feedback_fn):
4042
    """Perform the migration.
4043

4044
    """
4045
    self.feedback_fn = feedback_fn
4046

    
4047
    self.source_node = self.instance.primary_node
4048
    self.target_node = self.instance.secondary_nodes[0]
4049
    self.all_nodes = [self.source_node, self.target_node]
4050
    self.nodes_ip = {
4051
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4052
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4053
      }
4054
    if self.op.cleanup:
4055
      return self._ExecCleanup()
4056
    else:
4057
      return self._ExecMigration()
4058

    
4059

    
4060
def _CreateBlockDev(lu, node, instance, device, force_create,
4061
                    info, force_open):
4062
  """Create a tree of block devices on a given node.
4063

4064
  If this device type has to be created on secondaries, create it and
4065
  all its children.
4066

4067
  If not, just recurse to children keeping the same 'force' value.
4068

4069
  @param lu: the lu on whose behalf we execute
4070
  @param node: the node on which to create the device
4071
  @type instance: L{objects.Instance}
4072
  @param instance: the instance which owns the device
4073
  @type device: L{objects.Disk}
4074
  @param device: the device to create
4075
  @type force_create: boolean
4076
  @param force_create: whether to force creation of this device; this
4077
      will be change to True whenever we find a device which has
4078
      CreateOnSecondary() attribute
4079
  @param info: the extra 'metadata' we should attach to the device
4080
      (this will be represented as a LVM tag)
4081
  @type force_open: boolean
4082
  @param force_open: this parameter will be passes to the
4083
      L{backend.BlockdevCreate} function where it specifies
4084
      whether we run on primary or not, and it affects both
4085
      the child assembly and the device own Open() execution
4086

4087
  """
4088
  if device.CreateOnSecondary():
4089
    force_create = True
4090

    
4091
  if device.children:
4092
    for child in device.children:
4093
      _CreateBlockDev(lu, node, instance, child, force_create,
4094
                      info, force_open)
4095

    
4096
  if not force_create:
4097
    return
4098

    
4099
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4100

    
4101

    
4102
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4103
  """Create a single block device on a given node.
4104

4105
  This will not recurse over children of the device, so they must be
4106
  created in advance.
4107

4108
  @param lu: the lu on whose behalf we execute
4109
  @param node: the node on which to create the device
4110
  @type instance: L{objects.Instance}
4111
  @param instance: the instance which owns the device
4112
  @type device: L{objects.Disk}
4113
  @param device: the device to create
4114
  @param info: the extra 'metadata' we should attach to the device
4115
      (this will be represented as a LVM tag)
4116
  @type force_open: boolean
4117
  @param force_open: this parameter will be passes to the
4118
      L{backend.BlockdevCreate} function where it specifies
4119
      whether we run on primary or not, and it affects both
4120
      the child assembly and the device own Open() execution
4121

4122
  """
4123
  lu.cfg.SetDiskID(device, node)
4124
  result = lu.rpc.call_blockdev_create(node, device, device.size,
4125
                                       instance.name, force_open, info)
4126
  msg = result.RemoteFailMsg()
4127
  if msg:
4128
    raise errors.OpExecError("Can't create block device %s on"
4129
                             " node %s for instance %s: %s" %
4130
                             (device, node, instance.name, msg))
4131
  if device.physical_id is None:
4132
    device.physical_id = result.payload
4133

    
4134

    
4135
def _GenerateUniqueNames(lu, exts):
4136
  """Generate a suitable LV name.
4137

4138
  This will generate a logical volume name for the given instance.
4139

4140
  """
4141
  results = []
4142
  for val in exts:
4143
    new_id = lu.cfg.GenerateUniqueID()
4144
    results.append("%s%s" % (new_id, val))
4145
  return results
4146

    
4147

    
4148
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4149
                         p_minor, s_minor):
4150
  """Generate a drbd8 device complete with its children.
4151

4152
  """
4153
  port = lu.cfg.AllocatePort()
4154
  vgname = lu.cfg.GetVGName()
4155
  shared_secret = lu.cfg.GenerateDRBDSecret()
4156
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4157
                          logical_id=(vgname, names[0]))
4158
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4159
                          logical_id=(vgname, names[1]))
4160
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4161
                          logical_id=(primary, secondary, port,
4162
                                      p_minor, s_minor,
4163
                                      shared_secret),
4164
                          children=[dev_data, dev_meta],
4165
                          iv_name=iv_name)
4166
  return drbd_dev
4167

    
4168

    
4169
def _GenerateDiskTemplate(lu, template_name,
4170
                          instance_name, primary_node,
4171
                          secondary_nodes, disk_info,
4172
                          file_storage_dir, file_driver,
4173
                          base_index):
4174
  """Generate the entire disk layout for a given template type.
4175

4176
  """
4177
  #TODO: compute space requirements
4178

    
4179
  vgname = lu.cfg.GetVGName()
4180
  disk_count = len(disk_info)
4181
  disks = []
4182
  if template_name == constants.DT_DISKLESS:
4183
    pass
4184
  elif template_name == constants.DT_PLAIN:
4185
    if len(secondary_nodes) != 0:
4186
      raise errors.ProgrammerError("Wrong template configuration")
4187

    
4188
    names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4189
                                      for i in range(disk_count)])
4190
    for idx, disk in enumerate(disk_info):
4191
      disk_index = idx + base_index
4192
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4193
                              logical_id=(vgname, names[idx]),
4194
                              iv_name="disk/%d" % disk_index,
4195
                              mode=disk["mode"])
4196
      disks.append(disk_dev)
4197
  elif template_name == constants.DT_DRBD8:
4198
    if len(secondary_nodes) != 1:
4199
      raise errors.ProgrammerError("Wrong template configuration")
4200
    remote_node = secondary_nodes[0]
4201
    minors = lu.cfg.AllocateDRBDMinor(
4202
      [primary_node, remote_node] * len(disk_info), instance_name)
4203

    
4204
    names = []
4205
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4206
                                               for i in range(disk_count)]):
4207
      names.append(lv_prefix + "_data")
4208
      names.append(lv_prefix + "_meta")
4209
    for idx, disk in enumerate(disk_info):
4210
      disk_index = idx + base_index
4211
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4212
                                      disk["size"], names[idx*2:idx*2+2],
4213
                                      "disk/%d" % disk_index,
4214
                                      minors[idx*2], minors[idx*2+1])
4215
      disk_dev.mode = disk["mode"]
4216
      disks.append(disk_dev)
4217
  elif template_name == constants.DT_FILE:
4218
    if len(secondary_nodes) != 0:
4219
      raise errors.ProgrammerError("Wrong template configuration")
4220

    
4221
    for idx, disk in enumerate(disk_info):
4222
      disk_index = idx + base_index
4223
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4224
                              iv_name="disk/%d" % disk_index,
4225
                              logical_id=(file_driver,
4226
                                          "%s/disk%d" % (file_storage_dir,
4227
                                                         disk_index)),
4228
                              mode=disk["mode"])
4229
      disks.append(disk_dev)
4230
  else:
4231
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4232
  return disks
4233

    
4234

    
4235
def _GetInstanceInfoText(instance):
4236
  """Compute that text that should be added to the disk's metadata.
4237

4238
  """
4239
  return "originstname+%s" % instance.name
4240

    
4241

    
4242
def _CreateDisks(lu, instance):
4243
  """Create all disks for an instance.
4244

4245
  This abstracts away some work from AddInstance.
4246

4247
  @type lu: L{LogicalUnit}
4248
  @param lu: the logical unit on whose behalf we execute
4249
  @type instance: L{objects.Instance}
4250
  @param instance: the instance whose disks we should create
4251
  @rtype: boolean
4252
  @return: the success of the creation
4253

4254
  """
4255
  info = _GetInstanceInfoText(instance)
4256
  pnode = instance.primary_node
4257

    
4258
  if instance.disk_template == constants.DT_FILE:
4259
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4260
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4261

    
4262
    if result.failed or not result.data:
4263
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4264

    
4265
    if not result.data[0]:
4266
      raise errors.OpExecError("Failed to create directory '%s'" %
4267
                               file_storage_dir)
4268

    
4269
  # Note: this needs to be kept in sync with adding of disks in
4270
  # LUSetInstanceParams
4271
  for device in instance.disks:
4272
    logging.info("Creating volume %s for instance %s",
4273
                 device.iv_name, instance.name)
4274
    #HARDCODE
4275
    for node in instance.all_nodes:
4276
      f_create = node == pnode
4277
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4278

    
4279

    
4280
def _RemoveDisks(lu, instance):
4281
  """Remove all disks for an instance.
4282

4283
  This abstracts away some work from `AddInstance()` and
4284
  `RemoveInstance()`. Note that in case some of the devices couldn't
4285
  be removed, the removal will continue with the other ones (compare
4286
  with `_CreateDisks()`).
4287

4288
  @type lu: L{LogicalUnit}
4289
  @param lu: the logical unit on whose behalf we execute
4290
  @type instance: L{objects.Instance}
4291
  @param instance: the instance whose disks we should remove
4292
  @rtype: boolean
4293
  @return: the success of the removal
4294

4295
  """
4296
  logging.info("Removing block devices for instance %s", instance.name)
4297

    
4298
  all_result = True
4299
  for device in instance.disks:
4300
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4301
      lu.cfg.SetDiskID(disk, node)
4302
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4303
      if msg:
4304
        lu.LogWarning("Could not remove block device %s on node %s,"
4305
                      " continuing anyway: %s", device.iv_name, node, msg)
4306
        all_result = False
4307

    
4308
  if instance.disk_template == constants.DT_FILE:
4309
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4310
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4311
                                                 file_storage_dir)
4312
    if result.failed or not result.data:
4313
      logging.error("Could not remove directory '%s'", file_storage_dir)
4314
      all_result = False
4315

    
4316
  return all_result
4317

    
4318

    
4319
def _ComputeDiskSize(disk_template, disks):
4320
  """Compute disk size requirements in the volume group
4321

4322
  """
4323
  # Required free disk space as a function of disk and swap space
4324
  req_size_dict = {
4325
    constants.DT_DISKLESS: None,
4326
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4327
    # 128 MB are added for drbd metadata for each disk
4328
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4329
    constants.DT_FILE: None,
4330
  }
4331

    
4332
  if disk_template not in req_size_dict:
4333
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4334
                                 " is unknown" %  disk_template)
4335

    
4336
  return req_size_dict[disk_template]
4337

    
4338

    
4339
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4340
  """Hypervisor parameter validation.
4341

4342
  This function abstract the hypervisor parameter validation to be
4343
  used in both instance create and instance modify.
4344

4345
  @type lu: L{LogicalUnit}
4346
  @param lu: the logical unit for which we check
4347
  @type nodenames: list
4348
  @param nodenames: the list of nodes on which we should check
4349
  @type hvname: string
4350
  @param hvname: the name of the hypervisor we should use
4351
  @type hvparams: dict
4352
  @param hvparams: the parameters which we need to check
4353
  @raise errors.OpPrereqError: if the parameters are not valid
4354

4355
  """
4356
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4357
                                                  hvname,
4358
                                                  hvparams)
4359
  for node in nodenames:
4360
    info = hvinfo[node]
4361
    if info.offline:
4362
      continue
4363
    msg = info.RemoteFailMsg()
4364
    if msg:
4365
      raise errors.OpPrereqError("Hypervisor parameter validation"
4366
                                 " failed on node %s: %s" % (node, msg))
4367

    
4368

    
4369
class LUCreateInstance(LogicalUnit):
4370
  """Create an instance.
4371

4372
  """
4373
  HPATH = "instance-add"
4374
  HTYPE = constants.HTYPE_INSTANCE
4375
  _OP_REQP = ["instance_name", "disks", "disk_template",
4376
              "mode", "start",
4377
              "wait_for_sync", "ip_check", "nics",
4378
              "hvparams", "beparams"]
4379
  REQ_BGL = False
4380

    
4381
  def _ExpandNode(self, node):
4382
    """Expands and checks one node name.
4383

4384
    """
4385
    node_full = self.cfg.ExpandNodeName(node)
4386
    if node_full is None:
4387
      raise errors.OpPrereqError("Unknown node %s" % node)
4388
    return node_full
4389

    
4390
  def ExpandNames(self):
4391
    """ExpandNames for CreateInstance.
4392

4393
    Figure out the right locks for instance creation.
4394

4395
    """
4396
    self.needed_locks = {}
4397

    
4398
    # set optional parameters to none if they don't exist
4399
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4400
      if not hasattr(self.op, attr):
4401
        setattr(self.op, attr, None)
4402

    
4403
    # cheap checks, mostly valid constants given
4404

    
4405
    # verify creation mode
4406
    if self.op.mode not in (constants.INSTANCE_CREATE,
4407
                            constants.INSTANCE_IMPORT):
4408
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4409
                                 self.op.mode)
4410

    
4411
    # disk template and mirror node verification
4412
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4413
      raise errors.OpPrereqError("Invalid disk template name")
4414

    
4415
    if self.op.hypervisor is None:
4416
      self.op.hypervisor = self.cfg.GetHypervisorType()
4417

    
4418
    cluster = self.cfg.GetClusterInfo()
4419
    enabled_hvs = cluster.enabled_hypervisors
4420
    if self.op.hypervisor not in enabled_hvs:
4421
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4422
                                 " cluster (%s)" % (self.op.hypervisor,
4423
                                  ",".join(enabled_hvs)))
4424

    
4425
    # check hypervisor parameter syntax (locally)
4426
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4427
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4428
                                  self.op.hvparams)
4429
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4430
    hv_type.CheckParameterSyntax(filled_hvp)
4431
    self.hv_full = filled_hvp
4432

    
4433
    # fill and remember the beparams dict
4434
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4435
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4436
                                    self.op.beparams)
4437

    
4438
    #### instance parameters check
4439

    
4440
    # instance name verification
4441
    hostname1 = utils.HostInfo(self.op.instance_name)
4442
    self.op.instance_name = instance_name = hostname1.name
4443

    
4444
    # this is just a preventive check, but someone might still add this
4445
    # instance in the meantime, and creation will fail at lock-add time
4446
    if instance_name in self.cfg.GetInstanceList():
4447
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4448
                                 instance_name)
4449

    
4450
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4451

    
4452
    # NIC buildup
4453
    self.nics = []
4454
    for nic in self.op.nics:
4455
      # ip validity checks
4456
      ip = nic.get("ip", None)
4457
      if ip is None or ip.lower() == "none":
4458
        nic_ip = None
4459
      elif ip.lower() == constants.VALUE_AUTO:
4460
        nic_ip = hostname1.ip
4461
      else:
4462
        if not utils.IsValidIP(ip):
4463
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4464
                                     " like a valid IP" % ip)
4465
        nic_ip = ip
4466

    
4467
      # MAC address verification
4468
      mac = nic.get("mac", constants.VALUE_AUTO)
4469
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4470
        if not utils.IsValidMac(mac.lower()):
4471
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4472
                                     mac)
4473
      # bridge verification
4474
      bridge = nic.get("bridge", None)
4475
      if bridge is None:
4476
        bridge = self.cfg.GetDefBridge()
4477
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4478

    
4479
    # disk checks/pre-build
4480
    self.disks = []
4481
    for disk in self.op.disks:
4482
      mode = disk.get("mode", constants.DISK_RDWR)
4483
      if mode not in constants.DISK_ACCESS_SET:
4484
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4485
                                   mode)
4486
      size = disk.get("size", None)
4487
      if size is None:
4488
        raise errors.OpPrereqError("Missing disk size")
4489
      try:
4490
        size = int(size)
4491
      except ValueError:
4492
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493
      self.disks.append({"size": size, "mode": mode})
4494

    
4495
    # used in CheckPrereq for ip ping check
4496
    self.check_ip = hostname1.ip
4497

    
4498
    # file storage checks
4499
    if (self.op.file_driver and
4500
        not self.op.file_driver in constants.FILE_DRIVER):
4501
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502
                                 self.op.file_driver)
4503

    
4504
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505
      raise errors.OpPrereqError("File storage directory path not absolute")
4506

    
4507
    ### Node/iallocator related checks
4508
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4510
                                 " node must be given")
4511

    
4512
    if self.op.iallocator:
4513
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4514
    else:
4515
      self.op.pnode = self._ExpandNode(self.op.pnode)
4516
      nodelist = [self.op.pnode]
4517
      if self.op.snode is not None:
4518
        self.op.snode = self._ExpandNode(self.op.snode)
4519
        nodelist.append(self.op.snode)
4520
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4521

    
4522
    # in case of import lock the source node too
4523
    if self.op.mode == constants.INSTANCE_IMPORT:
4524
      src_node = getattr(self.op, "src_node", None)
4525
      src_path = getattr(self.op, "src_path", None)
4526

    
4527
      if src_path is None:
4528
        self.op.src_path = src_path = self.op.instance_name
4529

    
4530
      if src_node is None:
4531
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532
        self.op.src_node = None
4533
        if os.path.isabs(src_path):
4534
          raise errors.OpPrereqError("Importing an instance from an absolute"
4535
                                     " path requires a source node option.")
4536
      else:
4537
        self.op.src_node = src_node = self._ExpandNode(src_node)
4538
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540
        if not os.path.isabs(src_path):
4541
          self.op.src_path = src_path = \
4542
            os.path.join(constants.EXPORT_DIR, src_path)
4543

    
4544
    else: # INSTANCE_CREATE
4545
      if getattr(self.op, "os_type", None) is None:
4546
        raise errors.OpPrereqError("No guest OS specified")
4547

    
4548
  def _RunAllocator(self):
4549
    """Run the allocator based on input opcode.
4550

4551
    """
4552
    nics = [n.ToDict() for n in self.nics]
4553
    ial = IAllocator(self,
4554
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4555
                     name=self.op.instance_name,
4556
                     disk_template=self.op.disk_template,
4557
                     tags=[],
4558
                     os=self.op.os_type,
4559
                     vcpus=self.be_full[constants.BE_VCPUS],
4560
                     mem_size=self.be_full[constants.BE_MEMORY],
4561
                     disks=self.disks,
4562
                     nics=nics,
4563
                     hypervisor=self.op.hypervisor,
4564
                     )
4565

    
4566
    ial.Run(self.op.iallocator)
4567

    
4568
    if not ial.success:
4569
      raise errors.OpPrereqError("Can't compute nodes using"
4570
                                 " iallocator '%s': %s" % (self.op.iallocator,
4571
                                                           ial.info))
4572
    if len(ial.nodes) != ial.required_nodes:
4573
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574
                                 " of nodes (%s), required %s" %
4575
                                 (self.op.iallocator, len(ial.nodes),
4576
                                  ial.required_nodes))
4577
    self.op.pnode = ial.nodes[0]
4578
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579
                 self.op.instance_name, self.op.iallocator,
4580
                 ", ".join(ial.nodes))
4581
    if ial.required_nodes == 2:
4582
      self.op.snode = ial.nodes[1]
4583

    
4584
  def BuildHooksEnv(self):
4585
    """Build hooks env.
4586

4587
    This runs on master, primary and secondary nodes of the instance.
4588

4589
    """
4590
    env = {
4591
      "ADD_MODE": self.op.mode,
4592
      }
4593
    if self.op.mode == constants.INSTANCE_IMPORT:
4594
      env["SRC_NODE"] = self.op.src_node
4595
      env["SRC_PATH"] = self.op.src_path
4596
      env["SRC_IMAGES"] = self.src_images
4597

    
4598
    env.update(_BuildInstanceHookEnv(
4599
      name=self.op.instance_name,
4600
      primary_node=self.op.pnode,
4601
      secondary_nodes=self.secondaries,
4602
      status=self.op.start,
4603
      os_type=self.op.os_type,
4604
      memory=self.be_full[constants.BE_MEMORY],
4605
      vcpus=self.be_full[constants.BE_VCPUS],
4606
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4607
      disk_template=self.op.disk_template,
4608
      disks=[(d["size"], d["mode"]) for d in self.disks],
4609
      bep=self.be_full,
4610
      hvp=self.hv_full,
4611
      hypervisor=self.op.hypervisor,
4612
    ))
4613

    
4614
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4615
          self.secondaries)
4616
    return env, nl, nl
4617

    
4618

    
4619
  def CheckPrereq(self):
4620
    """Check prerequisites.
4621

4622
    """
4623
    if (not self.cfg.GetVGName() and
4624
        self.op.disk_template not in constants.DTS_NOT_LVM):
4625
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4626
                                 " instances")
4627

    
4628
    if self.op.mode == constants.INSTANCE_IMPORT:
4629
      src_node = self.op.src_node
4630
      src_path = self.op.src_path
4631

    
4632
      if src_node is None:
4633
        exp_list = self.rpc.call_export_list(
4634
          self.acquired_locks[locking.LEVEL_NODE])
4635
        found = False
4636
        for node in exp_list:
4637
          if not exp_list[node].failed and src_path in exp_list[node].data:
4638
            found = True
4639
            self.op.src_node = src_node = node
4640
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4641
                                                       src_path)
4642
            break
4643
        if not found:
4644
          raise errors.OpPrereqError("No export found for relative path %s" %
4645
                                      src_path)
4646

    
4647
      _CheckNodeOnline(self, src_node)
4648
      result = self.rpc.call_export_info(src_node, src_path)
4649
      result.Raise()
4650
      if not result.data:
4651
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4652

    
4653
      export_info = result.data
4654
      if not export_info.has_section(constants.INISECT_EXP):
4655
        raise errors.ProgrammerError("Corrupted export config")
4656

    
4657
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4658
      if (int(ei_version) != constants.EXPORT_VERSION):
4659
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4660
                                   (ei_version, constants.EXPORT_VERSION))
4661

    
4662
      # Check that the new instance doesn't have less disks than the export
4663
      instance_disks = len(self.disks)
4664
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4665
      if instance_disks < export_disks:
4666
        raise errors.OpPrereqError("Not enough disks to import."
4667
                                   " (instance: %d, export: %d)" %
4668
                                   (instance_disks, export_disks))
4669

    
4670
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4671
      disk_images = []
4672
      for idx in range(export_disks):
4673
        option = 'disk%d_dump' % idx
4674
        if export_info.has_option(constants.INISECT_INS, option):
4675
          # FIXME: are the old os-es, disk sizes, etc. useful?
4676
          export_name = export_info.get(constants.INISECT_INS, option)
4677
          image = os.path.join(src_path, export_name)
4678
          disk_images.append(image)
4679
        else:
4680
          disk_images.append(False)
4681

    
4682
      self.src_images = disk_images
4683

    
4684
      old_name = export_info.get(constants.INISECT_INS, 'name')
4685
      # FIXME: int() here could throw a ValueError on broken exports
4686
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4687
      if self.op.instance_name == old_name:
4688
        for idx, nic in enumerate(self.nics):
4689
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4690
            nic_mac_ini = 'nic%d_mac' % idx
4691
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4692

    
4693
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4694
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4695
    if self.op.start and not self.op.ip_check:
4696
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4697
                                 " adding an instance in start mode")
4698

    
4699
    if self.op.ip_check:
4700
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4701
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4702
                                   (self.check_ip, self.op.instance_name))
4703

    
4704
    #### mac address generation
4705
    # By generating here the mac address both the allocator and the hooks get
4706
    # the real final mac address rather than the 'auto' or 'generate' value.
4707
    # There is a race condition between the generation and the instance object
4708
    # creation, which means that we know the mac is valid now, but we're not
4709
    # sure it will be when we actually add the instance. If things go bad
4710
    # adding the instance will abort because of a duplicate mac, and the
4711
    # creation job will fail.
4712
    for nic in self.nics:
4713
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4714
        nic.mac = self.cfg.GenerateMAC()
4715

    
4716
    #### allocator run
4717

    
4718
    if self.op.iallocator is not None:
4719
      self._RunAllocator()
4720

    
4721
    #### node related checks
4722

    
4723
    # check primary node
4724
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4725
    assert self.pnode is not None, \
4726
      "Cannot retrieve locked node %s" % self.op.pnode
4727
    if pnode.offline:
4728
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4729
                                 pnode.name)
4730
    if pnode.drained:
4731
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4732
                                 pnode.name)
4733

    
4734
    self.secondaries = []
4735

    
4736
    # mirror node verification
4737
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4738
      if self.op.snode is None:
4739
        raise errors.OpPrereqError("The networked disk templates need"
4740
                                   " a mirror node")
4741
      if self.op.snode == pnode.name:
4742
        raise errors.OpPrereqError("The secondary node cannot be"
4743
                                   " the primary node.")
4744
      _CheckNodeOnline(self, self.op.snode)
4745
      _CheckNodeNotDrained(self, self.op.snode)
4746
      self.secondaries.append(self.op.snode)
4747

    
4748
    nodenames = [pnode.name] + self.secondaries
4749

    
4750
    req_size = _ComputeDiskSize(self.op.disk_template,
4751
                                self.disks)
4752

    
4753
    # Check lv size requirements
4754
    if req_size is not None:
4755
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4756
                                         self.op.hypervisor)
4757
      for node in nodenames:
4758
        info = nodeinfo[node]
4759
        info.Raise()
4760
        info = info.data
4761
        if not info:
4762
          raise errors.OpPrereqError("Cannot get current information"
4763
                                     " from node '%s'" % node)
4764
        vg_free = info.get('vg_free', None)
4765
        if not isinstance(vg_free, int):
4766
          raise errors.OpPrereqError("Can't compute free disk space on"
4767
                                     " node %s" % node)
4768
        if req_size > info['vg_free']:
4769
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4770
                                     " %d MB available, %d MB required" %
4771
                                     (node, info['vg_free'], req_size))
4772

    
4773
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4774

    
4775
    # os verification
4776
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4777
    result.Raise()
4778
    if not isinstance(result.data, objects.OS) or not result.data:
4779
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4780
                                 " primary node"  % self.op.os_type)
4781

    
4782
    # bridge check on primary node
4783
    bridges = [n.bridge for n in self.nics]
4784
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4785
    result.Raise()
4786
    if not result.data:
4787
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4788
                                 " exist on destination node '%s'" %
4789
                                 (",".join(bridges), pnode.name))
4790

    
4791
    # memory check on primary node
4792
    if self.op.start:
4793
      _CheckNodeFreeMemory(self, self.pnode.name,
4794
                           "creating instance %s" % self.op.instance_name,
4795
                           self.be_full[constants.BE_MEMORY],
4796
                           self.op.hypervisor)
4797

    
4798
  def Exec(self, feedback_fn):
4799
    """Create and add the instance to the cluster.
4800

4801
    """
4802
    instance = self.op.instance_name
4803
    pnode_name = self.pnode.name
4804

    
4805
    ht_kind = self.op.hypervisor
4806
    if ht_kind in constants.HTS_REQ_PORT:
4807
      network_port = self.cfg.AllocatePort()
4808
    else:
4809
      network_port = None
4810

    
4811
    ##if self.op.vnc_bind_address is None:
4812
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4813

    
4814
    # this is needed because os.path.join does not accept None arguments
4815
    if self.op.file_storage_dir is None:
4816
      string_file_storage_dir = ""
4817
    else:
4818
      string_file_storage_dir = self.op.file_storage_dir
4819

    
4820
    # build the full file storage dir path
4821
    file_storage_dir = os.path.normpath(os.path.join(
4822
                                        self.cfg.GetFileStorageDir(),
4823
                                        string_file_storage_dir, instance))
4824

    
4825

    
4826
    disks = _GenerateDiskTemplate(self,
4827
                                  self.op.disk_template,
4828
                                  instance, pnode_name,
4829
                                  self.secondaries,
4830
                                  self.disks,
4831
                                  file_storage_dir,
4832
                                  self.op.file_driver,
4833
                                  0)
4834

    
4835
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4836
                            primary_node=pnode_name,
4837
                            nics=self.nics, disks=disks,
4838
                            disk_template=self.op.disk_template,
4839
                            admin_up=False,
4840
                            network_port=network_port,
4841
                            beparams=self.op.beparams,
4842
                            hvparams=self.op.hvparams,
4843
                            hypervisor=self.op.hypervisor,
4844
                            )
4845

    
4846
    feedback_fn("* creating instance disks...")
4847
    try:
4848
      _CreateDisks(self, iobj)
4849
    except errors.OpExecError:
4850
      self.LogWarning("Device creation failed, reverting...")
4851
      try:
4852
        _RemoveDisks(self, iobj)
4853
      finally:
4854
        self.cfg.ReleaseDRBDMinors(instance)
4855
        raise
4856

    
4857
    feedback_fn("adding instance %s to cluster config" % instance)
4858

    
4859
    self.cfg.AddInstance(iobj)
4860
    # Declare that we don't want to remove the instance lock anymore, as we've
4861
    # added the instance to the config
4862
    del self.remove_locks[locking.LEVEL_INSTANCE]
4863
    # Unlock all the nodes
4864
    if self.op.mode == constants.INSTANCE_IMPORT:
4865
      nodes_keep = [self.op.src_node]
4866
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4867
                       if node != self.op.src_node]
4868
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4869
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4870
    else:
4871
      self.context.glm.release(locking.LEVEL_NODE)
4872
      del self.acquired_locks[locking.LEVEL_NODE]
4873

    
4874
    if self.op.wait_for_sync:
4875
      disk_abort = not _WaitForSync(self, iobj)
4876
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4877
      # make sure the disks are not degraded (still sync-ing is ok)
4878
      time.sleep(15)
4879
      feedback_fn("* checking mirrors status")
4880
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4881
    else:
4882
      disk_abort = False
4883

    
4884
    if disk_abort:
4885
      _RemoveDisks(self, iobj)
4886
      self.cfg.RemoveInstance(iobj.name)
4887
      # Make sure the instance lock gets removed
4888
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4889
      raise errors.OpExecError("There are some degraded disks for"
4890
                               " this instance")
4891

    
4892
    feedback_fn("creating os for instance %s on node %s" %
4893
                (instance, pnode_name))
4894

    
4895
    if iobj.disk_template != constants.DT_DISKLESS:
4896
      if self.op.mode == constants.INSTANCE_CREATE:
4897
        feedback_fn("* running the instance OS create scripts...")
4898
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4899
        msg = result.RemoteFailMsg()
4900
        if msg:
4901
          raise errors.OpExecError("Could not add os for instance %s"
4902
                                   " on node %s: %s" %
4903
                                   (instance, pnode_name, msg))
4904

    
4905
      elif self.op.mode == constants.INSTANCE_IMPORT:
4906
        feedback_fn("* running the instance OS import scripts...")
4907
        src_node = self.op.src_node
4908
        src_images = self.src_images
4909
        cluster_name = self.cfg.GetClusterName()
4910
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4911
                                                         src_node, src_images,
4912
                                                         cluster_name)
4913
        import_result.Raise()
4914
        for idx, result in enumerate(import_result.data):
4915
          if not result:
4916
            self.LogWarning("Could not import the image %s for instance"
4917
                            " %s, disk %d, on node %s" %
4918
                            (src_images[idx], instance, idx, pnode_name))
4919
      else:
4920
        # also checked in the prereq part
4921
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4922
                                     % self.op.mode)
4923

    
4924
    if self.op.start:
4925
      iobj.admin_up = True
4926
      self.cfg.Update(iobj)
4927
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4928
      feedback_fn("* starting instance...")
4929
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4930
      msg = result.RemoteFailMsg()
4931
      if msg:
4932
        raise errors.OpExecError("Could not start instance: %s" % msg)
4933

    
4934

    
4935
class LUConnectConsole(NoHooksLU):
4936
  """Connect to an instance's console.
4937

4938
  This is somewhat special in that it returns the command line that
4939
  you need to run on the master node in order to connect to the
4940
  console.
4941

4942
  """
4943
  _OP_REQP = ["instance_name"]
4944
  REQ_BGL = False
4945

    
4946
  def ExpandNames(self):
4947
    self._ExpandAndLockInstance()
4948

    
4949
  def CheckPrereq(self):
4950
    """Check prerequisites.
4951

4952
    This checks that the instance is in the cluster.
4953

4954
    """
4955
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4956
    assert self.instance is not None, \
4957
      "Cannot retrieve locked instance %s" % self.op.instance_name
4958
    _CheckNodeOnline(self, self.instance.primary_node)
4959

    
4960
  def Exec(self, feedback_fn):
4961
    """Connect to the console of an instance
4962

4963
    """
4964
    instance = self.instance
4965
    node = instance.primary_node
4966

    
4967
    node_insts = self.rpc.call_instance_list([node],
4968
                                             [instance.hypervisor])[node]
4969
    node_insts.Raise()
4970

    
4971
    if instance.name not in node_insts.data:
4972
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4973

    
4974
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4975

    
4976
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4977
    cluster = self.cfg.GetClusterInfo()
4978
    # beparams and hvparams are passed separately, to avoid editing the
4979
    # instance and then saving the defaults in the instance itself.
4980
    hvparams = cluster.FillHV(instance)
4981
    beparams = cluster.FillBE(instance)
4982
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4983

    
4984
    # build ssh cmdline
4985
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4986

    
4987

    
4988
class LUReplaceDisks(LogicalUnit):
4989
  """Replace the disks of an instance.
4990

4991
  """
4992
  HPATH = "mirrors-replace"
4993
  HTYPE = constants.HTYPE_INSTANCE
4994
  _OP_REQP = ["instance_name", "mode", "disks"]
4995
  REQ_BGL = False
4996

    
4997
  def CheckArguments(self):
4998
    if not hasattr(self.op, "remote_node"):
4999
      self.op.remote_node = None
5000
    if not hasattr(self.op, "iallocator"):
5001
      self.op.iallocator = None
5002

    
5003
    # check for valid parameter combination
5004
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
5005
    if self.op.mode == constants.REPLACE_DISK_CHG:
5006
      if cnt == 2:
5007
        raise errors.OpPrereqError("When changing the secondary either an"
5008
                                   " iallocator script must be used or the"
5009
                                   " new node given")
5010
      elif cnt == 0:
5011
        raise errors.OpPrereqError("Give either the iallocator or the new"
5012
                                   " secondary, not both")
5013
    else: # not replacing the secondary
5014
      if cnt != 2:
5015
        raise errors.OpPrereqError("The iallocator and new node options can"
5016
                                   " be used only when changing the"
5017
                                   " secondary node")
5018

    
5019
  def ExpandNames(self):
5020
    self._ExpandAndLockInstance()
5021

    
5022
    if self.op.iallocator is not None:
5023
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5024
    elif self.op.remote_node is not None:
5025
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5026
      if remote_node is None:
5027
        raise errors.OpPrereqError("Node '%s' not known" %
5028
                                   self.op.remote_node)
5029
      self.op.remote_node = remote_node
5030
      # Warning: do not remove the locking of the new secondary here
5031
      # unless DRBD8.AddChildren is changed to work in parallel;
5032
      # currently it doesn't since parallel invocations of
5033
      # FindUnusedMinor will conflict
5034
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5035
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5036
    else:
5037
      self.needed_locks[locking.LEVEL_NODE] = []
5038
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5039

    
5040
  def DeclareLocks(self, level):
5041
    # If we're not already locking all nodes in the set we have to declare the
5042
    # instance's primary/secondary nodes.
5043
    if (level == locking.LEVEL_NODE and
5044
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5045
      self._LockInstancesNodes()
5046

    
5047
  def _RunAllocator(self):
5048
    """Compute a new secondary node using an IAllocator.
5049

5050
    """
5051
    ial = IAllocator(self,
5052
                     mode=constants.IALLOCATOR_MODE_RELOC,
5053
                     name=self.op.instance_name,
5054
                     relocate_from=[self.sec_node])
5055

    
5056
    ial.Run(self.op.iallocator)
5057

    
5058
    if not ial.success:
5059
      raise errors.OpPrereqError("Can't compute nodes using"
5060
                                 " iallocator '%s': %s" % (self.op.iallocator,
5061
                                                           ial.info))
5062
    if len(ial.nodes) != ial.required_nodes:
5063
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5064
                                 " of nodes (%s), required %s" %
5065
                                 (len(ial.nodes), ial.required_nodes))
5066
    self.op.remote_node = ial.nodes[0]
5067
    self.LogInfo("Selected new secondary for the instance: %s",
5068
                 self.op.remote_node)
5069

    
5070
  def BuildHooksEnv(self):
5071
    """Build hooks env.
5072

5073
    This runs on the master, the primary and all the secondaries.
5074

5075
    """
5076
    env = {
5077
      "MODE": self.op.mode,
5078
      "NEW_SECONDARY": self.op.remote_node,
5079
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
5080
      }
5081
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5082
    nl = [
5083
      self.cfg.GetMasterNode(),
5084
      self.instance.primary_node,
5085
      ]
5086
    if self.op.remote_node is not None:
5087
      nl.append(self.op.remote_node)
5088
    return env, nl, nl
5089

    
5090
  def CheckPrereq(self):
5091
    """Check prerequisites.
5092

5093
    This checks that the instance is in the cluster.
5094

5095
    """
5096
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5097
    assert instance is not None, \
5098
      "Cannot retrieve locked instance %s" % self.op.instance_name
5099
    self.instance = instance
5100

    
5101
    if instance.disk_template != constants.DT_DRBD8:
5102
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5103
                                 " instances")
5104

    
5105
    if len(instance.secondary_nodes) != 1:
5106
      raise errors.OpPrereqError("The instance has a strange layout,"
5107
                                 " expected one secondary but found %d" %
5108
                                 len(instance.secondary_nodes))
5109

    
5110
    self.sec_node = instance.secondary_nodes[0]
5111

    
5112
    if self.op.iallocator is not None:
5113
      self._RunAllocator()
5114

    
5115
    remote_node = self.op.remote_node
5116
    if remote_node is not None:
5117
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5118
      assert self.remote_node_info is not None, \
5119
        "Cannot retrieve locked node %s" % remote_node
5120
    else:
5121
      self.remote_node_info = None
5122
    if remote_node == instance.primary_node:
5123
      raise errors.OpPrereqError("The specified node is the primary node of"
5124
                                 " the instance.")
5125
    elif remote_node == self.sec_node:
5126
      raise errors.OpPrereqError("The specified node is already the"
5127
                                 " secondary node of the instance.")
5128

    
5129
    if self.op.mode == constants.REPLACE_DISK_PRI:
5130
      n1 = self.tgt_node = instance.primary_node
5131
      n2 = self.oth_node = self.sec_node
5132
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5133
      n1 = self.tgt_node = self.sec_node
5134
      n2 = self.oth_node = instance.primary_node
5135
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5136
      n1 = self.new_node = remote_node
5137
      n2 = self.oth_node = instance.primary_node
5138
      self.tgt_node = self.sec_node
5139
      _CheckNodeNotDrained(self, remote_node)
5140
    else:
5141
      raise errors.ProgrammerError("Unhandled disk replace mode")
5142

    
5143
    _CheckNodeOnline(self, n1)
5144
    _CheckNodeOnline(self, n2)
5145

    
5146
    if not self.op.disks:
5147
      self.op.disks = range(len(instance.disks))
5148

    
5149
    for disk_idx in self.op.disks:
5150
      instance.FindDisk(disk_idx)
5151

    
5152
  def _ExecD8DiskOnly(self, feedback_fn):
5153
    """Replace a disk on the primary or secondary for dbrd8.
5154

5155
    The algorithm for replace is quite complicated:
5156

5157
      1. for each disk to be replaced:
5158

5159
        1. create new LVs on the target node with unique names
5160
        1. detach old LVs from the drbd device
5161
        1. rename old LVs to name_replaced.<time_t>
5162
        1. rename new LVs to old LVs
5163
        1. attach the new LVs (with the old names now) to the drbd device
5164

5165
      1. wait for sync across all devices
5166

5167
      1. for each modified disk:
5168

5169
        1. remove old LVs (which have the name name_replaces.<time_t>)
5170

5171
    Failures are not very well handled.
5172

5173
    """
5174
    steps_total = 6
5175
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5176
    instance = self.instance
5177
    iv_names = {}
5178
    vgname = self.cfg.GetVGName()
5179
    # start of work
5180
    cfg = self.cfg
5181
    tgt_node = self.tgt_node
5182
    oth_node = self.oth_node
5183

    
5184
    # Step: check device activation
5185
    self.proc.LogStep(1, steps_total, "check device existence")
5186
    info("checking volume groups")
5187
    my_vg = cfg.GetVGName()
5188
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5189
    if not results:
5190
      raise errors.OpExecError("Can't list volume groups on the nodes")
5191
    for node in oth_node, tgt_node:
5192
      res = results[node]
5193
      if res.failed or not res.data or my_vg not in res.data:
5194
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5195
                                 (my_vg, node))
5196
    for idx, dev in enumerate(instance.disks):
5197
      if idx not in self.op.disks:
5198
        continue
5199
      for node in tgt_node, oth_node:
5200
        info("checking disk/%d on %s" % (idx, node))
5201
        cfg.SetDiskID(dev, node)
5202
        result = self.rpc.call_blockdev_find(node, dev)
5203
        msg = result.RemoteFailMsg()
5204
        if not msg and not result.payload:
5205
          msg = "disk not found"
5206
        if msg:
5207
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5208
                                   (idx, node, msg))
5209

    
5210
    # Step: check other node consistency
5211
    self.proc.LogStep(2, steps_total, "check peer consistency")
5212
    for idx, dev in enumerate(instance.disks):
5213
      if idx not in self.op.disks:
5214
        continue
5215
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5216
      if not _CheckDiskConsistency(self, dev, oth_node,
5217
                                   oth_node==instance.primary_node):
5218
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5219
                                 " to replace disks on this node (%s)" %
5220
                                 (oth_node, tgt_node))
5221

    
5222
    # Step: create new storage
5223
    self.proc.LogStep(3, steps_total, "allocate new storage")
5224
    for idx, dev in enumerate(instance.disks):
5225
      if idx not in self.op.disks:
5226
        continue
5227
      size = dev.size
5228
      cfg.SetDiskID(dev, tgt_node)
5229
      lv_names = [".disk%d_%s" % (idx, suf)
5230
                  for suf in ["data", "meta"]]
5231
      names = _GenerateUniqueNames(self, lv_names)
5232
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5233
                             logical_id=(vgname, names[0]))
5234
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5235
                             logical_id=(vgname, names[1]))
5236
      new_lvs = [lv_data, lv_meta]
5237
      old_lvs = dev.children
5238
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5239
      info("creating new local storage on %s for %s" %
5240
           (tgt_node, dev.iv_name))
5241
      # we pass force_create=True to force the LVM creation
5242
      for new_lv in new_lvs:
5243
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5244
                        _GetInstanceInfoText(instance), False)
5245

    
5246
    # Step: for each lv, detach+rename*2+attach
5247
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5248
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5249
      info("detaching %s drbd from local storage" % dev.iv_name)
5250
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5251
      result.Raise()
5252
      if not result.data:
5253
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5254
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5255
      #dev.children = []
5256
      #cfg.Update(instance)
5257

    
5258
      # ok, we created the new LVs, so now we know we have the needed
5259
      # storage; as such, we proceed on the target node to rename
5260
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5261
      # using the assumption that logical_id == physical_id (which in
5262
      # turn is the unique_id on that node)
5263

    
5264
      # FIXME(iustin): use a better name for the replaced LVs
5265
      temp_suffix = int(time.time())
5266
      ren_fn = lambda d, suff: (d.physical_id[0],
5267
                                d.physical_id[1] + "_replaced-%s" % suff)
5268
      # build the rename list based on what LVs exist on the node
5269
      rlist = []
5270
      for to_ren in old_lvs:
5271
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5272
        if not result.RemoteFailMsg() and result.payload:
5273
          # device exists
5274
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5275

    
5276
      info("renaming the old LVs on the target node")
5277
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5278
      result.Raise()
5279
      if not result.data:
5280
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5281
      # now we rename the new LVs to the old LVs
5282
      info("renaming the new LVs on the target node")
5283
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5284
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5285
      result.Raise()
5286
      if not result.data:
5287
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5288

    
5289
      for old, new in zip(old_lvs, new_lvs):
5290
        new.logical_id = old.logical_id
5291
        cfg.SetDiskID(new, tgt_node)
5292

    
5293
      for disk in old_lvs:
5294
        disk.logical_id = ren_fn(disk, temp_suffix)
5295
        cfg.SetDiskID(disk, tgt_node)
5296

    
5297
      # now that the new lvs have the old name, we can add them to the device
5298
      info("adding new mirror component on %s" % tgt_node)
5299
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5300
      if result.failed or not result.data:
5301
        for new_lv in new_lvs:
5302
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5303
          if msg:
5304
            warning("Can't rollback device %s: %s", dev, msg,
5305
                    hint="cleanup manually the unused logical volumes")
5306
        raise errors.OpExecError("Can't add local storage to drbd")
5307

    
5308
      dev.children = new_lvs
5309
      cfg.Update(instance)
5310

    
5311
    # Step: wait for sync
5312

    
5313
    # this can fail as the old devices are degraded and _WaitForSync
5314
    # does a combined result over all disks, so we don't check its
5315
    # return value
5316
    self.proc.LogStep(5, steps_total, "sync devices")
5317
    _WaitForSync(self, instance, unlock=True)
5318

    
5319
    # so check manually all the devices
5320
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5321
      cfg.SetDiskID(dev, instance.primary_node)
5322
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5323
      msg = result.RemoteFailMsg()
5324
      if not msg and not result.payload:
5325
        msg = "disk not found"
5326
      if msg:
5327
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5328
                                 (name, msg))
5329
      if result.payload[5]:
5330
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5331

    
5332
    # Step: remove old storage
5333
    self.proc.LogStep(6, steps_total, "removing old storage")
5334
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5335
      info("remove logical volumes for %s" % name)
5336
      for lv in old_lvs:
5337
        cfg.SetDiskID(lv, tgt_node)
5338
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5339
        if msg:
5340
          warning("Can't remove old LV: %s" % msg,
5341
                  hint="manually remove unused LVs")
5342
          continue
5343

    
5344
  def _ExecD8Secondary(self, feedback_fn):
5345
    """Replace the secondary node for drbd8.
5346

5347
    The algorithm for replace is quite complicated:
5348
      - for all disks of the instance:
5349
        - create new LVs on the new node with same names
5350
        - shutdown the drbd device on the old secondary
5351
        - disconnect the drbd network on the primary
5352
        - create the drbd device on the new secondary
5353
        - network attach the drbd on the primary, using an artifice:
5354
          the drbd code for Attach() will connect to the network if it
5355
          finds a device which is connected to the good local disks but
5356
          not network enabled
5357
      - wait for sync across all devices
5358
      - remove all disks from the old secondary
5359

5360
    Failures are not very well handled.
5361

5362
    """
5363
    steps_total = 6
5364
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5365
    instance = self.instance
5366
    iv_names = {}
5367
    # start of work
5368
    cfg = self.cfg
5369
    old_node = self.tgt_node
5370
    new_node = self.new_node
5371
    pri_node = instance.primary_node
5372
    nodes_ip = {
5373
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5374
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5375
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5376
      }
5377

    
5378
    # Step: check device activation
5379
    self.proc.LogStep(1, steps_total, "check device existence")
5380
    info("checking volume groups")
5381
    my_vg = cfg.GetVGName()
5382
    results = self.rpc.call_vg_list([pri_node, new_node])
5383
    for node in pri_node, new_node:
5384
      res = results[node]
5385
      if res.failed or not res.data or my_vg not in res.data:
5386
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5387
                                 (my_vg, node))
5388
    for idx, dev in enumerate(instance.disks):
5389
      if idx not in self.op.disks:
5390
        continue
5391
      info("checking disk/%d on %s" % (idx, pri_node))
5392
      cfg.SetDiskID(dev, pri_node)
5393
      result = self.rpc.call_blockdev_find(pri_node, dev)
5394
      msg = result.RemoteFailMsg()
5395
      if not msg and not result.payload:
5396
        msg = "disk not found"
5397
      if msg:
5398
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5399
                                 (idx, pri_node, msg))
5400

    
5401
    # Step: check other node consistency
5402
    self.proc.LogStep(2, steps_total, "check peer consistency")
5403
    for idx, dev in enumerate(instance.disks):
5404
      if idx not in self.op.disks:
5405
        continue
5406
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5407
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5408
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5409
                                 " unsafe to replace the secondary" %
5410
                                 pri_node)
5411

    
5412
    # Step: create new storage
5413
    self.proc.LogStep(3, steps_total, "allocate new storage")
5414
    for idx, dev in enumerate(instance.disks):
5415
      info("adding new local storage on %s for disk/%d" %
5416
           (new_node, idx))
5417
      # we pass force_create=True to force LVM creation
5418
      for new_lv in dev.children:
5419
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5420
                        _GetInstanceInfoText(instance), False)
5421

    
5422
    # Step 4: dbrd minors and drbd setups changes
5423
    # after this, we must manually remove the drbd minors on both the
5424
    # error and the success paths
5425
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5426
                                   instance.name)
5427
    logging.debug("Allocated minors %s" % (minors,))
5428
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5429
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5430
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5431
      # create new devices on new_node; note that we create two IDs:
5432
      # one without port, so the drbd will be activated without
5433
      # networking information on the new node at this stage, and one
5434
      # with network, for the latter activation in step 4
5435
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5436
      if pri_node == o_node1:
5437
        p_minor = o_minor1
5438
      else:
5439
        p_minor = o_minor2
5440

    
5441
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5442
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5443

    
5444
      iv_names[idx] = (dev, dev.children, new_net_id)
5445
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5446
                    new_net_id)
5447
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5448
                              logical_id=new_alone_id,
5449
                              children=dev.children,
5450
                              size=dev.size)
5451
      try:
5452
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5453
                              _GetInstanceInfoText(instance), False)
5454
      except errors.GenericError:
5455
        self.cfg.ReleaseDRBDMinors(instance.name)
5456
        raise
5457

    
5458
    for idx, dev in enumerate(instance.disks):
5459
      # we have new devices, shutdown the drbd on the old secondary
5460
      info("shutting down drbd for disk/%d on old node" % idx)
5461
      cfg.SetDiskID(dev, old_node)
5462
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5463
      if msg:
5464
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5465
                (idx, msg),
5466
                hint="Please cleanup this device manually as soon as possible")
5467

    
5468
    info("detaching primary drbds from the network (=> standalone)")
5469
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5470
                                               instance.disks)[pri_node]
5471

    
5472
    msg = result.RemoteFailMsg()
5473
    if msg:
5474
      # detaches didn't succeed (unlikely)
5475
      self.cfg.ReleaseDRBDMinors(instance.name)
5476
      raise errors.OpExecError("Can't detach the disks from the network on"
5477
                               " old node: %s" % (msg,))
5478

    
5479
    # if we managed to detach at least one, we update all the disks of
5480
    # the instance to point to the new secondary
5481
    info("updating instance configuration")
5482
    for dev, _, new_logical_id in iv_names.itervalues():
5483
      dev.logical_id = new_logical_id
5484
      cfg.SetDiskID(dev, pri_node)
5485
    cfg.Update(instance)
5486

    
5487
    # and now perform the drbd attach
5488
    info("attaching primary drbds to new secondary (standalone => connected)")
5489
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5490
                                           instance.disks, instance.name,
5491
                                           False)
5492
    for to_node, to_result in result.items():
5493
      msg = to_result.RemoteFailMsg()
5494
      if msg:
5495
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5496
                hint="please do a gnt-instance info to see the"
5497
                " status of disks")
5498

    
5499
    # this can fail as the old devices are degraded and _WaitForSync
5500
    # does a combined result over all disks, so we don't check its
5501
    # return value
5502
    self.proc.LogStep(5, steps_total, "sync devices")
5503
    _WaitForSync(self, instance, unlock=True)
5504

    
5505
    # so check manually all the devices
5506
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5507
      cfg.SetDiskID(dev, pri_node)
5508
      result = self.rpc.call_blockdev_find(pri_node, dev)
5509
      msg = result.RemoteFailMsg()
5510
      if not msg and not result.payload:
5511
        msg = "disk not found"
5512
      if msg:
5513
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5514
                                 (idx, msg))
5515
      if result.payload[5]:
5516
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5517

    
5518
    self.proc.LogStep(6, steps_total, "removing old storage")
5519
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5520
      info("remove logical volumes for disk/%d" % idx)
5521
      for lv in old_lvs:
5522
        cfg.SetDiskID(lv, old_node)
5523
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5524
        if msg:
5525
          warning("Can't remove LV on old secondary: %s", msg,
5526
                  hint="Cleanup stale volumes by hand")
5527

    
5528
  def Exec(self, feedback_fn):
5529
    """Execute disk replacement.
5530

5531
    This dispatches the disk replacement to the appropriate handler.
5532

5533
    """
5534
    instance = self.instance
5535

    
5536
    # Activate the instance disks if we're replacing them on a down instance
5537
    if not instance.admin_up:
5538
      _StartInstanceDisks(self, instance, True)
5539

    
5540
    if self.op.mode == constants.REPLACE_DISK_CHG:
5541
      fn = self._ExecD8Secondary
5542
    else:
5543
      fn = self._ExecD8DiskOnly
5544

    
5545
    ret = fn(feedback_fn)
5546

    
5547
    # Deactivate the instance disks if we're replacing them on a down instance
5548
    if not instance.admin_up:
5549
      _SafeShutdownInstanceDisks(self, instance)
5550

    
5551
    return ret
5552

    
5553

    
5554
class LUGrowDisk(LogicalUnit):
5555
  """Grow a disk of an instance.
5556

5557
  """
5558
  HPATH = "disk-grow"
5559
  HTYPE = constants.HTYPE_INSTANCE
5560
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5561
  REQ_BGL = False
5562

    
5563
  def ExpandNames(self):
5564
    self._ExpandAndLockInstance()
5565
    self.needed_locks[locking.LEVEL_NODE] = []
5566
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5567

    
5568
  def DeclareLocks(self, level):
5569
    if level == locking.LEVEL_NODE:
5570
      self._LockInstancesNodes()
5571

    
5572
  def BuildHooksEnv(self):
5573
    """Build hooks env.
5574

5575
    This runs on the master, the primary and all the secondaries.
5576

5577
    """
5578
    env = {
5579
      "DISK": self.op.disk,
5580
      "AMOUNT": self.op.amount,
5581
      }
5582
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5583
    nl = [
5584
      self.cfg.GetMasterNode(),
5585
      self.instance.primary_node,
5586
      ]
5587
    return env, nl, nl
5588

    
5589
  def CheckPrereq(self):
5590
    """Check prerequisites.
5591

5592
    This checks that the instance is in the cluster.
5593

5594
    """
5595
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5596
    assert instance is not None, \
5597
      "Cannot retrieve locked instance %s" % self.op.instance_name
5598
    nodenames = list(instance.all_nodes)
5599
    for node in nodenames:
5600
      _CheckNodeOnline(self, node)
5601

    
5602

    
5603
    self.instance = instance
5604

    
5605
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5606
      raise errors.OpPrereqError("Instance's disk layout does not support"
5607
                                 " growing.")
5608

    
5609
    self.disk = instance.FindDisk(self.op.disk)
5610

    
5611
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5612
                                       instance.hypervisor)
5613
    for node in nodenames:
5614
      info = nodeinfo[node]
5615
      if info.failed or not info.data:
5616
        raise errors.OpPrereqError("Cannot get current information"
5617
                                   " from node '%s'" % node)
5618
      vg_free = info.data.get('vg_free', None)
5619
      if not isinstance(vg_free, int):
5620
        raise errors.OpPrereqError("Can't compute free disk space on"
5621
                                   " node %s" % node)
5622
      if self.op.amount > vg_free:
5623
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5624
                                   " %d MiB available, %d MiB required" %
5625
                                   (node, vg_free, self.op.amount))
5626

    
5627
  def Exec(self, feedback_fn):
5628
    """Execute disk grow.
5629

5630
    """
5631
    instance = self.instance
5632
    disk = self.disk
5633
    for node in instance.all_nodes:
5634
      self.cfg.SetDiskID(disk, node)
5635
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5636
      msg = result.RemoteFailMsg()
5637
      if msg:
5638
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5639
                                 (node, msg))
5640
    disk.RecordGrow(self.op.amount)
5641
    self.cfg.Update(instance)
5642
    if self.op.wait_for_sync:
5643
      disk_abort = not _WaitForSync(self, instance)
5644
      if disk_abort:
5645
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5646
                             " status.\nPlease check the instance.")
5647

    
5648

    
5649
class LUQueryInstanceData(NoHooksLU):
5650
  """Query runtime instance data.
5651

5652
  """
5653
  _OP_REQP = ["instances", "static"]
5654
  REQ_BGL = False
5655

    
5656
  def ExpandNames(self):
5657
    self.needed_locks = {}
5658
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5659

    
5660
    if not isinstance(self.op.instances, list):
5661
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5662

    
5663
    if self.op.instances:
5664
      self.wanted_names = []
5665
      for name in self.op.instances:
5666
        full_name = self.cfg.ExpandInstanceName(name)
5667
        if full_name is None:
5668
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5669
        self.wanted_names.append(full_name)
5670
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5671
    else:
5672
      self.wanted_names = None
5673
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5674

    
5675
    self.needed_locks[locking.LEVEL_NODE] = []
5676
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5677

    
5678
  def DeclareLocks(self, level):
5679
    if level == locking.LEVEL_NODE:
5680
      self._LockInstancesNodes()
5681

    
5682
  def CheckPrereq(self):
5683
    """Check prerequisites.
5684

5685
    This only checks the optional instance list against the existing names.
5686

5687
    """
5688
    if self.wanted_names is None:
5689
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5690

    
5691
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5692
                             in self.wanted_names]
5693
    return
5694

    
5695
  def _ComputeDiskStatus(self, instance, snode, dev):
5696
    """Compute block device status.
5697

5698
    """
5699
    static = self.op.static
5700
    if not static:
5701
      self.cfg.SetDiskID(dev, instance.primary_node)
5702
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5703
      if dev_pstatus.offline:
5704
        dev_pstatus = None
5705
      else:
5706
        msg = dev_pstatus.RemoteFailMsg()
5707
        if msg:
5708
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5709
                                   (instance.name, msg))
5710
        dev_pstatus = dev_pstatus.payload
5711
    else:
5712
      dev_pstatus = None
5713

    
5714
    if dev.dev_type in constants.LDS_DRBD:
5715
      # we change the snode then (otherwise we use the one passed in)
5716
      if dev.logical_id[0] == instance.primary_node:
5717
        snode = dev.logical_id[1]
5718
      else:
5719
        snode = dev.logical_id[0]
5720

    
5721
    if snode and not static:
5722
      self.cfg.SetDiskID(dev, snode)
5723
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5724
      if dev_sstatus.offline:
5725
        dev_sstatus = None
5726
      else:
5727
        msg = dev_sstatus.RemoteFailMsg()
5728
        if msg:
5729
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5730
                                   (instance.name, msg))
5731
        dev_sstatus = dev_sstatus.payload
5732
    else:
5733
      dev_sstatus = None
5734

    
5735
    if dev.children:
5736
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5737
                      for child in dev.children]
5738
    else:
5739
      dev_children = []
5740

    
5741
    data = {
5742
      "iv_name": dev.iv_name,
5743
      "dev_type": dev.dev_type,
5744
      "logical_id": dev.logical_id,
5745
      "physical_id": dev.physical_id,
5746
      "pstatus": dev_pstatus,
5747
      "sstatus": dev_sstatus,
5748
      "children": dev_children,
5749
      "mode": dev.mode,
5750
      "size": dev.size,
5751
      }
5752

    
5753
    return data
5754

    
5755
  def Exec(self, feedback_fn):
5756
    """Gather and return data"""
5757
    result = {}
5758

    
5759
    cluster = self.cfg.GetClusterInfo()
5760

    
5761
    for instance in self.wanted_instances:
5762
      if not self.op.static:
5763
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5764
                                                  instance.name,
5765
                                                  instance.hypervisor)
5766
        remote_info.Raise()
5767
        remote_info = remote_info.data
5768
        if remote_info and "state" in remote_info:
5769
          remote_state = "up"
5770
        else:
5771
          remote_state = "down"
5772
      else:
5773
        remote_state = None
5774
      if instance.admin_up:
5775
        config_state = "up"
5776
      else:
5777
        config_state = "down"
5778

    
5779
      disks = [self._ComputeDiskStatus(instance, None, device)
5780
               for device in instance.disks]
5781

    
5782
      idict = {
5783
        "name": instance.name,
5784
        "config_state": config_state,
5785
        "run_state": remote_state,
5786
        "pnode": instance.primary_node,
5787
        "snodes": instance.secondary_nodes,
5788
        "os": instance.os,
5789
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5790
        "disks": disks,
5791
        "hypervisor": instance.hypervisor,
5792
        "network_port": instance.network_port,
5793
        "hv_instance": instance.hvparams,
5794
        "hv_actual": cluster.FillHV(instance),
5795
        "be_instance": instance.beparams,
5796
        "be_actual": cluster.FillBE(instance),
5797
        }
5798

    
5799
      result[instance.name] = idict
5800

    
5801
    return result
5802

    
5803

    
5804
class LUSetInstanceParams(LogicalUnit):
5805
  """Modifies an instances's parameters.
5806

5807
  """
5808
  HPATH = "instance-modify"
5809
  HTYPE = constants.HTYPE_INSTANCE
5810
  _OP_REQP = ["instance_name"]
5811
  REQ_BGL = False
5812

    
5813
  def CheckArguments(self):
5814
    if not hasattr(self.op, 'nics'):
5815
      self.op.nics = []
5816
    if not hasattr(self.op, 'disks'):
5817
      self.op.disks = []
5818
    if not hasattr(self.op, 'beparams'):
5819
      self.op.beparams = {}
5820
    if not hasattr(self.op, 'hvparams'):
5821
      self.op.hvparams = {}
5822
    self.op.force = getattr(self.op, "force", False)
5823
    if not (self.op.nics or self.op.disks or
5824
            self.op.hvparams or self.op.beparams):
5825
      raise errors.OpPrereqError("No changes submitted")
5826

    
5827
    # Disk validation
5828
    disk_addremove = 0
5829
    for disk_op, disk_dict in self.op.disks:
5830
      if disk_op == constants.DDM_REMOVE:
5831
        disk_addremove += 1
5832
        continue
5833
      elif disk_op == constants.DDM_ADD:
5834
        disk_addremove += 1
5835
      else:
5836
        if not isinstance(disk_op, int):
5837
          raise errors.OpPrereqError("Invalid disk index")
5838
      if disk_op == constants.DDM_ADD:
5839
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5840
        if mode not in constants.DISK_ACCESS_SET:
5841
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5842
        size = disk_dict.get('size', None)
5843
        if size is None:
5844
          raise errors.OpPrereqError("Required disk parameter size missing")
5845
        try:
5846
          size = int(size)
5847
        except ValueError, err:
5848
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5849
                                     str(err))
5850
        disk_dict['size'] = size
5851
      else:
5852
        # modification of disk
5853
        if 'size' in disk_dict:
5854
          raise errors.OpPrereqError("Disk size change not possible, use"
5855
                                     " grow-disk")
5856

    
5857
    if disk_addremove > 1:
5858
      raise errors.OpPrereqError("Only one disk add or remove operation"
5859
                                 " supported at a time")
5860

    
5861
    # NIC validation
5862
    nic_addremove = 0
5863
    for nic_op, nic_dict in self.op.nics:
5864
      if nic_op == constants.DDM_REMOVE:
5865
        nic_addremove += 1
5866
        continue
5867
      elif nic_op == constants.DDM_ADD:
5868
        nic_addremove += 1
5869
      else:
5870
        if not isinstance(nic_op, int):
5871
          raise errors.OpPrereqError("Invalid nic index")
5872

    
5873
      # nic_dict should be a dict
5874
      nic_ip = nic_dict.get('ip', None)
5875
      if nic_ip is not None:
5876
        if nic_ip.lower() == constants.VALUE_NONE:
5877
          nic_dict['ip'] = None
5878
        else:
5879
          if not utils.IsValidIP(nic_ip):
5880
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5881

    
5882
      if nic_op == constants.DDM_ADD:
5883
        nic_bridge = nic_dict.get('bridge', None)
5884
        if nic_bridge is None:
5885
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5886
        nic_mac = nic_dict.get('mac', None)
5887
        if nic_mac is None:
5888
          nic_dict['mac'] = constants.VALUE_AUTO
5889

    
5890
      if 'mac' in nic_dict:
5891
        nic_mac = nic_dict['mac']
5892
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5893
          if not utils.IsValidMac(nic_mac):
5894
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5895
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5896
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5897
                                     " modifying an existing nic")
5898

    
5899
    if nic_addremove > 1:
5900
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5901
                                 " supported at a time")
5902

    
5903
  def ExpandNames(self):
5904
    self._ExpandAndLockInstance()
5905
    self.needed_locks[locking.LEVEL_NODE] = []
5906
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5907

    
5908
  def DeclareLocks(self, level):
5909
    if level == locking.LEVEL_NODE:
5910
      self._LockInstancesNodes()
5911

    
5912
  def BuildHooksEnv(self):
5913
    """Build hooks env.
5914

5915
    This runs on the master, primary and secondaries.
5916

5917
    """
5918
    args = dict()
5919
    if constants.BE_MEMORY in self.be_new:
5920
      args['memory'] = self.be_new[constants.BE_MEMORY]
5921
    if constants.BE_VCPUS in self.be_new:
5922
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5923
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5924
    # information at all.
5925
    if self.op.nics:
5926
      args['nics'] = []
5927
      nic_override = dict(self.op.nics)
5928
      for idx, nic in enumerate(self.instance.nics):
5929
        if idx in nic_override:
5930
          this_nic_override = nic_override[idx]
5931
        else:
5932
          this_nic_override = {}
5933
        if 'ip' in this_nic_override:
5934
          ip = this_nic_override['ip']
5935
        else:
5936
          ip = nic.ip
5937
        if 'bridge' in this_nic_override:
5938
          bridge = this_nic_override['bridge']
5939
        else:
5940
          bridge = nic.bridge
5941
        if 'mac' in this_nic_override:
5942
          mac = this_nic_override['mac']
5943
        else:
5944
          mac = nic.mac
5945
        args['nics'].append((ip, bridge, mac))
5946
      if constants.DDM_ADD in nic_override:
5947
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5948
        bridge = nic_override[constants.DDM_ADD]['bridge']
5949
        mac = nic_override[constants.DDM_ADD]['mac']
5950
        args['nics'].append((ip, bridge, mac))
5951
      elif constants.DDM_REMOVE in nic_override:
5952
        del args['nics'][-1]
5953

    
5954
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5955
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5956
    return env, nl, nl
5957

    
5958
  def CheckPrereq(self):
5959
    """Check prerequisites.
5960

5961
    This only checks the instance list against the existing names.
5962

5963
    """
5964
    self.force = self.op.force
5965

    
5966
    # checking the new params on the primary/secondary nodes
5967

    
5968
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5969
    assert self.instance is not None, \
5970
      "Cannot retrieve locked instance %s" % self.op.instance_name
5971
    pnode = instance.primary_node
5972
    nodelist = list(instance.all_nodes)
5973

    
5974
    # hvparams processing
5975
    if self.op.hvparams:
5976
      i_hvdict = copy.deepcopy(instance.hvparams)
5977
      for key, val in self.op.hvparams.iteritems():
5978
        if val == constants.VALUE_DEFAULT:
5979
          try:
5980
            del i_hvdict[key]
5981
          except KeyError:
5982
            pass
5983
        else:
5984
          i_hvdict[key] = val
5985
      cluster = self.cfg.GetClusterInfo()
5986
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5987
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5988
                                i_hvdict)
5989
      # local check
5990
      hypervisor.GetHypervisor(
5991
        instance.hypervisor).CheckParameterSyntax(hv_new)
5992
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5993
      self.hv_new = hv_new # the new actual values
5994
      self.hv_inst = i_hvdict # the new dict (without defaults)
5995
    else:
5996
      self.hv_new = self.hv_inst = {}
5997

    
5998
    # beparams processing
5999
    if self.op.beparams:
6000
      i_bedict = copy.deepcopy(instance.beparams)
6001
      for key, val in self.op.beparams.iteritems():
6002
        if val == constants.VALUE_DEFAULT:
6003
          try:
6004
            del i_bedict[key]
6005
          except KeyError:
6006
            pass
6007
        else:
6008
          i_bedict[key] = val
6009
      cluster = self.cfg.GetClusterInfo()
6010
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6011
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6012
                                i_bedict)
6013
      self.be_new = be_new # the new actual values
6014
      self.be_inst = i_bedict # the new dict (without defaults)
6015
    else:
6016
      self.be_new = self.be_inst = {}
6017

    
6018
    self.warn = []
6019

    
6020
    if constants.BE_MEMORY in self.op.beparams and not self.force:
6021
      mem_check_list = [pnode]
6022
      if be_new[constants.BE_AUTO_BALANCE]:
6023
        # either we changed auto_balance to yes or it was from before
6024
        mem_check_list.extend(instance.secondary_nodes)
6025
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
6026
                                                  instance.hypervisor)
6027
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6028
                                         instance.hypervisor)
6029
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6030
        # Assume the primary node is unreachable and go ahead
6031
        self.warn.append("Can't get info from primary node %s" % pnode)
6032
      else:
6033
        if not instance_info.failed and instance_info.data:
6034
          current_mem = int(instance_info.data['memory'])
6035
        else:
6036
          # Assume instance not running
6037
          # (there is a slight race condition here, but it's not very probable,
6038
          # and we have no other way to check)
6039
          current_mem = 0
6040
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6041
                    nodeinfo[pnode].data['memory_free'])
6042
        if miss_mem > 0:
6043
          raise errors.OpPrereqError("This change will prevent the instance"
6044
                                     " from starting, due to %d MB of memory"
6045
                                     " missing on its primary node" % miss_mem)
6046

    
6047
      if be_new[constants.BE_AUTO_BALANCE]:
6048
        for node, nres in nodeinfo.iteritems():
6049
          if node not in instance.secondary_nodes:
6050
            continue
6051
          if nres.failed or not isinstance(nres.data, dict):
6052
            self.warn.append("Can't get info from secondary node %s" % node)
6053
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6054
            self.warn.append("Not enough memory to failover instance to"
6055
                             " secondary node %s" % node)
6056

    
6057
    # NIC processing
6058
    for nic_op, nic_dict in self.op.nics:
6059
      if nic_op == constants.DDM_REMOVE:
6060
        if not instance.nics:
6061
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6062
        continue
6063
      if nic_op != constants.DDM_ADD:
6064
        # an existing nic
6065
        if nic_op < 0 or nic_op >= len(instance.nics):
6066
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6067
                                     " are 0 to %d" %
6068
                                     (nic_op, len(instance.nics)))
6069
      if 'bridge' in nic_dict:
6070
        nic_bridge = nic_dict['bridge']
6071
        if nic_bridge is None:
6072
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
6073
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6074
          msg = ("Bridge '%s' doesn't exist on one of"
6075
                 " the instance nodes" % nic_bridge)
6076
          if self.force:
6077
            self.warn.append(msg)
6078
          else:
6079
            raise errors.OpPrereqError(msg)
6080
      if 'mac' in nic_dict:
6081
        nic_mac = nic_dict['mac']
6082
        if nic_mac is None:
6083
          raise errors.OpPrereqError('Cannot set the nic mac to None')
6084
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6085
          # otherwise generate the mac
6086
          nic_dict['mac'] = self.cfg.GenerateMAC()
6087
        else:
6088
          # or validate/reserve the current one
6089
          if self.cfg.IsMacInUse(nic_mac):
6090
            raise errors.OpPrereqError("MAC address %s already in use"
6091
                                       " in cluster" % nic_mac)
6092

    
6093
    # DISK processing
6094
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6095
      raise errors.OpPrereqError("Disk operations not supported for"
6096
                                 " diskless instances")
6097
    for disk_op, disk_dict in self.op.disks:
6098
      if disk_op == constants.DDM_REMOVE:
6099
        if len(instance.disks) == 1:
6100
          raise errors.OpPrereqError("Cannot remove the last disk of"
6101
                                     " an instance")
6102
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6103
        ins_l = ins_l[pnode]
6104
        if ins_l.failed or not isinstance(ins_l.data, list):
6105
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6106
        if instance.name in ins_l.data:
6107
          raise errors.OpPrereqError("Instance is running, can't remove"
6108
                                     " disks.")
6109

    
6110
      if (disk_op == constants.DDM_ADD and
6111
          len(instance.nics) >= constants.MAX_DISKS):
6112
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6113
                                   " add more" % constants.MAX_DISKS)
6114
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6115
        # an existing disk
6116
        if disk_op < 0 or disk_op >= len(instance.disks):
6117
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
6118
                                     " are 0 to %d" %
6119
                                     (disk_op, len(instance.disks)))
6120

    
6121
    return
6122

    
6123
  def Exec(self, feedback_fn):
6124
    """Modifies an instance.
6125

6126
    All parameters take effect only at the next restart of the instance.
6127

6128
    """
6129
    # Process here the warnings from CheckPrereq, as we don't have a
6130
    # feedback_fn there.
6131
    for warn in self.warn:
6132
      feedback_fn("WARNING: %s" % warn)
6133

    
6134
    result = []
6135
    instance = self.instance
6136
    # disk changes
6137
    for disk_op, disk_dict in self.op.disks:
6138
      if disk_op == constants.DDM_REMOVE:
6139
        # remove the last disk
6140
        device = instance.disks.pop()
6141
        device_idx = len(instance.disks)
6142
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6143
          self.cfg.SetDiskID(disk, node)
6144
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6145
          if msg:
6146
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6147
                            " continuing anyway", device_idx, node, msg)
6148
        result.append(("disk/%d" % device_idx, "remove"))
6149
      elif disk_op == constants.DDM_ADD:
6150
        # add a new disk
6151
        if instance.disk_template == constants.DT_FILE:
6152
          file_driver, file_path = instance.disks[0].logical_id
6153
          file_path = os.path.dirname(file_path)
6154
        else:
6155
          file_driver = file_path = None
6156
        disk_idx_base = len(instance.disks)
6157
        new_disk = _GenerateDiskTemplate(self,
6158
                                         instance.disk_template,
6159
                                         instance.name, instance.primary_node,
6160
                                         instance.secondary_nodes,
6161
                                         [disk_dict],
6162
                                         file_path,
6163
                                         file_driver,
6164
                                         disk_idx_base)[0]
6165
        instance.disks.append(new_disk)
6166
        info = _GetInstanceInfoText(instance)
6167

    
6168
        logging.info("Creating volume %s for instance %s",
6169
                     new_disk.iv_name, instance.name)
6170
        # Note: this needs to be kept in sync with _CreateDisks
6171
        #HARDCODE
6172
        for node in instance.all_nodes:
6173
          f_create = node == instance.primary_node
6174
          try:
6175
            _CreateBlockDev(self, node, instance, new_disk,
6176
                            f_create, info, f_create)
6177
          except errors.OpExecError, err:
6178
            self.LogWarning("Failed to create volume %s (%s) on"
6179
                            " node %s: %s",
6180
                            new_disk.iv_name, new_disk, node, err)
6181
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6182
                       (new_disk.size, new_disk.mode)))
6183
      else:
6184
        # change a given disk
6185
        instance.disks[disk_op].mode = disk_dict['mode']
6186
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6187
    # NIC changes
6188
    for nic_op, nic_dict in self.op.nics:
6189
      if nic_op == constants.DDM_REMOVE:
6190
        # remove the last nic
6191
        del instance.nics[-1]
6192
        result.append(("nic.%d" % len(instance.nics), "remove"))
6193
      elif nic_op == constants.DDM_ADD:
6194
        # mac and bridge should be set, by now
6195
        mac = nic_dict['mac']
6196
        bridge = nic_dict['bridge']
6197
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6198
                              bridge=bridge)
6199
        instance.nics.append(new_nic)
6200
        result.append(("nic.%d" % (len(instance.nics) - 1),
6201
                       "add:mac=%s,ip=%s,bridge=%s" %
6202
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6203
      else:
6204
        # change a given nic
6205
        for key in 'mac', 'ip', 'bridge':
6206
          if key in nic_dict:
6207
            setattr(instance.nics[nic_op], key, nic_dict[key])
6208
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6209

    
6210
    # hvparams changes
6211
    if self.op.hvparams:
6212
      instance.hvparams = self.hv_inst
6213
      for key, val in self.op.hvparams.iteritems():
6214
        result.append(("hv/%s" % key, val))
6215

    
6216
    # beparams changes
6217
    if self.op.beparams:
6218
      instance.beparams = self.be_inst
6219
      for key, val in self.op.beparams.iteritems():
6220
        result.append(("be/%s" % key, val))
6221

    
6222
    self.cfg.Update(instance)
6223

    
6224
    return result
6225

    
6226

    
6227
class LUQueryExports(NoHooksLU):
6228
  """Query the exports list
6229

6230
  """
6231
  _OP_REQP = ['nodes']
6232
  REQ_BGL = False
6233

    
6234
  def ExpandNames(self):
6235
    self.needed_locks = {}
6236
    self.share_locks[locking.LEVEL_NODE] = 1
6237
    if not self.op.nodes:
6238
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6239
    else:
6240
      self.needed_locks[locking.LEVEL_NODE] = \
6241
        _GetWantedNodes(self, self.op.nodes)
6242

    
6243
  def CheckPrereq(self):
6244
    """Check prerequisites.
6245

6246
    """
6247
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6248

    
6249
  def Exec(self, feedback_fn):
6250
    """Compute the list of all the exported system images.
6251

6252
    @rtype: dict
6253
    @return: a dictionary with the structure node->(export-list)
6254
        where export-list is a list of the instances exported on
6255
        that node.
6256

6257
    """
6258
    rpcresult = self.rpc.call_export_list(self.nodes)
6259
    result = {}
6260
    for node in rpcresult:
6261
      if rpcresult[node].failed:
6262
        result[node] = False
6263
      else:
6264
        result[node] = rpcresult[node].data
6265

    
6266
    return result
6267

    
6268

    
6269
class LUExportInstance(LogicalUnit):
6270
  """Export an instance to an image in the cluster.
6271

6272
  """
6273
  HPATH = "instance-export"
6274
  HTYPE = constants.HTYPE_INSTANCE
6275
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6276
  REQ_BGL = False
6277

    
6278
  def ExpandNames(self):
6279
    self._ExpandAndLockInstance()
6280
    # FIXME: lock only instance primary and destination node
6281
    #
6282
    # Sad but true, for now we have do lock all nodes, as we don't know where
6283
    # the previous export might be, and and in this LU we search for it and
6284
    # remove it from its current node. In the future we could fix this by:
6285
    #  - making a tasklet to search (share-lock all), then create the new one,
6286
    #    then one to remove, after
6287
    #  - removing the removal operation altogether
6288
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6289

    
6290
  def DeclareLocks(self, level):
6291
    """Last minute lock declaration."""
6292
    # All nodes are locked anyway, so nothing to do here.
6293

    
6294
  def BuildHooksEnv(self):
6295
    """Build hooks env.
6296

6297
    This will run on the master, primary node and target node.
6298

6299
    """
6300
    env = {
6301
      "EXPORT_NODE": self.op.target_node,
6302
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6303
      }
6304
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6305
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6306
          self.op.target_node]
6307
    return env, nl, nl
6308

    
6309
  def CheckPrereq(self):
6310
    """Check prerequisites.
6311

6312
    This checks that the instance and node names are valid.
6313

6314
    """
6315
    instance_name = self.op.instance_name
6316
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6317
    assert self.instance is not None, \
6318
          "Cannot retrieve locked instance %s" % self.op.instance_name
6319
    _CheckNodeOnline(self, self.instance.primary_node)
6320

    
6321
    self.dst_node = self.cfg.GetNodeInfo(
6322
      self.cfg.ExpandNodeName(self.op.target_node))
6323

    
6324
    if self.dst_node is None:
6325
      # This is wrong node name, not a non-locked node
6326
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6327
    _CheckNodeOnline(self, self.dst_node.name)
6328
    _CheckNodeNotDrained(self, self.dst_node.name)
6329

    
6330
    # instance disk type verification
6331
    for disk in self.instance.disks:
6332
      if disk.dev_type == constants.LD_FILE:
6333
        raise errors.OpPrereqError("Export not supported for instances with"
6334
                                   " file-based disks")
6335

    
6336
  def Exec(self, feedback_fn):
6337
    """Export an instance to an image in the cluster.
6338

6339
    """
6340
    instance = self.instance
6341
    dst_node = self.dst_node
6342
    src_node = instance.primary_node
6343
    if self.op.shutdown:
6344
      # shutdown the instance, but not the disks
6345
      result = self.rpc.call_instance_shutdown(src_node, instance)
6346
      msg = result.RemoteFailMsg()
6347
      if msg:
6348
        raise errors.OpExecError("Could not shutdown instance %s on"
6349
                                 " node %s: %s" %
6350
                                 (instance.name, src_node, msg))
6351

    
6352
    vgname = self.cfg.GetVGName()
6353

    
6354
    snap_disks = []
6355

    
6356
    # set the disks ID correctly since call_instance_start needs the
6357
    # correct drbd minor to create the symlinks
6358
    for disk in instance.disks:
6359
      self.cfg.SetDiskID(disk, src_node)
6360

    
6361
    try:
6362
      for idx, disk in enumerate(instance.disks):
6363
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6364
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6365
        if new_dev_name.failed or not new_dev_name.data:
6366
          self.LogWarning("Could not snapshot disk/%d on node %s",
6367
                          idx, src_node)
6368
          snap_disks.append(False)
6369
        else:
6370
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6371
                                 logical_id=(vgname, new_dev_name.data),
6372
                                 physical_id=(vgname, new_dev_name.data),
6373
                                 iv_name=disk.iv_name)
6374
          snap_disks.append(new_dev)
6375

    
6376
    finally:
6377
      if self.op.shutdown and instance.admin_up:
6378
        result = self.rpc.call_instance_start(src_node, instance, None, None)
6379
        msg = result.RemoteFailMsg()
6380
        if msg:
6381
          _ShutdownInstanceDisks(self, instance)
6382
          raise errors.OpExecError("Could not start instance: %s" % msg)
6383

    
6384
    # TODO: check for size
6385

    
6386
    cluster_name = self.cfg.GetClusterName()
6387
    for idx, dev in enumerate(snap_disks):
6388
      if dev:
6389
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6390
                                               instance, cluster_name, idx)
6391
        if result.failed or not result.data:
6392
          self.LogWarning("Could not export disk/%d from node %s to"
6393
                          " node %s", idx, src_node, dst_node.name)
6394
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6395
        if msg:
6396
          self.LogWarning("Could not remove snapshot for disk/%d from node"
6397
                          " %s: %s", idx, src_node, msg)
6398

    
6399
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6400
    if result.failed or not result.data:
6401
      self.LogWarning("Could not finalize export for instance %s on node %s",
6402
                      instance.name, dst_node.name)
6403

    
6404
    nodelist = self.cfg.GetNodeList()
6405
    nodelist.remove(dst_node.name)
6406

    
6407
    # on one-node clusters nodelist will be empty after the removal
6408
    # if we proceed the backup would be removed because OpQueryExports
6409
    # substitutes an empty list with the full cluster node list.
6410
    if nodelist:
6411
      exportlist = self.rpc.call_export_list(nodelist)
6412
      for node in exportlist:
6413
        if exportlist[node].failed:
6414
          continue
6415
        if instance.name in exportlist[node].data:
6416
          if not self.rpc.call_export_remove(node, instance.name):
6417
            self.LogWarning("Could not remove older export for instance %s"
6418
                            " on node %s", instance.name, node)
6419

    
6420

    
6421
class LURemoveExport(NoHooksLU):
6422
  """Remove exports related to the named instance.
6423

6424
  """
6425
  _OP_REQP = ["instance_name"]
6426
  REQ_BGL = False
6427

    
6428
  def ExpandNames(self):
6429
    self.needed_locks = {}
6430
    # We need all nodes to be locked in order for RemoveExport to work, but we
6431
    # don't need to lock the instance itself, as nothing will happen to it (and
6432
    # we can remove exports also for a removed instance)
6433
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6434

    
6435
  def CheckPrereq(self):
6436
    """Check prerequisites.
6437
    """
6438
    pass
6439

    
6440
  def Exec(self, feedback_fn):
6441
    """Remove any export.
6442

6443
    """
6444
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6445
    # If the instance was not found we'll try with the name that was passed in.
6446
    # This will only work if it was an FQDN, though.
6447
    fqdn_warn = False
6448
    if not instance_name:
6449
      fqdn_warn = True
6450
      instance_name = self.op.instance_name
6451

    
6452
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6453
      locking.LEVEL_NODE])
6454
    found = False
6455
    for node in exportlist:
6456
      if exportlist[node].failed:
6457
        self.LogWarning("Failed to query node %s, continuing" % node)
6458
        continue
6459
      if instance_name in exportlist[node].data:
6460
        found = True
6461
        result = self.rpc.call_export_remove(node, instance_name)
6462
        if result.failed or not result.data:
6463
          logging.error("Could not remove export for instance %s"
6464
                        " on node %s", instance_name, node)
6465

    
6466
    if fqdn_warn and not found:
6467
      feedback_fn("Export not found. If trying to remove an export belonging"
6468
                  " to a deleted instance please use its Fully Qualified"
6469
                  " Domain Name.")
6470

    
6471

    
6472
class TagsLU(NoHooksLU):
6473
  """Generic tags LU.
6474

6475
  This is an abstract class which is the parent of all the other tags LUs.
6476

6477
  """
6478

    
6479
  def ExpandNames(self):
6480
    self.needed_locks = {}
6481
    if self.op.kind == constants.TAG_NODE:
6482
      name = self.cfg.ExpandNodeName(self.op.name)
6483
      if name is None:
6484
        raise errors.OpPrereqError("Invalid node name (%s)" %
6485
                                   (self.op.name,))
6486
      self.op.name = name
6487
      self.needed_locks[locking.LEVEL_NODE] = name
6488
    elif self.op.kind == constants.TAG_INSTANCE:
6489
      name = self.cfg.ExpandInstanceName(self.op.name)
6490
      if name is None:
6491
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6492
                                   (self.op.name,))
6493
      self.op.name = name
6494
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6495

    
6496
  def CheckPrereq(self):
6497
    """Check prerequisites.
6498

6499
    """
6500
    if self.op.kind == constants.TAG_CLUSTER:
6501
      self.target = self.cfg.GetClusterInfo()
6502
    elif self.op.kind == constants.TAG_NODE:
6503
      self.target = self.cfg.GetNodeInfo(self.op.name)
6504
    elif self.op.kind == constants.TAG_INSTANCE:
6505
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6506
    else:
6507
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6508
                                 str(self.op.kind))
6509

    
6510

    
6511
class LUGetTags(TagsLU):
6512
  """Returns the tags of a given object.
6513

6514
  """
6515
  _OP_REQP = ["kind", "name"]
6516
  REQ_BGL = False
6517

    
6518
  def Exec(self, feedback_fn):
6519
    """Returns the tag list.
6520

6521
    """
6522
    return list(self.target.GetTags())
6523

    
6524

    
6525
class LUSearchTags(NoHooksLU):
6526
  """Searches the tags for a given pattern.
6527

6528
  """
6529
  _OP_REQP = ["pattern"]
6530
  REQ_BGL = False
6531

    
6532
  def ExpandNames(self):
6533
    self.needed_locks = {}
6534

    
6535
  def CheckPrereq(self):
6536
    """Check prerequisites.
6537

6538
    This checks the pattern passed for validity by compiling it.
6539

6540
    """
6541
    try:
6542
      self.re = re.compile(self.op.pattern)
6543
    except re.error, err:
6544
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6545
                                 (self.op.pattern, err))
6546

    
6547
  def Exec(self, feedback_fn):
6548
    """Returns the tag list.
6549

6550
    """
6551
    cfg = self.cfg
6552
    tgts = [("/cluster", cfg.GetClusterInfo())]
6553
    ilist = cfg.GetAllInstancesInfo().values()
6554
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6555
    nlist = cfg.GetAllNodesInfo().values()
6556
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6557
    results = []
6558
    for path, target in tgts:
6559
      for tag in target.GetTags():
6560
        if self.re.search(tag):
6561
          results.append((path, tag))
6562
    return results
6563

    
6564

    
6565
class LUAddTags(TagsLU):
6566
  """Sets a tag on a given object.
6567

6568
  """
6569
  _OP_REQP = ["kind", "name", "tags"]
6570
  REQ_BGL = False
6571

    
6572
  def CheckPrereq(self):
6573
    """Check prerequisites.
6574

6575
    This checks the type and length of the tag name and value.
6576

6577
    """
6578
    TagsLU.CheckPrereq(self)
6579
    for tag in self.op.tags:
6580
      objects.TaggableObject.ValidateTag(tag)
6581

    
6582
  def Exec(self, feedback_fn):
6583
    """Sets the tag.
6584

6585
    """
6586
    try:
6587
      for tag in self.op.tags:
6588
        self.target.AddTag(tag)
6589
    except errors.TagError, err:
6590
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6591
    try:
6592
      self.cfg.Update(self.target)
6593
    except errors.ConfigurationError:
6594
      raise errors.OpRetryError("There has been a modification to the"
6595
                                " config file and the operation has been"
6596
                                " aborted. Please retry.")
6597

    
6598

    
6599
class LUDelTags(TagsLU):
6600
  """Delete a list of tags from a given object.
6601

6602
  """
6603
  _OP_REQP = ["kind", "name", "tags"]
6604
  REQ_BGL = False
6605

    
6606
  def CheckPrereq(self):
6607
    """Check prerequisites.
6608

6609
    This checks that we have the given tag.
6610

6611
    """
6612
    TagsLU.CheckPrereq(self)
6613
    for tag in self.op.tags:
6614
      objects.TaggableObject.ValidateTag(tag)
6615
    del_tags = frozenset(self.op.tags)
6616
    cur_tags = self.target.GetTags()
6617
    if not del_tags <= cur_tags:
6618
      diff_tags = del_tags - cur_tags
6619
      diff_names = ["'%s'" % tag for tag in diff_tags]
6620
      diff_names.sort()
6621
      raise errors.OpPrereqError("Tag(s) %s not found" %
6622
                                 (",".join(diff_names)))
6623

    
6624
  def Exec(self, feedback_fn):
6625
    """Remove the tag from the object.
6626

6627
    """
6628
    for tag in self.op.tags:
6629
      self.target.RemoveTag(tag)
6630
    try:
6631
      self.cfg.Update(self.target)
6632
    except errors.ConfigurationError:
6633
      raise errors.OpRetryError("There has been a modification to the"
6634
                                " config file and the operation has been"
6635
                                " aborted. Please retry.")
6636

    
6637

    
6638
class LUTestDelay(NoHooksLU):
6639
  """Sleep for a specified amount of time.
6640

6641
  This LU sleeps on the master and/or nodes for a specified amount of
6642
  time.
6643

6644
  """
6645
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6646
  REQ_BGL = False
6647

    
6648
  def ExpandNames(self):
6649
    """Expand names and set required locks.
6650

6651
    This expands the node list, if any.
6652

6653
    """
6654
    self.needed_locks = {}
6655
    if self.op.on_nodes:
6656
      # _GetWantedNodes can be used here, but is not always appropriate to use
6657
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6658
      # more information.
6659
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6660
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6661

    
6662
  def CheckPrereq(self):
6663
    """Check prerequisites.
6664

6665
    """
6666

    
6667
  def Exec(self, feedback_fn):
6668
    """Do the actual sleep.
6669

6670
    """
6671
    if self.op.on_master:
6672
      if not utils.TestDelay(self.op.duration):
6673
        raise errors.OpExecError("Error during master delay test")
6674
    if self.op.on_nodes:
6675
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6676
      if not result:
6677
        raise errors.OpExecError("Complete failure from rpc call")
6678
      for node, node_result in result.items():
6679
        node_result.Raise()
6680
        if not node_result.data:
6681
          raise errors.OpExecError("Failure during rpc call to node %s,"
6682
                                   " result: %s" % (node, node_result.data))
6683

    
6684

    
6685
class IAllocator(object):
6686
  """IAllocator framework.
6687

6688
  An IAllocator instance has three sets of attributes:
6689
    - cfg that is needed to query the cluster
6690
    - input data (all members of the _KEYS class attribute are required)
6691
    - four buffer attributes (in|out_data|text), that represent the
6692
      input (to the external script) in text and data structure format,
6693
      and the output from it, again in two formats
6694
    - the result variables from the script (success, info, nodes) for
6695
      easy usage
6696

6697
  """
6698
  _ALLO_KEYS = [
6699
    "mem_size", "disks", "disk_template",
6700
    "os", "tags", "nics", "vcpus", "hypervisor",
6701
    ]
6702
  _RELO_KEYS = [
6703
    "relocate_from",
6704
    ]
6705

    
6706
  def __init__(self, lu, mode, name, **kwargs):
6707
    self.lu = lu
6708
    # init buffer variables
6709
    self.in_text = self.out_text = self.in_data = self.out_data = None
6710
    # init all input fields so that pylint is happy
6711
    self.mode = mode
6712
    self.name = name
6713
    self.mem_size = self.disks = self.disk_template = None
6714
    self.os = self.tags = self.nics = self.vcpus = None
6715
    self.hypervisor = None
6716
    self.relocate_from = None
6717
    # computed fields
6718
    self.required_nodes = None
6719
    # init result fields
6720
    self.success = self.info = self.nodes = None
6721
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6722
      keyset = self._ALLO_KEYS
6723
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6724
      keyset = self._RELO_KEYS
6725
    else:
6726
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6727
                                   " IAllocator" % self.mode)
6728
    for key in kwargs:
6729
      if key not in keyset:
6730
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6731
                                     " IAllocator" % key)
6732
      setattr(self, key, kwargs[key])
6733
    for key in keyset:
6734
      if key not in kwargs:
6735
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6736
                                     " IAllocator" % key)
6737
    self._BuildInputData()
6738

    
6739
  def _ComputeClusterData(self):
6740
    """Compute the generic allocator input data.
6741

6742
    This is the data that is independent of the actual operation.
6743

6744
    """
6745
    cfg = self.lu.cfg
6746
    cluster_info = cfg.GetClusterInfo()
6747
    # cluster data
6748
    data = {
6749
      "version": constants.IALLOCATOR_VERSION,
6750
      "cluster_name": cfg.GetClusterName(),
6751
      "cluster_tags": list(cluster_info.GetTags()),
6752
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6753
      # we don't have job IDs
6754
      }
6755
    iinfo = cfg.GetAllInstancesInfo().values()
6756
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6757

    
6758
    # node data
6759
    node_results = {}
6760
    node_list = cfg.GetNodeList()
6761

    
6762
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6763
      hypervisor_name = self.hypervisor
6764
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6765
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6766

    
6767
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6768
                                           hypervisor_name)
6769
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6770
                       cluster_info.enabled_hypervisors)
6771
    for nname, nresult in node_data.items():
6772
      # first fill in static (config-based) values
6773
      ninfo = cfg.GetNodeInfo(nname)
6774
      pnr = {
6775
        "tags": list(ninfo.GetTags()),
6776
        "primary_ip": ninfo.primary_ip,
6777
        "secondary_ip": ninfo.secondary_ip,
6778
        "offline": ninfo.offline,
6779
        "drained": ninfo.drained,
6780
        "master_candidate": ninfo.master_candidate,
6781
        }
6782

    
6783
      if not ninfo.offline:
6784
        nresult.Raise()
6785
        if not isinstance(nresult.data, dict):
6786
          raise errors.OpExecError("Can't get data for node %s" % nname)
6787
        remote_info = nresult.data
6788
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6789
                     'vg_size', 'vg_free', 'cpu_total']:
6790
          if attr not in remote_info:
6791
            raise errors.OpExecError("Node '%s' didn't return attribute"
6792
                                     " '%s'" % (nname, attr))
6793
          try:
6794
            remote_info[attr] = int(remote_info[attr])
6795
          except ValueError, err:
6796
            raise errors.OpExecError("Node '%s' returned invalid value"
6797
                                     " for '%s': %s" % (nname, attr, err))
6798
        # compute memory used by primary instances
6799
        i_p_mem = i_p_up_mem = 0
6800
        for iinfo, beinfo in i_list:
6801
          if iinfo.primary_node == nname:
6802
            i_p_mem += beinfo[constants.BE_MEMORY]
6803
            if iinfo.name not in node_iinfo[nname].data:
6804
              i_used_mem = 0
6805
            else:
6806
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6807
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6808
            remote_info['memory_free'] -= max(0, i_mem_diff)
6809

    
6810
            if iinfo.admin_up:
6811
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6812

    
6813
        # compute memory used by instances
6814
        pnr_dyn = {
6815
          "total_memory": remote_info['memory_total'],
6816
          "reserved_memory": remote_info['memory_dom0'],
6817
          "free_memory": remote_info['memory_free'],
6818
          "total_disk": remote_info['vg_size'],
6819
          "free_disk": remote_info['vg_free'],
6820
          "total_cpus": remote_info['cpu_total'],
6821
          "i_pri_memory": i_p_mem,
6822
          "i_pri_up_memory": i_p_up_mem,
6823
          }
6824
        pnr.update(pnr_dyn)
6825

    
6826
      node_results[nname] = pnr
6827
    data["nodes"] = node_results
6828

    
6829
    # instance data
6830
    instance_data = {}
6831
    for iinfo, beinfo in i_list:
6832
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6833
                  for n in iinfo.nics]
6834
      pir = {
6835
        "tags": list(iinfo.GetTags()),
6836
        "admin_up": iinfo.admin_up,
6837
        "vcpus": beinfo[constants.BE_VCPUS],
6838
        "memory": beinfo[constants.BE_MEMORY],
6839
        "os": iinfo.os,
6840
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6841
        "nics": nic_data,
6842
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6843
        "disk_template": iinfo.disk_template,
6844
        "hypervisor": iinfo.hypervisor,
6845
        }
6846
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6847
                                                 pir["disks"])
6848
      instance_data[iinfo.name] = pir
6849

    
6850
    data["instances"] = instance_data
6851

    
6852
    self.in_data = data
6853

    
6854
  def _AddNewInstance(self):
6855
    """Add new instance data to allocator structure.
6856

6857
    This in combination with _AllocatorGetClusterData will create the
6858
    correct structure needed as input for the allocator.
6859

6860
    The checks for the completeness of the opcode must have already been
6861
    done.
6862

6863
    """
6864
    data = self.in_data
6865

    
6866
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6867

    
6868
    if self.disk_template in constants.DTS_NET_MIRROR:
6869
      self.required_nodes = 2
6870
    else:
6871
      self.required_nodes = 1
6872
    request = {
6873
      "type": "allocate",
6874
      "name": self.name,
6875
      "disk_template": self.disk_template,
6876
      "tags": self.tags,
6877
      "os": self.os,
6878
      "vcpus": self.vcpus,
6879
      "memory": self.mem_size,
6880
      "disks": self.disks,
6881
      "disk_space_total": disk_space,
6882
      "nics": self.nics,
6883
      "required_nodes": self.required_nodes,
6884
      }
6885
    data["request"] = request
6886

    
6887
  def _AddRelocateInstance(self):
6888
    """Add relocate instance data to allocator structure.
6889

6890
    This in combination with _IAllocatorGetClusterData will create the
6891
    correct structure needed as input for the allocator.
6892

6893
    The checks for the completeness of the opcode must have already been
6894
    done.
6895

6896
    """
6897
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6898
    if instance is None:
6899
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6900
                                   " IAllocator" % self.name)
6901

    
6902
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6903
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6904

    
6905
    if len(instance.secondary_nodes) != 1:
6906
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6907

    
6908
    self.required_nodes = 1
6909
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6910
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6911

    
6912
    request = {
6913
      "type": "relocate",
6914
      "name": self.name,
6915
      "disk_space_total": disk_space,
6916
      "required_nodes": self.required_nodes,
6917
      "relocate_from": self.relocate_from,
6918
      }
6919
    self.in_data["request"] = request
6920

    
6921
  def _BuildInputData(self):
6922
    """Build input data structures.
6923

6924
    """
6925
    self._ComputeClusterData()
6926

    
6927
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6928
      self._AddNewInstance()
6929
    else:
6930
      self._AddRelocateInstance()
6931

    
6932
    self.in_text = serializer.Dump(self.in_data)
6933

    
6934
  def Run(self, name, validate=True, call_fn=None):
6935
    """Run an instance allocator and return the results.
6936

6937
    """
6938
    if call_fn is None:
6939
      call_fn = self.lu.rpc.call_iallocator_runner
6940

    
6941
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6942
    result.Raise()
6943

    
6944
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6945
      raise errors.OpExecError("Invalid result from master iallocator runner")
6946

    
6947
    rcode, stdout, stderr, fail = result.data
6948

    
6949
    if rcode == constants.IARUN_NOTFOUND:
6950
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6951
    elif rcode == constants.IARUN_FAILURE:
6952
      raise errors.OpExecError("Instance allocator call failed: %s,"
6953
                               " output: %s" % (fail, stdout+stderr))
6954
    self.out_text = stdout
6955
    if validate:
6956
      self._ValidateResult()
6957

    
6958
  def _ValidateResult(self):
6959
    """Process the allocator results.
6960

6961
    This will process and if successful save the result in
6962
    self.out_data and the other parameters.
6963

6964
    """
6965
    try:
6966
      rdict = serializer.Load(self.out_text)
6967
    except Exception, err:
6968
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6969

    
6970
    if not isinstance(rdict, dict):
6971
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6972

    
6973
    for key in "success", "info", "nodes":
6974
      if key not in rdict:
6975
        raise errors.OpExecError("Can't parse iallocator results:"
6976
                                 " missing key '%s'" % key)
6977
      setattr(self, key, rdict[key])
6978

    
6979
    if not isinstance(rdict["nodes"], list):
6980
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6981
                               " is not a list")
6982
    self.out_data = rdict
6983

    
6984

    
6985
class LUTestAllocator(NoHooksLU):
6986
  """Run allocator tests.
6987

6988
  This LU runs the allocator tests
6989

6990
  """
6991
  _OP_REQP = ["direction", "mode", "name"]
6992

    
6993
  def CheckPrereq(self):
6994
    """Check prerequisites.
6995

6996
    This checks the opcode parameters depending on the director and mode test.
6997

6998
    """
6999
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7000
      for attr in ["name", "mem_size", "disks", "disk_template",
7001
                   "os", "tags", "nics", "vcpus"]:
7002
        if not hasattr(self.op, attr):
7003
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7004
                                     attr)
7005
      iname = self.cfg.ExpandInstanceName(self.op.name)
7006
      if iname is not None:
7007
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7008
                                   iname)
7009
      if not isinstance(self.op.nics, list):
7010
        raise errors.OpPrereqError("Invalid parameter 'nics'")
7011
      for row in self.op.nics:
7012
        if (not isinstance(row, dict) or
7013
            "mac" not in row or
7014
            "ip" not in row or
7015
            "bridge" not in row):
7016
          raise errors.OpPrereqError("Invalid contents of the"
7017
                                     " 'nics' parameter")
7018
      if not isinstance(self.op.disks, list):
7019
        raise errors.OpPrereqError("Invalid parameter 'disks'")
7020
      for row in self.op.disks:
7021
        if (not isinstance(row, dict) or
7022
            "size" not in row or
7023
            not isinstance(row["size"], int) or
7024
            "mode" not in row or
7025
            row["mode"] not in ['r', 'w']):
7026
          raise errors.OpPrereqError("Invalid contents of the"
7027
                                     " 'disks' parameter")
7028
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7029
        self.op.hypervisor = self.cfg.GetHypervisorType()
7030
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7031
      if not hasattr(self.op, "name"):
7032
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7033
      fname = self.cfg.ExpandInstanceName(self.op.name)
7034
      if fname is None:
7035
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7036
                                   self.op.name)
7037
      self.op.name = fname
7038
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7039
    else:
7040
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7041
                                 self.op.mode)
7042

    
7043
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7044
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
7045
        raise errors.OpPrereqError("Missing allocator name")
7046
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7047
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
7048
                                 self.op.direction)
7049

    
7050
  def Exec(self, feedback_fn):
7051
    """Run the allocator test.
7052

7053
    """
7054
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7055
      ial = IAllocator(self,
7056
                       mode=self.op.mode,
7057
                       name=self.op.name,
7058
                       mem_size=self.op.mem_size,
7059
                       disks=self.op.disks,
7060
                       disk_template=self.op.disk_template,
7061
                       os=self.op.os,
7062
                       tags=self.op.tags,
7063
                       nics=self.op.nics,
7064
                       vcpus=self.op.vcpus,
7065
                       hypervisor=self.op.hypervisor,
7066
                       )
7067
    else:
7068
      ial = IAllocator(self,
7069
                       mode=self.op.mode,
7070
                       name=self.op.name,
7071
                       relocate_from=list(self.relocate_from),
7072
                       )
7073

    
7074
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
7075
      result = ial.in_text
7076
    else:
7077
      ial.Run(self.op.allocator, validate=False)
7078
      result = ial.out_text
7079
    return result