Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 083a91c9

History | View | Annotate | Download (310.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0201
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
import os
30
import os.path
31
import time
32
import re
33
import platform
34
import logging
35
import copy
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq (except when tasklets are used)
54
    - implement Exec (except when tasklets are used)
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  @ivar dry_run_result: the value (if any) that will be returned to the caller
63
      in dry-run mode (signalled by opcode dry_run parameter)
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, rpc):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overridden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.context = context
82
    self.rpc = rpc
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict.fromkeys(locking.LEVELS, 0)
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92
    # logging
93
    self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94
    self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95
    self.LogStep = processor.LogStep # pylint: disable-msg=C0103
96
    # support for dry-run
97
    self.dry_run_result = None
98
    # support for generic debug attribute
99
    if (not hasattr(self.op, "debug_level") or
100
        not isinstance(self.op.debug_level, int)):
101
      self.op.debug_level = 0
102

    
103
    # Tasklets
104
    self.tasklets = None
105

    
106
    for attr_name in self._OP_REQP:
107
      attr_val = getattr(op, attr_name, None)
108
      if attr_val is None:
109
        raise errors.OpPrereqError("Required parameter '%s' missing" %
110
                                   attr_name, errors.ECODE_INVAL)
111

    
112
    self.CheckArguments()
113

    
114
  def __GetSSH(self):
115
    """Returns the SshRunner object
116

117
    """
118
    if not self.__ssh:
119
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
120
    return self.__ssh
121

    
122
  ssh = property(fget=__GetSSH)
123

    
124
  def CheckArguments(self):
125
    """Check syntactic validity for the opcode arguments.
126

127
    This method is for doing a simple syntactic check and ensure
128
    validity of opcode parameters, without any cluster-related
129
    checks. While the same can be accomplished in ExpandNames and/or
130
    CheckPrereq, doing these separate is better because:
131

132
      - ExpandNames is left as as purely a lock-related function
133
      - CheckPrereq is run after we have acquired locks (and possible
134
        waited for them)
135

136
    The function is allowed to change the self.op attribute so that
137
    later methods can no longer worry about missing parameters.
138

139
    """
140
    pass
141

    
142
  def ExpandNames(self):
143
    """Expand names for this LU.
144

145
    This method is called before starting to execute the opcode, and it should
146
    update all the parameters of the opcode to their canonical form (e.g. a
147
    short node name must be fully expanded after this method has successfully
148
    completed). This way locking, hooks, logging, ecc. can work correctly.
149

150
    LUs which implement this method must also populate the self.needed_locks
151
    member, as a dict with lock levels as keys, and a list of needed lock names
152
    as values. Rules:
153

154
      - use an empty dict if you don't need any lock
155
      - if you don't need any lock at a particular level omit that level
156
      - don't put anything for the BGL level
157
      - if you want all locks at a level use locking.ALL_SET as a value
158

159
    If you need to share locks (rather than acquire them exclusively) at one
160
    level you can modify self.share_locks, setting a true value (usually 1) for
161
    that level. By default locks are not shared.
162

163
    This function can also define a list of tasklets, which then will be
164
    executed in order instead of the usual LU-level CheckPrereq and Exec
165
    functions, if those are not defined by the LU.
166

167
    Examples::
168

169
      # Acquire all nodes and one instance
170
      self.needed_locks = {
171
        locking.LEVEL_NODE: locking.ALL_SET,
172
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
173
      }
174
      # Acquire just two nodes
175
      self.needed_locks = {
176
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
177
      }
178
      # Acquire no locks
179
      self.needed_locks = {} # No, you can't leave it to the default value None
180

181
    """
182
    # The implementation of this method is mandatory only if the new LU is
183
    # concurrent, so that old LUs don't need to be changed all at the same
184
    # time.
185
    if self.REQ_BGL:
186
      self.needed_locks = {} # Exclusive LUs don't need locks.
187
    else:
188
      raise NotImplementedError
189

    
190
  def DeclareLocks(self, level):
191
    """Declare LU locking needs for a level
192

193
    While most LUs can just declare their locking needs at ExpandNames time,
194
    sometimes there's the need to calculate some locks after having acquired
195
    the ones before. This function is called just before acquiring locks at a
196
    particular level, but after acquiring the ones at lower levels, and permits
197
    such calculations. It can be used to modify self.needed_locks, and by
198
    default it does nothing.
199

200
    This function is only called if you have something already set in
201
    self.needed_locks for the level.
202

203
    @param level: Locking level which is going to be locked
204
    @type level: member of ganeti.locking.LEVELS
205

206
    """
207

    
208
  def CheckPrereq(self):
209
    """Check prerequisites for this LU.
210

211
    This method should check that the prerequisites for the execution
212
    of this LU are fulfilled. It can do internode communication, but
213
    it should be idempotent - no cluster or system changes are
214
    allowed.
215

216
    The method should raise errors.OpPrereqError in case something is
217
    not fulfilled. Its return value is ignored.
218

219
    This method should also update all the parameters of the opcode to
220
    their canonical form if it hasn't been done by ExpandNames before.
221

222
    """
223
    if self.tasklets is not None:
224
      for (idx, tl) in enumerate(self.tasklets):
225
        logging.debug("Checking prerequisites for tasklet %s/%s",
226
                      idx + 1, len(self.tasklets))
227
        tl.CheckPrereq()
228
    else:
229
      raise NotImplementedError
230

    
231
  def Exec(self, feedback_fn):
232
    """Execute the LU.
233

234
    This method should implement the actual work. It should raise
235
    errors.OpExecError for failures that are somewhat dealt with in
236
    code, or expected.
237

238
    """
239
    if self.tasklets is not None:
240
      for (idx, tl) in enumerate(self.tasklets):
241
        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
242
        tl.Exec(feedback_fn)
243
    else:
244
      raise NotImplementedError
245

    
246
  def BuildHooksEnv(self):
247
    """Build hooks environment for this LU.
248

249
    This method should return a three-node tuple consisting of: a dict
250
    containing the environment that will be used for running the
251
    specific hook for this LU, a list of node names on which the hook
252
    should run before the execution, and a list of node names on which
253
    the hook should run after the execution.
254

255
    The keys of the dict must not have 'GANETI_' prefixed as this will
256
    be handled in the hooks runner. Also note additional keys will be
257
    added by the hooks runner. If the LU doesn't define any
258
    environment, an empty dict (and not None) should be returned.
259

260
    No nodes should be returned as an empty list (and not None).
261

262
    Note that if the HPATH for a LU class is None, this function will
263
    not be called.
264

265
    """
266
    raise NotImplementedError
267

    
268
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269
    """Notify the LU about the results of its hooks.
270

271
    This method is called every time a hooks phase is executed, and notifies
272
    the Logical Unit about the hooks' result. The LU can then use it to alter
273
    its result based on the hooks.  By default the method does nothing and the
274
    previous result is passed back unchanged but any LU can define it if it
275
    wants to use the local cluster hook-scripts somehow.
276

277
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
278
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279
    @param hook_results: the results of the multi-node hooks rpc call
280
    @param feedback_fn: function used send feedback back to the caller
281
    @param lu_result: the previous Exec result this LU had, or None
282
        in the PRE phase
283
    @return: the new Exec result, based on the previous result
284
        and hook results
285

286
    """
287
    # API must be kept, thus we ignore the unused argument and could
288
    # be a function warnings
289
    # pylint: disable-msg=W0613,R0201
290
    return lu_result
291

    
292
  def _ExpandAndLockInstance(self):
293
    """Helper function to expand and lock an instance.
294

295
    Many LUs that work on an instance take its name in self.op.instance_name
296
    and need to expand it and then declare the expanded name for locking. This
297
    function does it, and then updates self.op.instance_name to the expanded
298
    name. It also initializes needed_locks as a dict, if this hasn't been done
299
    before.
300

301
    """
302
    if self.needed_locks is None:
303
      self.needed_locks = {}
304
    else:
305
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306
        "_ExpandAndLockInstance called with instance-level locks set"
307
    self.op.instance_name = _ExpandInstanceName(self.cfg,
308
                                                self.op.instance_name)
309
    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
310

    
311
  def _LockInstancesNodes(self, primary_only=False):
312
    """Helper function to declare instances' nodes for locking.
313

314
    This function should be called after locking one or more instances to lock
315
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
316
    with all primary or secondary nodes for instances already locked and
317
    present in self.needed_locks[locking.LEVEL_INSTANCE].
318

319
    It should be called from DeclareLocks, and for safety only works if
320
    self.recalculate_locks[locking.LEVEL_NODE] is set.
321

322
    In the future it may grow parameters to just lock some instance's nodes, or
323
    to just lock primaries or secondary nodes, if needed.
324

325
    If should be called in DeclareLocks in a way similar to::
326

327
      if level == locking.LEVEL_NODE:
328
        self._LockInstancesNodes()
329

330
    @type primary_only: boolean
331
    @param primary_only: only lock primary nodes of locked instances
332

333
    """
334
    assert locking.LEVEL_NODE in self.recalculate_locks, \
335
      "_LockInstancesNodes helper function called with no nodes to recalculate"
336

    
337
    # TODO: check if we're really been called with the instance locks held
338

    
339
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
340
    # future we might want to have different behaviors depending on the value
341
    # of self.recalculate_locks[locking.LEVEL_NODE]
342
    wanted_nodes = []
343
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
344
      instance = self.context.cfg.GetInstanceInfo(instance_name)
345
      wanted_nodes.append(instance.primary_node)
346
      if not primary_only:
347
        wanted_nodes.extend(instance.secondary_nodes)
348

    
349
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
350
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
351
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
352
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
353

    
354
    del self.recalculate_locks[locking.LEVEL_NODE]
355

    
356

    
357
class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
358
  """Simple LU which runs no hooks.
359

360
  This LU is intended as a parent for other LogicalUnits which will
361
  run no hooks, in order to reduce duplicate code.
362

363
  """
364
  HPATH = None
365
  HTYPE = None
366

    
367
  def BuildHooksEnv(self):
368
    """Empty BuildHooksEnv for NoHooksLu.
369

370
    This just raises an error.
371

372
    """
373
    assert False, "BuildHooksEnv called for NoHooksLUs"
374

    
375

    
376
class Tasklet:
377
  """Tasklet base class.
378

379
  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
380
  they can mix legacy code with tasklets. Locking needs to be done in the LU,
381
  tasklets know nothing about locks.
382

383
  Subclasses must follow these rules:
384
    - Implement CheckPrereq
385
    - Implement Exec
386

387
  """
388
  def __init__(self, lu):
389
    self.lu = lu
390

    
391
    # Shortcuts
392
    self.cfg = lu.cfg
393
    self.rpc = lu.rpc
394

    
395
  def CheckPrereq(self):
396
    """Check prerequisites for this tasklets.
397

398
    This method should check whether the prerequisites for the execution of
399
    this tasklet are fulfilled. It can do internode communication, but it
400
    should be idempotent - no cluster or system changes are allowed.
401

402
    The method should raise errors.OpPrereqError in case something is not
403
    fulfilled. Its return value is ignored.
404

405
    This method should also update all parameters to their canonical form if it
406
    hasn't been done before.
407

408
    """
409
    raise NotImplementedError
410

    
411
  def Exec(self, feedback_fn):
412
    """Execute the tasklet.
413

414
    This method should implement the actual work. It should raise
415
    errors.OpExecError for failures that are somewhat dealt with in code, or
416
    expected.
417

418
    """
419
    raise NotImplementedError
420

    
421

    
422
def _GetWantedNodes(lu, nodes):
423
  """Returns list of checked and expanded node names.
424

425
  @type lu: L{LogicalUnit}
426
  @param lu: the logical unit on whose behalf we execute
427
  @type nodes: list
428
  @param nodes: list of node names or None for all nodes
429
  @rtype: list
430
  @return: the list of nodes, sorted
431
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
432

433
  """
434
  if not isinstance(nodes, list):
435
    raise errors.OpPrereqError("Invalid argument type 'nodes'",
436
                               errors.ECODE_INVAL)
437

    
438
  if not nodes:
439
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
440
      " non-empty list of nodes whose name is to be expanded.")
441

    
442
  wanted = []
443
  for name in nodes:
444
    node = _ExpandNodeName(lu.cfg, name)
445
    wanted.append(node)
446

    
447
  return utils.NiceSort(wanted)
448

    
449

    
450
def _GetWantedInstances(lu, instances):
451
  """Returns list of checked and expanded instance names.
452

453
  @type lu: L{LogicalUnit}
454
  @param lu: the logical unit on whose behalf we execute
455
  @type instances: list
456
  @param instances: list of instance names or None for all instances
457
  @rtype: list
458
  @return: the list of instances, sorted
459
  @raise errors.OpPrereqError: if the instances parameter is wrong type
460
  @raise errors.OpPrereqError: if any of the passed instances is not found
461

462
  """
463
  if not isinstance(instances, list):
464
    raise errors.OpPrereqError("Invalid argument type 'instances'",
465
                               errors.ECODE_INVAL)
466

    
467
  if instances:
468
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
469
  else:
470
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
471
  return wanted
472

    
473

    
474
def _CheckOutputFields(static, dynamic, selected):
475
  """Checks whether all selected fields are valid.
476

477
  @type static: L{utils.FieldSet}
478
  @param static: static fields set
479
  @type dynamic: L{utils.FieldSet}
480
  @param dynamic: dynamic fields set
481

482
  """
483
  f = utils.FieldSet()
484
  f.Extend(static)
485
  f.Extend(dynamic)
486

    
487
  delta = f.NonMatching(selected)
488
  if delta:
489
    raise errors.OpPrereqError("Unknown output fields selected: %s"
490
                               % ",".join(delta), errors.ECODE_INVAL)
491

    
492

    
493
def _CheckBooleanOpField(op, name):
494
  """Validates boolean opcode parameters.
495

496
  This will ensure that an opcode parameter is either a boolean value,
497
  or None (but that it always exists).
498

499
  """
500
  val = getattr(op, name, None)
501
  if not (val is None or isinstance(val, bool)):
502
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
503
                               (name, str(val)), errors.ECODE_INVAL)
504
  setattr(op, name, val)
505

    
506

    
507
def _CheckGlobalHvParams(params):
508
  """Validates that given hypervisor params are not global ones.
509

510
  This will ensure that instances don't get customised versions of
511
  global params.
512

513
  """
514
  used_globals = constants.HVC_GLOBALS.intersection(params)
515
  if used_globals:
516
    msg = ("The following hypervisor parameters are global and cannot"
517
           " be customized at instance level, please modify them at"
518
           " cluster level: %s" % utils.CommaJoin(used_globals))
519
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
520

    
521

    
522
def _CheckNodeOnline(lu, node):
523
  """Ensure that a given node is online.
524

525
  @param lu: the LU on behalf of which we make the check
526
  @param node: the node to check
527
  @raise errors.OpPrereqError: if the node is offline
528

529
  """
530
  if lu.cfg.GetNodeInfo(node).offline:
531
    raise errors.OpPrereqError("Can't use offline node %s" % node,
532
                               errors.ECODE_INVAL)
533

    
534

    
535
def _CheckNodeNotDrained(lu, node):
536
  """Ensure that a given node is not drained.
537

538
  @param lu: the LU on behalf of which we make the check
539
  @param node: the node to check
540
  @raise errors.OpPrereqError: if the node is drained
541

542
  """
543
  if lu.cfg.GetNodeInfo(node).drained:
544
    raise errors.OpPrereqError("Can't use drained node %s" % node,
545
                               errors.ECODE_INVAL)
546

    
547

    
548
def _ExpandItemName(fn, name, kind):
549
  """Expand an item name.
550

551
  @param fn: the function to use for expansion
552
  @param name: requested item name
553
  @param kind: text description ('Node' or 'Instance')
554
  @return: the resolved (full) name
555
  @raise errors.OpPrereqError: if the item is not found
556

557
  """
558
  full_name = fn(name)
559
  if full_name is None:
560
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
561
                               errors.ECODE_NOENT)
562
  return full_name
563

    
564

    
565
def _ExpandNodeName(cfg, name):
566
  """Wrapper over L{_ExpandItemName} for nodes."""
567
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
568

    
569

    
570
def _ExpandInstanceName(cfg, name):
571
  """Wrapper over L{_ExpandItemName} for instance."""
572
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
573

    
574

    
575
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
576
                          memory, vcpus, nics, disk_template, disks,
577
                          bep, hvp, hypervisor_name):
578
  """Builds instance related env variables for hooks
579

580
  This builds the hook environment from individual variables.
581

582
  @type name: string
583
  @param name: the name of the instance
584
  @type primary_node: string
585
  @param primary_node: the name of the instance's primary node
586
  @type secondary_nodes: list
587
  @param secondary_nodes: list of secondary nodes as strings
588
  @type os_type: string
589
  @param os_type: the name of the instance's OS
590
  @type status: boolean
591
  @param status: the should_run status of the instance
592
  @type memory: string
593
  @param memory: the memory size of the instance
594
  @type vcpus: string
595
  @param vcpus: the count of VCPUs the instance has
596
  @type nics: list
597
  @param nics: list of tuples (ip, mac, mode, link) representing
598
      the NICs the instance has
599
  @type disk_template: string
600
  @param disk_template: the disk template of the instance
601
  @type disks: list
602
  @param disks: the list of (size, mode) pairs
603
  @type bep: dict
604
  @param bep: the backend parameters for the instance
605
  @type hvp: dict
606
  @param hvp: the hypervisor parameters for the instance
607
  @type hypervisor_name: string
608
  @param hypervisor_name: the hypervisor for the instance
609
  @rtype: dict
610
  @return: the hook environment for this instance
611

612
  """
613
  if status:
614
    str_status = "up"
615
  else:
616
    str_status = "down"
617
  env = {
618
    "OP_TARGET": name,
619
    "INSTANCE_NAME": name,
620
    "INSTANCE_PRIMARY": primary_node,
621
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
622
    "INSTANCE_OS_TYPE": os_type,
623
    "INSTANCE_STATUS": str_status,
624
    "INSTANCE_MEMORY": memory,
625
    "INSTANCE_VCPUS": vcpus,
626
    "INSTANCE_DISK_TEMPLATE": disk_template,
627
    "INSTANCE_HYPERVISOR": hypervisor_name,
628
  }
629

    
630
  if nics:
631
    nic_count = len(nics)
632
    for idx, (ip, mac, mode, link) in enumerate(nics):
633
      if ip is None:
634
        ip = ""
635
      env["INSTANCE_NIC%d_IP" % idx] = ip
636
      env["INSTANCE_NIC%d_MAC" % idx] = mac
637
      env["INSTANCE_NIC%d_MODE" % idx] = mode
638
      env["INSTANCE_NIC%d_LINK" % idx] = link
639
      if mode == constants.NIC_MODE_BRIDGED:
640
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
641
  else:
642
    nic_count = 0
643

    
644
  env["INSTANCE_NIC_COUNT"] = nic_count
645

    
646
  if disks:
647
    disk_count = len(disks)
648
    for idx, (size, mode) in enumerate(disks):
649
      env["INSTANCE_DISK%d_SIZE" % idx] = size
650
      env["INSTANCE_DISK%d_MODE" % idx] = mode
651
  else:
652
    disk_count = 0
653

    
654
  env["INSTANCE_DISK_COUNT"] = disk_count
655

    
656
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
657
    for key, value in source.items():
658
      env["INSTANCE_%s_%s" % (kind, key)] = value
659

    
660
  return env
661

    
662

    
663
def _NICListToTuple(lu, nics):
664
  """Build a list of nic information tuples.
665

666
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
667
  value in LUQueryInstanceData.
668

669
  @type lu:  L{LogicalUnit}
670
  @param lu: the logical unit on whose behalf we execute
671
  @type nics: list of L{objects.NIC}
672
  @param nics: list of nics to convert to hooks tuples
673

674
  """
675
  hooks_nics = []
676
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
677
  for nic in nics:
678
    ip = nic.ip
679
    mac = nic.mac
680
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
681
    mode = filled_params[constants.NIC_MODE]
682
    link = filled_params[constants.NIC_LINK]
683
    hooks_nics.append((ip, mac, mode, link))
684
  return hooks_nics
685

    
686

    
687
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
688
  """Builds instance related env variables for hooks from an object.
689

690
  @type lu: L{LogicalUnit}
691
  @param lu: the logical unit on whose behalf we execute
692
  @type instance: L{objects.Instance}
693
  @param instance: the instance for which we should build the
694
      environment
695
  @type override: dict
696
  @param override: dictionary with key/values that will override
697
      our values
698
  @rtype: dict
699
  @return: the hook environment dictionary
700

701
  """
702
  cluster = lu.cfg.GetClusterInfo()
703
  bep = cluster.FillBE(instance)
704
  hvp = cluster.FillHV(instance)
705
  args = {
706
    'name': instance.name,
707
    'primary_node': instance.primary_node,
708
    'secondary_nodes': instance.secondary_nodes,
709
    'os_type': instance.os,
710
    'status': instance.admin_up,
711
    'memory': bep[constants.BE_MEMORY],
712
    'vcpus': bep[constants.BE_VCPUS],
713
    'nics': _NICListToTuple(lu, instance.nics),
714
    'disk_template': instance.disk_template,
715
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
716
    'bep': bep,
717
    'hvp': hvp,
718
    'hypervisor_name': instance.hypervisor,
719
  }
720
  if override:
721
    args.update(override)
722
  return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
723

    
724

    
725
def _AdjustCandidatePool(lu, exceptions):
726
  """Adjust the candidate pool after node operations.
727

728
  """
729
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
730
  if mod_list:
731
    lu.LogInfo("Promoted nodes to master candidate role: %s",
732
               utils.CommaJoin(node.name for node in mod_list))
733
    for name in mod_list:
734
      lu.context.ReaddNode(name)
735
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
736
  if mc_now > mc_max:
737
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
738
               (mc_now, mc_max))
739

    
740

    
741
def _DecideSelfPromotion(lu, exceptions=None):
742
  """Decide whether I should promote myself as a master candidate.
743

744
  """
745
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
746
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
747
  # the new node will increase mc_max with one, so:
748
  mc_should = min(mc_should + 1, cp_size)
749
  return mc_now < mc_should
750

    
751

    
752
def _CheckNicsBridgesExist(lu, target_nics, target_node,
753
                               profile=constants.PP_DEFAULT):
754
  """Check that the brigdes needed by a list of nics exist.
755

756
  """
757
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
758
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
759
                for nic in target_nics]
760
  brlist = [params[constants.NIC_LINK] for params in paramslist
761
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
762
  if brlist:
763
    result = lu.rpc.call_bridges_exist(target_node, brlist)
764
    result.Raise("Error checking bridges on destination node '%s'" %
765
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
766

    
767

    
768
def _CheckInstanceBridgesExist(lu, instance, node=None):
769
  """Check that the brigdes needed by an instance exist.
770

771
  """
772
  if node is None:
773
    node = instance.primary_node
774
  _CheckNicsBridgesExist(lu, instance.nics, node)
775

    
776

    
777
def _CheckOSVariant(os_obj, name):
778
  """Check whether an OS name conforms to the os variants specification.
779

780
  @type os_obj: L{objects.OS}
781
  @param os_obj: OS object to check
782
  @type name: string
783
  @param name: OS name passed by the user, to check for validity
784

785
  """
786
  if not os_obj.supported_variants:
787
    return
788
  try:
789
    variant = name.split("+", 1)[1]
790
  except IndexError:
791
    raise errors.OpPrereqError("OS name must include a variant",
792
                               errors.ECODE_INVAL)
793

    
794
  if variant not in os_obj.supported_variants:
795
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
796

    
797

    
798
def _GetNodeInstancesInner(cfg, fn):
799
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
800

    
801

    
802
def _GetNodeInstances(cfg, node_name):
803
  """Returns a list of all primary and secondary instances on a node.
804

805
  """
806

    
807
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
808

    
809

    
810
def _GetNodePrimaryInstances(cfg, node_name):
811
  """Returns primary instances on a node.
812

813
  """
814
  return _GetNodeInstancesInner(cfg,
815
                                lambda inst: node_name == inst.primary_node)
816

    
817

    
818
def _GetNodeSecondaryInstances(cfg, node_name):
819
  """Returns secondary instances on a node.
820

821
  """
822
  return _GetNodeInstancesInner(cfg,
823
                                lambda inst: node_name in inst.secondary_nodes)
824

    
825

    
826
def _GetStorageTypeArgs(cfg, storage_type):
827
  """Returns the arguments for a storage type.
828

829
  """
830
  # Special case for file storage
831
  if storage_type == constants.ST_FILE:
832
    # storage.FileStorage wants a list of storage directories
833
    return [[cfg.GetFileStorageDir()]]
834

    
835
  return []
836

    
837

    
838
def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
839
  faulty = []
840

    
841
  for dev in instance.disks:
842
    cfg.SetDiskID(dev, node_name)
843

    
844
  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
845
  result.Raise("Failed to get disk status from node %s" % node_name,
846
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
847

    
848
  for idx, bdev_status in enumerate(result.payload):
849
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
850
      faulty.append(idx)
851

    
852
  return faulty
853

    
854

    
855
class LUPostInitCluster(LogicalUnit):
856
  """Logical unit for running hooks after cluster initialization.
857

858
  """
859
  HPATH = "cluster-init"
860
  HTYPE = constants.HTYPE_CLUSTER
861
  _OP_REQP = []
862

    
863
  def BuildHooksEnv(self):
864
    """Build hooks env.
865

866
    """
867
    env = {"OP_TARGET": self.cfg.GetClusterName()}
868
    mn = self.cfg.GetMasterNode()
869
    return env, [], [mn]
870

    
871
  def CheckPrereq(self):
872
    """No prerequisites to check.
873

874
    """
875
    return True
876

    
877
  def Exec(self, feedback_fn):
878
    """Nothing to do.
879

880
    """
881
    return True
882

    
883

    
884
class LUDestroyCluster(LogicalUnit):
885
  """Logical unit for destroying the cluster.
886

887
  """
888
  HPATH = "cluster-destroy"
889
  HTYPE = constants.HTYPE_CLUSTER
890
  _OP_REQP = []
891

    
892
  def BuildHooksEnv(self):
893
    """Build hooks env.
894

895
    """
896
    env = {"OP_TARGET": self.cfg.GetClusterName()}
897
    return env, [], []
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    This checks whether the cluster is empty.
903

904
    Any errors are signaled by raising errors.OpPrereqError.
905

906
    """
907
    master = self.cfg.GetMasterNode()
908

    
909
    nodelist = self.cfg.GetNodeList()
910
    if len(nodelist) != 1 or nodelist[0] != master:
911
      raise errors.OpPrereqError("There are still %d node(s) in"
912
                                 " this cluster." % (len(nodelist) - 1),
913
                                 errors.ECODE_INVAL)
914
    instancelist = self.cfg.GetInstanceList()
915
    if instancelist:
916
      raise errors.OpPrereqError("There are still %d instance(s) in"
917
                                 " this cluster." % len(instancelist),
918
                                 errors.ECODE_INVAL)
919

    
920
  def Exec(self, feedback_fn):
921
    """Destroys the cluster.
922

923
    """
924
    master = self.cfg.GetMasterNode()
925
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
926

    
927
    # Run post hooks on master node before it's removed
928
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
929
    try:
930
      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
931
    except:
932
      # pylint: disable-msg=W0702
933
      self.LogWarning("Errors occurred running hooks on %s" % master)
934

    
935
    result = self.rpc.call_node_stop_master(master, False)
936
    result.Raise("Could not disable the master role")
937

    
938
    if modify_ssh_setup:
939
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
940
      utils.CreateBackup(priv_key)
941
      utils.CreateBackup(pub_key)
942

    
943
    return master
944

    
945

    
946
class LUVerifyCluster(LogicalUnit):
947
  """Verifies the cluster status.
948

949
  """
950
  HPATH = "cluster-verify"
951
  HTYPE = constants.HTYPE_CLUSTER
952
  _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
953
  REQ_BGL = False
954

    
955
  TCLUSTER = "cluster"
956
  TNODE = "node"
957
  TINSTANCE = "instance"
958

    
959
  ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
960
  EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
961
  EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
962
  EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
963
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
964
  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
965
  EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
966
  ENODEDRBD = (TNODE, "ENODEDRBD")
967
  ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
968
  ENODEHOOKS = (TNODE, "ENODEHOOKS")
969
  ENODEHV = (TNODE, "ENODEHV")
970
  ENODELVM = (TNODE, "ENODELVM")
971
  ENODEN1 = (TNODE, "ENODEN1")
972
  ENODENET = (TNODE, "ENODENET")
973
  ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
974
  ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
975
  ENODERPC = (TNODE, "ENODERPC")
976
  ENODESSH = (TNODE, "ENODESSH")
977
  ENODEVERSION = (TNODE, "ENODEVERSION")
978
  ENODESETUP = (TNODE, "ENODESETUP")
979
  ENODETIME = (TNODE, "ENODETIME")
980

    
981
  ETYPE_FIELD = "code"
982
  ETYPE_ERROR = "ERROR"
983
  ETYPE_WARNING = "WARNING"
984

    
985
  def ExpandNames(self):
986
    self.needed_locks = {
987
      locking.LEVEL_NODE: locking.ALL_SET,
988
      locking.LEVEL_INSTANCE: locking.ALL_SET,
989
    }
990
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
991

    
992
  def _Error(self, ecode, item, msg, *args, **kwargs):
993
    """Format an error message.
994

995
    Based on the opcode's error_codes parameter, either format a
996
    parseable error code, or a simpler error string.
997

998
    This must be called only from Exec and functions called from Exec.
999

1000
    """
1001
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1002
    itype, etxt = ecode
1003
    # first complete the msg
1004
    if args:
1005
      msg = msg % args
1006
    # then format the whole message
1007
    if self.op.error_codes:
1008
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1009
    else:
1010
      if item:
1011
        item = " " + item
1012
      else:
1013
        item = ""
1014
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1015
    # and finally report it via the feedback_fn
1016
    self._feedback_fn("  - %s" % msg)
1017

    
1018
  def _ErrorIf(self, cond, *args, **kwargs):
1019
    """Log an error message if the passed condition is True.
1020

1021
    """
1022
    cond = bool(cond) or self.op.debug_simulate_errors
1023
    if cond:
1024
      self._Error(*args, **kwargs)
1025
    # do not mark the operation as failed for WARN cases only
1026
    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1027
      self.bad = self.bad or cond
1028

    
1029
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1030
                  node_result, master_files, drbd_map, vg_name):
1031
    """Run multiple tests against a node.
1032

1033
    Test list:
1034

1035
      - compares ganeti version
1036
      - checks vg existence and size > 20G
1037
      - checks config file checksum
1038
      - checks ssh to other nodes
1039

1040
    @type nodeinfo: L{objects.Node}
1041
    @param nodeinfo: the node to check
1042
    @param file_list: required list of files
1043
    @param local_cksum: dictionary of local files and their checksums
1044
    @param node_result: the results from the node
1045
    @param master_files: list of files that only masters should have
1046
    @param drbd_map: the useddrbd minors for this node, in
1047
        form of minor: (instance, must_exist) which correspond to instances
1048
        and their running status
1049
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1050

1051
    """
1052
    node = nodeinfo.name
1053
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1054

    
1055
    # main result, node_result should be a non-empty dict
1056
    test = not node_result or not isinstance(node_result, dict)
1057
    _ErrorIf(test, self.ENODERPC, node,
1058
                  "unable to verify node: no data returned")
1059
    if test:
1060
      return
1061

    
1062
    # compares ganeti version
1063
    local_version = constants.PROTOCOL_VERSION
1064
    remote_version = node_result.get('version', None)
1065
    test = not (remote_version and
1066
                isinstance(remote_version, (list, tuple)) and
1067
                len(remote_version) == 2)
1068
    _ErrorIf(test, self.ENODERPC, node,
1069
             "connection to node returned invalid data")
1070
    if test:
1071
      return
1072

    
1073
    test = local_version != remote_version[0]
1074
    _ErrorIf(test, self.ENODEVERSION, node,
1075
             "incompatible protocol versions: master %s,"
1076
             " node %s", local_version, remote_version[0])
1077
    if test:
1078
      return
1079

    
1080
    # node seems compatible, we can actually try to look into its results
1081

    
1082
    # full package version
1083
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1084
                  self.ENODEVERSION, node,
1085
                  "software version mismatch: master %s, node %s",
1086
                  constants.RELEASE_VERSION, remote_version[1],
1087
                  code=self.ETYPE_WARNING)
1088

    
1089
    # checks vg existence and size > 20G
1090
    if vg_name is not None:
1091
      vglist = node_result.get(constants.NV_VGLIST, None)
1092
      test = not vglist
1093
      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1094
      if not test:
1095
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1096
                                              constants.MIN_VG_SIZE)
1097
        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1098

    
1099
    # checks config file checksum
1100

    
1101
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
1102
    test = not isinstance(remote_cksum, dict)
1103
    _ErrorIf(test, self.ENODEFILECHECK, node,
1104
             "node hasn't returned file checksum data")
1105
    if not test:
1106
      for file_name in file_list:
1107
        node_is_mc = nodeinfo.master_candidate
1108
        must_have = (file_name not in master_files) or node_is_mc
1109
        # missing
1110
        test1 = file_name not in remote_cksum
1111
        # invalid checksum
1112
        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1113
        # existing and good
1114
        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1115
        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1116
                 "file '%s' missing", file_name)
1117
        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1118
                 "file '%s' has wrong checksum", file_name)
1119
        # not candidate and this is not a must-have file
1120
        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1121
                 "file '%s' should not exist on non master"
1122
                 " candidates (and the file is outdated)", file_name)
1123
        # all good, except non-master/non-must have combination
1124
        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1125
                 "file '%s' should not exist"
1126
                 " on non master candidates", file_name)
1127

    
1128
    # checks ssh to any
1129

    
1130
    test = constants.NV_NODELIST not in node_result
1131
    _ErrorIf(test, self.ENODESSH, node,
1132
             "node hasn't returned node ssh connectivity data")
1133
    if not test:
1134
      if node_result[constants.NV_NODELIST]:
1135
        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1136
          _ErrorIf(True, self.ENODESSH, node,
1137
                   "ssh communication with node '%s': %s", a_node, a_msg)
1138

    
1139
    test = constants.NV_NODENETTEST not in node_result
1140
    _ErrorIf(test, self.ENODENET, node,
1141
             "node hasn't returned node tcp connectivity data")
1142
    if not test:
1143
      if node_result[constants.NV_NODENETTEST]:
1144
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1145
        for anode in nlist:
1146
          _ErrorIf(True, self.ENODENET, node,
1147
                   "tcp communication with node '%s': %s",
1148
                   anode, node_result[constants.NV_NODENETTEST][anode])
1149

    
1150
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1151
    if isinstance(hyp_result, dict):
1152
      for hv_name, hv_result in hyp_result.iteritems():
1153
        test = hv_result is not None
1154
        _ErrorIf(test, self.ENODEHV, node,
1155
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1156

    
1157
    # check used drbd list
1158
    if vg_name is not None:
1159
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
1160
      test = not isinstance(used_minors, (tuple, list))
1161
      _ErrorIf(test, self.ENODEDRBD, node,
1162
               "cannot parse drbd status file: %s", str(used_minors))
1163
      if not test:
1164
        for minor, (iname, must_exist) in drbd_map.items():
1165
          test = minor not in used_minors and must_exist
1166
          _ErrorIf(test, self.ENODEDRBD, node,
1167
                   "drbd minor %d of instance %s is not active",
1168
                   minor, iname)
1169
        for minor in used_minors:
1170
          test = minor not in drbd_map
1171
          _ErrorIf(test, self.ENODEDRBD, node,
1172
                   "unallocated drbd minor %d is in use", minor)
1173
    test = node_result.get(constants.NV_NODESETUP,
1174
                           ["Missing NODESETUP results"])
1175
    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1176
             "; ".join(test))
1177

    
1178
    # check pv names
1179
    if vg_name is not None:
1180
      pvlist = node_result.get(constants.NV_PVLIST, None)
1181
      test = pvlist is None
1182
      _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1183
      if not test:
1184
        # check that ':' is not present in PV names, since it's a
1185
        # special character for lvcreate (denotes the range of PEs to
1186
        # use on the PV)
1187
        for _, pvname, owner_vg in pvlist:
1188
          test = ":" in pvname
1189
          _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1190
                   " '%s' of VG '%s'", pvname, owner_vg)
1191

    
1192
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1193
                      node_instance, n_offline):
1194
    """Verify an instance.
1195

1196
    This function checks to see if the required block devices are
1197
    available on the instance's node.
1198

1199
    """
1200
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1201
    node_current = instanceconfig.primary_node
1202

    
1203
    node_vol_should = {}
1204
    instanceconfig.MapLVsByNode(node_vol_should)
1205

    
1206
    for node in node_vol_should:
1207
      if node in n_offline:
1208
        # ignore missing volumes on offline nodes
1209
        continue
1210
      for volume in node_vol_should[node]:
1211
        test = node not in node_vol_is or volume not in node_vol_is[node]
1212
        _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1213
                 "volume %s missing on node %s", volume, node)
1214

    
1215
    if instanceconfig.admin_up:
1216
      test = ((node_current not in node_instance or
1217
               not instance in node_instance[node_current]) and
1218
              node_current not in n_offline)
1219
      _ErrorIf(test, self.EINSTANCEDOWN, instance,
1220
               "instance not running on its primary node %s",
1221
               node_current)
1222

    
1223
    for node in node_instance:
1224
      if (not node == node_current):
1225
        test = instance in node_instance[node]
1226
        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1227
                 "instance should not run on node %s", node)
1228

    
1229
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1230
    """Verify if there are any unknown volumes in the cluster.
1231

1232
    The .os, .swap and backup volumes are ignored. All other volumes are
1233
    reported as unknown.
1234

1235
    """
1236
    for node in node_vol_is:
1237
      for volume in node_vol_is[node]:
1238
        test = (node not in node_vol_should or
1239
                volume not in node_vol_should[node])
1240
        self._ErrorIf(test, self.ENODEORPHANLV, node,
1241
                      "volume %s is unknown", volume)
1242

    
1243
  def _VerifyOrphanInstances(self, instancelist, node_instance):
1244
    """Verify the list of running instances.
1245

1246
    This checks what instances are running but unknown to the cluster.
1247

1248
    """
1249
    for node in node_instance:
1250
      for o_inst in node_instance[node]:
1251
        test = o_inst not in instancelist
1252
        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1253
                      "instance %s on node %s should not exist", o_inst, node)
1254

    
1255
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1256
    """Verify N+1 Memory Resilience.
1257

1258
    Check that if one single node dies we can still start all the instances it
1259
    was primary for.
1260

1261
    """
1262
    for node, nodeinfo in node_info.iteritems():
1263
      # This code checks that every node which is now listed as secondary has
1264
      # enough memory to host all instances it is supposed to should a single
1265
      # other node in the cluster fail.
1266
      # FIXME: not ready for failover to an arbitrary node
1267
      # FIXME: does not support file-backed instances
1268
      # WARNING: we currently take into account down instances as well as up
1269
      # ones, considering that even if they're down someone might want to start
1270
      # them even in the event of a node failure.
1271
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1272
        needed_mem = 0
1273
        for instance in instances:
1274
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1275
          if bep[constants.BE_AUTO_BALANCE]:
1276
            needed_mem += bep[constants.BE_MEMORY]
1277
        test = nodeinfo['mfree'] < needed_mem
1278
        self._ErrorIf(test, self.ENODEN1, node,
1279
                      "not enough memory on to accommodate"
1280
                      " failovers should peer node %s fail", prinode)
1281

    
1282
  def CheckPrereq(self):
1283
    """Check prerequisites.
1284

1285
    Transform the list of checks we're going to skip into a set and check that
1286
    all its members are valid.
1287

1288
    """
1289
    self.skip_set = frozenset(self.op.skip_checks)
1290
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1291
      raise errors.OpPrereqError("Invalid checks to be skipped specified",
1292
                                 errors.ECODE_INVAL)
1293

    
1294
  def BuildHooksEnv(self):
1295
    """Build hooks env.
1296

1297
    Cluster-Verify hooks just ran in the post phase and their failure makes
1298
    the output be logged in the verify output and the verification to fail.
1299

1300
    """
1301
    all_nodes = self.cfg.GetNodeList()
1302
    env = {
1303
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1304
      }
1305
    for node in self.cfg.GetAllNodesInfo().values():
1306
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1307

    
1308
    return env, [], all_nodes
1309

    
1310
  def Exec(self, feedback_fn):
1311
    """Verify integrity of cluster, performing various test on nodes.
1312

1313
    """
1314
    self.bad = False
1315
    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1316
    verbose = self.op.verbose
1317
    self._feedback_fn = feedback_fn
1318
    feedback_fn("* Verifying global settings")
1319
    for msg in self.cfg.VerifyConfig():
1320
      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1321

    
1322
    vg_name = self.cfg.GetVGName()
1323
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1324
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
1325
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1326
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1327
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1328
                        for iname in instancelist)
1329
    i_non_redundant = [] # Non redundant instances
1330
    i_non_a_balanced = [] # Non auto-balanced instances
1331
    n_offline = [] # List of offline nodes
1332
    n_drained = [] # List of nodes being drained
1333
    node_volume = {}
1334
    node_instance = {}
1335
    node_info = {}
1336
    instance_cfg = {}
1337

    
1338
    # FIXME: verify OS list
1339
    # do local checksums
1340
    master_files = [constants.CLUSTER_CONF_FILE]
1341

    
1342
    file_names = ssconf.SimpleStore().GetFileList()
1343
    file_names.append(constants.SSL_CERT_FILE)
1344
    file_names.append(constants.RAPI_CERT_FILE)
1345
    file_names.extend(master_files)
1346

    
1347
    local_checksums = utils.FingerprintFiles(file_names)
1348

    
1349
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1350
    node_verify_param = {
1351
      constants.NV_FILELIST: file_names,
1352
      constants.NV_NODELIST: [node.name for node in nodeinfo
1353
                              if not node.offline],
1354
      constants.NV_HYPERVISOR: hypervisors,
1355
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1356
                                  node.secondary_ip) for node in nodeinfo
1357
                                 if not node.offline],
1358
      constants.NV_INSTANCELIST: hypervisors,
1359
      constants.NV_VERSION: None,
1360
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1361
      constants.NV_NODESETUP: None,
1362
      constants.NV_TIME: None,
1363
      }
1364

    
1365
    if vg_name is not None:
1366
      node_verify_param[constants.NV_VGLIST] = None
1367
      node_verify_param[constants.NV_LVLIST] = vg_name
1368
      node_verify_param[constants.NV_PVLIST] = [vg_name]
1369
      node_verify_param[constants.NV_DRBDLIST] = None
1370

    
1371
    # Due to the way our RPC system works, exact response times cannot be
1372
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1373
    # time before and after executing the request, we can at least have a time
1374
    # window.
1375
    nvinfo_starttime = time.time()
1376
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1377
                                           self.cfg.GetClusterName())
1378
    nvinfo_endtime = time.time()
1379

    
1380
    cluster = self.cfg.GetClusterInfo()
1381
    master_node = self.cfg.GetMasterNode()
1382
    all_drbd_map = self.cfg.ComputeDRBDMap()
1383

    
1384
    feedback_fn("* Verifying node status")
1385
    for node_i in nodeinfo:
1386
      node = node_i.name
1387

    
1388
      if node_i.offline:
1389
        if verbose:
1390
          feedback_fn("* Skipping offline node %s" % (node,))
1391
        n_offline.append(node)
1392
        continue
1393

    
1394
      if node == master_node:
1395
        ntype = "master"
1396
      elif node_i.master_candidate:
1397
        ntype = "master candidate"
1398
      elif node_i.drained:
1399
        ntype = "drained"
1400
        n_drained.append(node)
1401
      else:
1402
        ntype = "regular"
1403
      if verbose:
1404
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1405

    
1406
      msg = all_nvinfo[node].fail_msg
1407
      _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1408
      if msg:
1409
        continue
1410

    
1411
      nresult = all_nvinfo[node].payload
1412
      node_drbd = {}
1413
      for minor, instance in all_drbd_map[node].items():
1414
        test = instance not in instanceinfo
1415
        _ErrorIf(test, self.ECLUSTERCFG, None,
1416
                 "ghost instance '%s' in temporary DRBD map", instance)
1417
          # ghost instance should not be running, but otherwise we
1418
          # don't give double warnings (both ghost instance and
1419
          # unallocated minor in use)
1420
        if test:
1421
          node_drbd[minor] = (instance, False)
1422
        else:
1423
          instance = instanceinfo[instance]
1424
          node_drbd[minor] = (instance.name, instance.admin_up)
1425

    
1426
      self._VerifyNode(node_i, file_names, local_checksums,
1427
                       nresult, master_files, node_drbd, vg_name)
1428

    
1429
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1430
      if vg_name is None:
1431
        node_volume[node] = {}
1432
      elif isinstance(lvdata, basestring):
1433
        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1434
                 utils.SafeEncode(lvdata))
1435
        node_volume[node] = {}
1436
      elif not isinstance(lvdata, dict):
1437
        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1438
        continue
1439
      else:
1440
        node_volume[node] = lvdata
1441

    
1442
      # node_instance
1443
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1444
      test = not isinstance(idata, list)
1445
      _ErrorIf(test, self.ENODEHV, node,
1446
               "rpc call to node failed (instancelist)")
1447
      if test:
1448
        continue
1449

    
1450
      node_instance[node] = idata
1451

    
1452
      # node_info
1453
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1454
      test = not isinstance(nodeinfo, dict)
1455
      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1456
      if test:
1457
        continue
1458

    
1459
      # Node time
1460
      ntime = nresult.get(constants.NV_TIME, None)
1461
      try:
1462
        ntime_merged = utils.MergeTime(ntime)
1463
      except (ValueError, TypeError):
1464
        _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1465

    
1466
      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1467
        ntime_diff = abs(nvinfo_starttime - ntime_merged)
1468
      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1469
        ntime_diff = abs(ntime_merged - nvinfo_endtime)
1470
      else:
1471
        ntime_diff = None
1472

    
1473
      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1474
               "Node time diverges by at least %0.1fs from master node time",
1475
               ntime_diff)
1476

    
1477
      if ntime_diff is not None:
1478
        continue
1479

    
1480
      try:
1481
        node_info[node] = {
1482
          "mfree": int(nodeinfo['memory_free']),
1483
          "pinst": [],
1484
          "sinst": [],
1485
          # dictionary holding all instances this node is secondary for,
1486
          # grouped by their primary node. Each key is a cluster node, and each
1487
          # value is a list of instances which have the key as primary and the
1488
          # current node as secondary.  this is handy to calculate N+1 memory
1489
          # availability if you can only failover from a primary to its
1490
          # secondary.
1491
          "sinst-by-pnode": {},
1492
        }
1493
        # FIXME: devise a free space model for file based instances as well
1494
        if vg_name is not None:
1495
          test = (constants.NV_VGLIST not in nresult or
1496
                  vg_name not in nresult[constants.NV_VGLIST])
1497
          _ErrorIf(test, self.ENODELVM, node,
1498
                   "node didn't return data for the volume group '%s'"
1499
                   " - it is either missing or broken", vg_name)
1500
          if test:
1501
            continue
1502
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1503
      except (ValueError, KeyError):
1504
        _ErrorIf(True, self.ENODERPC, node,
1505
                 "node returned invalid nodeinfo, check lvm/hypervisor")
1506
        continue
1507

    
1508
    node_vol_should = {}
1509

    
1510
    feedback_fn("* Verifying instance status")
1511
    for instance in instancelist:
1512
      if verbose:
1513
        feedback_fn("* Verifying instance %s" % instance)
1514
      inst_config = instanceinfo[instance]
1515
      self._VerifyInstance(instance, inst_config, node_volume,
1516
                           node_instance, n_offline)
1517
      inst_nodes_offline = []
1518

    
1519
      inst_config.MapLVsByNode(node_vol_should)
1520

    
1521
      instance_cfg[instance] = inst_config
1522

    
1523
      pnode = inst_config.primary_node
1524
      _ErrorIf(pnode not in node_info and pnode not in n_offline,
1525
               self.ENODERPC, pnode, "instance %s, connection to"
1526
               " primary node failed", instance)
1527
      if pnode in node_info:
1528
        node_info[pnode]['pinst'].append(instance)
1529

    
1530
      if pnode in n_offline:
1531
        inst_nodes_offline.append(pnode)
1532

    
1533
      # If the instance is non-redundant we cannot survive losing its primary
1534
      # node, so we are not N+1 compliant. On the other hand we have no disk
1535
      # templates with more than one secondary so that situation is not well
1536
      # supported either.
1537
      # FIXME: does not support file-backed instances
1538
      if len(inst_config.secondary_nodes) == 0:
1539
        i_non_redundant.append(instance)
1540
      _ErrorIf(len(inst_config.secondary_nodes) > 1,
1541
               self.EINSTANCELAYOUT, instance,
1542
               "instance has multiple secondary nodes", code="WARNING")
1543

    
1544
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1545
        i_non_a_balanced.append(instance)
1546

    
1547
      for snode in inst_config.secondary_nodes:
1548
        _ErrorIf(snode not in node_info and snode not in n_offline,
1549
                 self.ENODERPC, snode,
1550
                 "instance %s, connection to secondary node"
1551
                 "failed", instance)
1552

    
1553
        if snode in node_info:
1554
          node_info[snode]['sinst'].append(instance)
1555
          if pnode not in node_info[snode]['sinst-by-pnode']:
1556
            node_info[snode]['sinst-by-pnode'][pnode] = []
1557
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1558

    
1559
        if snode in n_offline:
1560
          inst_nodes_offline.append(snode)
1561

    
1562
      # warn that the instance lives on offline nodes
1563
      _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1564
               "instance lives on offline node(s) %s",
1565
               utils.CommaJoin(inst_nodes_offline))
1566

    
1567
    feedback_fn("* Verifying orphan volumes")
1568
    self._VerifyOrphanVolumes(node_vol_should, node_volume)
1569

    
1570
    feedback_fn("* Verifying remaining instances")
1571
    self._VerifyOrphanInstances(instancelist, node_instance)
1572

    
1573
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1574
      feedback_fn("* Verifying N+1 Memory redundancy")
1575
      self._VerifyNPlusOneMemory(node_info, instance_cfg)
1576

    
1577
    feedback_fn("* Other Notes")
1578
    if i_non_redundant:
1579
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1580
                  % len(i_non_redundant))
1581

    
1582
    if i_non_a_balanced:
1583
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1584
                  % len(i_non_a_balanced))
1585

    
1586
    if n_offline:
1587
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1588

    
1589
    if n_drained:
1590
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1591

    
1592
    return not self.bad
1593

    
1594
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1595
    """Analyze the post-hooks' result
1596

1597
    This method analyses the hook result, handles it, and sends some
1598
    nicely-formatted feedback back to the user.
1599

1600
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1601
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1602
    @param hooks_results: the results of the multi-node hooks rpc call
1603
    @param feedback_fn: function used send feedback back to the caller
1604
    @param lu_result: previous Exec result
1605
    @return: the new Exec result, based on the previous result
1606
        and hook results
1607

1608
    """
1609
    # We only really run POST phase hooks, and are only interested in
1610
    # their results
1611
    if phase == constants.HOOKS_PHASE_POST:
1612
      # Used to change hooks' output to proper indentation
1613
      indent_re = re.compile('^', re.M)
1614
      feedback_fn("* Hooks Results")
1615
      assert hooks_results, "invalid result from hooks"
1616

    
1617
      for node_name in hooks_results:
1618
        res = hooks_results[node_name]
1619
        msg = res.fail_msg
1620
        test = msg and not res.offline
1621
        self._ErrorIf(test, self.ENODEHOOKS, node_name,
1622
                      "Communication failure in hooks execution: %s", msg)
1623
        if res.offline or msg:
1624
          # No need to investigate payload if node is offline or gave an error.
1625
          # override manually lu_result here as _ErrorIf only
1626
          # overrides self.bad
1627
          lu_result = 1
1628
          continue
1629
        for script, hkr, output in res.payload:
1630
          test = hkr == constants.HKR_FAIL
1631
          self._ErrorIf(test, self.ENODEHOOKS, node_name,
1632
                        "Script %s failed, output:", script)
1633
          if test:
1634
            output = indent_re.sub('      ', output)
1635
            feedback_fn("%s" % output)
1636
            lu_result = 1
1637

    
1638
      return lu_result
1639

    
1640

    
1641
class LUVerifyDisks(NoHooksLU):
1642
  """Verifies the cluster disks status.
1643

1644
  """
1645
  _OP_REQP = []
1646
  REQ_BGL = False
1647

    
1648
  def ExpandNames(self):
1649
    self.needed_locks = {
1650
      locking.LEVEL_NODE: locking.ALL_SET,
1651
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1652
    }
1653
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    This has no prerequisites.
1659

1660
    """
1661
    pass
1662

    
1663
  def Exec(self, feedback_fn):
1664
    """Verify integrity of cluster disks.
1665

1666
    @rtype: tuple of three items
1667
    @return: a tuple of (dict of node-to-node_error, list of instances
1668
        which need activate-disks, dict of instance: (node, volume) for
1669
        missing volumes
1670

1671
    """
1672
    result = res_nodes, res_instances, res_missing = {}, [], {}
1673

    
1674
    vg_name = self.cfg.GetVGName()
1675
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1676
    instances = [self.cfg.GetInstanceInfo(name)
1677
                 for name in self.cfg.GetInstanceList()]
1678

    
1679
    nv_dict = {}
1680
    for inst in instances:
1681
      inst_lvs = {}
1682
      if (not inst.admin_up or
1683
          inst.disk_template not in constants.DTS_NET_MIRROR):
1684
        continue
1685
      inst.MapLVsByNode(inst_lvs)
1686
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1687
      for node, vol_list in inst_lvs.iteritems():
1688
        for vol in vol_list:
1689
          nv_dict[(node, vol)] = inst
1690

    
1691
    if not nv_dict:
1692
      return result
1693

    
1694
    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1695

    
1696
    for node in nodes:
1697
      # node_volume
1698
      node_res = node_lvs[node]
1699
      if node_res.offline:
1700
        continue
1701
      msg = node_res.fail_msg
1702
      if msg:
1703
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1704
        res_nodes[node] = msg
1705
        continue
1706

    
1707
      lvs = node_res.payload
1708
      for lv_name, (_, _, lv_online) in lvs.items():
1709
        inst = nv_dict.pop((node, lv_name), None)
1710
        if (not lv_online and inst is not None
1711
            and inst.name not in res_instances):
1712
          res_instances.append(inst.name)
1713

    
1714
    # any leftover items in nv_dict are missing LVs, let's arrange the
1715
    # data better
1716
    for key, inst in nv_dict.iteritems():
1717
      if inst.name not in res_missing:
1718
        res_missing[inst.name] = []
1719
      res_missing[inst.name].append(key)
1720

    
1721
    return result
1722

    
1723

    
1724
class LURepairDiskSizes(NoHooksLU):
1725
  """Verifies the cluster disks sizes.
1726

1727
  """
1728
  _OP_REQP = ["instances"]
1729
  REQ_BGL = False
1730

    
1731
  def ExpandNames(self):
1732
    if not isinstance(self.op.instances, list):
1733
      raise errors.OpPrereqError("Invalid argument type 'instances'",
1734
                                 errors.ECODE_INVAL)
1735

    
1736
    if self.op.instances:
1737
      self.wanted_names = []
1738
      for name in self.op.instances:
1739
        full_name = _ExpandInstanceName(self.cfg, name)
1740
        self.wanted_names.append(full_name)
1741
      self.needed_locks = {
1742
        locking.LEVEL_NODE: [],
1743
        locking.LEVEL_INSTANCE: self.wanted_names,
1744
        }
1745
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1746
    else:
1747
      self.wanted_names = None
1748
      self.needed_locks = {
1749
        locking.LEVEL_NODE: locking.ALL_SET,
1750
        locking.LEVEL_INSTANCE: locking.ALL_SET,
1751
        }
1752
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1753

    
1754
  def DeclareLocks(self, level):
1755
    if level == locking.LEVEL_NODE and self.wanted_names is not None:
1756
      self._LockInstancesNodes(primary_only=True)
1757

    
1758
  def CheckPrereq(self):
1759
    """Check prerequisites.
1760

1761
    This only checks the optional instance list against the existing names.
1762

1763
    """
1764
    if self.wanted_names is None:
1765
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1766

    
1767
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1768
                             in self.wanted_names]
1769

    
1770
  def _EnsureChildSizes(self, disk):
1771
    """Ensure children of the disk have the needed disk size.
1772

1773
    This is valid mainly for DRBD8 and fixes an issue where the
1774
    children have smaller disk size.
1775

1776
    @param disk: an L{ganeti.objects.Disk} object
1777

1778
    """
1779
    if disk.dev_type == constants.LD_DRBD8:
1780
      assert disk.children, "Empty children for DRBD8?"
1781
      fchild = disk.children[0]
1782
      mismatch = fchild.size < disk.size
1783
      if mismatch:
1784
        self.LogInfo("Child disk has size %d, parent %d, fixing",
1785
                     fchild.size, disk.size)
1786
        fchild.size = disk.size
1787

    
1788
      # and we recurse on this child only, not on the metadev
1789
      return self._EnsureChildSizes(fchild) or mismatch
1790
    else:
1791
      return False
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Verify the size of cluster disks.
1795

1796
    """
1797
    # TODO: check child disks too
1798
    # TODO: check differences in size between primary/secondary nodes
1799
    per_node_disks = {}
1800
    for instance in self.wanted_instances:
1801
      pnode = instance.primary_node
1802
      if pnode not in per_node_disks:
1803
        per_node_disks[pnode] = []
1804
      for idx, disk in enumerate(instance.disks):
1805
        per_node_disks[pnode].append((instance, idx, disk))
1806

    
1807
    changed = []
1808
    for node, dskl in per_node_disks.items():
1809
      newl = [v[2].Copy() for v in dskl]
1810
      for dsk in newl:
1811
        self.cfg.SetDiskID(dsk, node)
1812
      result = self.rpc.call_blockdev_getsizes(node, newl)
1813
      if result.fail_msg:
1814
        self.LogWarning("Failure in blockdev_getsizes call to node"
1815
                        " %s, ignoring", node)
1816
        continue
1817
      if len(result.data) != len(dskl):
1818
        self.LogWarning("Invalid result from node %s, ignoring node results",
1819
                        node)
1820
        continue
1821
      for ((instance, idx, disk), size) in zip(dskl, result.data):
1822
        if size is None:
1823
          self.LogWarning("Disk %d of instance %s did not return size"
1824
                          " information, ignoring", idx, instance.name)
1825
          continue
1826
        if not isinstance(size, (int, long)):
1827
          self.LogWarning("Disk %d of instance %s did not return valid"
1828
                          " size information, ignoring", idx, instance.name)
1829
          continue
1830
        size = size >> 20
1831
        if size != disk.size:
1832
          self.LogInfo("Disk %d of instance %s has mismatched size,"
1833
                       " correcting: recorded %d, actual %d", idx,
1834
                       instance.name, disk.size, size)
1835
          disk.size = size
1836
          self.cfg.Update(instance, feedback_fn)
1837
          changed.append((instance.name, idx, size))
1838
        if self._EnsureChildSizes(disk):
1839
          self.cfg.Update(instance, feedback_fn)
1840
          changed.append((instance.name, idx, disk.size))
1841
    return changed
1842

    
1843

    
1844
class LURenameCluster(LogicalUnit):
1845
  """Rename the cluster.
1846

1847
  """
1848
  HPATH = "cluster-rename"
1849
  HTYPE = constants.HTYPE_CLUSTER
1850
  _OP_REQP = ["name"]
1851

    
1852
  def BuildHooksEnv(self):
1853
    """Build hooks env.
1854

1855
    """
1856
    env = {
1857
      "OP_TARGET": self.cfg.GetClusterName(),
1858
      "NEW_NAME": self.op.name,
1859
      }
1860
    mn = self.cfg.GetMasterNode()
1861
    all_nodes = self.cfg.GetNodeList()
1862
    return env, [mn], all_nodes
1863

    
1864
  def CheckPrereq(self):
1865
    """Verify that the passed name is a valid one.
1866

1867
    """
1868
    hostname = utils.GetHostInfo(self.op.name)
1869

    
1870
    new_name = hostname.name
1871
    self.ip = new_ip = hostname.ip
1872
    old_name = self.cfg.GetClusterName()
1873
    old_ip = self.cfg.GetMasterIP()
1874
    if new_name == old_name and new_ip == old_ip:
1875
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1876
                                 " cluster has changed",
1877
                                 errors.ECODE_INVAL)
1878
    if new_ip != old_ip:
1879
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1880
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1881
                                   " reachable on the network. Aborting." %
1882
                                   new_ip, errors.ECODE_NOTUNIQUE)
1883

    
1884
    self.op.name = new_name
1885

    
1886
  def Exec(self, feedback_fn):
1887
    """Rename the cluster.
1888

1889
    """
1890
    clustername = self.op.name
1891
    ip = self.ip
1892

    
1893
    # shutdown the master IP
1894
    master = self.cfg.GetMasterNode()
1895
    result = self.rpc.call_node_stop_master(master, False)
1896
    result.Raise("Could not disable the master role")
1897

    
1898
    try:
1899
      cluster = self.cfg.GetClusterInfo()
1900
      cluster.cluster_name = clustername
1901
      cluster.master_ip = ip
1902
      self.cfg.Update(cluster, feedback_fn)
1903

    
1904
      # update the known hosts file
1905
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1906
      node_list = self.cfg.GetNodeList()
1907
      try:
1908
        node_list.remove(master)
1909
      except ValueError:
1910
        pass
1911
      result = self.rpc.call_upload_file(node_list,
1912
                                         constants.SSH_KNOWN_HOSTS_FILE)
1913
      for to_node, to_result in result.iteritems():
1914
        msg = to_result.fail_msg
1915
        if msg:
1916
          msg = ("Copy of file %s to node %s failed: %s" %
1917
                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1918
          self.proc.LogWarning(msg)
1919

    
1920
    finally:
1921
      result = self.rpc.call_node_start_master(master, False, False)
1922
      msg = result.fail_msg
1923
      if msg:
1924
        self.LogWarning("Could not re-enable the master role on"
1925
                        " the master, please restart manually: %s", msg)
1926

    
1927

    
1928
def _RecursiveCheckIfLVMBased(disk):
1929
  """Check if the given disk or its children are lvm-based.
1930

1931
  @type disk: L{objects.Disk}
1932
  @param disk: the disk to check
1933
  @rtype: boolean
1934
  @return: boolean indicating whether a LD_LV dev_type was found or not
1935

1936
  """
1937
  if disk.children:
1938
    for chdisk in disk.children:
1939
      if _RecursiveCheckIfLVMBased(chdisk):
1940
        return True
1941
  return disk.dev_type == constants.LD_LV
1942

    
1943

    
1944
class LUSetClusterParams(LogicalUnit):
1945
  """Change the parameters of the cluster.
1946

1947
  """
1948
  HPATH = "cluster-modify"
1949
  HTYPE = constants.HTYPE_CLUSTER
1950
  _OP_REQP = []
1951
  REQ_BGL = False
1952

    
1953
  def CheckArguments(self):
1954
    """Check parameters
1955

1956
    """
1957
    if not hasattr(self.op, "candidate_pool_size"):
1958
      self.op.candidate_pool_size = None
1959
    if self.op.candidate_pool_size is not None:
1960
      try:
1961
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1962
      except (ValueError, TypeError), err:
1963
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1964
                                   str(err), errors.ECODE_INVAL)
1965
      if self.op.candidate_pool_size < 1:
1966
        raise errors.OpPrereqError("At least one master candidate needed",
1967
                                   errors.ECODE_INVAL)
1968

    
1969
  def ExpandNames(self):
1970
    # FIXME: in the future maybe other cluster params won't require checking on
1971
    # all nodes to be modified.
1972
    self.needed_locks = {
1973
      locking.LEVEL_NODE: locking.ALL_SET,
1974
    }
1975
    self.share_locks[locking.LEVEL_NODE] = 1
1976

    
1977
  def BuildHooksEnv(self):
1978
    """Build hooks env.
1979

1980
    """
1981
    env = {
1982
      "OP_TARGET": self.cfg.GetClusterName(),
1983
      "NEW_VG_NAME": self.op.vg_name,
1984
      }
1985
    mn = self.cfg.GetMasterNode()
1986
    return env, [mn], [mn]
1987

    
1988
  def CheckPrereq(self):
1989
    """Check prerequisites.
1990

1991
    This checks whether the given params don't conflict and
1992
    if the given volume group is valid.
1993

1994
    """
1995
    if self.op.vg_name is not None and not self.op.vg_name:
1996
      instances = self.cfg.GetAllInstancesInfo().values()
1997
      for inst in instances:
1998
        for disk in inst.disks:
1999
          if _RecursiveCheckIfLVMBased(disk):
2000
            raise errors.OpPrereqError("Cannot disable lvm storage while"
2001
                                       " lvm-based instances exist",
2002
                                       errors.ECODE_INVAL)
2003

    
2004
    node_list = self.acquired_locks[locking.LEVEL_NODE]
2005

    
2006
    # if vg_name not None, checks given volume group on all nodes
2007
    if self.op.vg_name:
2008
      vglist = self.rpc.call_vg_list(node_list)
2009
      for node in node_list:
2010
        msg = vglist[node].fail_msg
2011
        if msg:
2012
          # ignoring down node
2013
          self.LogWarning("Error while gathering data on node %s"
2014
                          " (ignoring node): %s", node, msg)
2015
          continue
2016
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2017
                                              self.op.vg_name,
2018
                                              constants.MIN_VG_SIZE)
2019
        if vgstatus:
2020
          raise errors.OpPrereqError("Error on node '%s': %s" %
2021
                                     (node, vgstatus), errors.ECODE_ENVIRON)
2022

    
2023
    self.cluster = cluster = self.cfg.GetClusterInfo()
2024
    # validate params changes
2025
    if self.op.beparams:
2026
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2027
      self.new_beparams = objects.FillDict(
2028
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2029

    
2030
    if self.op.nicparams:
2031
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2032
      self.new_nicparams = objects.FillDict(
2033
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2034
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
2035
      nic_errors = []
2036

    
2037
      # check all instances for consistency
2038
      for instance in self.cfg.GetAllInstancesInfo().values():
2039
        for nic_idx, nic in enumerate(instance.nics):
2040
          params_copy = copy.deepcopy(nic.nicparams)
2041
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
2042

    
2043
          # check parameter syntax
2044
          try:
2045
            objects.NIC.CheckParameterSyntax(params_filled)
2046
          except errors.ConfigurationError, err:
2047
            nic_errors.append("Instance %s, nic/%d: %s" %
2048
                              (instance.name, nic_idx, err))
2049

    
2050
          # if we're moving instances to routed, check that they have an ip
2051
          target_mode = params_filled[constants.NIC_MODE]
2052
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2053
            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2054
                              (instance.name, nic_idx))
2055
      if nic_errors:
2056
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2057
                                   "\n".join(nic_errors))
2058

    
2059
    # hypervisor list/parameters
2060
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2061
    if self.op.hvparams:
2062
      if not isinstance(self.op.hvparams, dict):
2063
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2064
                                   errors.ECODE_INVAL)
2065
      for hv_name, hv_dict in self.op.hvparams.items():
2066
        if hv_name not in self.new_hvparams:
2067
          self.new_hvparams[hv_name] = hv_dict
2068
        else:
2069
          self.new_hvparams[hv_name].update(hv_dict)
2070

    
2071
    if self.op.enabled_hypervisors is not None:
2072
      self.hv_list = self.op.enabled_hypervisors
2073
      if not self.hv_list:
2074
        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2075
                                   " least one member",
2076
                                   errors.ECODE_INVAL)
2077
      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2078
      if invalid_hvs:
2079
        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2080
                                   " entries: %s" %
2081
                                   utils.CommaJoin(invalid_hvs),
2082
                                   errors.ECODE_INVAL)
2083
    else:
2084
      self.hv_list = cluster.enabled_hypervisors
2085

    
2086
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
2087
      # either the enabled list has changed, or the parameters have, validate
2088
      for hv_name, hv_params in self.new_hvparams.items():
2089
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
2090
            (self.op.enabled_hypervisors and
2091
             hv_name in self.op.enabled_hypervisors)):
2092
          # either this is a new hypervisor, or its parameters have changed
2093
          hv_class = hypervisor.GetHypervisor(hv_name)
2094
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2095
          hv_class.CheckParameterSyntax(hv_params)
2096
          _CheckHVParams(self, node_list, hv_name, hv_params)
2097

    
2098
  def Exec(self, feedback_fn):
2099
    """Change the parameters of the cluster.
2100

2101
    """
2102
    if self.op.vg_name is not None:
2103
      new_volume = self.op.vg_name
2104
      if not new_volume:
2105
        new_volume = None
2106
      if new_volume != self.cfg.GetVGName():
2107
        self.cfg.SetVGName(new_volume)
2108
      else:
2109
        feedback_fn("Cluster LVM configuration already in desired"
2110
                    " state, not changing")
2111
    if self.op.hvparams:
2112
      self.cluster.hvparams = self.new_hvparams
2113
    if self.op.enabled_hypervisors is not None:
2114
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2115
    if self.op.beparams:
2116
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2117
    if self.op.nicparams:
2118
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2119

    
2120
    if self.op.candidate_pool_size is not None:
2121
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
2122
      # we need to update the pool size here, otherwise the save will fail
2123
      _AdjustCandidatePool(self, [])
2124

    
2125
    self.cfg.Update(self.cluster, feedback_fn)
2126

    
2127

    
2128
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2129
  """Distribute additional files which are part of the cluster configuration.
2130

2131
  ConfigWriter takes care of distributing the config and ssconf files, but
2132
  there are more files which should be distributed to all nodes. This function
2133
  makes sure those are copied.
2134

2135
  @param lu: calling logical unit
2136
  @param additional_nodes: list of nodes not in the config to distribute to
2137

2138
  """
2139
  # 1. Gather target nodes
2140
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2141
  dist_nodes = lu.cfg.GetNodeList()
2142
  if additional_nodes is not None:
2143
    dist_nodes.extend(additional_nodes)
2144
  if myself.name in dist_nodes:
2145
    dist_nodes.remove(myself.name)
2146

    
2147
  # 2. Gather files to distribute
2148
  dist_files = set([constants.ETC_HOSTS,
2149
                    constants.SSH_KNOWN_HOSTS_FILE,
2150
                    constants.RAPI_CERT_FILE,
2151
                    constants.RAPI_USERS_FILE,
2152
                    constants.HMAC_CLUSTER_KEY,
2153
                   ])
2154

    
2155
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2156
  for hv_name in enabled_hypervisors:
2157
    hv_class = hypervisor.GetHypervisor(hv_name)
2158
    dist_files.update(hv_class.GetAncillaryFiles())
2159

    
2160
  # 3. Perform the files upload
2161
  for fname in dist_files:
2162
    if os.path.exists(fname):
2163
      result = lu.rpc.call_upload_file(dist_nodes, fname)
2164
      for to_node, to_result in result.items():
2165
        msg = to_result.fail_msg
2166
        if msg:
2167
          msg = ("Copy of file %s to node %s failed: %s" %
2168
                 (fname, to_node, msg))
2169
          lu.proc.LogWarning(msg)
2170

    
2171

    
2172
class LURedistributeConfig(NoHooksLU):
2173
  """Force the redistribution of cluster configuration.
2174

2175
  This is a very simple LU.
2176

2177
  """
2178
  _OP_REQP = []
2179
  REQ_BGL = False
2180

    
2181
  def ExpandNames(self):
2182
    self.needed_locks = {
2183
      locking.LEVEL_NODE: locking.ALL_SET,
2184
    }
2185
    self.share_locks[locking.LEVEL_NODE] = 1
2186

    
2187
  def CheckPrereq(self):
2188
    """Check prerequisites.
2189

2190
    """
2191

    
2192
  def Exec(self, feedback_fn):
2193
    """Redistribute the configuration.
2194

2195
    """
2196
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2197
    _RedistributeAncillaryFiles(self)
2198

    
2199

    
2200
def _WaitForSync(lu, instance, oneshot=False):
2201
  """Sleep and poll for an instance's disk to sync.
2202

2203
  """
2204
  if not instance.disks:
2205
    return True
2206

    
2207
  if not oneshot:
2208
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2209

    
2210
  node = instance.primary_node
2211

    
2212
  for dev in instance.disks:
2213
    lu.cfg.SetDiskID(dev, node)
2214

    
2215
  # TODO: Convert to utils.Retry
2216

    
2217
  retries = 0
2218
  degr_retries = 10 # in seconds, as we sleep 1 second each time
2219
  while True:
2220
    max_time = 0
2221
    done = True
2222
    cumul_degraded = False
2223
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2224
    msg = rstats.fail_msg
2225
    if msg:
2226
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2227
      retries += 1
2228
      if retries >= 10:
2229
        raise errors.RemoteError("Can't contact node %s for mirror data,"
2230
                                 " aborting." % node)
2231
      time.sleep(6)
2232
      continue
2233
    rstats = rstats.payload
2234
    retries = 0
2235
    for i, mstat in enumerate(rstats):
2236
      if mstat is None:
2237
        lu.LogWarning("Can't compute data for node %s/%s",
2238
                           node, instance.disks[i].iv_name)
2239
        continue
2240

    
2241
      cumul_degraded = (cumul_degraded or
2242
                        (mstat.is_degraded and mstat.sync_percent is None))
2243
      if mstat.sync_percent is not None:
2244
        done = False
2245
        if mstat.estimated_time is not None:
2246
          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2247
          max_time = mstat.estimated_time
2248
        else:
2249
          rem_time = "no time estimate"
2250
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2251
                        (instance.disks[i].iv_name, mstat.sync_percent,
2252
                         rem_time))
2253

    
2254
    # if we're done but degraded, let's do a few small retries, to
2255
    # make sure we see a stable and not transient situation; therefore
2256
    # we force restart of the loop
2257
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
2258
      logging.info("Degraded disks found, %d retries left", degr_retries)
2259
      degr_retries -= 1
2260
      time.sleep(1)
2261
      continue
2262

    
2263
    if done or oneshot:
2264
      break
2265

    
2266
    time.sleep(min(60, max_time))
2267

    
2268
  if done:
2269
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2270
  return not cumul_degraded
2271

    
2272

    
2273
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2274
  """Check that mirrors are not degraded.
2275

2276
  The ldisk parameter, if True, will change the test from the
2277
  is_degraded attribute (which represents overall non-ok status for
2278
  the device(s)) to the ldisk (representing the local storage status).
2279

2280
  """
2281
  lu.cfg.SetDiskID(dev, node)
2282

    
2283
  result = True
2284

    
2285
  if on_primary or dev.AssembleOnSecondary():
2286
    rstats = lu.rpc.call_blockdev_find(node, dev)
2287
    msg = rstats.fail_msg
2288
    if msg:
2289
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2290
      result = False
2291
    elif not rstats.payload:
2292
      lu.LogWarning("Can't find disk on node %s", node)
2293
      result = False
2294
    else:
2295
      if ldisk:
2296
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2297
      else:
2298
        result = result and not rstats.payload.is_degraded
2299

    
2300
  if dev.children:
2301
    for child in dev.children:
2302
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2303

    
2304
  return result
2305

    
2306

    
2307
class LUDiagnoseOS(NoHooksLU):
2308
  """Logical unit for OS diagnose/query.
2309

2310
  """
2311
  _OP_REQP = ["output_fields", "names"]
2312
  REQ_BGL = False
2313
  _FIELDS_STATIC = utils.FieldSet()
2314
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2315
  # Fields that need calculation of global os validity
2316
  _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2317

    
2318
  def ExpandNames(self):
2319
    if self.op.names:
2320
      raise errors.OpPrereqError("Selective OS query not supported",
2321
                                 errors.ECODE_INVAL)
2322

    
2323
    _CheckOutputFields(static=self._FIELDS_STATIC,
2324
                       dynamic=self._FIELDS_DYNAMIC,
2325
                       selected=self.op.output_fields)
2326

    
2327
    # Lock all nodes, in shared mode
2328
    # Temporary removal of locks, should be reverted later
2329
    # TODO: reintroduce locks when they are lighter-weight
2330
    self.needed_locks = {}
2331
    #self.share_locks[locking.LEVEL_NODE] = 1
2332
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2333

    
2334
  def CheckPrereq(self):
2335
    """Check prerequisites.
2336

2337
    """
2338

    
2339
  @staticmethod
2340
  def _DiagnoseByOS(rlist):
2341
    """Remaps a per-node return list into an a per-os per-node dictionary
2342

2343
    @param rlist: a map with node names as keys and OS objects as values
2344

2345
    @rtype: dict
2346
    @return: a dictionary with osnames as keys and as value another map, with
2347
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
2348

2349
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2350
                                     (/srv/..., False, "invalid api")],
2351
                           "node2": [(/srv/..., True, "")]}
2352
          }
2353

2354
    """
2355
    all_os = {}
2356
    # we build here the list of nodes that didn't fail the RPC (at RPC
2357
    # level), so that nodes with a non-responding node daemon don't
2358
    # make all OSes invalid
2359
    good_nodes = [node_name for node_name in rlist
2360
                  if not rlist[node_name].fail_msg]
2361
    for node_name, nr in rlist.items():
2362
      if nr.fail_msg or not nr.payload:
2363
        continue
2364
      for name, path, status, diagnose, variants in nr.payload:
2365
        if name not in all_os:
2366
          # build a list of nodes for this os containing empty lists
2367
          # for each node in node_list
2368
          all_os[name] = {}
2369
          for nname in good_nodes:
2370
            all_os[name][nname] = []
2371
        all_os[name][node_name].append((path, status, diagnose, variants))
2372
    return all_os
2373

    
2374
  def Exec(self, feedback_fn):
2375
    """Compute the list of OSes.
2376

2377
    """
2378
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2379
    node_data = self.rpc.call_os_diagnose(valid_nodes)
2380
    pol = self._DiagnoseByOS(node_data)
2381
    output = []
2382
    calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2383
    calc_variants = "variants" in self.op.output_fields
2384

    
2385
    for os_name, os_data in pol.items():
2386
      row = []
2387
      if calc_valid:
2388
        valid = True
2389
        variants = None
2390
        for osl in os_data.values():
2391
          valid = valid and osl and osl[0][1]
2392
          if not valid:
2393
            variants = None
2394
            break
2395
          if calc_variants:
2396
            node_variants = osl[0][3]
2397
            if variants is None:
2398
              variants = node_variants
2399
            else:
2400
              variants = [v for v in variants if v in node_variants]
2401

    
2402
      for field in self.op.output_fields:
2403
        if field == "name":
2404
          val = os_name
2405
        elif field == "valid":
2406
          val = valid
2407
        elif field == "node_status":
2408
          # this is just a copy of the dict
2409
          val = {}
2410
          for node_name, nos_list in os_data.items():
2411
            val[node_name] = nos_list
2412
        elif field == "variants":
2413
          val =  variants
2414
        else:
2415
          raise errors.ParameterError(field)
2416
        row.append(val)
2417
      output.append(row)
2418

    
2419
    return output
2420

    
2421

    
2422
class LURemoveNode(LogicalUnit):
2423
  """Logical unit for removing a node.
2424

2425
  """
2426
  HPATH = "node-remove"
2427
  HTYPE = constants.HTYPE_NODE
2428
  _OP_REQP = ["node_name"]
2429

    
2430
  def BuildHooksEnv(self):
2431
    """Build hooks env.
2432

2433
    This doesn't run on the target node in the pre phase as a failed
2434
    node would then be impossible to remove.
2435

2436
    """
2437
    env = {
2438
      "OP_TARGET": self.op.node_name,
2439
      "NODE_NAME": self.op.node_name,
2440
      }
2441
    all_nodes = self.cfg.GetNodeList()
2442
    try:
2443
      all_nodes.remove(self.op.node_name)
2444
    except ValueError:
2445
      logging.warning("Node %s which is about to be removed not found"
2446
                      " in the all nodes list", self.op.node_name)
2447
    return env, all_nodes, all_nodes
2448

    
2449
  def CheckPrereq(self):
2450
    """Check prerequisites.
2451

2452
    This checks:
2453
     - the node exists in the configuration
2454
     - it does not have primary or secondary instances
2455
     - it's not the master
2456

2457
    Any errors are signaled by raising errors.OpPrereqError.
2458

2459
    """
2460
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2461
    node = self.cfg.GetNodeInfo(self.op.node_name)
2462
    assert node is not None
2463

    
2464
    instance_list = self.cfg.GetInstanceList()
2465

    
2466
    masternode = self.cfg.GetMasterNode()
2467
    if node.name == masternode:
2468
      raise errors.OpPrereqError("Node is the master node,"
2469
                                 " you need to failover first.",
2470
                                 errors.ECODE_INVAL)
2471

    
2472
    for instance_name in instance_list:
2473
      instance = self.cfg.GetInstanceInfo(instance_name)
2474
      if node.name in instance.all_nodes:
2475
        raise errors.OpPrereqError("Instance %s is still running on the node,"
2476
                                   " please remove first." % instance_name,
2477
                                   errors.ECODE_INVAL)
2478
    self.op.node_name = node.name
2479
    self.node = node
2480

    
2481
  def Exec(self, feedback_fn):
2482
    """Removes the node from the cluster.
2483

2484
    """
2485
    node = self.node
2486
    logging.info("Stopping the node daemon and removing configs from node %s",
2487
                 node.name)
2488

    
2489
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2490

    
2491
    # Promote nodes to master candidate as needed
2492
    _AdjustCandidatePool(self, exceptions=[node.name])
2493
    self.context.RemoveNode(node.name)
2494

    
2495
    # Run post hooks on the node before it's removed
2496
    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2497
    try:
2498
      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2499
    except:
2500
      # pylint: disable-msg=W0702
2501
      self.LogWarning("Errors occurred running hooks on %s" % node.name)
2502

    
2503
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2504
    msg = result.fail_msg
2505
    if msg:
2506
      self.LogWarning("Errors encountered on the remote node while leaving"
2507
                      " the cluster: %s", msg)
2508

    
2509

    
2510
class LUQueryNodes(NoHooksLU):
2511
  """Logical unit for querying nodes.
2512

2513
  """
2514
  # pylint: disable-msg=W0142
2515
  _OP_REQP = ["output_fields", "names", "use_locking"]
2516
  REQ_BGL = False
2517

    
2518
  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2519
                    "master_candidate", "offline", "drained"]
2520

    
2521
  _FIELDS_DYNAMIC = utils.FieldSet(
2522
    "dtotal", "dfree",
2523
    "mtotal", "mnode", "mfree",
2524
    "bootid",
2525
    "ctotal", "cnodes", "csockets",
2526
    )
2527

    
2528
  _FIELDS_STATIC = utils.FieldSet(*[
2529
    "pinst_cnt", "sinst_cnt",
2530
    "pinst_list", "sinst_list",
2531
    "pip", "sip", "tags",
2532
    "master",
2533
    "role"] + _SIMPLE_FIELDS
2534
    )
2535

    
2536
  def ExpandNames(self):
2537
    _CheckOutputFields(static=self._FIELDS_STATIC,
2538
                       dynamic=self._FIELDS_DYNAMIC,
2539
                       selected=self.op.output_fields)
2540

    
2541
    self.needed_locks = {}
2542
    self.share_locks[locking.LEVEL_NODE] = 1
2543

    
2544
    if self.op.names:
2545
      self.wanted = _GetWantedNodes(self, self.op.names)
2546
    else:
2547
      self.wanted = locking.ALL_SET
2548

    
2549
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2550
    self.do_locking = self.do_node_query and self.op.use_locking
2551
    if self.do_locking:
2552
      # if we don't request only static fields, we need to lock the nodes
2553
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
2554

    
2555
  def CheckPrereq(self):
2556
    """Check prerequisites.
2557

2558
    """
2559
    # The validation of the node list is done in the _GetWantedNodes,
2560
    # if non empty, and if empty, there's no validation to do
2561
    pass
2562

    
2563
  def Exec(self, feedback_fn):
2564
    """Computes the list of nodes and their attributes.
2565

2566
    """
2567
    all_info = self.cfg.GetAllNodesInfo()
2568
    if self.do_locking:
2569
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
2570
    elif self.wanted != locking.ALL_SET:
2571
      nodenames = self.wanted
2572
      missing = set(nodenames).difference(all_info.keys())
2573
      if missing:
2574
        raise errors.OpExecError(
2575
          "Some nodes were removed before retrieving their data: %s" % missing)
2576
    else:
2577
      nodenames = all_info.keys()
2578

    
2579
    nodenames = utils.NiceSort(nodenames)
2580
    nodelist = [all_info[name] for name in nodenames]
2581

    
2582
    # begin data gathering
2583

    
2584
    if self.do_node_query:
2585
      live_data = {}
2586
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2587
                                          self.cfg.GetHypervisorType())
2588
      for name in nodenames:
2589
        nodeinfo = node_data[name]
2590
        if not nodeinfo.fail_msg and nodeinfo.payload:
2591
          nodeinfo = nodeinfo.payload
2592
          fn = utils.TryConvert
2593
          live_data[name] = {
2594
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2595
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2596
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2597
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2598
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2599
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2600
            "bootid": nodeinfo.get('bootid', None),
2601
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2602
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2603
            }
2604
        else:
2605
          live_data[name] = {}
2606
    else:
2607
      live_data = dict.fromkeys(nodenames, {})
2608

    
2609
    node_to_primary = dict([(name, set()) for name in nodenames])
2610
    node_to_secondary = dict([(name, set()) for name in nodenames])
2611

    
2612
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2613
                             "sinst_cnt", "sinst_list"))
2614
    if inst_fields & frozenset(self.op.output_fields):
2615
      inst_data = self.cfg.GetAllInstancesInfo()
2616

    
2617
      for inst in inst_data.values():
2618
        if inst.primary_node in node_to_primary:
2619
          node_to_primary[inst.primary_node].add(inst.name)
2620
        for secnode in inst.secondary_nodes:
2621
          if secnode in node_to_secondary:
2622
            node_to_secondary[secnode].add(inst.name)
2623

    
2624
    master_node = self.cfg.GetMasterNode()
2625

    
2626
    # end data gathering
2627

    
2628
    output = []
2629
    for node in nodelist:
2630
      node_output = []
2631
      for field in self.op.output_fields:
2632
        if field in self._SIMPLE_FIELDS:
2633
          val = getattr(node, field)
2634
        elif field == "pinst_list":
2635
          val = list(node_to_primary[node.name])
2636
        elif field == "sinst_list":
2637
          val = list(node_to_secondary[node.name])
2638
        elif field == "pinst_cnt":
2639
          val = len(node_to_primary[node.name])
2640
        elif field == "sinst_cnt":
2641
          val = len(node_to_secondary[node.name])
2642
        elif field == "pip":
2643
          val = node.primary_ip
2644
        elif field == "sip":
2645
          val = node.secondary_ip
2646
        elif field == "tags":
2647
          val = list(node.GetTags())
2648
        elif field == "master":
2649
          val = node.name == master_node
2650
        elif self._FIELDS_DYNAMIC.Matches(field):
2651
          val = live_data[node.name].get(field, None)
2652
        elif field == "role":
2653
          if node.name == master_node:
2654
            val = "M"
2655
          elif node.master_candidate:
2656
            val = "C"
2657
          elif node.drained:
2658
            val = "D"
2659
          elif node.offline:
2660
            val = "O"
2661
          else:
2662
            val = "R"
2663
        else:
2664
          raise errors.ParameterError(field)
2665
        node_output.append(val)
2666
      output.append(node_output)
2667

    
2668
    return output
2669

    
2670

    
2671
class LUQueryNodeVolumes(NoHooksLU):
2672
  """Logical unit for getting volumes on node(s).
2673

2674
  """
2675
  _OP_REQP = ["nodes", "output_fields"]
2676
  REQ_BGL = False
2677
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2678
  _FIELDS_STATIC = utils.FieldSet("node")
2679

    
2680
  def ExpandNames(self):
2681
    _CheckOutputFields(static=self._FIELDS_STATIC,
2682
                       dynamic=self._FIELDS_DYNAMIC,
2683
                       selected=self.op.output_fields)
2684

    
2685
    self.needed_locks = {}
2686
    self.share_locks[locking.LEVEL_NODE] = 1
2687
    if not self.op.nodes:
2688
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2689
    else:
2690
      self.needed_locks[locking.LEVEL_NODE] = \
2691
        _GetWantedNodes(self, self.op.nodes)
2692

    
2693
  def CheckPrereq(self):
2694
    """Check prerequisites.
2695

2696
    This checks that the fields required are valid output fields.
2697

2698
    """
2699
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2700

    
2701
  def Exec(self, feedback_fn):
2702
    """Computes the list of nodes and their attributes.
2703

2704
    """
2705
    nodenames = self.nodes
2706
    volumes = self.rpc.call_node_volumes(nodenames)
2707

    
2708
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2709
             in self.cfg.GetInstanceList()]
2710

    
2711
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2712

    
2713
    output = []
2714
    for node in nodenames:
2715
      nresult = volumes[node]
2716
      if nresult.offline:
2717
        continue
2718
      msg = nresult.fail_msg
2719
      if msg:
2720
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2721
        continue
2722

    
2723
      node_vols = nresult.payload[:]
2724
      node_vols.sort(key=lambda vol: vol['dev'])
2725

    
2726
      for vol in node_vols:
2727
        node_output = []
2728
        for field in self.op.output_fields:
2729
          if field == "node":
2730
            val = node
2731
          elif field == "phys":
2732
            val = vol['dev']
2733
          elif field == "vg":
2734
            val = vol['vg']
2735
          elif field == "name":
2736
            val = vol['name']
2737
          elif field == "size":
2738
            val = int(float(vol['size']))
2739
          elif field == "instance":
2740
            for inst in ilist:
2741
              if node not in lv_by_node[inst]:
2742
                continue
2743
              if vol['name'] in lv_by_node[inst][node]:
2744
                val = inst.name
2745
                break
2746
            else:
2747
              val = '-'
2748
          else:
2749
            raise errors.ParameterError(field)
2750
          node_output.append(str(val))
2751

    
2752
        output.append(node_output)
2753

    
2754
    return output
2755

    
2756

    
2757
class LUQueryNodeStorage(NoHooksLU):
2758
  """Logical unit for getting information on storage units on node(s).
2759

2760
  """
2761
  _OP_REQP = ["nodes", "storage_type", "output_fields"]
2762
  REQ_BGL = False
2763
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2764

    
2765
  def ExpandNames(self):
2766
    storage_type = self.op.storage_type
2767

    
2768
    if storage_type not in constants.VALID_STORAGE_TYPES:
2769
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2770
                                 errors.ECODE_INVAL)
2771

    
2772
    _CheckOutputFields(static=self._FIELDS_STATIC,
2773
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2774
                       selected=self.op.output_fields)
2775

    
2776
    self.needed_locks = {}
2777
    self.share_locks[locking.LEVEL_NODE] = 1
2778

    
2779
    if self.op.nodes:
2780
      self.needed_locks[locking.LEVEL_NODE] = \
2781
        _GetWantedNodes(self, self.op.nodes)
2782
    else:
2783
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2784

    
2785
  def CheckPrereq(self):
2786
    """Check prerequisites.
2787

2788
    This checks that the fields required are valid output fields.
2789

2790
    """
2791
    self.op.name = getattr(self.op, "name", None)
2792

    
2793
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2794

    
2795
  def Exec(self, feedback_fn):
2796
    """Computes the list of nodes and their attributes.
2797

2798
    """
2799
    # Always get name to sort by
2800
    if constants.SF_NAME in self.op.output_fields:
2801
      fields = self.op.output_fields[:]
2802
    else:
2803
      fields = [constants.SF_NAME] + self.op.output_fields
2804

    
2805
    # Never ask for node or type as it's only known to the LU
2806
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
2807
      while extra in fields:
2808
        fields.remove(extra)
2809

    
2810
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2811
    name_idx = field_idx[constants.SF_NAME]
2812

    
2813
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2814
    data = self.rpc.call_storage_list(self.nodes,
2815
                                      self.op.storage_type, st_args,
2816
                                      self.op.name, fields)
2817

    
2818
    result = []
2819

    
2820
    for node in utils.NiceSort(self.nodes):
2821
      nresult = data[node]
2822
      if nresult.offline:
2823
        continue
2824

    
2825
      msg = nresult.fail_msg
2826
      if msg:
2827
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2828
        continue
2829

    
2830
      rows = dict([(row[name_idx], row) for row in nresult.payload])
2831

    
2832
      for name in utils.NiceSort(rows.keys()):
2833
        row = rows[name]
2834

    
2835
        out = []
2836

    
2837
        for field in self.op.output_fields:
2838
          if field == constants.SF_NODE:
2839
            val = node
2840
          elif field == constants.SF_TYPE:
2841
            val = self.op.storage_type
2842
          elif field in field_idx:
2843
            val = row[field_idx[field]]
2844
          else:
2845
            raise errors.ParameterError(field)
2846

    
2847
          out.append(val)
2848

    
2849
        result.append(out)
2850

    
2851
    return result
2852

    
2853

    
2854
class LUModifyNodeStorage(NoHooksLU):
2855
  """Logical unit for modifying a storage volume on a node.
2856

2857
  """
2858
  _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2859
  REQ_BGL = False
2860

    
2861
  def CheckArguments(self):
2862
    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
2863

    
2864
    storage_type = self.op.storage_type
2865
    if storage_type not in constants.VALID_STORAGE_TYPES:
2866
      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2867
                                 errors.ECODE_INVAL)
2868

    
2869
  def ExpandNames(self):
2870
    self.needed_locks = {
2871
      locking.LEVEL_NODE: self.op.node_name,
2872
      }
2873

    
2874
  def CheckPrereq(self):
2875
    """Check prerequisites.
2876

2877
    """
2878
    storage_type = self.op.storage_type
2879

    
2880
    try:
2881
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2882
    except KeyError:
2883
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2884
                                 " modified" % storage_type,
2885
                                 errors.ECODE_INVAL)
2886

    
2887
    diff = set(self.op.changes.keys()) - modifiable
2888
    if diff:
2889
      raise errors.OpPrereqError("The following fields can not be modified for"
2890
                                 " storage units of type '%s': %r" %
2891
                                 (storage_type, list(diff)),
2892
                                 errors.ECODE_INVAL)
2893

    
2894
  def Exec(self, feedback_fn):
2895
    """Computes the list of nodes and their attributes.
2896

2897
    """
2898
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2899
    result = self.rpc.call_storage_modify(self.op.node_name,
2900
                                          self.op.storage_type, st_args,
2901
                                          self.op.name, self.op.changes)
2902
    result.Raise("Failed to modify storage unit '%s' on %s" %
2903
                 (self.op.name, self.op.node_name))
2904

    
2905

    
2906
class LUAddNode(LogicalUnit):
2907
  """Logical unit for adding node to the cluster.
2908

2909
  """
2910
  HPATH = "node-add"
2911
  HTYPE = constants.HTYPE_NODE
2912
  _OP_REQP = ["node_name"]
2913

    
2914
  def BuildHooksEnv(self):
2915
    """Build hooks env.
2916

2917
    This will run on all nodes before, and on all nodes + the new node after.
2918

2919
    """
2920
    env = {
2921
      "OP_TARGET": self.op.node_name,
2922
      "NODE_NAME": self.op.node_name,
2923
      "NODE_PIP": self.op.primary_ip,
2924
      "NODE_SIP": self.op.secondary_ip,
2925
      }
2926
    nodes_0 = self.cfg.GetNodeList()
2927
    nodes_1 = nodes_0 + [self.op.node_name, ]
2928
    return env, nodes_0, nodes_1
2929

    
2930
  def CheckPrereq(self):
2931
    """Check prerequisites.
2932

2933
    This checks:
2934
     - the new node is not already in the config
2935
     - it is resolvable
2936
     - its parameters (single/dual homed) matches the cluster
2937

2938
    Any errors are signaled by raising errors.OpPrereqError.
2939

2940
    """
2941
    node_name = self.op.node_name
2942
    cfg = self.cfg
2943

    
2944
    dns_data = utils.GetHostInfo(node_name)
2945

    
2946
    node = dns_data.name
2947
    primary_ip = self.op.primary_ip = dns_data.ip
2948
    secondary_ip = getattr(self.op, "secondary_ip", None)
2949
    if secondary_ip is None:
2950
      secondary_ip = primary_ip
2951
    if not utils.IsValidIP(secondary_ip):
2952
      raise errors.OpPrereqError("Invalid secondary IP given",
2953
                                 errors.ECODE_INVAL)
2954
    self.op.secondary_ip = secondary_ip
2955

    
2956
    node_list = cfg.GetNodeList()
2957
    if not self.op.readd and node in node_list:
2958
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2959
                                 node, errors.ECODE_EXISTS)
2960
    elif self.op.readd and node not in node_list:
2961
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2962
                                 errors.ECODE_NOENT)
2963

    
2964
    for existing_node_name in node_list:
2965
      existing_node = cfg.GetNodeInfo(existing_node_name)
2966

    
2967
      if self.op.readd and node == existing_node_name:
2968
        if (existing_node.primary_ip != primary_ip or
2969
            existing_node.secondary_ip != secondary_ip):
2970
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2971
                                     " address configuration as before",
2972
                                     errors.ECODE_INVAL)
2973
        continue
2974

    
2975
      if (existing_node.primary_ip == primary_ip or
2976
          existing_node.secondary_ip == primary_ip or
2977
          existing_node.primary_ip == secondary_ip or
2978
          existing_node.secondary_ip == secondary_ip):
2979
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2980
                                   " existing node %s" % existing_node.name,
2981
                                   errors.ECODE_NOTUNIQUE)
2982

    
2983
    # check that the type of the node (single versus dual homed) is the
2984
    # same as for the master
2985
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2986
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2987
    newbie_singlehomed = secondary_ip == primary_ip
2988
    if master_singlehomed != newbie_singlehomed:
2989
      if master_singlehomed:
2990
        raise errors.OpPrereqError("The master has no private ip but the"
2991
                                   " new node has one",
2992
                                   errors.ECODE_INVAL)
2993
      else:
2994
        raise errors.OpPrereqError("The master has a private ip but the"
2995
                                   " new node doesn't have one",
2996
                                   errors.ECODE_INVAL)
2997

    
2998
    # checks reachability
2999
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
3000
      raise errors.OpPrereqError("Node not reachable by ping",
3001
                                 errors.ECODE_ENVIRON)
3002

    
3003
    if not newbie_singlehomed:
3004
      # check reachability from my secondary ip to newbie's secondary ip
3005
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3006
                           source=myself.secondary_ip):
3007
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3008
                                   " based ping to noded port",
3009
                                   errors.ECODE_ENVIRON)
3010

    
3011
    if self.op.readd:
3012
      exceptions = [node]
3013
    else:
3014
      exceptions = []
3015

    
3016
    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3017

    
3018
    if self.op.readd:
3019
      self.new_node = self.cfg.GetNodeInfo(node)
3020
      assert self.new_node is not None, "Can't retrieve locked node %s" % node
3021
    else:
3022
      self.new_node = objects.Node(name=node,
3023
                                   primary_ip=primary_ip,
3024
                                   secondary_ip=secondary_ip,
3025
                                   master_candidate=self.master_candidate,
3026
                                   offline=False, drained=False)
3027

    
3028
  def Exec(self, feedback_fn):
3029
    """Adds the new node to the cluster.
3030

3031
    """
3032
    new_node = self.new_node
3033
    node = new_node.name
3034

    
3035
    # for re-adds, reset the offline/drained/master-candidate flags;
3036
    # we need to reset here, otherwise offline would prevent RPC calls
3037
    # later in the procedure; this also means that if the re-add
3038
    # fails, we are left with a non-offlined, broken node
3039
    if self.op.readd:
3040
      new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3041
      self.LogInfo("Readding a node, the offline/drained flags were reset")
3042
      # if we demote the node, we do cleanup later in the procedure
3043
      new_node.master_candidate = self.master_candidate
3044

    
3045
    # notify the user about any possible mc promotion
3046
    if new_node.master_candidate:
3047
      self.LogInfo("Node will be a master candidate")
3048

    
3049
    # check connectivity
3050
    result = self.rpc.call_version([node])[node]
3051
    result.Raise("Can't get version information from node %s" % node)
3052
    if constants.PROTOCOL_VERSION == result.payload:
3053
      logging.info("Communication to node %s fine, sw version %s match",
3054
                   node, result.payload)
3055
    else:
3056
      raise errors.OpExecError("Version mismatch master version %s,"
3057
                               " node version %s" %
3058
                               (constants.PROTOCOL_VERSION, result.payload))
3059

    
3060
    # setup ssh on node
3061
    if self.cfg.GetClusterInfo().modify_ssh_setup:
3062
      logging.info("Copy ssh key to node %s", node)
3063
      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3064
      keyarray = []
3065
      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3066
                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3067
                  priv_key, pub_key]
3068

    
3069
      for i in keyfiles:
3070
        keyarray.append(utils.ReadFile(i))
3071

    
3072
      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3073
                                      keyarray[2], keyarray[3], keyarray[4],
3074
                                      keyarray[5])
3075
      result.Raise("Cannot transfer ssh keys to the new node")
3076

    
3077
    # Add node to our /etc/hosts, and add key to known_hosts
3078
    if self.cfg.GetClusterInfo().modify_etc_hosts:
3079
      utils.AddHostToEtcHosts(new_node.name)
3080

    
3081
    if new_node.secondary_ip != new_node.primary_ip:
3082
      result = self.rpc.call_node_has_ip_address(new_node.name,
3083
                                                 new_node.secondary_ip)
3084
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3085
                   prereq=True, ecode=errors.ECODE_ENVIRON)
3086
      if not result.payload:
3087
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3088
                                 " you gave (%s). Please fix and re-run this"
3089
                                 " command." % new_node.secondary_ip)
3090

    
3091
    node_verify_list = [self.cfg.GetMasterNode()]
3092
    node_verify_param = {
3093
      constants.NV_NODELIST: [node],
3094
      # TODO: do a node-net-test as well?
3095
    }
3096

    
3097
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3098
                                       self.cfg.GetClusterName())
3099
    for verifier in node_verify_list:
3100
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
3101
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
3102
      if nl_payload:
3103
        for failed in nl_payload:
3104
          feedback_fn("ssh/hostname verification failed"
3105
                      " (checking from %s): %s" %
3106
                      (verifier, nl_payload[failed]))
3107
        raise errors.OpExecError("ssh/hostname verification failed.")
3108

    
3109
    if self.op.readd:
3110
      _RedistributeAncillaryFiles(self)
3111
      self.context.ReaddNode(new_node)
3112
      # make sure we redistribute the config
3113
      self.cfg.Update(new_node, feedback_fn)
3114
      # and make sure the new node will not have old files around
3115
      if not new_node.master_candidate:
3116
        result = self.rpc.call_node_demote_from_mc(new_node.name)
3117
        msg = result.fail_msg
3118
        if msg:
3119
          self.LogWarning("Node failed to demote itself from master"
3120
                          " candidate status: %s" % msg)
3121
    else:
3122
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
3123
      self.context.AddNode(new_node, self.proc.GetECId())
3124

    
3125

    
3126
class LUSetNodeParams(LogicalUnit):
3127
  """Modifies the parameters of a node.
3128

3129
  """
3130
  HPATH = "node-modify"
3131
  HTYPE = constants.HTYPE_NODE
3132
  _OP_REQP = ["node_name"]
3133
  REQ_BGL = False
3134

    
3135
  def CheckArguments(self):
3136
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3137
    _CheckBooleanOpField(self.op, 'master_candidate')
3138
    _CheckBooleanOpField(self.op, 'offline')
3139
    _CheckBooleanOpField(self.op, 'drained')
3140
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3141
    if all_mods.count(None) == 3:
3142
      raise errors.OpPrereqError("Please pass at least one modification",
3143
                                 errors.ECODE_INVAL)
3144
    if all_mods.count(True) > 1:
3145
      raise errors.OpPrereqError("Can't set the node into more than one"
3146
                                 " state at the same time",
3147
                                 errors.ECODE_INVAL)
3148

    
3149
  def ExpandNames(self):
3150
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3151

    
3152
  def BuildHooksEnv(self):
3153
    """Build hooks env.
3154

3155
    This runs on the master node.
3156

3157
    """
3158
    env = {
3159
      "OP_TARGET": self.op.node_name,
3160
      "MASTER_CANDIDATE": str(self.op.master_candidate),
3161
      "OFFLINE": str(self.op.offline),
3162
      "DRAINED": str(self.op.drained),
3163
      }
3164
    nl = [self.cfg.GetMasterNode(),
3165
          self.op.node_name]
3166
    return env, nl, nl
3167

    
3168
  def CheckPrereq(self):
3169
    """Check prerequisites.
3170

3171
    This only checks the instance list against the existing names.
3172

3173
    """
3174
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3175

    
3176
    if (self.op.master_candidate is not None or
3177
        self.op.drained is not None or
3178
        self.op.offline is not None):
3179
      # we can't change the master's node flags
3180
      if self.op.node_name == self.cfg.GetMasterNode():
3181
        raise errors.OpPrereqError("The master role can be changed"
3182
                                   " only via masterfailover",
3183
                                   errors.ECODE_INVAL)
3184

    
3185
    # Boolean value that tells us whether we're offlining or draining the node
3186
    offline_or_drain = self.op.offline == True or self.op.drained == True
3187
    deoffline_or_drain = self.op.offline == False or self.op.drained == False
3188

    
3189
    if (node.master_candidate and
3190
        (self.op.master_candidate == False or offline_or_drain)):
3191
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3192
      mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3193
      if mc_now <= cp_size:
3194
        msg = ("Not enough master candidates (desired"
3195
               " %d, new value will be %d)" % (cp_size, mc_now-1))
3196
        # Only allow forcing the operation if it's an offline/drain operation,
3197
        # and we could not possibly promote more nodes.
3198
        # FIXME: this can still lead to issues if in any way another node which
3199
        # could be promoted appears in the meantime.
3200
        if self.op.force and offline_or_drain and mc_should == mc_max:
3201
          self.LogWarning(msg)
3202
        else:
3203
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3204

    
3205
    if (self.op.master_candidate == True and
3206
        ((node.offline and not self.op.offline == False) or
3207
         (node.drained and not self.op.drained == False))):
3208
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3209
                                 " to master_candidate" % node.name,
3210
                                 errors.ECODE_INVAL)
3211

    
3212
    # If we're being deofflined/drained, we'll MC ourself if needed
3213
    if (deoffline_or_drain and not offline_or_drain and not
3214
        self.op.master_candidate == True and not node.master_candidate):
3215
      self.op.master_candidate = _DecideSelfPromotion(self)
3216
      if self.op.master_candidate:
3217
        self.LogInfo("Autopromoting node to master candidate")
3218

    
3219
    return
3220

    
3221
  def Exec(self, feedback_fn):
3222
    """Modifies a node.
3223

3224
    """
3225
    node = self.node
3226

    
3227
    result = []
3228
    changed_mc = False
3229

    
3230
    if self.op.offline is not None:
3231
      node.offline = self.op.offline
3232
      result.append(("offline", str(self.op.offline)))
3233
      if self.op.offline == True:
3234
        if node.master_candidate:
3235
          node.master_candidate = False
3236
          changed_mc = True
3237
          result.append(("master_candidate", "auto-demotion due to offline"))
3238
        if node.drained:
3239
          node.drained = False
3240
          result.append(("drained", "clear drained status due to offline"))
3241

    
3242
    if self.op.master_candidate is not None:
3243
      node.master_candidate = self.op.master_candidate
3244
      changed_mc = True
3245
      result.append(("master_candidate", str(self.op.master_candidate)))
3246
      if self.op.master_candidate == False:
3247
        rrc = self.rpc.call_node_demote_from_mc(node.name)
3248
        msg = rrc.fail_msg
3249
        if msg:
3250
          self.LogWarning("Node failed to demote itself: %s" % msg)
3251

    
3252
    if self.op.drained is not None:
3253
      node.drained = self.op.drained
3254
      result.append(("drained", str(self.op.drained)))
3255
      if self.op.drained == True:
3256
        if node.master_candidate:
3257
          node.master_candidate = False
3258
          changed_mc = True
3259
          result.append(("master_candidate", "auto-demotion due to drain"))
3260
          rrc = self.rpc.call_node_demote_from_mc(node.name)
3261
          msg = rrc.fail_msg
3262
          if msg:
3263
            self.LogWarning("Node failed to demote itself: %s" % msg)
3264
        if node.offline:
3265
          node.offline = False
3266
          result.append(("offline", "clear offline status due to drain"))
3267

    
3268
    # this will trigger configuration file update, if needed
3269
    self.cfg.Update(node, feedback_fn)
3270
    # this will trigger job queue propagation or cleanup
3271
    if changed_mc:
3272
      self.context.ReaddNode(node)
3273

    
3274
    return result
3275

    
3276

    
3277
class LUPowercycleNode(NoHooksLU):
3278
  """Powercycles a node.
3279

3280
  """
3281
  _OP_REQP = ["node_name", "force"]
3282
  REQ_BGL = False
3283

    
3284
  def CheckArguments(self):
3285
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3286
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3287
      raise errors.OpPrereqError("The node is the master and the force"
3288
                                 " parameter was not set",
3289
                                 errors.ECODE_INVAL)
3290

    
3291
  def ExpandNames(self):
3292
    """Locking for PowercycleNode.
3293

3294
    This is a last-resort option and shouldn't block on other
3295
    jobs. Therefore, we grab no locks.
3296

3297
    """
3298
    self.needed_locks = {}
3299

    
3300
  def CheckPrereq(self):
3301
    """Check prerequisites.
3302

3303
    This LU has no prereqs.
3304

3305
    """
3306
    pass
3307

    
3308
  def Exec(self, feedback_fn):
3309
    """Reboots a node.
3310

3311
    """
3312
    result = self.rpc.call_node_powercycle(self.op.node_name,
3313
                                           self.cfg.GetHypervisorType())
3314
    result.Raise("Failed to schedule the reboot")
3315
    return result.payload
3316

    
3317

    
3318
class LUQueryClusterInfo(NoHooksLU):
3319
  """Query cluster configuration.
3320

3321
  """
3322
  _OP_REQP = []
3323
  REQ_BGL = False
3324

    
3325
  def ExpandNames(self):
3326
    self.needed_locks = {}
3327

    
3328
  def CheckPrereq(self):
3329
    """No prerequsites needed for this LU.
3330

3331
    """
3332
    pass
3333

    
3334
  def Exec(self, feedback_fn):
3335
    """Return cluster config.
3336

3337
    """
3338
    cluster = self.cfg.GetClusterInfo()
3339
    result = {
3340
      "software_version": constants.RELEASE_VERSION,
3341
      "protocol_version": constants.PROTOCOL_VERSION,
3342
      "config_version": constants.CONFIG_VERSION,
3343
      "os_api_version": max(constants.OS_API_VERSIONS),
3344
      "export_version": constants.EXPORT_VERSION,
3345
      "architecture": (platform.architecture()[0], platform.machine()),
3346
      "name": cluster.cluster_name,
3347
      "master": cluster.master_node,
3348
      "default_hypervisor": cluster.enabled_hypervisors[0],
3349
      "enabled_hypervisors": cluster.enabled_hypervisors,
3350
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3351
                        for hypervisor_name in cluster.enabled_hypervisors]),
3352
      "beparams": cluster.beparams,
3353
      "nicparams": cluster.nicparams,
3354
      "candidate_pool_size": cluster.candidate_pool_size,
3355
      "master_netdev": cluster.master_netdev,
3356
      "volume_group_name": cluster.volume_group_name,
3357
      "file_storage_dir": cluster.file_storage_dir,
3358
      "ctime": cluster.ctime,
3359
      "mtime": cluster.mtime,
3360
      "uuid": cluster.uuid,
3361
      "tags": list(cluster.GetTags()),
3362
      }
3363

    
3364
    return result
3365

    
3366

    
3367
class LUQueryConfigValues(NoHooksLU):
3368
  """Return configuration values.
3369

3370
  """
3371
  _OP_REQP = []
3372
  REQ_BGL = False
3373
  _FIELDS_DYNAMIC = utils.FieldSet()
3374
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3375
                                  "watcher_pause")
3376

    
3377
  def ExpandNames(self):
3378
    self.needed_locks = {}
3379

    
3380
    _CheckOutputFields(static=self._FIELDS_STATIC,
3381
                       dynamic=self._FIELDS_DYNAMIC,
3382
                       selected=self.op.output_fields)
3383

    
3384
  def CheckPrereq(self):
3385
    """No prerequisites.
3386

3387
    """
3388
    pass
3389

    
3390
  def Exec(self, feedback_fn):
3391
    """Dump a representation of the cluster config to the standard output.
3392

3393
    """
3394
    values = []
3395
    for field in self.op.output_fields:
3396
      if field == "cluster_name":
3397
        entry = self.cfg.GetClusterName()
3398
      elif field == "master_node":
3399
        entry = self.cfg.GetMasterNode()
3400
      elif field == "drain_flag":
3401
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3402
      elif field == "watcher_pause":
3403
        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3404
      else:
3405
        raise errors.ParameterError(field)
3406
      values.append(entry)
3407
    return values
3408

    
3409

    
3410
class LUActivateInstanceDisks(NoHooksLU):
3411
  """Bring up an instance's disks.
3412

3413
  """
3414
  _OP_REQP = ["instance_name"]
3415
  REQ_BGL = False
3416

    
3417
  def ExpandNames(self):
3418
    self._ExpandAndLockInstance()
3419
    self.needed_locks[locking.LEVEL_NODE] = []
3420
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3421

    
3422
  def DeclareLocks(self, level):
3423
    if level == locking.LEVEL_NODE:
3424
      self._LockInstancesNodes()
3425

    
3426
  def CheckPrereq(self):
3427
    """Check prerequisites.
3428

3429
    This checks that the instance is in the cluster.
3430

3431
    """
3432
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3433
    assert self.instance is not None, \
3434
      "Cannot retrieve locked instance %s" % self.op.instance_name
3435
    _CheckNodeOnline(self, self.instance.primary_node)
3436
    if not hasattr(self.op, "ignore_size"):
3437
      self.op.ignore_size = False
3438

    
3439
  def Exec(self, feedback_fn):
3440
    """Activate the disks.
3441

3442
    """
3443
    disks_ok, disks_info = \
3444
              _AssembleInstanceDisks(self, self.instance,
3445
                                     ignore_size=self.op.ignore_size)
3446
    if not disks_ok:
3447
      raise errors.OpExecError("Cannot activate block devices")
3448

    
3449
    return disks_info
3450

    
3451

    
3452
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3453
                           ignore_size=False):
3454
  """Prepare the block devices for an instance.
3455

3456
  This sets up the block devices on all nodes.
3457

3458
  @type lu: L{LogicalUnit}
3459
  @param lu: the logical unit on whose behalf we execute
3460
  @type instance: L{objects.Instance}
3461
  @param instance: the instance for whose disks we assemble
3462
  @type ignore_secondaries: boolean
3463
  @param ignore_secondaries: if true, errors on secondary nodes
3464
      won't result in an error return from the function
3465
  @type ignore_size: boolean
3466
  @param ignore_size: if true, the current known size of the disk
3467
      will not be used during the disk activation, useful for cases
3468
      when the size is wrong
3469
  @return: False if the operation failed, otherwise a list of
3470
      (host, instance_visible_name, node_visible_name)
3471
      with the mapping from node devices to instance devices
3472

3473
  """
3474
  device_info = []
3475
  disks_ok = True
3476
  iname = instance.name
3477
  # With the two passes mechanism we try to reduce the window of
3478
  # opportunity for the race condition of switching DRBD to primary
3479
  # before handshaking occured, but we do not eliminate it
3480

    
3481
  # The proper fix would be to wait (with some limits) until the
3482
  # connection has been made and drbd transitions from WFConnection
3483
  # into any other network-connected state (Connected, SyncTarget,
3484
  # SyncSource, etc.)
3485

    
3486
  # 1st pass, assemble on all nodes in secondary mode
3487
  for inst_disk in instance.disks:
3488
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3489
      if ignore_size:
3490
        node_disk = node_disk.Copy()
3491
        node_disk.UnsetSize()
3492
      lu.cfg.SetDiskID(node_disk, node)
3493
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3494
      msg = result.fail_msg
3495
      if msg:
3496
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3497
                           " (is_primary=False, pass=1): %s",
3498
                           inst_disk.iv_name, node, msg)
3499
        if not ignore_secondaries:
3500
          disks_ok = False
3501

    
3502
  # FIXME: race condition on drbd migration to primary
3503

    
3504
  # 2nd pass, do only the primary node
3505
  for inst_disk in instance.disks:
3506
    dev_path = None
3507

    
3508
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3509
      if node != instance.primary_node:
3510
        continue
3511
      if ignore_size:
3512
        node_disk = node_disk.Copy()
3513
        node_disk.UnsetSize()
3514
      lu.cfg.SetDiskID(node_disk, node)
3515
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3516
      msg = result.fail_msg
3517
      if msg:
3518
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
3519
                           " (is_primary=True, pass=2): %s",
3520
                           inst_disk.iv_name, node, msg)
3521
        disks_ok = False
3522
      else:
3523
        dev_path = result.payload
3524

    
3525
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3526

    
3527
  # leave the disks configured for the primary node
3528
  # this is a workaround that would be fixed better by
3529
  # improving the logical/physical id handling
3530
  for disk in instance.disks:
3531
    lu.cfg.SetDiskID(disk, instance.primary_node)
3532

    
3533
  return disks_ok, device_info
3534

    
3535

    
3536
def _StartInstanceDisks(lu, instance, force):
3537
  """Start the disks of an instance.
3538

3539
  """
3540
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3541
                                           ignore_secondaries=force)
3542
  if not disks_ok:
3543
    _ShutdownInstanceDisks(lu, instance)
3544
    if force is not None and not force:
3545
      lu.proc.LogWarning("", hint="If the message above refers to a"
3546
                         " secondary node,"
3547
                         " you can retry the operation using '--force'.")
3548
    raise errors.OpExecError("Disk consistency error")
3549

    
3550

    
3551
class LUDeactivateInstanceDisks(NoHooksLU):
3552
  """Shutdown an instance's disks.
3553

3554
  """
3555
  _OP_REQP = ["instance_name"]
3556
  REQ_BGL = False
3557

    
3558
  def ExpandNames(self):
3559
    self._ExpandAndLockInstance()
3560
    self.needed_locks[locking.LEVEL_NODE] = []
3561
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3562

    
3563
  def DeclareLocks(self, level):
3564
    if level == locking.LEVEL_NODE:
3565
      self._LockInstancesNodes()
3566

    
3567
  def CheckPrereq(self):
3568
    """Check prerequisites.
3569

3570
    This checks that the instance is in the cluster.
3571

3572
    """
3573
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3574
    assert self.instance is not None, \
3575
      "Cannot retrieve locked instance %s" % self.op.instance_name
3576

    
3577
  def Exec(self, feedback_fn):
3578
    """Deactivate the disks
3579

3580
    """
3581
    instance = self.instance
3582
    _SafeShutdownInstanceDisks(self, instance)
3583

    
3584

    
3585
def _SafeShutdownInstanceDisks(lu, instance):
3586
  """Shutdown block devices of an instance.
3587

3588
  This function checks if an instance is running, before calling
3589
  _ShutdownInstanceDisks.
3590

3591
  """
3592
  pnode = instance.primary_node
3593
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3594
  ins_l.Raise("Can't contact node %s" % pnode)
3595

    
3596
  if instance.name in ins_l.payload:
3597
    raise errors.OpExecError("Instance is running, can't shutdown"
3598
                             " block devices.")
3599

    
3600
  _ShutdownInstanceDisks(lu, instance)
3601

    
3602

    
3603
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3604
  """Shutdown block devices of an instance.
3605

3606
  This does the shutdown on all nodes of the instance.
3607

3608
  If the ignore_primary is false, errors on the primary node are
3609
  ignored.
3610

3611
  """
3612
  all_result = True
3613
  for disk in instance.disks:
3614
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3615
      lu.cfg.SetDiskID(top_disk, node)
3616
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3617
      msg = result.fail_msg
3618
      if msg:
3619
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3620
                      disk.iv_name, node, msg)
3621
        if not ignore_primary or node != instance.primary_node:
3622
          all_result = False
3623
  return all_result
3624

    
3625

    
3626
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3627
  """Checks if a node has enough free memory.
3628

3629
  This function check if a given node has the needed amount of free
3630
  memory. In case the node has less memory or we cannot get the
3631
  information from the node, this function raise an OpPrereqError
3632
  exception.
3633

3634
  @type lu: C{LogicalUnit}
3635
  @param lu: a logical unit from which we get configuration data
3636
  @type node: C{str}
3637
  @param node: the node to check
3638
  @type reason: C{str}
3639
  @param reason: string to use in the error message
3640
  @type requested: C{int}
3641
  @param requested: the amount of memory in MiB to check for
3642
  @type hypervisor_name: C{str}
3643
  @param hypervisor_name: the hypervisor to ask for memory stats
3644
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3645
      we cannot check the node
3646

3647
  """
3648
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3649
  nodeinfo[node].Raise("Can't get data from node %s" % node,
3650
                       prereq=True, ecode=errors.ECODE_ENVIRON)
3651
  free_mem = nodeinfo[node].payload.get('memory_free', None)
3652
  if not isinstance(free_mem, int):
3653
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3654
                               " was '%s'" % (node, free_mem),
3655
                               errors.ECODE_ENVIRON)
3656
  if requested > free_mem:
3657
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3658
                               " needed %s MiB, available %s MiB" %
3659
                               (node, reason, requested, free_mem),
3660
                               errors.ECODE_NORES)
3661

    
3662

    
3663
class LUStartupInstance(LogicalUnit):
3664
  """Starts an instance.
3665

3666
  """
3667
  HPATH = "instance-start"
3668
  HTYPE = constants.HTYPE_INSTANCE
3669
  _OP_REQP = ["instance_name", "force"]
3670
  REQ_BGL = False
3671

    
3672
  def ExpandNames(self):
3673
    self._ExpandAndLockInstance()
3674

    
3675
  def BuildHooksEnv(self):
3676
    """Build hooks env.
3677

3678
    This runs on master, primary and secondary nodes of the instance.
3679

3680
    """
3681
    env = {
3682
      "FORCE": self.op.force,
3683
      }
3684
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3685
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3686
    return env, nl, nl
3687

    
3688
  def CheckPrereq(self):
3689
    """Check prerequisites.
3690

3691
    This checks that the instance is in the cluster.
3692

3693
    """
3694
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3695
    assert self.instance is not None, \
3696
      "Cannot retrieve locked instance %s" % self.op.instance_name
3697

    
3698
    # extra beparams
3699
    self.beparams = getattr(self.op, "beparams", {})
3700
    if self.beparams:
3701
      if not isinstance(self.beparams, dict):
3702
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3703
                                   " dict" % (type(self.beparams), ),
3704
                                   errors.ECODE_INVAL)
3705
      # fill the beparams dict
3706
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3707
      self.op.beparams = self.beparams
3708

    
3709
    # extra hvparams
3710
    self.hvparams = getattr(self.op, "hvparams", {})
3711
    if self.hvparams:
3712
      if not isinstance(self.hvparams, dict):
3713
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3714
                                   " dict" % (type(self.hvparams), ),
3715
                                   errors.ECODE_INVAL)
3716

    
3717
      # check hypervisor parameter syntax (locally)
3718
      cluster = self.cfg.GetClusterInfo()
3719
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3720
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3721
                                    instance.hvparams)
3722
      filled_hvp.update(self.hvparams)
3723
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3724
      hv_type.CheckParameterSyntax(filled_hvp)
3725
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3726
      self.op.hvparams = self.hvparams
3727

    
3728
    _CheckNodeOnline(self, instance.primary_node)
3729

    
3730
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3731
    # check bridges existence
3732
    _CheckInstanceBridgesExist(self, instance)
3733

    
3734
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3735
                                              instance.name,
3736
                                              instance.hypervisor)
3737
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3738
                      prereq=True, ecode=errors.ECODE_ENVIRON)
3739
    if not remote_info.payload: # not running already
3740
      _CheckNodeFreeMemory(self, instance.primary_node,
3741
                           "starting instance %s" % instance.name,
3742
                           bep[constants.BE_MEMORY], instance.hypervisor)
3743

    
3744
  def Exec(self, feedback_fn):
3745
    """Start the instance.
3746

3747
    """
3748
    instance = self.instance
3749
    force = self.op.force
3750

    
3751
    self.cfg.MarkInstanceUp(instance.name)
3752

    
3753
    node_current = instance.primary_node
3754

    
3755
    _StartInstanceDisks(self, instance, force)
3756

    
3757
    result = self.rpc.call_instance_start(node_current, instance,
3758
                                          self.hvparams, self.beparams)
3759
    msg = result.fail_msg
3760
    if msg:
3761
      _ShutdownInstanceDisks(self, instance)
3762
      raise errors.OpExecError("Could not start instance: %s" % msg)
3763

    
3764

    
3765
class LURebootInstance(LogicalUnit):
3766
  """Reboot an instance.
3767

3768
  """
3769
  HPATH = "instance-reboot"
3770
  HTYPE = constants.HTYPE_INSTANCE
3771
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3772
  REQ_BGL = False
3773

    
3774
  def CheckArguments(self):
3775
    """Check the arguments.
3776

3777
    """
3778
    self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3779
                                    constants.DEFAULT_SHUTDOWN_TIMEOUT)
3780

    
3781
  def ExpandNames(self):
3782
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3783
                                   constants.INSTANCE_REBOOT_HARD,
3784
                                   constants.INSTANCE_REBOOT_FULL]:
3785
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3786
                                  (constants.INSTANCE_REBOOT_SOFT,
3787
                                   constants.INSTANCE_REBOOT_HARD,
3788
                                   constants.INSTANCE_REBOOT_FULL))
3789
    self._ExpandAndLockInstance()
3790

    
3791
  def BuildHooksEnv(self):
3792
    """Build hooks env.
3793

3794
    This runs on master, primary and secondary nodes of the instance.
3795

3796
    """
3797
    env = {
3798
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3799
      "REBOOT_TYPE": self.op.reboot_type,
3800
      "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3801
      }
3802
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3803
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3804
    return env, nl, nl
3805

    
3806
  def CheckPrereq(self):
3807
    """Check prerequisites.
3808

3809
    This checks that the instance is in the cluster.
3810

3811
    """
3812
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3813
    assert self.instance is not None, \
3814
      "Cannot retrieve locked instance %s" % self.op.instance_name
3815

    
3816
    _CheckNodeOnline(self, instance.primary_node)
3817

    
3818
    # check bridges existence
3819
    _CheckInstanceBridgesExist(self, instance)
3820

    
3821
  def Exec(self, feedback_fn):
3822
    """Reboot the instance.
3823

3824
    """
3825
    instance = self.instance
3826
    ignore_secondaries = self.op.ignore_secondaries
3827
    reboot_type = self.op.reboot_type
3828

    
3829
    node_current = instance.primary_node
3830

    
3831
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3832
                       constants.INSTANCE_REBOOT_HARD]:
3833
      for disk in instance.disks:
3834
        self.cfg.SetDiskID(disk, node_current)
3835
      result = self.rpc.call_instance_reboot(node_current, instance,
3836
                                             reboot_type,
3837
                                             self.shutdown_timeout)
3838
      result.Raise("Could not reboot instance")
3839
    else:
3840
      result = self.rpc.call_instance_shutdown(node_current, instance,
3841
                                               self.shutdown_timeout)
3842
      result.Raise("Could not shutdown instance for full reboot")
3843
      _ShutdownInstanceDisks(self, instance)
3844
      _StartInstanceDisks(self, instance, ignore_secondaries)
3845
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3846
      msg = result.fail_msg
3847
      if msg:
3848
        _ShutdownInstanceDisks(self, instance)
3849
        raise errors.OpExecError("Could not start instance for"
3850
                                 " full reboot: %s" % msg)
3851

    
3852
    self.cfg.MarkInstanceUp(instance.name)
3853

    
3854

    
3855
class LUShutdownInstance(LogicalUnit):
3856
  """Shutdown an instance.
3857

3858
  """
3859
  HPATH = "instance-stop"
3860
  HTYPE = constants.HTYPE_INSTANCE
3861
  _OP_REQP = ["instance_name"]
3862
  REQ_BGL = False
3863

    
3864
  def CheckArguments(self):
3865
    """Check the arguments.
3866

3867
    """
3868
    self.timeout = getattr(self.op, "timeout",
3869
                           constants.DEFAULT_SHUTDOWN_TIMEOUT)
3870

    
3871
  def ExpandNames(self):
3872
    self._ExpandAndLockInstance()
3873

    
3874
  def BuildHooksEnv(self):
3875
    """Build hooks env.
3876

3877
    This runs on master, primary and secondary nodes of the instance.
3878

3879
    """
3880
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3881
    env["TIMEOUT"] = self.timeout
3882
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3883
    return env, nl, nl
3884

    
3885
  def CheckPrereq(self):
3886
    """Check prerequisites.
3887

3888
    This checks that the instance is in the cluster.
3889

3890
    """
3891
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3892
    assert self.instance is not None, \
3893
      "Cannot retrieve locked instance %s" % self.op.instance_name
3894
    _CheckNodeOnline(self, self.instance.primary_node)
3895

    
3896
  def Exec(self, feedback_fn):
3897
    """Shutdown the instance.
3898

3899
    """
3900
    instance = self.instance
3901
    node_current = instance.primary_node
3902
    timeout = self.timeout
3903
    self.cfg.MarkInstanceDown(instance.name)
3904
    result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3905
    msg = result.fail_msg
3906
    if msg:
3907
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3908

    
3909
    _ShutdownInstanceDisks(self, instance)
3910

    
3911

    
3912
class LUReinstallInstance(LogicalUnit):
3913
  """Reinstall an instance.
3914

3915
  """
3916
  HPATH = "instance-reinstall"
3917
  HTYPE = constants.HTYPE_INSTANCE
3918
  _OP_REQP = ["instance_name"]
3919
  REQ_BGL = False
3920

    
3921
  def ExpandNames(self):
3922
    self._ExpandAndLockInstance()
3923

    
3924
  def BuildHooksEnv(self):
3925
    """Build hooks env.
3926

3927
    This runs on master, primary and secondary nodes of the instance.
3928

3929
    """
3930
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3931
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3932
    return env, nl, nl
3933

    
3934
  def CheckPrereq(self):
3935
    """Check prerequisites.
3936

3937
    This checks that the instance is in the cluster and is not running.
3938

3939
    """
3940
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3941
    assert instance is not None, \
3942
      "Cannot retrieve locked instance %s" % self.op.instance_name
3943
    _CheckNodeOnline(self, instance.primary_node)
3944

    
3945
    if instance.disk_template == constants.DT_DISKLESS:
3946
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3947
                                 self.op.instance_name,
3948
                                 errors.ECODE_INVAL)
3949
    if instance.admin_up:
3950
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3951
                                 self.op.instance_name,
3952
                                 errors.ECODE_STATE)
3953
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3954
                                              instance.name,
3955
                                              instance.hypervisor)
3956
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3957
                      prereq=True, ecode=errors.ECODE_ENVIRON)
3958
    if remote_info.payload:
3959
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3960
                                 (self.op.instance_name,
3961
                                  instance.primary_node),
3962
                                 errors.ECODE_STATE)
3963

    
3964
    self.op.os_type = getattr(self.op, "os_type", None)
3965
    self.op.force_variant = getattr(self.op, "force_variant", False)
3966
    if self.op.os_type is not None:
3967
      # OS verification
3968
      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
3969
      result = self.rpc.call_os_get(pnode, self.op.os_type)
3970
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3971
                   (self.op.os_type, pnode),
3972
                   prereq=True, ecode=errors.ECODE_INVAL)
3973
      if not self.op.force_variant:
3974
        _CheckOSVariant(result.payload, self.op.os_type)
3975

    
3976
    self.instance = instance
3977

    
3978
  def Exec(self, feedback_fn):
3979
    """Reinstall the instance.
3980

3981
    """
3982
    inst = self.instance
3983

    
3984
    if self.op.os_type is not None:
3985
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3986
      inst.os = self.op.os_type
3987
      self.cfg.Update(inst, feedback_fn)
3988

    
3989
    _StartInstanceDisks(self, inst, None)
3990
    try:
3991
      feedback_fn("Running the instance OS create scripts...")
3992
      # FIXME: pass debug option from opcode to backend
3993
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
3994
                                             self.op.debug_level)
3995
      result.Raise("Could not install OS for instance %s on node %s" %
3996
                   (inst.name, inst.primary_node))
3997
    finally:
3998
      _ShutdownInstanceDisks(self, inst)
3999

    
4000

    
4001
class LURecreateInstanceDisks(LogicalUnit):
4002
  """Recreate an instance's missing disks.
4003

4004
  """
4005
  HPATH = "instance-recreate-disks"
4006
  HTYPE = constants.HTYPE_INSTANCE
4007
  _OP_REQP = ["instance_name", "disks"]
4008
  REQ_BGL = False
4009

    
4010
  def CheckArguments(self):
4011
    """Check the arguments.
4012

4013
    """
4014
    if not isinstance(self.op.disks, list):
4015
      raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4016
    for item in self.op.disks:
4017
      if (not isinstance(item, int) or
4018
          item < 0):
4019
        raise errors.OpPrereqError("Invalid disk specification '%s'" %
4020
                                   str(item), errors.ECODE_INVAL)
4021

    
4022
  def ExpandNames(self):
4023
    self._ExpandAndLockInstance()
4024

    
4025
  def BuildHooksEnv(self):
4026
    """Build hooks env.
4027

4028
    This runs on master, primary and secondary nodes of the instance.
4029