Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4b7735f9

History | View | Annotate | Download (207.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = lu.cfg.GetInstanceList()
395
  return utils.NiceSort(wanted)
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
418
                          memory, vcpus, nics):
419
  """Builds instance related env variables for hooks
420

421
  This builds the hook environment from individual variables.
422

423
  @type name: string
424
  @param name: the name of the instance
425
  @type primary_node: string
426
  @param primary_node: the name of the instance's primary node
427
  @type secondary_nodes: list
428
  @param secondary_nodes: list of secondary nodes as strings
429
  @type os_type: string
430
  @param os_type: the name of the instance's OS
431
  @type status: string
432
  @param status: the desired status of the instances
433
  @type memory: string
434
  @param memory: the memory size of the instance
435
  @type vcpus: string
436
  @param vcpus: the count of VCPUs the instance has
437
  @type nics: list
438
  @param nics: list of tuples (ip, bridge, mac) representing
439
      the NICs the instance  has
440
  @rtype: dict
441
  @return: the hook environment for this instance
442

443
  """
444
  env = {
445
    "OP_TARGET": name,
446
    "INSTANCE_NAME": name,
447
    "INSTANCE_PRIMARY": primary_node,
448
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
449
    "INSTANCE_OS_TYPE": os_type,
450
    "INSTANCE_STATUS": status,
451
    "INSTANCE_MEMORY": memory,
452
    "INSTANCE_VCPUS": vcpus,
453
  }
454

    
455
  if nics:
456
    nic_count = len(nics)
457
    for idx, (ip, bridge, mac) in enumerate(nics):
458
      if ip is None:
459
        ip = ""
460
      env["INSTANCE_NIC%d_IP" % idx] = ip
461
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
462
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
463
  else:
464
    nic_count = 0
465

    
466
  env["INSTANCE_NIC_COUNT"] = nic_count
467

    
468
  return env
469

    
470

    
471
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
472
  """Builds instance related env variables for hooks from an object.
473

474
  @type lu: L{LogicalUnit}
475
  @param lu: the logical unit on whose behalf we execute
476
  @type instance: L{objects.Instance}
477
  @param instance: the instance for which we should build the
478
      environment
479
  @type override: dict
480
  @param override: dictionary with key/values that will override
481
      our values
482
  @rtype: dict
483
  @return: the hook environment dictionary
484

485
  """
486
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
487
  args = {
488
    'name': instance.name,
489
    'primary_node': instance.primary_node,
490
    'secondary_nodes': instance.secondary_nodes,
491
    'os_type': instance.os,
492
    'status': instance.os,
493
    'memory': bep[constants.BE_MEMORY],
494
    'vcpus': bep[constants.BE_VCPUS],
495
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
496
  }
497
  if override:
498
    args.update(override)
499
  return _BuildInstanceHookEnv(**args)
500

    
501

    
502
def _CheckInstanceBridgesExist(lu, instance):
503
  """Check that the brigdes needed by an instance exist.
504

505
  """
506
  # check bridges existance
507
  brlist = [nic.bridge for nic in instance.nics]
508
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
509
    raise errors.OpPrereqError("one or more target bridges %s does not"
510
                               " exist on destination node '%s'" %
511
                               (brlist, instance.primary_node))
512

    
513

    
514
class LUDestroyCluster(NoHooksLU):
515
  """Logical unit for destroying the cluster.
516

517
  """
518
  _OP_REQP = []
519

    
520
  def CheckPrereq(self):
521
    """Check prerequisites.
522

523
    This checks whether the cluster is empty.
524

525
    Any errors are signalled by raising errors.OpPrereqError.
526

527
    """
528
    master = self.cfg.GetMasterNode()
529

    
530
    nodelist = self.cfg.GetNodeList()
531
    if len(nodelist) != 1 or nodelist[0] != master:
532
      raise errors.OpPrereqError("There are still %d node(s) in"
533
                                 " this cluster." % (len(nodelist) - 1))
534
    instancelist = self.cfg.GetInstanceList()
535
    if instancelist:
536
      raise errors.OpPrereqError("There are still %d instance(s) in"
537
                                 " this cluster." % len(instancelist))
538

    
539
  def Exec(self, feedback_fn):
540
    """Destroys the cluster.
541

542
    """
543
    master = self.cfg.GetMasterNode()
544
    if not self.rpc.call_node_stop_master(master, False):
545
      raise errors.OpExecError("Could not disable the master role")
546
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
547
    utils.CreateBackup(priv_key)
548
    utils.CreateBackup(pub_key)
549
    return master
550

    
551

    
552
class LUVerifyCluster(LogicalUnit):
553
  """Verifies the cluster status.
554

555
  """
556
  HPATH = "cluster-verify"
557
  HTYPE = constants.HTYPE_CLUSTER
558
  _OP_REQP = ["skip_checks"]
559
  REQ_BGL = False
560

    
561
  def ExpandNames(self):
562
    self.needed_locks = {
563
      locking.LEVEL_NODE: locking.ALL_SET,
564
      locking.LEVEL_INSTANCE: locking.ALL_SET,
565
    }
566
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
567

    
568
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
569
                  remote_version, feedback_fn):
570
    """Run multiple tests against a node.
571

572
    Test list::
573

574
      - compares ganeti version
575
      - checks vg existance and size > 20G
576
      - checks config file checksum
577
      - checks ssh to other nodes
578

579
    @type node: string
580
    @param node: the name of the node to check
581
    @param file_list: required list of files
582
    @param local_cksum: dictionary of local files and their checksums
583
    @type vglist: dict
584
    @param vglist: dictionary of volume group names and their size
585
    @param node_result: the results from the node
586
    @param remote_version: the RPC version from the remote node
587
    @param feedback_fn: function used to accumulate results
588

589
    """
590
    # compares ganeti version
591
    local_version = constants.PROTOCOL_VERSION
592
    if not remote_version:
593
      feedback_fn("  - ERROR: connection to %s failed" % (node))
594
      return True
595

    
596
    if local_version != remote_version:
597
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
598
                      (local_version, node, remote_version))
599
      return True
600

    
601
    # checks vg existance and size > 20G
602

    
603
    bad = False
604
    if not vglist:
605
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
606
                      (node,))
607
      bad = True
608
    else:
609
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
610
                                            constants.MIN_VG_SIZE)
611
      if vgstatus:
612
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
613
        bad = True
614

    
615
    if not node_result:
616
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
617
      return True
618

    
619
    # checks config file checksum
620
    # checks ssh to any
621

    
622
    if 'filelist' not in node_result:
623
      bad = True
624
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
625
    else:
626
      remote_cksum = node_result['filelist']
627
      for file_name in file_list:
628
        if file_name not in remote_cksum:
629
          bad = True
630
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
631
        elif remote_cksum[file_name] != local_cksum[file_name]:
632
          bad = True
633
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
634

    
635
    if 'nodelist' not in node_result:
636
      bad = True
637
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
638
    else:
639
      if node_result['nodelist']:
640
        bad = True
641
        for node in node_result['nodelist']:
642
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
643
                          (node, node_result['nodelist'][node]))
644
    if 'node-net-test' not in node_result:
645
      bad = True
646
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
647
    else:
648
      if node_result['node-net-test']:
649
        bad = True
650
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
651
        for node in nlist:
652
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
653
                          (node, node_result['node-net-test'][node]))
654

    
655
    hyp_result = node_result.get('hypervisor', None)
656
    if isinstance(hyp_result, dict):
657
      for hv_name, hv_result in hyp_result.iteritems():
658
        if hv_result is not None:
659
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
660
                      (hv_name, hv_result))
661
    return bad
662

    
663
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
664
                      node_instance, feedback_fn):
665
    """Verify an instance.
666

667
    This function checks to see if the required block devices are
668
    available on the instance's node.
669

670
    """
671
    bad = False
672

    
673
    node_current = instanceconfig.primary_node
674

    
675
    node_vol_should = {}
676
    instanceconfig.MapLVsByNode(node_vol_should)
677

    
678
    for node in node_vol_should:
679
      for volume in node_vol_should[node]:
680
        if node not in node_vol_is or volume not in node_vol_is[node]:
681
          feedback_fn("  - ERROR: volume %s missing on node %s" %
682
                          (volume, node))
683
          bad = True
684

    
685
    if not instanceconfig.status == 'down':
686
      if (node_current not in node_instance or
687
          not instance in node_instance[node_current]):
688
        feedback_fn("  - ERROR: instance %s not running on node %s" %
689
                        (instance, node_current))
690
        bad = True
691

    
692
    for node in node_instance:
693
      if (not node == node_current):
694
        if instance in node_instance[node]:
695
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
696
                          (instance, node))
697
          bad = True
698

    
699
    return bad
700

    
701
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
702
    """Verify if there are any unknown volumes in the cluster.
703

704
    The .os, .swap and backup volumes are ignored. All other volumes are
705
    reported as unknown.
706

707
    """
708
    bad = False
709

    
710
    for node in node_vol_is:
711
      for volume in node_vol_is[node]:
712
        if node not in node_vol_should or volume not in node_vol_should[node]:
713
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
714
                      (volume, node))
715
          bad = True
716
    return bad
717

    
718
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
719
    """Verify the list of running instances.
720

721
    This checks what instances are running but unknown to the cluster.
722

723
    """
724
    bad = False
725
    for node in node_instance:
726
      for runninginstance in node_instance[node]:
727
        if runninginstance not in instancelist:
728
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
729
                          (runninginstance, node))
730
          bad = True
731
    return bad
732

    
733
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
734
    """Verify N+1 Memory Resilience.
735

736
    Check that if one single node dies we can still start all the instances it
737
    was primary for.
738

739
    """
740
    bad = False
741

    
742
    for node, nodeinfo in node_info.iteritems():
743
      # This code checks that every node which is now listed as secondary has
744
      # enough memory to host all instances it is supposed to should a single
745
      # other node in the cluster fail.
746
      # FIXME: not ready for failover to an arbitrary node
747
      # FIXME: does not support file-backed instances
748
      # WARNING: we currently take into account down instances as well as up
749
      # ones, considering that even if they're down someone might want to start
750
      # them even in the event of a node failure.
751
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
752
        needed_mem = 0
753
        for instance in instances:
754
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
755
          if bep[constants.BE_AUTO_BALANCE]:
756
            needed_mem += bep[constants.BE_MEMORY]
757
        if nodeinfo['mfree'] < needed_mem:
758
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
759
                      " failovers should node %s fail" % (node, prinode))
760
          bad = True
761
    return bad
762

    
763
  def CheckPrereq(self):
764
    """Check prerequisites.
765

766
    Transform the list of checks we're going to skip into a set and check that
767
    all its members are valid.
768

769
    """
770
    self.skip_set = frozenset(self.op.skip_checks)
771
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
772
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
773

    
774
  def BuildHooksEnv(self):
775
    """Build hooks env.
776

777
    Cluster-Verify hooks just rone in the post phase and their failure makes
778
    the output be logged in the verify output and the verification to fail.
779

780
    """
781
    all_nodes = self.cfg.GetNodeList()
782
    # TODO: populate the environment with useful information for verify hooks
783
    env = {}
784
    return env, [], all_nodes
785

    
786
  def Exec(self, feedback_fn):
787
    """Verify integrity of cluster, performing various test on nodes.
788

789
    """
790
    bad = False
791
    feedback_fn("* Verifying global settings")
792
    for msg in self.cfg.VerifyConfig():
793
      feedback_fn("  - ERROR: %s" % msg)
794

    
795
    vg_name = self.cfg.GetVGName()
796
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
797
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
798
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
799
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800
    i_non_redundant = [] # Non redundant instances
801
    i_non_a_balanced = [] # Non auto-balanced instances
802
    node_volume = {}
803
    node_instance = {}
804
    node_info = {}
805
    instance_cfg = {}
806

    
807
    # FIXME: verify OS list
808
    # do local checksums
809
    file_names = []
810
    file_names.append(constants.SSL_CERT_FILE)
811
    file_names.append(constants.CLUSTER_CONF_FILE)
812
    local_checksums = utils.FingerprintFiles(file_names)
813

    
814
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
815
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
816
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
817
    all_vglist = self.rpc.call_vg_list(nodelist)
818
    node_verify_param = {
819
      'filelist': file_names,
820
      'nodelist': nodelist,
821
      'hypervisor': hypervisors,
822
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
823
                        for node in nodeinfo]
824
      }
825
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
826
                                           self.cfg.GetClusterName())
827
    all_rversion = self.rpc.call_version(nodelist)
828
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
829
                                        self.cfg.GetHypervisorType())
830

    
831
    cluster = self.cfg.GetClusterInfo()
832
    for node in nodelist:
833
      feedback_fn("* Verifying node %s" % node)
834
      result = self._VerifyNode(node, file_names, local_checksums,
835
                                all_vglist[node], all_nvinfo[node],
836
                                all_rversion[node], feedback_fn)
837
      bad = bad or result
838

    
839
      # node_volume
840
      volumeinfo = all_volumeinfo[node]
841

    
842
      if isinstance(volumeinfo, basestring):
843
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
844
                    (node, volumeinfo[-400:].encode('string_escape')))
845
        bad = True
846
        node_volume[node] = {}
847
      elif not isinstance(volumeinfo, dict):
848
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
849
        bad = True
850
        continue
851
      else:
852
        node_volume[node] = volumeinfo
853

    
854
      # node_instance
855
      nodeinstance = all_instanceinfo[node]
856
      if type(nodeinstance) != list:
857
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
858
        bad = True
859
        continue
860

    
861
      node_instance[node] = nodeinstance
862

    
863
      # node_info
864
      nodeinfo = all_ninfo[node]
865
      if not isinstance(nodeinfo, dict):
866
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
867
        bad = True
868
        continue
869

    
870
      try:
871
        node_info[node] = {
872
          "mfree": int(nodeinfo['memory_free']),
873
          "dfree": int(nodeinfo['vg_free']),
874
          "pinst": [],
875
          "sinst": [],
876
          # dictionary holding all instances this node is secondary for,
877
          # grouped by their primary node. Each key is a cluster node, and each
878
          # value is a list of instances which have the key as primary and the
879
          # current node as secondary.  this is handy to calculate N+1 memory
880
          # availability if you can only failover from a primary to its
881
          # secondary.
882
          "sinst-by-pnode": {},
883
        }
884
      except ValueError:
885
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
886
        bad = True
887
        continue
888

    
889
    node_vol_should = {}
890

    
891
    for instance in instancelist:
892
      feedback_fn("* Verifying instance %s" % instance)
893
      inst_config = self.cfg.GetInstanceInfo(instance)
894
      result =  self._VerifyInstance(instance, inst_config, node_volume,
895
                                     node_instance, feedback_fn)
896
      bad = bad or result
897

    
898
      inst_config.MapLVsByNode(node_vol_should)
899

    
900
      instance_cfg[instance] = inst_config
901

    
902
      pnode = inst_config.primary_node
903
      if pnode in node_info:
904
        node_info[pnode]['pinst'].append(instance)
905
      else:
906
        feedback_fn("  - ERROR: instance %s, connection to primary node"
907
                    " %s failed" % (instance, pnode))
908
        bad = True
909

    
910
      # If the instance is non-redundant we cannot survive losing its primary
911
      # node, so we are not N+1 compliant. On the other hand we have no disk
912
      # templates with more than one secondary so that situation is not well
913
      # supported either.
914
      # FIXME: does not support file-backed instances
915
      if len(inst_config.secondary_nodes) == 0:
916
        i_non_redundant.append(instance)
917
      elif len(inst_config.secondary_nodes) > 1:
918
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
919
                    % instance)
920

    
921
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
922
        i_non_a_balanced.append(instance)
923

    
924
      for snode in inst_config.secondary_nodes:
925
        if snode in node_info:
926
          node_info[snode]['sinst'].append(instance)
927
          if pnode not in node_info[snode]['sinst-by-pnode']:
928
            node_info[snode]['sinst-by-pnode'][pnode] = []
929
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
930
        else:
931
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
932
                      " %s failed" % (instance, snode))
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
945
      feedback_fn("* Verifying N+1 Memory redundancy")
946
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
947
      bad = bad or result
948

    
949
    feedback_fn("* Other Notes")
950
    if i_non_redundant:
951
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
952
                  % len(i_non_redundant))
953

    
954
    if i_non_a_balanced:
955
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
956
                  % len(i_non_a_balanced))
957

    
958
    return not bad
959

    
960
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
961
    """Analize the post-hooks' result
962

963
    This method analyses the hook result, handles it, and sends some
964
    nicely-formatted feedback back to the user.
965

966
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
967
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
968
    @param hooks_results: the results of the multi-node hooks rpc call
969
    @param feedback_fn: function used send feedback back to the caller
970
    @param lu_result: previous Exec result
971
    @return: the new Exec result, based on the previous result
972
        and hook results
973

974
    """
975
    # We only really run POST phase hooks, and are only interested in
976
    # their results
977
    if phase == constants.HOOKS_PHASE_POST:
978
      # Used to change hooks' output to proper indentation
979
      indent_re = re.compile('^', re.M)
980
      feedback_fn("* Hooks Results")
981
      if not hooks_results:
982
        feedback_fn("  - ERROR: general communication failure")
983
        lu_result = 1
984
      else:
985
        for node_name in hooks_results:
986
          show_node_header = True
987
          res = hooks_results[node_name]
988
          if res is False or not isinstance(res, list):
989
            feedback_fn("    Communication failure")
990
            lu_result = 1
991
            continue
992
          for script, hkr, output in res:
993
            if hkr == constants.HKR_FAIL:
994
              # The node header is only shown once, if there are
995
              # failing hooks on that node
996
              if show_node_header:
997
                feedback_fn("  Node %s:" % node_name)
998
                show_node_header = False
999
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1000
              output = indent_re.sub('      ', output)
1001
              feedback_fn("%s" % output)
1002
              lu_result = 1
1003

    
1004
      return lu_result
1005

    
1006

    
1007
class LUVerifyDisks(NoHooksLU):
1008
  """Verifies the cluster disks status.
1009

1010
  """
1011
  _OP_REQP = []
1012
  REQ_BGL = False
1013

    
1014
  def ExpandNames(self):
1015
    self.needed_locks = {
1016
      locking.LEVEL_NODE: locking.ALL_SET,
1017
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1018
    }
1019
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1020

    
1021
  def CheckPrereq(self):
1022
    """Check prerequisites.
1023

1024
    This has no prerequisites.
1025

1026
    """
1027
    pass
1028

    
1029
  def Exec(self, feedback_fn):
1030
    """Verify integrity of cluster disks.
1031

1032
    """
1033
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1034

    
1035
    vg_name = self.cfg.GetVGName()
1036
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1037
    instances = [self.cfg.GetInstanceInfo(name)
1038
                 for name in self.cfg.GetInstanceList()]
1039

    
1040
    nv_dict = {}
1041
    for inst in instances:
1042
      inst_lvs = {}
1043
      if (inst.status != "up" or
1044
          inst.disk_template not in constants.DTS_NET_MIRROR):
1045
        continue
1046
      inst.MapLVsByNode(inst_lvs)
1047
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1048
      for node, vol_list in inst_lvs.iteritems():
1049
        for vol in vol_list:
1050
          nv_dict[(node, vol)] = inst
1051

    
1052
    if not nv_dict:
1053
      return result
1054

    
1055
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1056

    
1057
    to_act = set()
1058
    for node in nodes:
1059
      # node_volume
1060
      lvs = node_lvs[node]
1061

    
1062
      if isinstance(lvs, basestring):
1063
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1064
        res_nlvm[node] = lvs
1065
      elif not isinstance(lvs, dict):
1066
        logging.warning("Connection to node %s failed or invalid data"
1067
                        " returned", node)
1068
        res_nodes.append(node)
1069
        continue
1070

    
1071
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1072
        inst = nv_dict.pop((node, lv_name), None)
1073
        if (not lv_online and inst is not None
1074
            and inst.name not in res_instances):
1075
          res_instances.append(inst.name)
1076

    
1077
    # any leftover items in nv_dict are missing LVs, let's arrange the
1078
    # data better
1079
    for key, inst in nv_dict.iteritems():
1080
      if inst.name not in res_missing:
1081
        res_missing[inst.name] = []
1082
      res_missing[inst.name].append(key)
1083

    
1084
    return result
1085

    
1086

    
1087
class LURenameCluster(LogicalUnit):
1088
  """Rename the cluster.
1089

1090
  """
1091
  HPATH = "cluster-rename"
1092
  HTYPE = constants.HTYPE_CLUSTER
1093
  _OP_REQP = ["name"]
1094

    
1095
  def BuildHooksEnv(self):
1096
    """Build hooks env.
1097

1098
    """
1099
    env = {
1100
      "OP_TARGET": self.cfg.GetClusterName(),
1101
      "NEW_NAME": self.op.name,
1102
      }
1103
    mn = self.cfg.GetMasterNode()
1104
    return env, [mn], [mn]
1105

    
1106
  def CheckPrereq(self):
1107
    """Verify that the passed name is a valid one.
1108

1109
    """
1110
    hostname = utils.HostInfo(self.op.name)
1111

    
1112
    new_name = hostname.name
1113
    self.ip = new_ip = hostname.ip
1114
    old_name = self.cfg.GetClusterName()
1115
    old_ip = self.cfg.GetMasterIP()
1116
    if new_name == old_name and new_ip == old_ip:
1117
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1118
                                 " cluster has changed")
1119
    if new_ip != old_ip:
1120
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1121
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1122
                                   " reachable on the network. Aborting." %
1123
                                   new_ip)
1124

    
1125
    self.op.name = new_name
1126

    
1127
  def Exec(self, feedback_fn):
1128
    """Rename the cluster.
1129

1130
    """
1131
    clustername = self.op.name
1132
    ip = self.ip
1133

    
1134
    # shutdown the master IP
1135
    master = self.cfg.GetMasterNode()
1136
    if not self.rpc.call_node_stop_master(master, False):
1137
      raise errors.OpExecError("Could not disable the master role")
1138

    
1139
    try:
1140
      # modify the sstore
1141
      # TODO: sstore
1142
      ss.SetKey(ss.SS_MASTER_IP, ip)
1143
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1144

    
1145
      # Distribute updated ss config to all nodes
1146
      myself = self.cfg.GetNodeInfo(master)
1147
      dist_nodes = self.cfg.GetNodeList()
1148
      if myself.name in dist_nodes:
1149
        dist_nodes.remove(myself.name)
1150

    
1151
      logging.debug("Copying updated ssconf data to all nodes")
1152
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1153
        fname = ss.KeyToFilename(keyname)
1154
        result = self.rpc.call_upload_file(dist_nodes, fname)
1155
        for to_node in dist_nodes:
1156
          if not result[to_node]:
1157
            self.LogWarning("Copy of file %s to node %s failed",
1158
                            fname, to_node)
1159
    finally:
1160
      if not self.rpc.call_node_start_master(master, False):
1161
        self.LogWarning("Could not re-enable the master role on"
1162
                        " the master, please restart manually.")
1163

    
1164

    
1165
def _RecursiveCheckIfLVMBased(disk):
1166
  """Check if the given disk or its children are lvm-based.
1167

1168
  @type disk: L{objects.Disk}
1169
  @param disk: the disk to check
1170
  @rtype: booleean
1171
  @return: boolean indicating whether a LD_LV dev_type was found or not
1172

1173
  """
1174
  if disk.children:
1175
    for chdisk in disk.children:
1176
      if _RecursiveCheckIfLVMBased(chdisk):
1177
        return True
1178
  return disk.dev_type == constants.LD_LV
1179

    
1180

    
1181
class LUSetClusterParams(LogicalUnit):
1182
  """Change the parameters of the cluster.
1183

1184
  """
1185
  HPATH = "cluster-modify"
1186
  HTYPE = constants.HTYPE_CLUSTER
1187
  _OP_REQP = []
1188
  REQ_BGL = False
1189

    
1190
  def CheckParameters(self):
1191
    """Check parameters
1192

1193
    """
1194
    if not hasattr(self.op, "candidate_pool_size"):
1195
      self.op.candidate_pool_size = None
1196
    if self.op.candidate_pool_size is not None:
1197
      try:
1198
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1199
      except ValueError, err:
1200
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1201
                                   str(err))
1202
      if self.op.candidate_pool_size < 1:
1203
        raise errors.OpPrereqError("At least one master candidate needed")
1204

    
1205
  def ExpandNames(self):
1206
    # FIXME: in the future maybe other cluster params won't require checking on
1207
    # all nodes to be modified.
1208
    self.needed_locks = {
1209
      locking.LEVEL_NODE: locking.ALL_SET,
1210
    }
1211
    self.share_locks[locking.LEVEL_NODE] = 1
1212

    
1213
  def BuildHooksEnv(self):
1214
    """Build hooks env.
1215

1216
    """
1217
    env = {
1218
      "OP_TARGET": self.cfg.GetClusterName(),
1219
      "NEW_VG_NAME": self.op.vg_name,
1220
      }
1221
    mn = self.cfg.GetMasterNode()
1222
    return env, [mn], [mn]
1223

    
1224
  def CheckPrereq(self):
1225
    """Check prerequisites.
1226

1227
    This checks whether the given params don't conflict and
1228
    if the given volume group is valid.
1229

1230
    """
1231
    # FIXME: This only works because there is only one parameter that can be
1232
    # changed or removed.
1233
    if self.op.vg_name is not None and not self.op.vg_name:
1234
      instances = self.cfg.GetAllInstancesInfo().values()
1235
      for inst in instances:
1236
        for disk in inst.disks:
1237
          if _RecursiveCheckIfLVMBased(disk):
1238
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1239
                                       " lvm-based instances exist")
1240

    
1241
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1242

    
1243
    # if vg_name not None, checks given volume group on all nodes
1244
    if self.op.vg_name:
1245
      vglist = self.rpc.call_vg_list(node_list)
1246
      for node in node_list:
1247
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1248
                                              constants.MIN_VG_SIZE)
1249
        if vgstatus:
1250
          raise errors.OpPrereqError("Error on node '%s': %s" %
1251
                                     (node, vgstatus))
1252

    
1253
    self.cluster = cluster = self.cfg.GetClusterInfo()
1254
    # beparams changes do not need validation (we can't validate?),
1255
    # but we still process here
1256
    if self.op.beparams:
1257
      self.new_beparams = cluster.FillDict(
1258
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1259

    
1260
    # hypervisor list/parameters
1261
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1262
    if self.op.hvparams:
1263
      if not isinstance(self.op.hvparams, dict):
1264
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1265
      for hv_name, hv_dict in self.op.hvparams.items():
1266
        if hv_name not in self.new_hvparams:
1267
          self.new_hvparams[hv_name] = hv_dict
1268
        else:
1269
          self.new_hvparams[hv_name].update(hv_dict)
1270

    
1271
    if self.op.enabled_hypervisors is not None:
1272
      self.hv_list = self.op.enabled_hypervisors
1273
    else:
1274
      self.hv_list = cluster.enabled_hypervisors
1275

    
1276
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1277
      # either the enabled list has changed, or the parameters have, validate
1278
      for hv_name, hv_params in self.new_hvparams.items():
1279
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1280
            (self.op.enabled_hypervisors and
1281
             hv_name in self.op.enabled_hypervisors)):
1282
          # either this is a new hypervisor, or its parameters have changed
1283
          hv_class = hypervisor.GetHypervisor(hv_name)
1284
          hv_class.CheckParameterSyntax(hv_params)
1285
          _CheckHVParams(self, node_list, hv_name, hv_params)
1286

    
1287
  def Exec(self, feedback_fn):
1288
    """Change the parameters of the cluster.
1289

1290
    """
1291
    if self.op.vg_name is not None:
1292
      if self.op.vg_name != self.cfg.GetVGName():
1293
        self.cfg.SetVGName(self.op.vg_name)
1294
      else:
1295
        feedback_fn("Cluster LVM configuration already in desired"
1296
                    " state, not changing")
1297
    if self.op.hvparams:
1298
      self.cluster.hvparams = self.new_hvparams
1299
    if self.op.enabled_hypervisors is not None:
1300
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1301
    if self.op.beparams:
1302
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1303
    if self.op.candidate_pool_size is not None:
1304
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1305

    
1306
    self.cfg.Update(self.cluster)
1307

    
1308
    # we want to update nodes after the cluster so that if any errors
1309
    # happen, we have recorded and saved the cluster info
1310
    if self.op.candidate_pool_size is not None:
1311
      node_info = self.cfg.GetAllNodesInfo().values()
1312
      num_candidates = len([node for node in node_info
1313
                            if node.master_candidate])
1314
      num_nodes = len(node_info)
1315
      if num_candidates < self.op.candidate_pool_size:
1316
        random.shuffle(node_info)
1317
        for node in node_info:
1318
          if num_candidates >= self.op.candidate_pool_size:
1319
            break
1320
          if node.master_candidate:
1321
            continue
1322
          node.master_candidate = True
1323
          self.LogInfo("Promoting node %s to master candidate", node.name)
1324
          self.cfg.Update(node)
1325
          self.context.ReaddNode(node)
1326
          num_candidates += 1
1327
      elif num_candidates > self.op.candidate_pool_size:
1328
        self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1329
                     " of candidate_pool_size (%d)" %
1330
                     (num_candidates, self.op.candidate_pool_size))
1331

    
1332

    
1333
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1334
  """Sleep and poll for an instance's disk to sync.
1335

1336
  """
1337
  if not instance.disks:
1338
    return True
1339

    
1340
  if not oneshot:
1341
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1342

    
1343
  node = instance.primary_node
1344

    
1345
  for dev in instance.disks:
1346
    lu.cfg.SetDiskID(dev, node)
1347

    
1348
  retries = 0
1349
  while True:
1350
    max_time = 0
1351
    done = True
1352
    cumul_degraded = False
1353
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1354
    if not rstats:
1355
      lu.LogWarning("Can't get any data from node %s", node)
1356
      retries += 1
1357
      if retries >= 10:
1358
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1359
                                 " aborting." % node)
1360
      time.sleep(6)
1361
      continue
1362
    retries = 0
1363
    for i in range(len(rstats)):
1364
      mstat = rstats[i]
1365
      if mstat is None:
1366
        lu.LogWarning("Can't compute data for node %s/%s",
1367
                           node, instance.disks[i].iv_name)
1368
        continue
1369
      # we ignore the ldisk parameter
1370
      perc_done, est_time, is_degraded, _ = mstat
1371
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1372
      if perc_done is not None:
1373
        done = False
1374
        if est_time is not None:
1375
          rem_time = "%d estimated seconds remaining" % est_time
1376
          max_time = est_time
1377
        else:
1378
          rem_time = "no time estimate"
1379
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1380
                        (instance.disks[i].iv_name, perc_done, rem_time))
1381
    if done or oneshot:
1382
      break
1383

    
1384
    time.sleep(min(60, max_time))
1385

    
1386
  if done:
1387
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1388
  return not cumul_degraded
1389

    
1390

    
1391
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1392
  """Check that mirrors are not degraded.
1393

1394
  The ldisk parameter, if True, will change the test from the
1395
  is_degraded attribute (which represents overall non-ok status for
1396
  the device(s)) to the ldisk (representing the local storage status).
1397

1398
  """
1399
  lu.cfg.SetDiskID(dev, node)
1400
  if ldisk:
1401
    idx = 6
1402
  else:
1403
    idx = 5
1404

    
1405
  result = True
1406
  if on_primary or dev.AssembleOnSecondary():
1407
    rstats = lu.rpc.call_blockdev_find(node, dev)
1408
    if not rstats:
1409
      logging.warning("Node %s: disk degraded, not found or node down", node)
1410
      result = False
1411
    else:
1412
      result = result and (not rstats[idx])
1413
  if dev.children:
1414
    for child in dev.children:
1415
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1416

    
1417
  return result
1418

    
1419

    
1420
class LUDiagnoseOS(NoHooksLU):
1421
  """Logical unit for OS diagnose/query.
1422

1423
  """
1424
  _OP_REQP = ["output_fields", "names"]
1425
  REQ_BGL = False
1426
  _FIELDS_STATIC = utils.FieldSet()
1427
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1428

    
1429
  def ExpandNames(self):
1430
    if self.op.names:
1431
      raise errors.OpPrereqError("Selective OS query not supported")
1432

    
1433
    _CheckOutputFields(static=self._FIELDS_STATIC,
1434
                       dynamic=self._FIELDS_DYNAMIC,
1435
                       selected=self.op.output_fields)
1436

    
1437
    # Lock all nodes, in shared mode
1438
    self.needed_locks = {}
1439
    self.share_locks[locking.LEVEL_NODE] = 1
1440
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1441

    
1442
  def CheckPrereq(self):
1443
    """Check prerequisites.
1444

1445
    """
1446

    
1447
  @staticmethod
1448
  def _DiagnoseByOS(node_list, rlist):
1449
    """Remaps a per-node return list into an a per-os per-node dictionary
1450

1451
    @param node_list: a list with the names of all nodes
1452
    @param rlist: a map with node names as keys and OS objects as values
1453

1454
    @rtype: dict
1455
    @returns: a dictionary with osnames as keys and as value another map, with
1456
        nodes as keys and list of OS objects as values, eg::
1457

1458
          {"debian-etch": {"node1": [<object>,...],
1459
                           "node2": [<object>,]}
1460
          }
1461

1462
    """
1463
    all_os = {}
1464
    for node_name, nr in rlist.iteritems():
1465
      if not nr:
1466
        continue
1467
      for os_obj in nr:
1468
        if os_obj.name not in all_os:
1469
          # build a list of nodes for this os containing empty lists
1470
          # for each node in node_list
1471
          all_os[os_obj.name] = {}
1472
          for nname in node_list:
1473
            all_os[os_obj.name][nname] = []
1474
        all_os[os_obj.name][node_name].append(os_obj)
1475
    return all_os
1476

    
1477
  def Exec(self, feedback_fn):
1478
    """Compute the list of OSes.
1479

1480
    """
1481
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1482
    node_data = self.rpc.call_os_diagnose(node_list)
1483
    if node_data == False:
1484
      raise errors.OpExecError("Can't gather the list of OSes")
1485
    pol = self._DiagnoseByOS(node_list, node_data)
1486
    output = []
1487
    for os_name, os_data in pol.iteritems():
1488
      row = []
1489
      for field in self.op.output_fields:
1490
        if field == "name":
1491
          val = os_name
1492
        elif field == "valid":
1493
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1494
        elif field == "node_status":
1495
          val = {}
1496
          for node_name, nos_list in os_data.iteritems():
1497
            val[node_name] = [(v.status, v.path) for v in nos_list]
1498
        else:
1499
          raise errors.ParameterError(field)
1500
        row.append(val)
1501
      output.append(row)
1502

    
1503
    return output
1504

    
1505

    
1506
class LURemoveNode(LogicalUnit):
1507
  """Logical unit for removing a node.
1508

1509
  """
1510
  HPATH = "node-remove"
1511
  HTYPE = constants.HTYPE_NODE
1512
  _OP_REQP = ["node_name"]
1513

    
1514
  def BuildHooksEnv(self):
1515
    """Build hooks env.
1516

1517
    This doesn't run on the target node in the pre phase as a failed
1518
    node would then be impossible to remove.
1519

1520
    """
1521
    env = {
1522
      "OP_TARGET": self.op.node_name,
1523
      "NODE_NAME": self.op.node_name,
1524
      }
1525
    all_nodes = self.cfg.GetNodeList()
1526
    all_nodes.remove(self.op.node_name)
1527
    return env, all_nodes, all_nodes
1528

    
1529
  def CheckPrereq(self):
1530
    """Check prerequisites.
1531

1532
    This checks:
1533
     - the node exists in the configuration
1534
     - it does not have primary or secondary instances
1535
     - it's not the master
1536

1537
    Any errors are signalled by raising errors.OpPrereqError.
1538

1539
    """
1540
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1541
    if node is None:
1542
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1543

    
1544
    instance_list = self.cfg.GetInstanceList()
1545

    
1546
    masternode = self.cfg.GetMasterNode()
1547
    if node.name == masternode:
1548
      raise errors.OpPrereqError("Node is the master node,"
1549
                                 " you need to failover first.")
1550

    
1551
    for instance_name in instance_list:
1552
      instance = self.cfg.GetInstanceInfo(instance_name)
1553
      if node.name == instance.primary_node:
1554
        raise errors.OpPrereqError("Instance %s still running on the node,"
1555
                                   " please remove first." % instance_name)
1556
      if node.name in instance.secondary_nodes:
1557
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1558
                                   " please remove first." % instance_name)
1559
    self.op.node_name = node.name
1560
    self.node = node
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Removes the node from the cluster.
1564

1565
    """
1566
    node = self.node
1567
    logging.info("Stopping the node daemon and removing configs from node %s",
1568
                 node.name)
1569

    
1570
    self.context.RemoveNode(node.name)
1571

    
1572
    self.rpc.call_node_leave_cluster(node.name)
1573

    
1574

    
1575
class LUQueryNodes(NoHooksLU):
1576
  """Logical unit for querying nodes.
1577

1578
  """
1579
  _OP_REQP = ["output_fields", "names"]
1580
  REQ_BGL = False
1581
  _FIELDS_DYNAMIC = utils.FieldSet(
1582
    "dtotal", "dfree",
1583
    "mtotal", "mnode", "mfree",
1584
    "bootid",
1585
    "ctotal",
1586
    )
1587

    
1588
  _FIELDS_STATIC = utils.FieldSet(
1589
    "name", "pinst_cnt", "sinst_cnt",
1590
    "pinst_list", "sinst_list",
1591
    "pip", "sip", "tags",
1592
    "serial_no",
1593
    "master_candidate",
1594
    "master",
1595
    )
1596

    
1597
  def ExpandNames(self):
1598
    _CheckOutputFields(static=self._FIELDS_STATIC,
1599
                       dynamic=self._FIELDS_DYNAMIC,
1600
                       selected=self.op.output_fields)
1601

    
1602
    self.needed_locks = {}
1603
    self.share_locks[locking.LEVEL_NODE] = 1
1604

    
1605
    if self.op.names:
1606
      self.wanted = _GetWantedNodes(self, self.op.names)
1607
    else:
1608
      self.wanted = locking.ALL_SET
1609

    
1610
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1611
    if self.do_locking:
1612
      # if we don't request only static fields, we need to lock the nodes
1613
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1614

    
1615

    
1616
  def CheckPrereq(self):
1617
    """Check prerequisites.
1618

1619
    """
1620
    # The validation of the node list is done in the _GetWantedNodes,
1621
    # if non empty, and if empty, there's no validation to do
1622
    pass
1623

    
1624
  def Exec(self, feedback_fn):
1625
    """Computes the list of nodes and their attributes.
1626

1627
    """
1628
    all_info = self.cfg.GetAllNodesInfo()
1629
    if self.do_locking:
1630
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1631
    elif self.wanted != locking.ALL_SET:
1632
      nodenames = self.wanted
1633
      missing = set(nodenames).difference(all_info.keys())
1634
      if missing:
1635
        raise errors.OpExecError(
1636
          "Some nodes were removed before retrieving their data: %s" % missing)
1637
    else:
1638
      nodenames = all_info.keys()
1639

    
1640
    nodenames = utils.NiceSort(nodenames)
1641
    nodelist = [all_info[name] for name in nodenames]
1642

    
1643
    # begin data gathering
1644

    
1645
    if self.do_locking:
1646
      live_data = {}
1647
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1648
                                          self.cfg.GetHypervisorType())
1649
      for name in nodenames:
1650
        nodeinfo = node_data.get(name, None)
1651
        if nodeinfo:
1652
          fn = utils.TryConvert
1653
          live_data[name] = {
1654
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1655
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1656
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1657
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1658
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1659
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1660
            "bootid": nodeinfo.get('bootid', None),
1661
            }
1662
        else:
1663
          live_data[name] = {}
1664
    else:
1665
      live_data = dict.fromkeys(nodenames, {})
1666

    
1667
    node_to_primary = dict([(name, set()) for name in nodenames])
1668
    node_to_secondary = dict([(name, set()) for name in nodenames])
1669

    
1670
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1671
                             "sinst_cnt", "sinst_list"))
1672
    if inst_fields & frozenset(self.op.output_fields):
1673
      instancelist = self.cfg.GetInstanceList()
1674

    
1675
      for instance_name in instancelist:
1676
        inst = self.cfg.GetInstanceInfo(instance_name)
1677
        if inst.primary_node in node_to_primary:
1678
          node_to_primary[inst.primary_node].add(inst.name)
1679
        for secnode in inst.secondary_nodes:
1680
          if secnode in node_to_secondary:
1681
            node_to_secondary[secnode].add(inst.name)
1682

    
1683
    master_node = self.cfg.GetMasterNode()
1684

    
1685
    # end data gathering
1686

    
1687
    output = []
1688
    for node in nodelist:
1689
      node_output = []
1690
      for field in self.op.output_fields:
1691
        if field == "name":
1692
          val = node.name
1693
        elif field == "pinst_list":
1694
          val = list(node_to_primary[node.name])
1695
        elif field == "sinst_list":
1696
          val = list(node_to_secondary[node.name])
1697
        elif field == "pinst_cnt":
1698
          val = len(node_to_primary[node.name])
1699
        elif field == "sinst_cnt":
1700
          val = len(node_to_secondary[node.name])
1701
        elif field == "pip":
1702
          val = node.primary_ip
1703
        elif field == "sip":
1704
          val = node.secondary_ip
1705
        elif field == "tags":
1706
          val = list(node.GetTags())
1707
        elif field == "serial_no":
1708
          val = node.serial_no
1709
        elif field == "master_candidate":
1710
          val = node.master_candidate
1711
        elif field == "master":
1712
          val = node.name == master_node
1713
        elif self._FIELDS_DYNAMIC.Matches(field):
1714
          val = live_data[node.name].get(field, None)
1715
        else:
1716
          raise errors.ParameterError(field)
1717
        node_output.append(val)
1718
      output.append(node_output)
1719

    
1720
    return output
1721

    
1722

    
1723
class LUQueryNodeVolumes(NoHooksLU):
1724
  """Logical unit for getting volumes on node(s).
1725

1726
  """
1727
  _OP_REQP = ["nodes", "output_fields"]
1728
  REQ_BGL = False
1729
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1730
  _FIELDS_STATIC = utils.FieldSet("node")
1731

    
1732
  def ExpandNames(self):
1733
    _CheckOutputFields(static=self._FIELDS_STATIC,
1734
                       dynamic=self._FIELDS_DYNAMIC,
1735
                       selected=self.op.output_fields)
1736

    
1737
    self.needed_locks = {}
1738
    self.share_locks[locking.LEVEL_NODE] = 1
1739
    if not self.op.nodes:
1740
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1741
    else:
1742
      self.needed_locks[locking.LEVEL_NODE] = \
1743
        _GetWantedNodes(self, self.op.nodes)
1744

    
1745
  def CheckPrereq(self):
1746
    """Check prerequisites.
1747

1748
    This checks that the fields required are valid output fields.
1749

1750
    """
1751
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1752

    
1753
  def Exec(self, feedback_fn):
1754
    """Computes the list of nodes and their attributes.
1755

1756
    """
1757
    nodenames = self.nodes
1758
    volumes = self.rpc.call_node_volumes(nodenames)
1759

    
1760
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1761
             in self.cfg.GetInstanceList()]
1762

    
1763
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1764

    
1765
    output = []
1766
    for node in nodenames:
1767
      if node not in volumes or not volumes[node]:
1768
        continue
1769

    
1770
      node_vols = volumes[node][:]
1771
      node_vols.sort(key=lambda vol: vol['dev'])
1772

    
1773
      for vol in node_vols:
1774
        node_output = []
1775
        for field in self.op.output_fields:
1776
          if field == "node":
1777
            val = node
1778
          elif field == "phys":
1779
            val = vol['dev']
1780
          elif field == "vg":
1781
            val = vol['vg']
1782
          elif field == "name":
1783
            val = vol['name']
1784
          elif field == "size":
1785
            val = int(float(vol['size']))
1786
          elif field == "instance":
1787
            for inst in ilist:
1788
              if node not in lv_by_node[inst]:
1789
                continue
1790
              if vol['name'] in lv_by_node[inst][node]:
1791
                val = inst.name
1792
                break
1793
            else:
1794
              val = '-'
1795
          else:
1796
            raise errors.ParameterError(field)
1797
          node_output.append(str(val))
1798

    
1799
        output.append(node_output)
1800

    
1801
    return output
1802

    
1803

    
1804
class LUAddNode(LogicalUnit):
1805
  """Logical unit for adding node to the cluster.
1806

1807
  """
1808
  HPATH = "node-add"
1809
  HTYPE = constants.HTYPE_NODE
1810
  _OP_REQP = ["node_name"]
1811

    
1812
  def BuildHooksEnv(self):
1813
    """Build hooks env.
1814

1815
    This will run on all nodes before, and on all nodes + the new node after.
1816

1817
    """
1818
    env = {
1819
      "OP_TARGET": self.op.node_name,
1820
      "NODE_NAME": self.op.node_name,
1821
      "NODE_PIP": self.op.primary_ip,
1822
      "NODE_SIP": self.op.secondary_ip,
1823
      }
1824
    nodes_0 = self.cfg.GetNodeList()
1825
    nodes_1 = nodes_0 + [self.op.node_name, ]
1826
    return env, nodes_0, nodes_1
1827

    
1828
  def CheckPrereq(self):
1829
    """Check prerequisites.
1830

1831
    This checks:
1832
     - the new node is not already in the config
1833
     - it is resolvable
1834
     - its parameters (single/dual homed) matches the cluster
1835

1836
    Any errors are signalled by raising errors.OpPrereqError.
1837

1838
    """
1839
    node_name = self.op.node_name
1840
    cfg = self.cfg
1841

    
1842
    dns_data = utils.HostInfo(node_name)
1843

    
1844
    node = dns_data.name
1845
    primary_ip = self.op.primary_ip = dns_data.ip
1846
    secondary_ip = getattr(self.op, "secondary_ip", None)
1847
    if secondary_ip is None:
1848
      secondary_ip = primary_ip
1849
    if not utils.IsValidIP(secondary_ip):
1850
      raise errors.OpPrereqError("Invalid secondary IP given")
1851
    self.op.secondary_ip = secondary_ip
1852

    
1853
    node_list = cfg.GetNodeList()
1854
    if not self.op.readd and node in node_list:
1855
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1856
                                 node)
1857
    elif self.op.readd and node not in node_list:
1858
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1859

    
1860
    for existing_node_name in node_list:
1861
      existing_node = cfg.GetNodeInfo(existing_node_name)
1862

    
1863
      if self.op.readd and node == existing_node_name:
1864
        if (existing_node.primary_ip != primary_ip or
1865
            existing_node.secondary_ip != secondary_ip):
1866
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1867
                                     " address configuration as before")
1868
        continue
1869

    
1870
      if (existing_node.primary_ip == primary_ip or
1871
          existing_node.secondary_ip == primary_ip or
1872
          existing_node.primary_ip == secondary_ip or
1873
          existing_node.secondary_ip == secondary_ip):
1874
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1875
                                   " existing node %s" % existing_node.name)
1876

    
1877
    # check that the type of the node (single versus dual homed) is the
1878
    # same as for the master
1879
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1880
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1881
    newbie_singlehomed = secondary_ip == primary_ip
1882
    if master_singlehomed != newbie_singlehomed:
1883
      if master_singlehomed:
1884
        raise errors.OpPrereqError("The master has no private ip but the"
1885
                                   " new node has one")
1886
      else:
1887
        raise errors.OpPrereqError("The master has a private ip but the"
1888
                                   " new node doesn't have one")
1889

    
1890
    # checks reachablity
1891
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1892
      raise errors.OpPrereqError("Node not reachable by ping")
1893

    
1894
    if not newbie_singlehomed:
1895
      # check reachability from my secondary ip to newbie's secondary ip
1896
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1897
                           source=myself.secondary_ip):
1898
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1899
                                   " based ping to noded port")
1900

    
1901
    self.new_node = objects.Node(name=node,
1902
                                 primary_ip=primary_ip,
1903
                                 secondary_ip=secondary_ip)
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Adds the new node to the cluster.
1907

1908
    """
1909
    new_node = self.new_node
1910
    node = new_node.name
1911

    
1912
    # check connectivity
1913
    result = self.rpc.call_version([node])[node]
1914
    if result:
1915
      if constants.PROTOCOL_VERSION == result:
1916
        logging.info("Communication to node %s fine, sw version %s match",
1917
                     node, result)
1918
      else:
1919
        raise errors.OpExecError("Version mismatch master version %s,"
1920
                                 " node version %s" %
1921
                                 (constants.PROTOCOL_VERSION, result))
1922
    else:
1923
      raise errors.OpExecError("Cannot get version from the new node")
1924

    
1925
    # setup ssh on node
1926
    logging.info("Copy ssh key to node %s", node)
1927
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1928
    keyarray = []
1929
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1930
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1931
                priv_key, pub_key]
1932

    
1933
    for i in keyfiles:
1934
      f = open(i, 'r')
1935
      try:
1936
        keyarray.append(f.read())
1937
      finally:
1938
        f.close()
1939

    
1940
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1941
                                    keyarray[2],
1942
                                    keyarray[3], keyarray[4], keyarray[5])
1943

    
1944
    if not result:
1945
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1946

    
1947
    # Add node to our /etc/hosts, and add key to known_hosts
1948
    utils.AddHostToEtcHosts(new_node.name)
1949

    
1950
    if new_node.secondary_ip != new_node.primary_ip:
1951
      if not self.rpc.call_node_has_ip_address(new_node.name,
1952
                                               new_node.secondary_ip):
1953
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1954
                                 " you gave (%s). Please fix and re-run this"
1955
                                 " command." % new_node.secondary_ip)
1956

    
1957
    node_verify_list = [self.cfg.GetMasterNode()]
1958
    node_verify_param = {
1959
      'nodelist': [node],
1960
      # TODO: do a node-net-test as well?
1961
    }
1962

    
1963
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1964
                                       self.cfg.GetClusterName())
1965
    for verifier in node_verify_list:
1966
      if not result[verifier]:
1967
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1968
                                 " for remote verification" % verifier)
1969
      if result[verifier]['nodelist']:
1970
        for failed in result[verifier]['nodelist']:
1971
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1972
                      (verifier, result[verifier]['nodelist'][failed]))
1973
        raise errors.OpExecError("ssh/hostname verification failed.")
1974

    
1975
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1976
    # including the node just added
1977
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1978
    dist_nodes = self.cfg.GetNodeList()
1979
    if not self.op.readd:
1980
      dist_nodes.append(node)
1981
    if myself.name in dist_nodes:
1982
      dist_nodes.remove(myself.name)
1983

    
1984
    logging.debug("Copying hosts and known_hosts to all nodes")
1985
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1986
      result = self.rpc.call_upload_file(dist_nodes, fname)
1987
      for to_node in dist_nodes:
1988
        if not result[to_node]:
1989
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1990

    
1991
    to_copy = []
1992
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1993
      to_copy.append(constants.VNC_PASSWORD_FILE)
1994
    for fname in to_copy:
1995
      result = self.rpc.call_upload_file([node], fname)
1996
      if not result[node]:
1997
        logging.error("Could not copy file %s to node %s", fname, node)
1998

    
1999
    if self.op.readd:
2000
      self.context.ReaddNode(new_node)
2001
    else:
2002
      self.context.AddNode(new_node)
2003

    
2004

    
2005
class LUSetNodeParams(LogicalUnit):
2006
  """Modifies the parameters of a node.
2007

2008
  """
2009
  HPATH = "node-modify"
2010
  HTYPE = constants.HTYPE_NODE
2011
  _OP_REQP = ["node_name"]
2012
  REQ_BGL = False
2013

    
2014
  def CheckArguments(self):
2015
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2016
    if node_name is None:
2017
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2018
    self.op.node_name = node_name
2019
    if not hasattr(self.op, 'master_candidate'):
2020
      raise errors.OpPrereqError("Please pass at least one modification")
2021
    self.op.master_candidate = bool(self.op.master_candidate)
2022

    
2023
  def ExpandNames(self):
2024
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2025

    
2026
  def BuildHooksEnv(self):
2027
    """Build hooks env.
2028

2029
    This runs on the master node.
2030

2031
    """
2032
    env = {
2033
      "OP_TARGET": self.op.node_name,
2034
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2035
      }
2036
    nl = [self.cfg.GetMasterNode(),
2037
          self.op.node_name]
2038
    return env, nl, nl
2039

    
2040
  def CheckPrereq(self):
2041
    """Check prerequisites.
2042

2043
    This only checks the instance list against the existing names.
2044

2045
    """
2046
    force = self.force = self.op.force
2047

    
2048
    return
2049

    
2050
  def Exec(self, feedback_fn):
2051
    """Modifies a node.
2052

2053
    """
2054
    node = self.cfg.GetNodeInfo(self.op.node_name)
2055

    
2056
    result = []
2057

    
2058
    if self.op.master_candidate is not None:
2059
      node.master_candidate = self.op.master_candidate
2060
      result.append(("master_candidate", str(self.op.master_candidate)))
2061

    
2062
    # this will trigger configuration file update, if needed
2063
    self.cfg.Update(node)
2064
    # this will trigger job queue propagation or cleanup
2065
    self.context.ReaddNode(node)
2066

    
2067
    return result
2068

    
2069

    
2070
class LUQueryClusterInfo(NoHooksLU):
2071
  """Query cluster configuration.
2072

2073
  """
2074
  _OP_REQP = []
2075
  REQ_BGL = False
2076

    
2077
  def ExpandNames(self):
2078
    self.needed_locks = {}
2079

    
2080
  def CheckPrereq(self):
2081
    """No prerequsites needed for this LU.
2082

2083
    """
2084
    pass
2085

    
2086
  def Exec(self, feedback_fn):
2087
    """Return cluster config.
2088

2089
    """
2090
    cluster = self.cfg.GetClusterInfo()
2091
    result = {
2092
      "software_version": constants.RELEASE_VERSION,
2093
      "protocol_version": constants.PROTOCOL_VERSION,
2094
      "config_version": constants.CONFIG_VERSION,
2095
      "os_api_version": constants.OS_API_VERSION,
2096
      "export_version": constants.EXPORT_VERSION,
2097
      "architecture": (platform.architecture()[0], platform.machine()),
2098
      "name": cluster.cluster_name,
2099
      "master": cluster.master_node,
2100
      "default_hypervisor": cluster.default_hypervisor,
2101
      "enabled_hypervisors": cluster.enabled_hypervisors,
2102
      "hvparams": cluster.hvparams,
2103
      "beparams": cluster.beparams,
2104
      "candidate_pool_size": cluster.candidate_pool_size,
2105
      }
2106

    
2107
    return result
2108

    
2109

    
2110
class LUQueryConfigValues(NoHooksLU):
2111
  """Return configuration values.
2112

2113
  """
2114
  _OP_REQP = []
2115
  REQ_BGL = False
2116
  _FIELDS_DYNAMIC = utils.FieldSet()
2117
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2118

    
2119
  def ExpandNames(self):
2120
    self.needed_locks = {}
2121

    
2122
    _CheckOutputFields(static=self._FIELDS_STATIC,
2123
                       dynamic=self._FIELDS_DYNAMIC,
2124
                       selected=self.op.output_fields)
2125

    
2126
  def CheckPrereq(self):
2127
    """No prerequisites.
2128

2129
    """
2130
    pass
2131

    
2132
  def Exec(self, feedback_fn):
2133
    """Dump a representation of the cluster config to the standard output.
2134

2135
    """
2136
    values = []
2137
    for field in self.op.output_fields:
2138
      if field == "cluster_name":
2139
        entry = self.cfg.GetClusterName()
2140
      elif field == "master_node":
2141
        entry = self.cfg.GetMasterNode()
2142
      elif field == "drain_flag":
2143
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2144
      else:
2145
        raise errors.ParameterError(field)
2146
      values.append(entry)
2147
    return values
2148

    
2149

    
2150
class LUActivateInstanceDisks(NoHooksLU):
2151
  """Bring up an instance's disks.
2152

2153
  """
2154
  _OP_REQP = ["instance_name"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    self._ExpandAndLockInstance()
2159
    self.needed_locks[locking.LEVEL_NODE] = []
2160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161

    
2162
  def DeclareLocks(self, level):
2163
    if level == locking.LEVEL_NODE:
2164
      self._LockInstancesNodes()
2165

    
2166
  def CheckPrereq(self):
2167
    """Check prerequisites.
2168

2169
    This checks that the instance is in the cluster.
2170

2171
    """
2172
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173
    assert self.instance is not None, \
2174
      "Cannot retrieve locked instance %s" % self.op.instance_name
2175

    
2176
  def Exec(self, feedback_fn):
2177
    """Activate the disks.
2178

2179
    """
2180
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2181
    if not disks_ok:
2182
      raise errors.OpExecError("Cannot activate block devices")
2183

    
2184
    return disks_info
2185

    
2186

    
2187
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2188
  """Prepare the block devices for an instance.
2189

2190
  This sets up the block devices on all nodes.
2191

2192
  @type lu: L{LogicalUnit}
2193
  @param lu: the logical unit on whose behalf we execute
2194
  @type instance: L{objects.Instance}
2195
  @param instance: the instance for whose disks we assemble
2196
  @type ignore_secondaries: boolean
2197
  @param ignore_secondaries: if true, errors on secondary nodes
2198
      won't result in an error return from the function
2199
  @return: False if the operation failed, otherwise a list of
2200
      (host, instance_visible_name, node_visible_name)
2201
      with the mapping from node devices to instance devices
2202

2203
  """
2204
  device_info = []
2205
  disks_ok = True
2206
  iname = instance.name
2207
  # With the two passes mechanism we try to reduce the window of
2208
  # opportunity for the race condition of switching DRBD to primary
2209
  # before handshaking occured, but we do not eliminate it
2210

    
2211
  # The proper fix would be to wait (with some limits) until the
2212
  # connection has been made and drbd transitions from WFConnection
2213
  # into any other network-connected state (Connected, SyncTarget,
2214
  # SyncSource, etc.)
2215

    
2216
  # 1st pass, assemble on all nodes in secondary mode
2217
  for inst_disk in instance.disks:
2218
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2219
      lu.cfg.SetDiskID(node_disk, node)
2220
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2221
      if not result:
2222
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2223
                           " (is_primary=False, pass=1)",
2224
                           inst_disk.iv_name, node)
2225
        if not ignore_secondaries:
2226
          disks_ok = False
2227

    
2228
  # FIXME: race condition on drbd migration to primary
2229

    
2230
  # 2nd pass, do only the primary node
2231
  for inst_disk in instance.disks:
2232
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2233
      if node != instance.primary_node:
2234
        continue
2235
      lu.cfg.SetDiskID(node_disk, node)
2236
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2237
      if not result:
2238
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2239
                           " (is_primary=True, pass=2)",
2240
                           inst_disk.iv_name, node)
2241
        disks_ok = False
2242
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2243

    
2244
  # leave the disks configured for the primary node
2245
  # this is a workaround that would be fixed better by
2246
  # improving the logical/physical id handling
2247
  for disk in instance.disks:
2248
    lu.cfg.SetDiskID(disk, instance.primary_node)
2249

    
2250
  return disks_ok, device_info
2251

    
2252

    
2253
def _StartInstanceDisks(lu, instance, force):
2254
  """Start the disks of an instance.
2255

2256
  """
2257
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2258
                                           ignore_secondaries=force)
2259
  if not disks_ok:
2260
    _ShutdownInstanceDisks(lu, instance)
2261
    if force is not None and not force:
2262
      lu.proc.LogWarning("", hint="If the message above refers to a"
2263
                         " secondary node,"
2264
                         " you can retry the operation using '--force'.")
2265
    raise errors.OpExecError("Disk consistency error")
2266

    
2267

    
2268
class LUDeactivateInstanceDisks(NoHooksLU):
2269
  """Shutdown an instance's disks.
2270

2271
  """
2272
  _OP_REQP = ["instance_name"]
2273
  REQ_BGL = False
2274

    
2275
  def ExpandNames(self):
2276
    self._ExpandAndLockInstance()
2277
    self.needed_locks[locking.LEVEL_NODE] = []
2278
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2279

    
2280
  def DeclareLocks(self, level):
2281
    if level == locking.LEVEL_NODE:
2282
      self._LockInstancesNodes()
2283

    
2284
  def CheckPrereq(self):
2285
    """Check prerequisites.
2286

2287
    This checks that the instance is in the cluster.
2288

2289
    """
2290
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2291
    assert self.instance is not None, \
2292
      "Cannot retrieve locked instance %s" % self.op.instance_name
2293

    
2294
  def Exec(self, feedback_fn):
2295
    """Deactivate the disks
2296

2297
    """
2298
    instance = self.instance
2299
    _SafeShutdownInstanceDisks(self, instance)
2300

    
2301

    
2302
def _SafeShutdownInstanceDisks(lu, instance):
2303
  """Shutdown block devices of an instance.
2304

2305
  This function checks if an instance is running, before calling
2306
  _ShutdownInstanceDisks.
2307

2308
  """
2309
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2310
                                      [instance.hypervisor])
2311
  ins_l = ins_l[instance.primary_node]
2312
  if not type(ins_l) is list:
2313
    raise errors.OpExecError("Can't contact node '%s'" %
2314
                             instance.primary_node)
2315

    
2316
  if instance.name in ins_l:
2317
    raise errors.OpExecError("Instance is running, can't shutdown"
2318
                             " block devices.")
2319

    
2320
  _ShutdownInstanceDisks(lu, instance)
2321

    
2322

    
2323
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2324
  """Shutdown block devices of an instance.
2325

2326
  This does the shutdown on all nodes of the instance.
2327

2328
  If the ignore_primary is false, errors on the primary node are
2329
  ignored.
2330

2331
  """
2332
  result = True
2333
  for disk in instance.disks:
2334
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2335
      lu.cfg.SetDiskID(top_disk, node)
2336
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2337
        logging.error("Could not shutdown block device %s on node %s",
2338
                      disk.iv_name, node)
2339
        if not ignore_primary or node != instance.primary_node:
2340
          result = False
2341
  return result
2342

    
2343

    
2344
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2345
  """Checks if a node has enough free memory.
2346

2347
  This function check if a given node has the needed amount of free
2348
  memory. In case the node has less memory or we cannot get the
2349
  information from the node, this function raise an OpPrereqError
2350
  exception.
2351

2352
  @type lu: C{LogicalUnit}
2353
  @param lu: a logical unit from which we get configuration data
2354
  @type node: C{str}
2355
  @param node: the node to check
2356
  @type reason: C{str}
2357
  @param reason: string to use in the error message
2358
  @type requested: C{int}
2359
  @param requested: the amount of memory in MiB to check for
2360
  @type hypervisor: C{str}
2361
  @param hypervisor: the hypervisor to ask for memory stats
2362
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2363
      we cannot check the node
2364

2365
  """
2366
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2367
  if not nodeinfo or not isinstance(nodeinfo, dict):
2368
    raise errors.OpPrereqError("Could not contact node %s for resource"
2369
                             " information" % (node,))
2370

    
2371
  free_mem = nodeinfo[node].get('memory_free')
2372
  if not isinstance(free_mem, int):
2373
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2374
                             " was '%s'" % (node, free_mem))
2375
  if requested > free_mem:
2376
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2377
                             " needed %s MiB, available %s MiB" %
2378
                             (node, reason, requested, free_mem))
2379

    
2380

    
2381
class LUStartupInstance(LogicalUnit):
2382
  """Starts an instance.
2383

2384
  """
2385
  HPATH = "instance-start"
2386
  HTYPE = constants.HTYPE_INSTANCE
2387
  _OP_REQP = ["instance_name", "force"]
2388
  REQ_BGL = False
2389

    
2390
  def ExpandNames(self):
2391
    self._ExpandAndLockInstance()
2392

    
2393
  def BuildHooksEnv(self):
2394
    """Build hooks env.
2395

2396
    This runs on master, primary and secondary nodes of the instance.
2397

2398
    """
2399
    env = {
2400
      "FORCE": self.op.force,
2401
      }
2402
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2403
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2404
          list(self.instance.secondary_nodes))
2405
    return env, nl, nl
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    This checks that the instance is in the cluster.
2411

2412
    """
2413
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2414
    assert self.instance is not None, \
2415
      "Cannot retrieve locked instance %s" % self.op.instance_name
2416

    
2417
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2418
    # check bridges existance
2419
    _CheckInstanceBridgesExist(self, instance)
2420

    
2421
    _CheckNodeFreeMemory(self, instance.primary_node,
2422
                         "starting instance %s" % instance.name,
2423
                         bep[constants.BE_MEMORY], instance.hypervisor)
2424

    
2425
  def Exec(self, feedback_fn):
2426
    """Start the instance.
2427

2428
    """
2429
    instance = self.instance
2430
    force = self.op.force
2431
    extra_args = getattr(self.op, "extra_args", "")
2432

    
2433
    self.cfg.MarkInstanceUp(instance.name)
2434

    
2435
    node_current = instance.primary_node
2436

    
2437
    _StartInstanceDisks(self, instance, force)
2438

    
2439
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2440
      _ShutdownInstanceDisks(self, instance)
2441
      raise errors.OpExecError("Could not start instance")
2442

    
2443

    
2444
class LURebootInstance(LogicalUnit):
2445
  """Reboot an instance.
2446

2447
  """
2448
  HPATH = "instance-reboot"
2449
  HTYPE = constants.HTYPE_INSTANCE
2450
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2451
  REQ_BGL = False
2452

    
2453
  def ExpandNames(self):
2454
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2455
                                   constants.INSTANCE_REBOOT_HARD,
2456
                                   constants.INSTANCE_REBOOT_FULL]:
2457
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2458
                                  (constants.INSTANCE_REBOOT_SOFT,
2459
                                   constants.INSTANCE_REBOOT_HARD,
2460
                                   constants.INSTANCE_REBOOT_FULL))
2461
    self._ExpandAndLockInstance()
2462

    
2463
  def BuildHooksEnv(self):
2464
    """Build hooks env.
2465

2466
    This runs on master, primary and secondary nodes of the instance.
2467

2468
    """
2469
    env = {
2470
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2471
      }
2472
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2473
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2474
          list(self.instance.secondary_nodes))
2475
    return env, nl, nl
2476

    
2477
  def CheckPrereq(self):
2478
    """Check prerequisites.
2479

2480
    This checks that the instance is in the cluster.
2481

2482
    """
2483
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2484
    assert self.instance is not None, \
2485
      "Cannot retrieve locked instance %s" % self.op.instance_name
2486

    
2487
    # check bridges existance
2488
    _CheckInstanceBridgesExist(self, instance)
2489

    
2490
  def Exec(self, feedback_fn):
2491
    """Reboot the instance.
2492

2493
    """
2494
    instance = self.instance
2495
    ignore_secondaries = self.op.ignore_secondaries
2496
    reboot_type = self.op.reboot_type
2497
    extra_args = getattr(self.op, "extra_args", "")
2498

    
2499
    node_current = instance.primary_node
2500

    
2501
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2502
                       constants.INSTANCE_REBOOT_HARD]:
2503
      if not self.rpc.call_instance_reboot(node_current, instance,
2504
                                           reboot_type, extra_args):
2505
        raise errors.OpExecError("Could not reboot instance")
2506
    else:
2507
      if not self.rpc.call_instance_shutdown(node_current, instance):
2508
        raise errors.OpExecError("could not shutdown instance for full reboot")
2509
      _ShutdownInstanceDisks(self, instance)
2510
      _StartInstanceDisks(self, instance, ignore_secondaries)
2511
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2512
        _ShutdownInstanceDisks(self, instance)
2513
        raise errors.OpExecError("Could not start instance for full reboot")
2514

    
2515
    self.cfg.MarkInstanceUp(instance.name)
2516

    
2517

    
2518
class LUShutdownInstance(LogicalUnit):
2519
  """Shutdown an instance.
2520

2521
  """
2522
  HPATH = "instance-stop"
2523
  HTYPE = constants.HTYPE_INSTANCE
2524
  _OP_REQP = ["instance_name"]
2525
  REQ_BGL = False
2526

    
2527
  def ExpandNames(self):
2528
    self._ExpandAndLockInstance()
2529

    
2530
  def BuildHooksEnv(self):
2531
    """Build hooks env.
2532

2533
    This runs on master, primary and secondary nodes of the instance.
2534

2535
    """
2536
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2537
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2538
          list(self.instance.secondary_nodes))
2539
    return env, nl, nl
2540

    
2541
  def CheckPrereq(self):
2542
    """Check prerequisites.
2543

2544
    This checks that the instance is in the cluster.
2545

2546
    """
2547
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2548
    assert self.instance is not None, \
2549
      "Cannot retrieve locked instance %s" % self.op.instance_name
2550

    
2551
  def Exec(self, feedback_fn):
2552
    """Shutdown the instance.
2553

2554
    """
2555
    instance = self.instance
2556
    node_current = instance.primary_node
2557
    self.cfg.MarkInstanceDown(instance.name)
2558
    if not self.rpc.call_instance_shutdown(node_current, instance):
2559
      self.proc.LogWarning("Could not shutdown instance")
2560

    
2561
    _ShutdownInstanceDisks(self, instance)
2562

    
2563

    
2564
class LUReinstallInstance(LogicalUnit):
2565
  """Reinstall an instance.
2566

2567
  """
2568
  HPATH = "instance-reinstall"
2569
  HTYPE = constants.HTYPE_INSTANCE
2570
  _OP_REQP = ["instance_name"]
2571
  REQ_BGL = False
2572

    
2573
  def ExpandNames(self):
2574
    self._ExpandAndLockInstance()
2575

    
2576
  def BuildHooksEnv(self):
2577
    """Build hooks env.
2578

2579
    This runs on master, primary and secondary nodes of the instance.
2580

2581
    """
2582
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2583
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2584
          list(self.instance.secondary_nodes))
2585
    return env, nl, nl
2586

    
2587
  def CheckPrereq(self):
2588
    """Check prerequisites.
2589

2590
    This checks that the instance is in the cluster and is not running.
2591

2592
    """
2593
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2594
    assert instance is not None, \
2595
      "Cannot retrieve locked instance %s" % self.op.instance_name
2596

    
2597
    if instance.disk_template == constants.DT_DISKLESS:
2598
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2599
                                 self.op.instance_name)
2600
    if instance.status != "down":
2601
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2602
                                 self.op.instance_name)
2603
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2604
                                              instance.name,
2605
                                              instance.hypervisor)
2606
    if remote_info:
2607
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2608
                                 (self.op.instance_name,
2609
                                  instance.primary_node))
2610

    
2611
    self.op.os_type = getattr(self.op, "os_type", None)
2612
    if self.op.os_type is not None:
2613
      # OS verification
2614
      pnode = self.cfg.GetNodeInfo(
2615
        self.cfg.ExpandNodeName(instance.primary_node))
2616
      if pnode is None:
2617
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2618
                                   self.op.pnode)
2619
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2620
      if not os_obj:
2621
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2622
                                   " primary node"  % self.op.os_type)
2623

    
2624
    self.instance = instance
2625

    
2626
  def Exec(self, feedback_fn):
2627
    """Reinstall the instance.
2628

2629
    """
2630
    inst = self.instance
2631

    
2632
    if self.op.os_type is not None:
2633
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2634
      inst.os = self.op.os_type
2635
      self.cfg.Update(inst)
2636

    
2637
    _StartInstanceDisks(self, inst, None)
2638
    try:
2639
      feedback_fn("Running the instance OS create scripts...")
2640
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2641
        raise errors.OpExecError("Could not install OS for instance %s"
2642
                                 " on node %s" %
2643
                                 (inst.name, inst.primary_node))
2644
    finally:
2645
      _ShutdownInstanceDisks(self, inst)
2646

    
2647

    
2648
class LURenameInstance(LogicalUnit):
2649
  """Rename an instance.
2650

2651
  """
2652
  HPATH = "instance-rename"
2653
  HTYPE = constants.HTYPE_INSTANCE
2654
  _OP_REQP = ["instance_name", "new_name"]
2655

    
2656
  def BuildHooksEnv(self):
2657
    """Build hooks env.
2658

2659
    This runs on master, primary and secondary nodes of the instance.
2660

2661
    """
2662
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2663
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2664
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2665
          list(self.instance.secondary_nodes))
2666
    return env, nl, nl
2667

    
2668
  def CheckPrereq(self):
2669
    """Check prerequisites.
2670

2671
    This checks that the instance is in the cluster and is not running.
2672

2673
    """
2674
    instance = self.cfg.GetInstanceInfo(
2675
      self.cfg.ExpandInstanceName(self.op.instance_name))
2676
    if instance is None:
2677
      raise errors.OpPrereqError("Instance '%s' not known" %
2678
                                 self.op.instance_name)
2679
    if instance.status != "down":
2680
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2681
                                 self.op.instance_name)
2682
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2683
                                              instance.name,
2684
                                              instance.hypervisor)
2685
    if remote_info:
2686
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2687
                                 (self.op.instance_name,
2688
                                  instance.primary_node))
2689
    self.instance = instance
2690

    
2691
    # new name verification
2692
    name_info = utils.HostInfo(self.op.new_name)
2693

    
2694
    self.op.new_name = new_name = name_info.name
2695
    instance_list = self.cfg.GetInstanceList()
2696
    if new_name in instance_list:
2697
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2698
                                 new_name)
2699

    
2700
    if not getattr(self.op, "ignore_ip", False):
2701
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2702
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2703
                                   (name_info.ip, new_name))
2704

    
2705

    
2706
  def Exec(self, feedback_fn):
2707
    """Reinstall the instance.
2708

2709
    """
2710
    inst = self.instance
2711
    old_name = inst.name
2712

    
2713
    if inst.disk_template == constants.DT_FILE:
2714
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2715

    
2716
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2717
    # Change the instance lock. This is definitely safe while we hold the BGL
2718
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2719
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2720

    
2721
    # re-read the instance from the configuration after rename
2722
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2723

    
2724
    if inst.disk_template == constants.DT_FILE:
2725
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2726
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2727
                                                     old_file_storage_dir,
2728
                                                     new_file_storage_dir)
2729

    
2730
      if not result:
2731
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2732
                                 " directory '%s' to '%s' (but the instance"
2733
                                 " has been renamed in Ganeti)" % (
2734
                                 inst.primary_node, old_file_storage_dir,
2735
                                 new_file_storage_dir))
2736

    
2737
      if not result[0]:
2738
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2739
                                 " (but the instance has been renamed in"
2740
                                 " Ganeti)" % (old_file_storage_dir,
2741
                                               new_file_storage_dir))
2742

    
2743
    _StartInstanceDisks(self, inst, None)
2744
    try:
2745
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2746
                                               old_name):
2747
        msg = ("Could not run OS rename script for instance %s on node %s"
2748
               " (but the instance has been renamed in Ganeti)" %
2749
               (inst.name, inst.primary_node))
2750
        self.proc.LogWarning(msg)
2751
    finally:
2752
      _ShutdownInstanceDisks(self, inst)
2753

    
2754

    
2755
class LURemoveInstance(LogicalUnit):
2756
  """Remove an instance.
2757

2758
  """
2759
  HPATH = "instance-remove"
2760
  HTYPE = constants.HTYPE_INSTANCE
2761
  _OP_REQP = ["instance_name", "ignore_failures"]
2762
  REQ_BGL = False
2763

    
2764
  def ExpandNames(self):
2765
    self._ExpandAndLockInstance()
2766
    self.needed_locks[locking.LEVEL_NODE] = []
2767
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2768

    
2769
  def DeclareLocks(self, level):
2770
    if level == locking.LEVEL_NODE:
2771
      self._LockInstancesNodes()
2772

    
2773
  def BuildHooksEnv(self):
2774
    """Build hooks env.
2775

2776
    This runs on master, primary and secondary nodes of the instance.
2777

2778
    """
2779
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2780
    nl = [self.cfg.GetMasterNode()]
2781
    return env, nl, nl
2782

    
2783
  def CheckPrereq(self):
2784
    """Check prerequisites.
2785

2786
    This checks that the instance is in the cluster.
2787

2788
    """
2789
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2790
    assert self.instance is not None, \
2791
      "Cannot retrieve locked instance %s" % self.op.instance_name
2792

    
2793
  def Exec(self, feedback_fn):
2794
    """Remove the instance.
2795

2796
    """
2797
    instance = self.instance
2798
    logging.info("Shutting down instance %s on node %s",
2799
                 instance.name, instance.primary_node)
2800

    
2801
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2802
      if self.op.ignore_failures:
2803
        feedback_fn("Warning: can't shutdown instance")
2804
      else:
2805
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2806
                                 (instance.name, instance.primary_node))
2807

    
2808
    logging.info("Removing block devices for instance %s", instance.name)
2809

    
2810
    if not _RemoveDisks(self, instance):
2811
      if self.op.ignore_failures:
2812
        feedback_fn("Warning: can't remove instance's disks")
2813
      else:
2814
        raise errors.OpExecError("Can't remove instance's disks")
2815

    
2816
    logging.info("Removing instance %s out of cluster config", instance.name)
2817

    
2818
    self.cfg.RemoveInstance(instance.name)
2819
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2820

    
2821

    
2822
class LUQueryInstances(NoHooksLU):
2823
  """Logical unit for querying instances.
2824

2825
  """
2826
  _OP_REQP = ["output_fields", "names"]
2827
  REQ_BGL = False
2828
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2829
                                    "admin_state", "admin_ram",
2830
                                    "disk_template", "ip", "mac", "bridge",
2831
                                    "sda_size", "sdb_size", "vcpus", "tags",
2832
                                    "network_port", "beparams",
2833
                                    "(disk).(size)/([0-9]+)",
2834
                                    "(disk).(sizes)",
2835
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2836
                                    "(nic).(macs|ips|bridges)",
2837
                                    "(disk|nic).(count)",
2838
                                    "serial_no", "hypervisor", "hvparams",] +
2839
                                  ["hv/%s" % name
2840
                                   for name in constants.HVS_PARAMETERS] +
2841
                                  ["be/%s" % name
2842
                                   for name in constants.BES_PARAMETERS])
2843
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2844

    
2845

    
2846
  def ExpandNames(self):
2847
    _CheckOutputFields(static=self._FIELDS_STATIC,
2848
                       dynamic=self._FIELDS_DYNAMIC,
2849
                       selected=self.op.output_fields)
2850

    
2851
    self.needed_locks = {}
2852
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2853
    self.share_locks[locking.LEVEL_NODE] = 1
2854

    
2855
    if self.op.names:
2856
      self.wanted = _GetWantedInstances(self, self.op.names)
2857
    else:
2858
      self.wanted = locking.ALL_SET
2859

    
2860
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2861
    if self.do_locking:
2862
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2863
      self.needed_locks[locking.LEVEL_NODE] = []
2864
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2865

    
2866
  def DeclareLocks(self, level):
2867
    if level == locking.LEVEL_NODE and self.do_locking:
2868
      self._LockInstancesNodes()
2869

    
2870
  def CheckPrereq(self):
2871
    """Check prerequisites.
2872

2873
    """
2874
    pass
2875

    
2876
  def Exec(self, feedback_fn):
2877
    """Computes the list of nodes and their attributes.
2878

2879
    """
2880
    all_info = self.cfg.GetAllInstancesInfo()
2881
    if self.do_locking:
2882
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2883
    elif self.wanted != locking.ALL_SET:
2884
      instance_names = self.wanted
2885
      missing = set(instance_names).difference(all_info.keys())
2886
      if missing:
2887
        raise errors.OpExecError(
2888
          "Some instances were removed before retrieving their data: %s"
2889
          % missing)
2890
    else:
2891
      instance_names = all_info.keys()
2892

    
2893
    instance_names = utils.NiceSort(instance_names)
2894
    instance_list = [all_info[iname] for iname in instance_names]
2895

    
2896
    # begin data gathering
2897

    
2898
    nodes = frozenset([inst.primary_node for inst in instance_list])
2899
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2900

    
2901
    bad_nodes = []
2902
    if self.do_locking:
2903
      live_data = {}
2904
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2905
      for name in nodes:
2906
        result = node_data[name]
2907
        if result:
2908
          live_data.update(result)
2909
        elif result == False:
2910
          bad_nodes.append(name)
2911
        # else no instance is alive
2912
    else:
2913
      live_data = dict([(name, {}) for name in instance_names])
2914

    
2915
    # end data gathering
2916

    
2917
    HVPREFIX = "hv/"
2918
    BEPREFIX = "be/"
2919
    output = []
2920
    for instance in instance_list:
2921
      iout = []
2922
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2923
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2924
      for field in self.op.output_fields:
2925
        st_match = self._FIELDS_STATIC.Matches(field)
2926
        if field == "name":
2927
          val = instance.name
2928
        elif field == "os":
2929
          val = instance.os
2930
        elif field == "pnode":
2931
          val = instance.primary_node
2932
        elif field == "snodes":
2933
          val = list(instance.secondary_nodes)
2934
        elif field == "admin_state":
2935
          val = (instance.status != "down")
2936
        elif field == "oper_state":
2937
          if instance.primary_node in bad_nodes:
2938
            val = None
2939
          else:
2940
            val = bool(live_data.get(instance.name))
2941
        elif field == "status":
2942
          if instance.primary_node in bad_nodes:
2943
            val = "ERROR_nodedown"
2944
          else:
2945
            running = bool(live_data.get(instance.name))
2946
            if running:
2947
              if instance.status != "down":
2948
                val = "running"
2949
              else:
2950
                val = "ERROR_up"
2951
            else:
2952
              if instance.status != "down":
2953
                val = "ERROR_down"
2954
              else:
2955
                val = "ADMIN_down"
2956
        elif field == "oper_ram":
2957
          if instance.primary_node in bad_nodes:
2958
            val = None
2959
          elif instance.name in live_data:
2960
            val = live_data[instance.name].get("memory", "?")
2961
          else:
2962
            val = "-"
2963
        elif field == "disk_template":
2964
          val = instance.disk_template
2965
        elif field == "ip":
2966
          val = instance.nics[0].ip
2967
        elif field == "bridge":
2968
          val = instance.nics[0].bridge
2969
        elif field == "mac":
2970
          val = instance.nics[0].mac
2971
        elif field == "sda_size" or field == "sdb_size":
2972
          idx = ord(field[2]) - ord('a')
2973
          try:
2974
            val = instance.FindDisk(idx).size
2975
          except errors.OpPrereqError:
2976
            val = None
2977
        elif field == "tags":
2978
          val = list(instance.GetTags())
2979
        elif field == "serial_no":
2980
          val = instance.serial_no
2981
        elif field == "network_port":
2982
          val = instance.network_port
2983
        elif field == "hypervisor":
2984
          val = instance.hypervisor
2985
        elif field == "hvparams":
2986
          val = i_hv
2987
        elif (field.startswith(HVPREFIX) and
2988
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2989
          val = i_hv.get(field[len(HVPREFIX):], None)
2990
        elif field == "beparams":
2991
          val = i_be
2992
        elif (field.startswith(BEPREFIX) and
2993
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2994
          val = i_be.get(field[len(BEPREFIX):], None)
2995
        elif st_match and st_match.groups():
2996
          # matches a variable list
2997
          st_groups = st_match.groups()
2998
          if st_groups and st_groups[0] == "disk":
2999
            if st_groups[1] == "count":
3000
              val = len(instance.disks)
3001
            elif st_groups[1] == "sizes":
3002
              val = [disk.size for disk in instance.disks]
3003
            elif st_groups[1] == "size":
3004
              try:
3005
                val = instance.FindDisk(st_groups[2]).size
3006
              except errors.OpPrereqError:
3007
                val = None
3008
            else:
3009
              assert False, "Unhandled disk parameter"
3010
          elif st_groups[0] == "nic":
3011
            if st_groups[1] == "count":
3012
              val = len(instance.nics)
3013
            elif st_groups[1] == "macs":
3014
              val = [nic.mac for nic in instance.nics]
3015
            elif st_groups[1] == "ips":
3016
              val = [nic.ip for nic in instance.nics]
3017
            elif st_groups[1] == "bridges":
3018
              val = [nic.bridge for nic in instance.nics]
3019
            else:
3020
              # index-based item
3021
              nic_idx = int(st_groups[2])
3022
              if nic_idx >= len(instance.nics):
3023
                val = None
3024
              else:
3025
                if st_groups[1] == "mac":
3026
                  val = instance.nics[nic_idx].mac
3027
                elif st_groups[1] == "ip":
3028
                  val = instance.nics[nic_idx].ip
3029
                elif st_groups[1] == "bridge":
3030
                  val = instance.nics[nic_idx].bridge
3031
                else:
3032
                  assert False, "Unhandled NIC parameter"
3033
          else:
3034
            assert False, "Unhandled variable parameter"
3035
        else:
3036
          raise errors.ParameterError(field)
3037
        iout.append(val)
3038
      output.append(iout)
3039

    
3040
    return output
3041

    
3042

    
3043
class LUFailoverInstance(LogicalUnit):
3044
  """Failover an instance.
3045

3046
  """
3047
  HPATH = "instance-failover"
3048
  HTYPE = constants.HTYPE_INSTANCE
3049
  _OP_REQP = ["instance_name", "ignore_consistency"]
3050
  REQ_BGL = False
3051

    
3052
  def ExpandNames(self):
3053
    self._ExpandAndLockInstance()
3054
    self.needed_locks[locking.LEVEL_NODE] = []
3055
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3056

    
3057
  def DeclareLocks(self, level):
3058
    if level == locking.LEVEL_NODE:
3059
      self._LockInstancesNodes()
3060

    
3061
  def BuildHooksEnv(self):
3062
    """Build hooks env.
3063

3064
    This runs on master, primary and secondary nodes of the instance.
3065

3066
    """
3067
    env = {
3068
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3069
      }
3070
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3071
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3072
    return env, nl, nl
3073

    
3074
  def CheckPrereq(self):
3075
    """Check prerequisites.
3076

3077
    This checks that the instance is in the cluster.
3078

3079
    """
3080
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3081
    assert self.instance is not None, \
3082
      "Cannot retrieve locked instance %s" % self.op.instance_name
3083

    
3084
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3085
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3086
      raise errors.OpPrereqError("Instance's disk layout is not"
3087
                                 " network mirrored, cannot failover.")
3088

    
3089
    secondary_nodes = instance.secondary_nodes
3090
    if not secondary_nodes:
3091
      raise errors.ProgrammerError("no secondary node but using "
3092
                                   "a mirrored disk template")
3093

    
3094
    target_node = secondary_nodes[0]
3095
    # check memory requirements on the secondary node
3096
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3097
                         instance.name, bep[constants.BE_MEMORY],
3098
                         instance.hypervisor)
3099

    
3100
    # check bridge existance
3101
    brlist = [nic.bridge for nic in instance.nics]
3102
    if not self.rpc.call_bridges_exist(target_node, brlist):
3103
      raise errors.OpPrereqError("One or more target bridges %s does not"
3104
                                 " exist on destination node '%s'" %
3105
                                 (brlist, target_node))
3106

    
3107
  def Exec(self, feedback_fn):
3108
    """Failover an instance.
3109

3110
    The failover is done by shutting it down on its present node and
3111
    starting it on the secondary.
3112

3113
    """
3114
    instance = self.instance
3115

    
3116
    source_node = instance.primary_node
3117
    target_node = instance.secondary_nodes[0]
3118

    
3119
    feedback_fn("* checking disk consistency between source and target")
3120
    for dev in instance.disks:
3121
      # for drbd, these are drbd over lvm
3122
      if not _CheckDiskConsistency(self, dev, target_node, False):
3123
        if instance.status == "up" and not self.op.ignore_consistency:
3124
          raise errors.OpExecError("Disk %s is degraded on target node,"
3125
                                   " aborting failover." % dev.iv_name)
3126

    
3127
    feedback_fn("* shutting down instance on source node")
3128
    logging.info("Shutting down instance %s on node %s",
3129
                 instance.name, source_node)
3130

    
3131
    if not self.rpc.call_instance_shutdown(source_node, instance):
3132
      if self.op.ignore_consistency:
3133
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3134
                             " Proceeding"
3135
                             " anyway. Please make sure node %s is down",
3136
                             instance.name, source_node, source_node)
3137
      else:
3138
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3139
                                 (instance.name, source_node))
3140

    
3141
    feedback_fn("* deactivating the instance's disks on source node")
3142
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3143
      raise errors.OpExecError("Can't shut down the instance's disks.")
3144

    
3145
    instance.primary_node = target_node
3146
    # distribute new instance config to the other nodes
3147
    self.cfg.Update(instance)
3148

    
3149
    # Only start the instance if it's marked as up
3150
    if instance.status == "up":
3151
      feedback_fn("* activating the instance's disks on target node")
3152
      logging.info("Starting instance %s on node %s",
3153
                   instance.name, target_node)
3154

    
3155
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3156
                                               ignore_secondaries=True)
3157
      if not disks_ok:
3158
        _ShutdownInstanceDisks(self, instance)
3159
        raise errors.OpExecError("Can't activate the instance's disks")
3160

    
3161
      feedback_fn("* starting the instance on the target node")
3162
      if not self.rpc.call_instance_start(target_node, instance, None):
3163
        _ShutdownInstanceDisks(self, instance)
3164
        raise errors.OpExecError("Could not start instance %s on node %s." %
3165
                                 (instance.name, target_node))
3166

    
3167

    
3168
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3169
  """Create a tree of block devices on the primary node.
3170

3171
  This always creates all devices.
3172

3173
  """
3174
  if device.children:
3175
    for child in device.children:
3176
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3177
        return False
3178

    
3179
  lu.cfg.SetDiskID(device, node)
3180
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3181
                                       instance.name, True, info)
3182
  if not new_id:
3183
    return False
3184
  if device.physical_id is None:
3185
    device.physical_id = new_id
3186
  return True
3187

    
3188

    
3189
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3190
  """Create a tree of block devices on a secondary node.
3191

3192
  If this device type has to be created on secondaries, create it and
3193
  all its children.
3194

3195
  If not, just recurse to children keeping the same 'force' value.
3196

3197
  """
3198
  if device.CreateOnSecondary():
3199
    force = True
3200
  if device.children:
3201
    for child in device.children:
3202
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3203
                                        child, force, info):
3204
        return False
3205

    
3206
  if not force:
3207
    return True
3208
  lu.cfg.SetDiskID(device, node)
3209
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3210
                                       instance.name, False, info)
3211
  if not new_id:
3212
    return False
3213
  if device.physical_id is None:
3214
    device.physical_id = new_id
3215
  return True
3216

    
3217

    
3218
def _GenerateUniqueNames(lu, exts):
3219
  """Generate a suitable LV name.
3220

3221
  This will generate a logical volume name for the given instance.
3222

3223
  """
3224
  results = []
3225
  for val in exts:
3226
    new_id = lu.cfg.GenerateUniqueID()
3227
    results.append("%s%s" % (new_id, val))
3228
  return results
3229

    
3230

    
3231
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3232
                         p_minor, s_minor):
3233
  """Generate a drbd8 device complete with its children.
3234

3235
  """
3236
  port = lu.cfg.AllocatePort()
3237
  vgname = lu.cfg.GetVGName()
3238
  shared_secret = lu.cfg.GenerateDRBDSecret()
3239
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3240
                          logical_id=(vgname, names[0]))
3241
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3242
                          logical_id=(vgname, names[1]))
3243
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3244
                          logical_id=(primary, secondary, port,
3245
                                      p_minor, s_minor,
3246
                                      shared_secret),
3247
                          children=[dev_data, dev_meta],
3248
                          iv_name=iv_name)
3249
  return drbd_dev
3250

    
3251

    
3252
def _GenerateDiskTemplate(lu, template_name,
3253
                          instance_name, primary_node,
3254
                          secondary_nodes, disk_info,
3255
                          file_storage_dir, file_driver,
3256
                          base_index):
3257
  """Generate the entire disk layout for a given template type.
3258

3259
  """
3260
  #TODO: compute space requirements
3261

    
3262
  vgname = lu.cfg.GetVGName()
3263
  disk_count = len(disk_info)
3264
  disks = []
3265
  if template_name == constants.DT_DISKLESS:
3266
    pass
3267
  elif template_name == constants.DT_PLAIN:
3268
    if len(secondary_nodes) != 0:
3269
      raise errors.ProgrammerError("Wrong template configuration")
3270

    
3271
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3272
                                      for i in range(disk_count)])
3273
    for idx, disk in enumerate(disk_info):
3274
      disk_index = idx + base_index
3275
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3276
                              logical_id=(vgname, names[idx]),
3277
                              iv_name="disk/%d" % disk_index)
3278
      disks.append(disk_dev)
3279
  elif template_name == constants.DT_DRBD8:
3280
    if len(secondary_nodes) != 1:
3281
      raise errors.ProgrammerError("Wrong template configuration")
3282
    remote_node = secondary_nodes[0]
3283
    minors = lu.cfg.AllocateDRBDMinor(
3284
      [primary_node, remote_node] * len(disk_info), instance_name)
3285

    
3286
    names = _GenerateUniqueNames(lu,
3287
                                 [".disk%d_%s" % (i, s)
3288
                                  for i in range(disk_count)
3289
                                  for s in ("data", "meta")
3290
                                  ])
3291
    for idx, disk in enumerate(disk_info):
3292
      disk_index = idx + base_index
3293
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3294
                                      disk["size"], names[idx*2:idx*2+2],
3295
                                      "disk/%d" % disk_index,
3296
                                      minors[idx*2], minors[idx*2+1])
3297
      disks.append(disk_dev)
3298
  elif template_name == constants.DT_FILE:
3299
    if len(secondary_nodes) != 0:
3300
      raise errors.ProgrammerError("Wrong template configuration")
3301

    
3302
    for idx, disk in enumerate(disk_info):
3303
      disk_index = idx + base_index
3304
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3305
                              iv_name="disk/%d" % disk_index,
3306
                              logical_id=(file_driver,
3307
                                          "%s/disk%d" % (file_storage_dir,
3308
                                                         idx)))
3309
      disks.append(disk_dev)
3310
  else:
3311
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3312
  return disks
3313

    
3314

    
3315
def _GetInstanceInfoText(instance):
3316
  """Compute that text that should be added to the disk's metadata.
3317

3318
  """
3319
  return "originstname+%s" % instance.name
3320

    
3321

    
3322
def _CreateDisks(lu, instance):
3323
  """Create all disks for an instance.
3324

3325
  This abstracts away some work from AddInstance.
3326

3327
  @type lu: L{LogicalUnit}
3328
  @param lu: the logical unit on whose behalf we execute
3329
  @type instance: L{objects.Instance}
3330
  @param instance: the instance whose disks we should create
3331
  @rtype: boolean
3332
  @return: the success of the creation
3333

3334
  """
3335
  info = _GetInstanceInfoText(instance)
3336

    
3337
  if instance.disk_template == constants.DT_FILE:
3338
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3339
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3340
                                                 file_storage_dir)
3341

    
3342
    if not result:
3343
      logging.error("Could not connect to node '%s'", instance.primary_node)
3344
      return False
3345

    
3346
    if not result[0]:
3347
      logging.error("Failed to create directory '%s'", file_storage_dir)
3348
      return False
3349

    
3350
  # Note: this needs to be kept in sync with adding of disks in
3351
  # LUSetInstanceParams
3352
  for device in instance.disks:
3353
    logging.info("Creating volume %s for instance %s",
3354
                 device.iv_name, instance.name)
3355
    #HARDCODE
3356
    for secondary_node in instance.secondary_nodes:
3357
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3358
                                        device, False, info):
3359
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3360
                      device.iv_name, device, secondary_node)
3361
        return False
3362
    #HARDCODE
3363
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3364
                                    instance, device, info):
3365
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3366
      return False
3367

    
3368
  return True
3369

    
3370

    
3371
def _RemoveDisks(lu, instance):
3372
  """Remove all disks for an instance.
3373

3374
  This abstracts away some work from `AddInstance()` and
3375
  `RemoveInstance()`. Note that in case some of the devices couldn't
3376
  be removed, the removal will continue with the other ones (compare
3377
  with `_CreateDisks()`).
3378

3379
  @type lu: L{LogicalUnit}
3380
  @param lu: the logical unit on whose behalf we execute
3381
  @type instance: L{objects.Instance}
3382
  @param instance: the instance whose disks we should remove
3383
  @rtype: boolean
3384
  @return: the success of the removal
3385

3386
  """
3387
  logging.info("Removing block devices for instance %s", instance.name)
3388

    
3389
  result = True
3390
  for device in instance.disks:
3391
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3392
      lu.cfg.SetDiskID(disk, node)
3393
      if not lu.rpc.call_blockdev_remove(node, disk):
3394
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3395
                           " continuing anyway", device.iv_name, node)
3396
        result = False
3397

    
3398
  if instance.disk_template == constants.DT_FILE:
3399
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3400
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3401
                                               file_storage_dir):
3402
      logging.error("Could not remove directory '%s'", file_storage_dir)
3403
      result = False
3404

    
3405
  return result
3406

    
3407

    
3408
def _ComputeDiskSize(disk_template, disks):
3409
  """Compute disk size requirements in the volume group
3410

3411
  """
3412
  # Required free disk space as a function of disk and swap space
3413
  req_size_dict = {
3414
    constants.DT_DISKLESS: None,
3415
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3416
    # 128 MB are added for drbd metadata for each disk
3417
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3418
    constants.DT_FILE: None,
3419
  }
3420

    
3421
  if disk_template not in req_size_dict:
3422
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3423
                                 " is unknown" %  disk_template)
3424

    
3425
  return req_size_dict[disk_template]
3426

    
3427

    
3428
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3429
  """Hypervisor parameter validation.
3430

3431
  This function abstract the hypervisor parameter validation to be
3432
  used in both instance create and instance modify.
3433

3434
  @type lu: L{LogicalUnit}
3435
  @param lu: the logical unit for which we check
3436
  @type nodenames: list
3437
  @param nodenames: the list of nodes on which we should check
3438
  @type hvname: string
3439
  @param hvname: the name of the hypervisor we should use
3440
  @type hvparams: dict
3441
  @param hvparams: the parameters which we need to check
3442
  @raise errors.OpPrereqError: if the parameters are not valid
3443

3444
  """
3445
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3446
                                                  hvname,
3447
                                                  hvparams)
3448
  for node in nodenames:
3449
    info = hvinfo.get(node, None)
3450
    if not info or not isinstance(info, (tuple, list)):
3451
      raise errors.OpPrereqError("Cannot get current information"
3452
                                 " from node '%s' (%s)" % (node, info))
3453
    if not info[0]:
3454
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3455
                                 " %s" % info[1])
3456

    
3457

    
3458
class LUCreateInstance(LogicalUnit):
3459
  """Create an instance.
3460

3461
  """
3462
  HPATH = "instance-add"
3463
  HTYPE = constants.HTYPE_INSTANCE
3464
  _OP_REQP = ["instance_name", "disks", "disk_template",
3465
              "mode", "start",
3466
              "wait_for_sync", "ip_check", "nics",
3467
              "hvparams", "beparams"]
3468
  REQ_BGL = False
3469

    
3470
  def _ExpandNode(self, node):
3471
    """Expands and checks one node name.
3472

3473
    """
3474
    node_full = self.cfg.ExpandNodeName(node)
3475
    if node_full is None:
3476
      raise errors.OpPrereqError("Unknown node %s" % node)
3477
    return node_full
3478

    
3479
  def ExpandNames(self):
3480
    """ExpandNames for CreateInstance.
3481

3482
    Figure out the right locks for instance creation.
3483

3484
    """
3485
    self.needed_locks = {}
3486

    
3487
    # set optional parameters to none if they don't exist
3488
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3489
      if not hasattr(self.op, attr):
3490
        setattr(self.op, attr, None)
3491

    
3492
    # cheap checks, mostly valid constants given
3493

    
3494
    # verify creation mode
3495
    if self.op.mode not in (constants.INSTANCE_CREATE,
3496
                            constants.INSTANCE_IMPORT):
3497
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3498
                                 self.op.mode)
3499

    
3500
    # disk template and mirror node verification
3501
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3502
      raise errors.OpPrereqError("Invalid disk template name")
3503

    
3504
    if self.op.hypervisor is None:
3505
      self.op.hypervisor = self.cfg.GetHypervisorType()
3506

    
3507
    cluster = self.cfg.GetClusterInfo()
3508
    enabled_hvs = cluster.enabled_hypervisors
3509
    if self.op.hypervisor not in enabled_hvs:
3510
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3511
                                 " cluster (%s)" % (self.op.hypervisor,
3512
                                  ",".join(enabled_hvs)))
3513

    
3514
    # check hypervisor parameter syntax (locally)
3515

    
3516
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3517
                                  self.op.hvparams)
3518
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3519
    hv_type.CheckParameterSyntax(filled_hvp)
3520

    
3521
    # fill and remember the beparams dict
3522
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3523
                                    self.op.beparams)
3524

    
3525
    #### instance parameters check
3526

    
3527
    # instance name verification
3528
    hostname1 = utils.HostInfo(self.op.instance_name)
3529
    self.op.instance_name = instance_name = hostname1.name
3530

    
3531
    # this is just a preventive check, but someone might still add this
3532
    # instance in the meantime, and creation will fail at lock-add time
3533
    if instance_name in self.cfg.GetInstanceList():
3534
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3535
                                 instance_name)
3536

    
3537
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3538

    
3539
    # NIC buildup
3540
    self.nics = []
3541
    for nic in self.op.nics:
3542
      # ip validity checks
3543
      ip = nic.get("ip", None)
3544
      if ip is None or ip.lower() == "none":
3545
        nic_ip = None
3546
      elif ip.lower() == constants.VALUE_AUTO:
3547
        nic_ip = hostname1.ip
3548
      else:
3549
        if not utils.IsValidIP(ip):
3550
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3551
                                     " like a valid IP" % ip)
3552
        nic_ip = ip
3553

    
3554
      # MAC address verification
3555
      mac = nic.get("mac", constants.VALUE_AUTO)
3556
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3557
        if not utils.IsValidMac(mac.lower()):
3558
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3559
                                     mac)
3560
      # bridge verification
3561
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3562
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3563

    
3564
    # disk checks/pre-build
3565
    self.disks = []
3566
    for disk in self.op.disks:
3567
      mode = disk.get("mode", constants.DISK_RDWR)
3568
      if mode not in constants.DISK_ACCESS_SET:
3569
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3570
                                   mode)
3571
      size = disk.get("size", None)
3572
      if size is None:
3573
        raise errors.OpPrereqError("Missing disk size")
3574
      try:
3575
        size = int(size)
3576
      except ValueError:
3577
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3578
      self.disks.append({"size": size, "mode": mode})
3579

    
3580
    # used in CheckPrereq for ip ping check
3581
    self.check_ip = hostname1.ip
3582

    
3583
    # file storage checks
3584
    if (self.op.file_driver and
3585
        not self.op.file_driver in constants.FILE_DRIVER):
3586
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3587
                                 self.op.file_driver)
3588

    
3589
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3590
      raise errors.OpPrereqError("File storage directory path not absolute")
3591

    
3592
    ### Node/iallocator related checks
3593
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3594
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3595
                                 " node must be given")
3596

    
3597
    if self.op.iallocator:
3598
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3599
    else:
3600
      self.op.pnode = self._ExpandNode(self.op.pnode)
3601
      nodelist = [self.op.pnode]
3602
      if self.op.snode is not None:
3603
        self.op.snode = self._ExpandNode(self.op.snode)
3604
        nodelist.append(self.op.snode)
3605
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3606

    
3607
    # in case of import lock the source node too
3608
    if self.op.mode == constants.INSTANCE_IMPORT:
3609
      src_node = getattr(self.op, "src_node", None)
3610
      src_path = getattr(self.op, "src_path", None)
3611

    
3612
      if src_node is None or src_path is None:
3613
        raise errors.OpPrereqError("Importing an instance requires source"
3614
                                   " node and path options")
3615

    
3616
      if not os.path.isabs(src_path):
3617
        raise errors.OpPrereqError("The source path must be absolute")
3618

    
3619
      self.op.src_node = src_node = self._ExpandNode(src_node)
3620
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3621
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3622

    
3623
    else: # INSTANCE_CREATE
3624
      if getattr(self.op, "os_type", None) is None:
3625
        raise errors.OpPrereqError("No guest OS specified")
3626

    
3627
  def _RunAllocator(self):
3628
    """Run the allocator based on input opcode.
3629

3630
    """
3631
    nics = [n.ToDict() for n in self.nics]
3632
    ial = IAllocator(self,
3633
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3634
                     name=self.op.instance_name,
3635
                     disk_template=self.op.disk_template,
3636
                     tags=[],
3637
                     os=self.op.os_type,
3638
                     vcpus=self.be_full[constants.BE_VCPUS],
3639
                     mem_size=self.be_full[constants.BE_MEMORY],
3640
                     disks=self.disks,
3641
                     nics=nics,
3642
                     hypervisor=self.op.hypervisor,
3643
                     )
3644

    
3645
    ial.Run(self.op.iallocator)
3646

    
3647
    if not ial.success:
3648
      raise errors.OpPrereqError("Can't compute nodes using"
3649
                                 " iallocator '%s': %s" % (self.op.iallocator,
3650
                                                           ial.info))
3651
    if len(ial.nodes) != ial.required_nodes:
3652
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3653
                                 " of nodes (%s), required %s" %
3654
                                 (self.op.iallocator, len(ial.nodes),
3655
                                  ial.required_nodes))
3656
    self.op.pnode = ial.nodes[0]
3657
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3658
                 self.op.instance_name, self.op.iallocator,
3659
                 ", ".join(ial.nodes))
3660
    if ial.required_nodes == 2:
3661
      self.op.snode = ial.nodes[1]
3662

    
3663
  def BuildHooksEnv(self):
3664
    """Build hooks env.
3665

3666
    This runs on master, primary and secondary nodes of the instance.
3667

3668
    """
3669
    env = {
3670
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3671
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3672
      "INSTANCE_ADD_MODE": self.op.mode,
3673
      }
3674
    if self.op.mode == constants.INSTANCE_IMPORT:
3675
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3676
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3677
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3678

    
3679
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3680
      primary_node=self.op.pnode,
3681
      secondary_nodes=self.secondaries,
3682
      status=self.instance_status,
3683
      os_type=self.op.os_type,
3684
      memory=self.be_full[constants.BE_MEMORY],
3685
      vcpus=self.be_full[constants.BE_VCPUS],
3686
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3687
    ))
3688

    
3689
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3690
          self.secondaries)
3691
    return env, nl, nl
3692

    
3693

    
3694
  def CheckPrereq(self):
3695
    """Check prerequisites.
3696

3697
    """
3698
    if (not self.cfg.GetVGName() and
3699
        self.op.disk_template not in constants.DTS_NOT_LVM):
3700
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3701
                                 " instances")
3702

    
3703

    
3704
    if self.op.mode == constants.INSTANCE_IMPORT:
3705
      src_node = self.op.src_node
3706
      src_path = self.op.src_path
3707

    
3708
      export_info = self.rpc.call_export_info(src_node, src_path)
3709

    
3710
      if not export_info:
3711
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3712

    
3713
      if not export_info.has_section(constants.INISECT_EXP):
3714
        raise errors.ProgrammerError("Corrupted export config")
3715

    
3716
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3717
      if (int(ei_version) != constants.EXPORT_VERSION):
3718
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3719
                                   (ei_version, constants.EXPORT_VERSION))
3720

    
3721
      # Check that the new instance doesn't have less disks than the export
3722
      instance_disks = len(self.disks)
3723
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3724
      if instance_disks < export_disks:
3725
        raise errors.OpPrereqError("Not enough disks to import."
3726
                                   " (instance: %d, export: %d)" %
3727
                                   (instance_disks, export_disks))
3728

    
3729
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3730
      disk_images = []
3731
      for idx in range(export_disks):
3732
        option = 'disk%d_dump' % idx
3733
        if export_info.has_option(constants.INISECT_INS, option):
3734
          # FIXME: are the old os-es, disk sizes, etc. useful?
3735
          export_name = export_info.get(constants.INISECT_INS, option)
3736
          image = os.path.join(src_path, export_name)
3737
          disk_images.append(image)
3738
        else:
3739
          disk_images.append(False)
3740

    
3741
      self.src_images = disk_images
3742

    
3743
      old_name = export_info.get(constants.INISECT_INS, 'name')
3744
      # FIXME: int() here could throw a ValueError on broken exports
3745
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3746
      if self.op.instance_name == old_name:
3747
        for idx, nic in enumerate(self.nics):
3748
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3749
            nic_mac_ini = 'nic%d_mac' % idx
3750
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3751

    
3752
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3753
    if self.op.start and not self.op.ip_check:
3754
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3755
                                 " adding an instance in start mode")
3756

    
3757
    if self.op.ip_check:
3758
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3759
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3760
                                   (self.check_ip, self.op.instance_name))
3761

    
3762
    #### allocator run
3763

    
3764
    if self.op.iallocator is not None:
3765
      self._RunAllocator()
3766

    
3767
    #### node related checks
3768

    
3769
    # check primary node
3770
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3771
    assert self.pnode is not None, \
3772
      "Cannot retrieve locked node %s" % self.op.pnode
3773
    self.secondaries = []
3774

    
3775
    # mirror node verification
3776
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3777
      if self.op.snode is None:
3778
        raise errors.OpPrereqError("The networked disk templates need"
3779
                                   " a mirror node")
3780
      if self.op.snode == pnode.name:
3781
        raise errors.OpPrereqError("The secondary node cannot be"
3782
                                   " the primary node.")
3783
      self.secondaries.append(self.op.snode)
3784

    
3785
    nodenames = [pnode.name] + self.secondaries
3786

    
3787
    req_size = _ComputeDiskSize(self.op.disk_template,
3788
                                self.disks)
3789

    
3790
    # Check lv size requirements
3791
    if req_size is not None:
3792
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3793
                                         self.op.hypervisor)
3794
      for node in nodenames:
3795
        info = nodeinfo.get(node, None)
3796
        if not info:
3797
          raise errors.OpPrereqError("Cannot get current information"
3798
                                     " from node '%s'" % node)
3799
        vg_free = info.get('vg_free', None)
3800
        if not isinstance(vg_free, int):
3801
          raise errors.OpPrereqError("Can't compute free disk space on"
3802
                                     " node %s" % node)
3803
        if req_size > info['vg_free']:
3804
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3805
                                     " %d MB available, %d MB required" %
3806
                                     (node, info['vg_free'], req_size))
3807

    
3808
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3809

    
3810
    # os verification
3811
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3812
    if not os_obj:
3813
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3814
                                 " primary node"  % self.op.os_type)
3815

    
3816
    # bridge check on primary node
3817
    bridges = [n.bridge for n in self.nics]
3818
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3819
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3820
                                 " exist on"
3821
                                 " destination node '%s'" %
3822
                                 (",".join(bridges), pnode.name))
3823

    
3824
    # memory check on primary node
3825
    if self.op.start:
3826
      _CheckNodeFreeMemory(self, self.pnode.name,
3827
                           "creating instance %s" % self.op.instance_name,
3828
                           self.be_full[constants.BE_MEMORY],
3829
                           self.op.hypervisor)
3830

    
3831
    if self.op.start:
3832
      self.instance_status = 'up'
3833
    else:
3834
      self.instance_status = 'down'
3835

    
3836
  def Exec(self, feedback_fn):
3837
    """Create and add the instance to the cluster.
3838

3839
    """
3840
    instance = self.op.instance_name
3841
    pnode_name = self.pnode.name
3842

    
3843
    for nic in self.nics:
3844
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3845
        nic.mac = self.cfg.GenerateMAC()
3846

    
3847
    ht_kind = self.op.hypervisor
3848
    if ht_kind in constants.HTS_REQ_PORT:
3849
      network_port = self.cfg.AllocatePort()
3850
    else:
3851
      network_port = None
3852

    
3853
    ##if self.op.vnc_bind_address is None:
3854
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3855

    
3856
    # this is needed because os.path.join does not accept None arguments
3857
    if self.op.file_storage_dir is None:
3858
      string_file_storage_dir = ""
3859
    else:
3860
      string_file_storage_dir = self.op.file_storage_dir
3861

    
3862
    # build the full file storage dir path
3863
    file_storage_dir = os.path.normpath(os.path.join(
3864
                                        self.cfg.GetFileStorageDir(),
3865
                                        string_file_storage_dir, instance))
3866

    
3867

    
3868
    disks = _GenerateDiskTemplate(self,
3869
                                  self.op.disk_template,
3870
                                  instance, pnode_name,
3871
                                  self.secondaries,
3872
                                  self.disks,
3873
                                  file_storage_dir,
3874
                                  self.op.file_driver,
3875
                                  0)
3876

    
3877
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3878
                            primary_node=pnode_name,
3879
                            nics=self.nics, disks=disks,
3880
                            disk_template=self.op.disk_template,
3881
                            status=self.instance_status,
3882
                            network_port=network_port,
3883
                            beparams=self.op.beparams,
3884
                            hvparams=self.op.hvparams,
3885
                            hypervisor=self.op.hypervisor,
3886
                            )
3887

    
3888
    feedback_fn("* creating instance disks...")
3889
    if not _CreateDisks(self, iobj):
3890
      _RemoveDisks(self, iobj)
3891
      self.cfg.ReleaseDRBDMinors(instance)
3892
      raise errors.OpExecError("Device creation failed, reverting...")
3893

    
3894
    feedback_fn("adding instance %s to cluster config" % instance)
3895

    
3896
    self.cfg.AddInstance(iobj)
3897
    # Declare that we don't want to remove the instance lock anymore, as we've
3898
    # added the instance to the config
3899
    del self.remove_locks[locking.LEVEL_INSTANCE]
3900
    # Remove the temp. assignements for the instance's drbds
3901
    self.cfg.ReleaseDRBDMinors(instance)
3902
    # Unlock all the nodes
3903
    self.context.glm.release(locking.LEVEL_NODE)
3904
    del self.acquired_locks[locking.LEVEL_NODE]
3905

    
3906
    if self.op.wait_for_sync:
3907
      disk_abort = not _WaitForSync(self, iobj)
3908
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3909
      # make sure the disks are not degraded (still sync-ing is ok)
3910
      time.sleep(15)
3911
      feedback_fn("* checking mirrors status")
3912
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3913
    else:
3914
      disk_abort = False
3915

    
3916
    if disk_abort:
3917
      _RemoveDisks(self, iobj)
3918
      self.cfg.RemoveInstance(iobj.name)
3919
      # Make sure the instance lock gets removed
3920
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3921
      raise errors.OpExecError("There are some degraded disks for"
3922
                               " this instance")
3923

    
3924
    feedback_fn("creating os for instance %s on node %s" %
3925
                (instance, pnode_name))
3926

    
3927
    if iobj.disk_template != constants.DT_DISKLESS:
3928
      if self.op.mode == constants.INSTANCE_CREATE:
3929
        feedback_fn("* running the instance OS create scripts...")
3930
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3931
          raise errors.OpExecError("could not add os for instance %s"
3932
                                   " on node %s" %
3933
                                   (instance, pnode_name))
3934

    
3935
      elif self.op.mode == constants.INSTANCE_IMPORT:
3936
        feedback_fn("* running the instance OS import scripts...")
3937
        src_node = self.op.src_node
3938
        src_images = self.src_images
3939
        cluster_name = self.cfg.GetClusterName()
3940
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3941
                                                         src_node, src_images,
3942
                                                         cluster_name)
3943
        for idx, result in enumerate(import_result):
3944
          if not result:
3945
            self.LogWarning("Could not import the image %s for instance"
3946
                            " %s, disk %d, on node %s" %
3947
                            (src_images[idx], instance, idx, pnode_name))
3948
      else:
3949
        # also checked in the prereq part
3950
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3951
                                     % self.op.mode)
3952

    
3953
    if self.op.start:
3954
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3955
      feedback_fn("* starting instance...")
3956
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3957
        raise errors.OpExecError("Could not start instance")
3958

    
3959

    
3960
class LUConnectConsole(NoHooksLU):
3961
  """Connect to an instance's console.
3962

3963
  This is somewhat special in that it returns the command line that
3964
  you need to run on the master node in order to connect to the
3965
  console.
3966

3967
  """
3968
  _OP_REQP = ["instance_name"]
3969
  REQ_BGL = False
3970

    
3971
  def ExpandNames(self):
3972
    self._ExpandAndLockInstance()
3973

    
3974
  def CheckPrereq(self):
3975
    """Check prerequisites.
3976

3977
    This checks that the instance is in the cluster.
3978

3979
    """
3980
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3981
    assert self.instance is not None, \
3982
      "Cannot retrieve locked instance %s" % self.op.instance_name
3983

    
3984
  def Exec(self, feedback_fn):
3985
    """Connect to the console of an instance
3986

3987
    """
3988
    instance = self.instance
3989
    node = instance.primary_node
3990

    
3991
    node_insts = self.rpc.call_instance_list([node],
3992
                                             [instance.hypervisor])[node]
3993
    if node_insts is False:
3994
      raise errors.OpExecError("Can't connect to node %s." % node)
3995

    
3996
    if instance.name not in node_insts:
3997
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3998

    
3999
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4000

    
4001
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4002
    console_cmd = hyper.GetShellCommandForConsole(instance)
4003

    
4004
    # build ssh cmdline
4005
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4006

    
4007

    
4008
class LUReplaceDisks(LogicalUnit):
4009
  """Replace the disks of an instance.
4010

4011
  """
4012
  HPATH = "mirrors-replace"
4013
  HTYPE = constants.HTYPE_INSTANCE
4014
  _OP_REQP = ["instance_name", "mode", "disks"]
4015
  REQ_BGL = False
4016

    
4017
  def ExpandNames(self):
4018
    self._ExpandAndLockInstance()
4019

    
4020
    if not hasattr(self.op, "remote_node"):
4021
      self.op.remote_node = None
4022

    
4023
    ia_name = getattr(self.op, "iallocator", None)
4024
    if ia_name is not None:
4025
      if self.op.remote_node is not None:
4026
        raise errors.OpPrereqError("Give either the iallocator or the new"
4027
                                   " secondary, not both")
4028
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4029
    elif self.op.remote_node is not None:
4030
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4031
      if remote_node is None:
4032
        raise errors.OpPrereqError("Node '%s' not known" %
4033
                                   self.op.remote_node)
4034
      self.op.remote_node = remote_node
4035
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4036
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4037
    else:
4038
      self.needed_locks[locking.LEVEL_NODE] = []
4039
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4040

    
4041
  def DeclareLocks(self, level):
4042
    # If we're not already locking all nodes in the set we have to declare the
4043
    # instance's primary/secondary nodes.
4044
    if (level == locking.LEVEL_NODE and
4045
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4046
      self._LockInstancesNodes()
4047

    
4048
  def _RunAllocator(self):
4049
    """Compute a new secondary node using an IAllocator.
4050

4051
    """
4052
    ial = IAllocator(self,
4053
                     mode=constants.IALLOCATOR_MODE_RELOC,
4054
                     name=self.op.instance_name,
4055
                     relocate_from=[self.sec_node])
4056

    
4057
    ial.Run(self.op.iallocator)
4058

    
4059
    if not ial.success:
4060
      raise errors.OpPrereqError("Can't compute nodes using"
4061
                                 " iallocator '%s': %s" % (self.op.iallocator,
4062
                                                           ial.info))
4063
    if len(ial.nodes) != ial.required_nodes:
4064
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4065
                                 " of nodes (%s), required %s" %
4066
                                 (len(ial.nodes), ial.required_nodes))
4067
    self.op.remote_node = ial.nodes[0]
4068
    self.LogInfo("Selected new secondary for the instance: %s",
4069
                 self.op.remote_node)
4070

    
4071
  def BuildHooksEnv(self):
4072
    """Build hooks env.
4073

4074
    This runs on the master, the primary and all the secondaries.
4075

4076
    """
4077
    env = {
4078
      "MODE": self.op.mode,
4079
      "NEW_SECONDARY": self.op.remote_node,
4080
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4081
      }
4082
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4083
    nl = [
4084
      self.cfg.GetMasterNode(),
4085
      self.instance.primary_node,
4086
      ]
4087
    if self.op.remote_node is not None:
4088
      nl.append(self.op.remote_node)
4089
    return env, nl, nl
4090

    
4091
  def CheckPrereq(self):
4092
    """Check prerequisites.
4093

4094
    This checks that the instance is in the cluster.
4095

4096
    """
4097
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4098
    assert instance is not None, \
4099
      "Cannot retrieve locked instance %s" % self.op.instance_name
4100
    self.instance = instance
4101

    
4102
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4103
      raise errors.OpPrereqError("Instance's disk layout is not"
4104
                                 " network mirrored.")
4105

    
4106
    if len(instance.secondary_nodes) != 1:
4107
      raise errors.OpPrereqError("The instance has a strange layout,"
4108
                                 " expected one secondary but found %d" %
4109
                                 len(instance.secondary_nodes))
4110

    
4111
    self.sec_node = instance.secondary_nodes[0]
4112

    
4113
    ia_name = getattr(self.op, "iallocator", None)
4114
    if ia_name is not None:
4115
      self._RunAllocator()
4116

    
4117
    remote_node = self.op.remote_node
4118
    if remote_node is not None:
4119
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4120
      assert self.remote_node_info is not None, \
4121
        "Cannot retrieve locked node %s" % remote_node
4122
    else:
4123
      self.remote_node_info = None
4124
    if remote_node == instance.primary_node:
4125
      raise errors.OpPrereqError("The specified node is the primary node of"
4126
                                 " the instance.")
4127
    elif remote_node == self.sec_node:
4128
      if self.op.mode == constants.REPLACE_DISK_SEC:
4129
        # this is for DRBD8, where we can't execute the same mode of
4130
        # replacement as for drbd7 (no different port allocated)
4131
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4132
                                   " replacement")
4133
    if instance.disk_template == constants.DT_DRBD8:
4134
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4135
          remote_node is not None):
4136
        # switch to replace secondary mode
4137
        self.op.mode = constants.REPLACE_DISK_SEC
4138

    
4139
      if self.op.mode == constants.REPLACE_DISK_ALL:
4140
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4141
                                   " secondary disk replacement, not"
4142
                                   " both at once")
4143
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4144
        if remote_node is not None:
4145
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4146
                                     " the secondary while doing a primary"
4147
                                     " node disk replacement")
4148
        self.tgt_node = instance.primary_node
4149
        self.oth_node = instance.secondary_nodes[0]
4150
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4151
        self.new_node = remote_node # this can be None, in which case
4152
                                    # we don't change the secondary
4153
        self.tgt_node = instance.secondary_nodes[0]
4154
        self.oth_node = instance.primary_node
4155
      else:
4156
        raise errors.ProgrammerError("Unhandled disk replace mode")
4157

    
4158
    if not self.op.disks:
4159
      self.op.disks = range(len(instance.disks))
4160

    
4161
    for disk_idx in self.op.disks:
4162
      instance.FindDisk(disk_idx)
4163

    
4164
  def _ExecD8DiskOnly(self, feedback_fn):
4165
    """Replace a disk on the primary or secondary for dbrd8.
4166

4167
    The algorithm for replace is quite complicated:
4168

4169
      1. for each disk to be replaced:
4170

4171
        1. create new LVs on the target node with unique names
4172
        1. detach old LVs from the drbd device
4173
        1. rename old LVs to name_replaced.<time_t>
4174
        1. rename new LVs to old LVs
4175
        1. attach the new LVs (with the old names now) to the drbd device
4176

4177
      1. wait for sync across all devices
4178

4179
      1. for each modified disk:
4180

4181
        1. remove old LVs (which have the name name_replaces.<time_t>)
4182

4183
    Failures are not very well handled.
4184

4185
    """
4186
    steps_total = 6
4187
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4188
    instance = self.instance
4189
    iv_names = {}
4190
    vgname = self.cfg.GetVGName()
4191
    # start of work
4192
    cfg = self.cfg
4193
    tgt_node = self.tgt_node
4194
    oth_node = self.oth_node
4195

    
4196
    # Step: check device activation
4197
    self.proc.LogStep(1, steps_total, "check device existence")
4198
    info("checking volume groups")
4199
    my_vg = cfg.GetVGName()
4200
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4201
    if not results:
4202
      raise errors.OpExecError("Can't list volume groups on the nodes")
4203
    for node in oth_node, tgt_node:
4204
      res = results.get(node, False)
4205
      if not res or my_vg not in res:
4206
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4207
                                 (my_vg, node))
4208
    for idx, dev in enumerate(instance.disks):
4209
      if idx not in self.op.disks:
4210
        continue
4211
      for node in tgt_node, oth_node:
4212
        info("checking disk/%d on %s" % (idx, node))
4213
        cfg.SetDiskID(dev, node)
4214
        if not self.rpc.call_blockdev_find(node, dev):
4215
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4216
                                   (idx, node))
4217

    
4218
    # Step: check other node consistency
4219
    self.proc.LogStep(2, steps_total, "check peer consistency")
4220
    for idx, dev in enumerate(instance.disks):
4221
      if idx not in self.op.disks:
4222
        continue
4223
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4224
      if not _CheckDiskConsistency(self, dev, oth_node,
4225
                                   oth_node==instance.primary_node):
4226
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4227
                                 " to replace disks on this node (%s)" %
4228
                                 (oth_node, tgt_node))
4229

    
4230
    # Step: create new storage
4231
    self.proc.LogStep(3, steps_total, "allocate new storage")
4232
    for idx, dev in enumerate(instance.disks):
4233
      if idx not in self.op.disks:
4234
        continue
4235
      size = dev.size
4236
      cfg.SetDiskID(dev, tgt_node)
4237
      lv_names = [".disk%d_%s" % (idx, suf)
4238
                  for suf in ["data", "meta"]]
4239
      names = _GenerateUniqueNames(self, lv_names)
4240
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4241
                             logical_id=(vgname, names[0]))
4242
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4243
                             logical_id=(vgname, names[1]))
4244
      new_lvs = [lv_data, lv_meta]
4245
      old_lvs = dev.children
4246
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4247
      info("creating new local storage on %s for %s" %
4248
           (tgt_node, dev.iv_name))
4249
      # since we *always* want to create this LV, we use the
4250
      # _Create...OnPrimary (which forces the creation), even if we
4251
      # are talking about the secondary node
4252
      for new_lv in new_lvs:
4253
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4254
                                        _GetInstanceInfoText(instance)):
4255
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4256
                                   " node '%s'" %
4257
                                   (new_lv.logical_id[1], tgt_node))
4258

    
4259
    # Step: for each lv, detach+rename*2+attach
4260
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4261
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4262
      info("detaching %s drbd from local storage" % dev.iv_name)
4263
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4264
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4265
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4266
      #dev.children = []
4267
      #cfg.Update(instance)
4268

    
4269
      # ok, we created the new LVs, so now we know we have the needed
4270
      # storage; as such, we proceed on the target node to rename
4271
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4272
      # using the assumption that logical_id == physical_id (which in
4273
      # turn is the unique_id on that node)
4274

    
4275
      # FIXME(iustin): use a better name for the replaced LVs
4276
      temp_suffix = int(time.time())
4277
      ren_fn = lambda d, suff: (d.physical_id[0],
4278
                                d.physical_id[1] + "_replaced-%s" % suff)
4279
      # build the rename list based on what LVs exist on the node
4280
      rlist = []
4281
      for to_ren in old_lvs:
4282
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4283
        if find_res is not None: # device exists
4284
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4285

    
4286
      info("renaming the old LVs on the target node")
4287
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4288
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4289
      # now we rename the new LVs to the old LVs
4290
      info("renaming the new LVs on the target node")
4291
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4292
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4293
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4294

    
4295
      for old, new in zip(old_lvs, new_lvs):
4296
        new.logical_id = old.logical_id
4297
        cfg.SetDiskID(new, tgt_node)
4298

    
4299
      for disk in old_lvs:
4300
        disk.logical_id = ren_fn(disk, temp_suffix)
4301
        cfg.SetDiskID(disk, tgt_node)
4302

    
4303
      # now that the new lvs have the old name, we can add them to the device
4304
      info("adding new mirror component on %s" % tgt_node)
4305
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4306
        for new_lv in new_lvs:
4307
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4308
            warning("Can't rollback device %s", hint="manually cleanup unused"
4309
                    " logical volumes")
4310
        raise errors.OpExecError("Can't add local storage to drbd")
4311

    
4312
      dev.children = new_lvs
4313
      cfg.Update(instance)
4314

    
4315
    # Step: wait for sync
4316

    
4317
    # this can fail as the old devices are degraded and _WaitForSync
4318
    # does a combined result over all disks, so we don't check its
4319
    # return value
4320
    self.proc.LogStep(5, steps_total, "sync devices")
4321
    _WaitForSync(self, instance, unlock=True)
4322

    
4323
    # so check manually all the devices
4324
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4325
      cfg.SetDiskID(dev, instance.primary_node)
4326
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4327
      if is_degr:
4328
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4329

    
4330
    # Step: remove old storage
4331
    self.proc.LogStep(6, steps_total, "removing old storage")
4332
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4333
      info("remove logical volumes for %s" % name)
4334
      for lv in old_lvs:
4335
        cfg.SetDiskID(lv, tgt_node)
4336
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4337
          warning("Can't remove old LV", hint="manually remove unused LVs")
4338
          continue
4339

    
4340
  def _ExecD8Secondary(self, feedback_fn):
4341
    """Replace the secondary node for drbd8.
4342

4343
    The algorithm for replace is quite complicated:
4344
      - for all disks of the instance:
4345
        - create new LVs on the new node with same names
4346
        - shutdown the drbd device on the old secondary
4347
        - disconnect the drbd network on the primary
4348
        - create the drbd device on the new secondary
4349
        - network attach the drbd on the primary, using an artifice:
4350
          the drbd code for Attach() will connect to the network if it
4351
          finds a device which is connected to the good local disks but
4352
          not network enabled
4353
      - wait for sync across all devices
4354
      - remove all disks from the old secondary
4355

4356
    Failures are not very well handled.
4357

4358
    """
4359
    steps_total = 6
4360
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4361
    instance = self.instance
4362
    iv_names = {}
4363
    vgname = self.cfg.GetVGName()
4364
    # start of work
4365
    cfg = self.cfg
4366
    old_node = self.tgt_node
4367
    new_node = self.new_node
4368
    pri_node = instance.primary_node
4369

    
4370
    # Step: check device activation
4371
    self.proc.LogStep(1, steps_total, "check device existence")
4372
    info("checking volume groups")
4373
    my_vg = cfg.GetVGName()
4374
    results = self.rpc.call_vg_list([pri_node, new_node])
4375
    if not results:
4376
      raise errors.OpExecError("Can't list volume groups on the nodes")
4377
    for node in pri_node, new_node:
4378
      res = results.get(node, False)
4379
      if not res or my_vg not in res:
4380
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4381
                                 (my_vg, node))
4382
    for idx, dev in enumerate(instance.disks):
4383
      if idx not in self.op.disks:
4384
        continue
4385
      info("checking disk/%d on %s" % (idx, pri_node))
4386
      cfg.SetDiskID(dev, pri_node)
4387
      if not self.rpc.call_blockdev_find(pri_node, dev):
4388
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4389
                                 (idx, pri_node))
4390

    
4391
    # Step: check other node consistency
4392
    self.proc.LogStep(2, steps_total, "check peer consistency")
4393
    for idx, dev in enumerate(instance.disks):
4394
      if idx not in self.op.disks:
4395
        continue
4396
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4397
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4398
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4399
                                 " unsafe to replace the secondary" %
4400
                                 pri_node)
4401

    
4402
    # Step: create new storage
4403
    self.proc.LogStep(3, steps_total, "allocate new storage")
4404
    for idx, dev in enumerate(instance.disks):
4405
      size = dev.size
4406
      info("adding new local storage on %s for disk/%d" %
4407
           (new_node, idx))
4408
      # since we *always* want to create this LV, we use the
4409
      # _Create...OnPrimary (which forces the creation), even if we
4410
      # are talking about the secondary node
4411
      for new_lv in dev.children:
4412
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4413
                                        _GetInstanceInfoText(instance)):
4414
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4415
                                   " node '%s'" %
4416
                                   (new_lv.logical_id[1], new_node))
4417

    
4418
    # Step 4: dbrd minors and drbd setups changes
4419
    # after this, we must manually remove the drbd minors on both the
4420
    # error and the success paths
4421
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4422
                                   instance.name)
4423
    logging.debug("Allocated minors %s" % (minors,))
4424
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4425
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4426
      size = dev.size
4427
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4428
      # create new devices on new_node
4429
      if pri_node == dev.logical_id[0]:
4430
        new_logical_id = (pri_node, new_node,
4431
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4432
                          dev.logical_id[5])
4433
      else:
4434
        new_logical_id = (new_node, pri_node,
4435
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4436
                          dev.logical_id[5])
4437
      iv_names[idx] = (dev, dev.children, new_logical_id)
4438
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4439
                    new_logical_id)
4440
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4441
                              logical_id=new_logical_id,
4442
                              children=dev.children)
4443
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4444
                                        new_drbd, False,
4445
                                        _GetInstanceInfoText(instance)):
4446
        self.cfg.ReleaseDRBDMinors(instance.name)
4447
        raise errors.OpExecError("Failed to create new DRBD on"
4448
                                 " node '%s'" % new_node)
4449

    
4450
    for idx, dev in enumerate(instance.disks):
4451
      # we have new devices, shutdown the drbd on the old secondary
4452
      info("shutting down drbd for disk/%d on old node" % idx)
4453
      cfg.SetDiskID(dev, old_node)
4454
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4455
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4456
                hint="Please cleanup this device manually as soon as possible")
4457

    
4458
    info("detaching primary drbds from the network (=> standalone)")
4459
    done = 0
4460
    for idx, dev in enumerate(instance.disks):
4461
      cfg.SetDiskID(dev, pri_node)
4462
      # set the network part of the physical (unique in bdev terms) id
4463
      # to None, meaning detach from network
4464
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4465
      # and 'find' the device, which will 'fix' it to match the
4466
      # standalone state
4467
      if self.rpc.call_blockdev_find(pri_node, dev):
4468
        done += 1
4469
      else:
4470
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4471
                idx)
4472

    
4473
    if not done:
4474
      # no detaches succeeded (very unlikely)
4475
      self.cfg.ReleaseDRBDMinors(instance.name)
4476
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4477

    
4478
    # if we managed to detach at least one, we update all the disks of
4479
    # the instance to point to the new secondary
4480
    info("updating instance configuration")
4481
    for dev, _, new_logical_id in iv_names.itervalues():
4482
      dev.logical_id = new_logical_id
4483
      cfg.SetDiskID(dev, pri_node)
4484
    cfg.Update(instance)
4485
    # we can remove now the temp minors as now the new values are
4486
    # written to the config file (and therefore stable)
4487
    self.cfg.ReleaseDRBDMinors(instance.name)
4488

    
4489
    # and now perform the drbd attach
4490
    info("attaching primary drbds to new secondary (standalone => connected)")
4491
    failures = []
4492
    for idx, dev in enumerate(instance.disks):
4493
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4494
      # since the attach is smart, it's enough to 'find' the device,
4495
      # it will automatically activate the network, if the physical_id
4496
      # is correct
4497
      cfg.SetDiskID(dev, pri_node)
4498
      logging.debug("Disk to attach: %s", dev)
4499
      if not self.rpc.call_blockdev_find(pri_node, dev):
4500
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4501
                "please do a gnt-instance info to see the status of disks")
4502

    
4503
    # this can fail as the old devices are degraded and _WaitForSync
4504
    # does a combined result over all disks, so we don't check its
4505
    # return value
4506
    self.proc.LogStep(5, steps_total, "sync devices")
4507
    _WaitForSync(self, instance, unlock=True)
4508

    
4509
    # so check manually all the devices
4510
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4511
      cfg.SetDiskID(dev, pri_node)
4512
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4513
      if is_degr:
4514
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4515

    
4516
    self.proc.LogStep(6, steps_total, "removing old storage")
4517
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4518
      info("remove logical volumes for disk/%d" % idx)
4519
      for lv in old_lvs:
4520
        cfg.SetDiskID(lv, old_node)
4521
        if not self.rpc.call_blockdev_remove(old_node, lv):
4522
          warning("Can't remove LV on old secondary",
4523
                  hint="Cleanup stale volumes by hand")
4524

    
4525
  def Exec(self, feedback_fn):
4526
    """Execute disk replacement.
4527

4528
    This dispatches the disk replacement to the appropriate handler.
4529

4530
    """
4531
    instance = self.instance
4532

    
4533
    # Activate the instance disks if we're replacing them on a down instance
4534
    if instance.status == "down":
4535
      _StartInstanceDisks(self, instance, True)
4536

    
4537
    if instance.disk_template == constants.DT_DRBD8:
4538
      if self.op.remote_node is None:
4539
        fn = self._ExecD8DiskOnly
4540
      else:
4541
        fn = self._ExecD8Secondary
4542
    else:
4543
      raise errors.ProgrammerError("Unhandled disk replacement case")
4544

    
4545
    ret = fn(feedback_fn)
4546

    
4547
    # Deactivate the instance disks if we're replacing them on a down instance
4548
    if instance.status == "down":
4549
      _SafeShutdownInstanceDisks(self, instance)
4550

    
4551
    return ret
4552

    
4553

    
4554
class LUGrowDisk(LogicalUnit):
4555
  """Grow a disk of an instance.
4556

4557
  """
4558
  HPATH = "disk-grow"
4559
  HTYPE = constants.HTYPE_INSTANCE
4560
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4561
  REQ_BGL = False
4562

    
4563
  def ExpandNames(self):
4564
    self._ExpandAndLockInstance()
4565
    self.needed_locks[locking.LEVEL_NODE] = []
4566
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4567

    
4568
  def DeclareLocks(self, level):
4569
    if level == locking.LEVEL_NODE:
4570
      self._LockInstancesNodes()
4571

    
4572
  def BuildHooksEnv(self):
4573
    """Build hooks env.
4574

4575
    This runs on the master, the primary and all the secondaries.
4576

4577
    """
4578
    env = {
4579
      "DISK": self.op.disk,
4580
      "AMOUNT": self.op.amount,
4581
      }
4582
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4583
    nl = [
4584
      self.cfg.GetMasterNode(),
4585
      self.instance.primary_node,
4586
      ]
4587
    return env, nl, nl
4588

    
4589
  def CheckPrereq(self):
4590
    """Check prerequisites.
4591

4592
    This checks that the instance is in the cluster.
4593

4594
    """
4595
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4596
    assert instance is not None, \
4597
      "Cannot retrieve locked instance %s" % self.op.instance_name
4598

    
4599
    self.instance = instance
4600

    
4601
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4602
      raise errors.OpPrereqError("Instance's disk layout does not support"
4603
                                 " growing.")
4604

    
4605
    self.disk = instance.FindDisk(self.op.disk)
4606

    
4607
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4608
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4609
                                       instance.hypervisor)
4610
    for node in nodenames:
4611
      info = nodeinfo.get(node, None)
4612
      if not info:
4613
        raise errors.OpPrereqError("Cannot get current information"
4614
                                   " from node '%s'" % node)
4615
      vg_free = info.get('vg_free', None)
4616
      if not isinstance(vg_free, int):
4617
        raise errors.OpPrereqError("Can't compute free disk space on"
4618
                                   " node %s" % node)
4619
      if self.op.amount > info['vg_free']:
4620
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4621
                                   " %d MiB available, %d MiB required" %
4622
                                   (node, info['vg_free'], self.op.amount))
4623

    
4624
  def Exec(self, feedback_fn):
4625
    """Execute disk grow.
4626

4627
    """
4628
    instance = self.instance
4629
    disk = self.disk
4630
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4631
      self.cfg.SetDiskID(disk, node)
4632
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4633
      if (not result or not isinstance(result, (list, tuple)) or
4634
          len(result) != 2):
4635
        raise errors.OpExecError("grow request failed to node %s" % node)
4636
      elif not result[0]:
4637
        raise errors.OpExecError("grow request failed to node %s: %s" %
4638
                                 (node, result[1]))
4639
    disk.RecordGrow(self.op.amount)
4640
    self.cfg.Update(instance)
4641
    if self.op.wait_for_sync:
4642
      disk_abort = not _WaitForSync(self, instance)
4643
      if disk_abort:
4644
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4645
                             " status.\nPlease check the instance.")
4646

    
4647

    
4648
class LUQueryInstanceData(NoHooksLU):
4649
  """Query runtime instance data.
4650

4651
  """
4652
  _OP_REQP = ["instances", "static"]
4653
  REQ_BGL = False
4654

    
4655
  def ExpandNames(self):
4656
    self.needed_locks = {}
4657
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4658

    
4659
    if not isinstance(self.op.instances, list):
4660
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4661

    
4662
    if self.op.instances:
4663
      self.wanted_names = []
4664
      for name in self.op.instances:
4665
        full_name = self.cfg.ExpandInstanceName(name)
4666
        if full_name is None:
4667
          raise errors.OpPrereqError("Instance '%s' not known" %
4668
                                     self.op.instance_name)
4669
        self.wanted_names.append(full_name)
4670
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4671
    else:
4672
      self.wanted_names = None
4673
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4674

    
4675
    self.needed_locks[locking.LEVEL_NODE] = []
4676
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4677

    
4678
  def DeclareLocks(self, level):
4679
    if level == locking.LEVEL_NODE:
4680
      self._LockInstancesNodes()
4681

    
4682
  def CheckPrereq(self):
4683
    """Check prerequisites.
4684

4685
    This only checks the optional instance list against the existing names.
4686

4687
    """
4688
    if self.wanted_names is None:
4689
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4690

    
4691
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4692
                             in self.wanted_names]
4693
    return
4694

    
4695
  def _ComputeDiskStatus(self, instance, snode, dev):
4696
    """Compute block device status.
4697

4698
    """
4699
    static = self.op.static
4700
    if not static:
4701
      self.cfg.SetDiskID(dev, instance.primary_node)
4702
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4703
    else:
4704
      dev_pstatus = None
4705

    
4706
    if dev.dev_type in constants.LDS_DRBD:
4707
      # we change the snode then (otherwise we use the one passed in)
4708
      if dev.logical_id[0] == instance.primary_node:
4709
        snode = dev.logical_id[1]
4710
      else:
4711
        snode = dev.logical_id[0]
4712

    
4713
    if snode and not static:
4714
      self.cfg.SetDiskID(dev, snode)
4715
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4716
    else:
4717
      dev_sstatus = None
4718

    
4719
    if dev.children:
4720
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4721
                      for child in dev.children]
4722
    else:
4723
      dev_children = []
4724

    
4725
    data = {
4726
      "iv_name": dev.iv_name,
4727
      "dev_type": dev.dev_type,
4728
      "logical_id": dev.logical_id,
4729
      "physical_id": dev.physical_id,
4730
      "pstatus": dev_pstatus,
4731
      "sstatus": dev_sstatus,
4732
      "children": dev_children,
4733
      "mode": dev.mode,
4734
      }
4735

    
4736
    return data
4737

    
4738
  def Exec(self, feedback_fn):
4739
    """Gather and return data"""
4740
    result = {}
4741

    
4742
    cluster = self.cfg.GetClusterInfo()
4743

    
4744
    for instance in self.wanted_instances:
4745
      if not self.op.static:
4746
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4747
                                                  instance.name,
4748
                                                  instance.hypervisor)
4749
        if remote_info and "state" in remote_info:
4750
          remote_state = "up"
4751
        else:
4752
          remote_state = "down"
4753
      else:
4754
        remote_state = None
4755
      if instance.status == "down":
4756
        config_state = "down"
4757
      else:
4758
        config_state = "up"
4759

    
4760
      disks = [self._ComputeDiskStatus(instance, None, device)
4761
               for device in instance.disks]
4762

    
4763
      idict = {
4764
        "name": instance.name,
4765
        "config_state": config_state,
4766
        "run_state": remote_state,
4767
        "pnode": instance.primary_node,
4768
        "snodes": instance.secondary_nodes,
4769
        "os": instance.os,
4770
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4771
        "disks": disks,
4772
        "hypervisor": instance.hypervisor,
4773
        "network_port": instance.network_port,
4774
        "hv_instance": instance.hvparams,
4775
        "hv_actual": cluster.FillHV(instance),
4776
        "be_instance": instance.beparams,
4777
        "be_actual": cluster.FillBE(instance),
4778
        }
4779

    
4780
      result[instance.name] = idict
4781

    
4782
    return result
4783

    
4784

    
4785
class LUSetInstanceParams(LogicalUnit):
4786
  """Modifies an instances's parameters.
4787

4788
  """
4789
  HPATH = "instance-modify"
4790
  HTYPE = constants.HTYPE_INSTANCE
4791
  _OP_REQP = ["instance_name"]
4792
  REQ_BGL = False
4793

    
4794
  def CheckArguments(self):
4795
    if not hasattr(self.op, 'nics'):
4796
      self.op.nics = []
4797
    if not hasattr(self.op, 'disks'):
4798
      self.op.disks = []
4799
    if not hasattr(self.op, 'beparams'):
4800
      self.op.beparams = {}
4801
    if not hasattr(self.op, 'hvparams'):
4802
      self.op.hvparams = {}
4803
    self.op.force = getattr(self.op, "force", False)
4804
    if not (self.op.nics or self.op.disks or
4805
            self.op.hvparams or self.op.beparams):
4806
      raise errors.OpPrereqError("No changes submitted")
4807

    
4808
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4809
      val = self.op.beparams.get(item, None)
4810
      if val is not None:
4811
        try:
4812
          val = int(val)
4813
        except ValueError, err:
4814
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4815
        self.op.beparams[item] = val
4816
    # Disk validation
4817
    disk_addremove = 0
4818
    for disk_op, disk_dict in self.op.disks:
4819
      if disk_op == constants.DDM_REMOVE:
4820
        disk_addremove += 1
4821
        continue
4822
      elif disk_op == constants.DDM_ADD:
4823
        disk_addremove += 1
4824
      else:
4825
        if not isinstance(disk_op, int):
4826
          raise errors.OpPrereqError("Invalid disk index")
4827
      if disk_op == constants.DDM_ADD:
4828
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4829
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4830
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4831
        size = disk_dict.get('size', None)
4832
        if size is None:
4833
          raise errors.OpPrereqError("Required disk parameter size missing")
4834
        try:
4835
          size = int(size)
4836
        except ValueError, err:
4837
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4838
                                     str(err))
4839
        disk_dict['size'] = size
4840
      else:
4841
        # modification of disk
4842
        if 'size' in disk_dict:
4843
          raise errors.OpPrereqError("Disk size change not possible, use"
4844
                                     " grow-disk")
4845

    
4846
    if disk_addremove > 1:
4847
      raise errors.OpPrereqError("Only one disk add or remove operation"
4848
                                 " supported at a time")
4849

    
4850
    # NIC validation
4851
    nic_addremove = 0
4852
    for nic_op, nic_dict in self.op.nics:
4853
      if nic_op == constants.DDM_REMOVE:
4854
        nic_addremove += 1
4855
        continue
4856
      elif nic_op == constants.DDM_ADD:
4857
        nic_addremove += 1
4858
      else:
4859
        if not isinstance(nic_op, int):
4860
          raise errors.OpPrereqError("Invalid nic index")
4861

    
4862
      # nic_dict should be a dict
4863
      nic_ip = nic_dict.get('ip', None)
4864
      if nic_ip is not None:
4865
        if nic_ip.lower() == "none":
4866
          nic_dict['ip'] = None
4867
        else:
4868
          if not utils.IsValidIP(nic_ip):
4869
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4870
      # we can only check None bridges and assign the default one
4871
      nic_bridge = nic_dict.get('bridge', None)
4872
      if nic_bridge is None:
4873
        nic_dict['bridge'] = self.cfg.GetDefBridge()
4874
      # but we can validate MACs
4875
      nic_mac = nic_dict.get('mac', None)
4876
      if nic_mac is not None:
4877
        if self.cfg.IsMacInUse(nic_mac):
4878
          raise errors.OpPrereqError("MAC address %s already in use"
4879
                                     " in cluster" % nic_mac)
4880
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4881
          if not utils.IsValidMac(nic_mac):
4882
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4883
    if nic_addremove > 1:
4884
      raise errors.OpPrereqError("Only one NIC add or remove operation"
4885
                                 " supported at a time")
4886

    
4887
  def ExpandNames(self):
4888
    self._ExpandAndLockInstance()
4889
    self.needed_locks[locking.LEVEL_NODE] = []
4890
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4891

    
4892
  def DeclareLocks(self, level):
4893
    if level == locking.LEVEL_NODE:
4894
      self._LockInstancesNodes()
4895

    
4896
  def BuildHooksEnv(self):
4897
    """Build hooks env.
4898

4899
    This runs on the master, primary and secondaries.
4900

4901
    """
4902
    args = dict()
4903
    if constants.BE_MEMORY in self.be_new:
4904
      args['memory'] = self.be_new[constants.BE_MEMORY]
4905
    if constants.BE_VCPUS in self.be_new:
4906
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4907
    # FIXME: readd disk/nic changes
4908
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4909
    nl = [self.cfg.GetMasterNode(),
4910
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4911
    return env, nl, nl
4912

    
4913
  def CheckPrereq(self):
4914
    """Check prerequisites.
4915

4916
    This only checks the instance list against the existing names.
4917

4918
    """
4919
    force = self.force = self.op.force
4920

    
4921
    # checking the new params on the primary/secondary nodes
4922

    
4923
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4924
    assert self.instance is not None, \
4925
      "Cannot retrieve locked instance %s" % self.op.instance_name
4926
    pnode = self.instance.primary_node
4927
    nodelist = [pnode]
4928
    nodelist.extend(instance.secondary_nodes)
4929

    
4930
    # hvparams processing
4931
    if self.op.hvparams:
4932
      i_hvdict = copy.deepcopy(instance.hvparams)
4933
      for key, val in self.op.hvparams.iteritems():
4934
        if val is None:
4935
          try:
4936
            del i_hvdict[key]
4937
          except KeyError:
4938
            pass
4939
        else:
4940
          i_hvdict[key] = val
4941
      cluster = self.cfg.GetClusterInfo()
4942
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4943
                                i_hvdict)
4944
      # local check
4945
      hypervisor.GetHypervisor(
4946
        instance.hypervisor).CheckParameterSyntax(hv_new)
4947
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4948
      self.hv_new = hv_new # the new actual values
4949
      self.hv_inst = i_hvdict # the new dict (without defaults)
4950
    else:
4951
      self.hv_new = self.hv_inst = {}
4952

    
4953
    # beparams processing
4954
    if self.op.beparams:
4955
      i_bedict = copy.deepcopy(instance.beparams)
4956
      for key, val in self.op.beparams.iteritems():
4957
        if val is None:
4958
          try:
4959
            del i_bedict[key]
4960
          except KeyError:
4961
            pass
4962
        else:
4963
          i_bedict[key] = val
4964
      cluster = self.cfg.GetClusterInfo()
4965
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4966
                                i_bedict)
4967
      self.be_new = be_new # the new actual values
4968
      self.be_inst = i_bedict # the new dict (without defaults)
4969
    else:
4970
      self.be_new = self.be_inst = {}
4971

    
4972
    self.warn = []
4973

    
4974
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4975
      mem_check_list = [pnode]
4976
      if be_new[constants.BE_AUTO_BALANCE]:
4977
        # either we changed auto_balance to yes or it was from before
4978
        mem_check_list.extend(instance.secondary_nodes)
4979
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4980
                                                  instance.hypervisor)
4981
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4982
                                         instance.hypervisor)
4983

    
4984
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4985
        # Assume the primary node is unreachable and go ahead
4986
        self.warn.append("Can't get info from primary node %s" % pnode)
4987
      else:
4988
        if instance_info:
4989
          current_mem = instance_info['memory']
4990
        else:
4991
          # Assume instance not running
4992
          # (there is a slight race condition here, but it's not very probable,
4993
          # and we have no other way to check)
4994
          current_mem = 0
4995
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4996
                    nodeinfo[pnode]['memory_free'])
4997
        if miss_mem > 0:
4998
          raise errors.OpPrereqError("This change will prevent the instance"
4999
                                     " from starting, due to %d MB of memory"
5000
                                     " missing on its primary node" % miss_mem)
5001

    
5002
      if be_new[constants.BE_AUTO_BALANCE]:
5003
        for node in instance.secondary_nodes:
5004
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5005
            self.warn.append("Can't get info from secondary node %s" % node)
5006
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
5007
            self.warn.append("Not enough memory to failover instance to"
5008
                             " secondary node %s" % node)
5009

    
5010
    # NIC processing
5011
    for nic_op, nic_dict in self.op.nics:
5012
      if nic_op == constants.DDM_REMOVE:
5013
        if not instance.nics:
5014
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5015
        continue
5016
      if nic_op != constants.DDM_ADD:
5017
        # an existing nic
5018
        if nic_op < 0 or nic_op >= len(instance.nics):
5019
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5020
                                     " are 0 to %d" %
5021
                                     (nic_op, len(instance.nics)))
5022
      nic_bridge = nic_dict.get('bridge', None)
5023
      if nic_bridge is not None:
5024
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5025
          msg = ("Bridge '%s' doesn't exist on one of"
5026
                 " the instance nodes" % nic_bridge)
5027
          if self.force:
5028
            self.warn.append(msg)
5029
          else:
5030
            raise errors.OpPrereqError(msg)
5031

    
5032
    # DISK processing
5033
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5034
      raise errors.OpPrereqError("Disk operations not supported for"
5035
                                 " diskless instances")
5036
    for disk_op, disk_dict in self.op.disks:
5037
      if disk_op == constants.DDM_REMOVE:
5038
        if len(instance.disks) == 1:
5039
          raise errors.OpPrereqError("Cannot remove the last disk of"
5040
                                     " an instance")
5041
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5042
        ins_l = ins_l[pnode]
5043
        if not type(ins_l) is list:
5044
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5045
        if instance.name in ins_l:
5046
          raise errors.OpPrereqError("Instance is running, can't remove"
5047
                                     " disks.")
5048

    
5049
      if (disk_op == constants.DDM_ADD and
5050
          len(instance.nics) >= constants.MAX_DISKS):
5051
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5052
                                   " add more" % constants.MAX_DISKS)
5053
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5054
        # an existing disk
5055
        if disk_op < 0 or disk_op >= len(instance.disks):
5056
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5057
                                     " are 0 to %d" %
5058
                                     (disk_op, len(instance.disks)))
5059

    
5060
    return
5061

    
5062
  def Exec(self, feedback_fn):
5063
    """Modifies an instance.
5064

5065
    All parameters take effect only at the next restart of the instance.
5066

5067
    """
5068
    # Process here the warnings from CheckPrereq, as we don't have a
5069
    # feedback_fn there.
5070
    for warn in self.warn:
5071
      feedback_fn("WARNING: %s" % warn)
5072

    
5073
    result = []
5074
    instance = self.instance
5075
    # disk changes
5076
    for disk_op, disk_dict in self.op.disks:
5077
      if disk_op == constants.DDM_REMOVE:
5078
        # remove the last disk
5079
        device = instance.disks.pop()
5080
        device_idx = len(instance.disks)
5081
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5082
          self.cfg.SetDiskID(disk, node)
5083
          if not self.rpc.call_blockdev_remove(node, disk):
5084
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5085
                                 " continuing anyway", device_idx, node)
5086
        result.append(("disk/%d" % device_idx, "remove"))
5087
      elif disk_op == constants.DDM_ADD:
5088
        # add a new disk
5089
        if instance.disk_template == constants.DT_FILE:
5090
          file_driver, file_path = instance.disks[0].logical_id
5091
          file_path = os.path.dirname(file_path)
5092
        else:
5093
          file_driver = file_path = None
5094
        disk_idx_base = len(instance.disks)
5095
        new_disk = _GenerateDiskTemplate(self,
5096
                                         instance.disk_template,
5097
                                         instance, instance.primary_node,
5098
                                         instance.secondary_nodes,
5099
                                         [disk_dict],
5100
                                         file_path,
5101
                                         file_driver,
5102
                                         disk_idx_base)[0]
5103
        new_disk.mode = disk_dict['mode']
5104
        instance.disks.append(new_disk)
5105
        info = _GetInstanceInfoText(instance)
5106

    
5107
        logging.info("Creating volume %s for instance %s",
5108
                     new_disk.iv_name, instance.name)
5109
        # Note: this needs to be kept in sync with _CreateDisks
5110
        #HARDCODE
5111
        for secondary_node in instance.secondary_nodes:
5112
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5113
                                            new_disk, False, info):
5114
            self.LogWarning("Failed to create volume %s (%s) on"
5115
                            " secondary node %s!",
5116
                            new_disk.iv_name, new_disk, secondary_node)
5117
        #HARDCODE
5118
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5119
                                        instance, new_disk, info):
5120
          self.LogWarning("Failed to create volume %s on primary!",
5121
                          new_disk.iv_name)
5122
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5123
                       (new_disk.size, new_disk.mode)))
5124
      else:
5125
        # change a given disk
5126
        instance.disks[disk_op].mode = disk_dict['mode']
5127
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5128
    # NIC changes
5129
    for nic_op, nic_dict in self.op.nics:
5130
      if nic_op == constants.DDM_REMOVE:
5131
        # remove the last nic
5132
        del instance.nics[-1]
5133
        result.append(("nic.%d" % len(instance.nics), "remove"))
5134
      elif nic_op == constants.DDM_ADD:
5135
        # add a new nic
5136
        if 'mac' not in nic_dict:
5137
          mac = constants.VALUE_GENERATE
5138
        else:
5139
          mac = nic_dict['mac']
5140
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5141
          mac = self.cfg.GenerateMAC()
5142
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5143
                              bridge=nic_dict.get('bridge', None))
5144
        instance.nics.append(new_nic)
5145
        result.append(("nic.%d" % (len(instance.nics) - 1),
5146
                       "add:mac=%s,ip=%s,bridge=%s" %
5147
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5148
      else:
5149
        # change a given nic
5150
        for key in 'mac', 'ip', 'bridge':
5151
          if key in nic_dict:
5152
            setattr(instance.nics[nic_op], key, nic_dict[key])
5153
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5154

    
5155
    # hvparams changes
5156
    if self.op.hvparams:
5157
      instance.hvparams = self.hv_new
5158
      for key, val in self.op.hvparams.iteritems():
5159
        result.append(("hv/%s" % key, val))
5160

    
5161
    # beparams changes
5162
    if self.op.beparams:
5163
      instance.beparams = self.be_inst
5164
      for key, val in self.op.beparams.iteritems():
5165
        result.append(("be/%s" % key, val))
5166

    
5167
    self.cfg.Update(instance)
5168

    
5169
    return result
5170

    
5171

    
5172
class LUQueryExports(NoHooksLU):
5173
  """Query the exports list
5174

5175
  """
5176
  _OP_REQP = ['nodes']
5177
  REQ_BGL = False
5178

    
5179
  def ExpandNames(self):
5180
    self.needed_locks = {}
5181
    self.share_locks[locking.LEVEL_NODE] = 1
5182
    if not self.op.nodes:
5183
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5184
    else:
5185
      self.needed_locks[locking.LEVEL_NODE] = \
5186
        _GetWantedNodes(self, self.op.nodes)
5187

    
5188
  def CheckPrereq(self):
5189
    """Check prerequisites.
5190

5191
    """
5192
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5193

    
5194
  def Exec(self, feedback_fn):
5195
    """Compute the list of all the exported system images.
5196

5197
    @rtype: dict
5198
    @return: a dictionary with the structure node->(export-list)
5199
        where export-list is a list of the instances exported on
5200
        that node.
5201

5202
    """
5203
    return self.rpc.call_export_list(self.nodes)
5204

    
5205

    
5206
class LUExportInstance(LogicalUnit):
5207
  """Export an instance to an image in the cluster.
5208

5209
  """
5210
  HPATH = "instance-export"
5211
  HTYPE = constants.HTYPE_INSTANCE
5212
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5213
  REQ_BGL = False
5214

    
5215
  def ExpandNames(self):
5216
    self._ExpandAndLockInstance()
5217
    # FIXME: lock only instance primary and destination node
5218
    #
5219
    # Sad but true, for now we have do lock all nodes, as we don't know where
5220
    # the previous export might be, and and in this LU we search for it and
5221
    # remove it from its current node. In the future we could fix this by:
5222
    #  - making a tasklet to search (share-lock all), then create the new one,
5223
    #    then one to remove, after
5224
    #  - removing the removal operation altoghether
5225
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5226

    
5227
  def DeclareLocks(self, level):
5228
    """Last minute lock declaration."""
5229
    # All nodes are locked anyway, so nothing to do here.
5230

    
5231
  def BuildHooksEnv(self):
5232
    """Build hooks env.
5233

5234
    This will run on the master, primary node and target node.
5235

5236
    """
5237
    env = {
5238
      "EXPORT_NODE": self.op.target_node,
5239
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5240
      }
5241
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5242
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5243
          self.op.target_node]
5244
    return env, nl, nl
5245

    
5246
  def CheckPrereq(self):
5247
    """Check prerequisites.
5248

5249
    This checks that the instance and node names are valid.
5250

5251
    """
5252
    instance_name = self.op.instance_name
5253
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5254
    assert self.instance is not None, \
5255
          "Cannot retrieve locked instance %s" % self.op.instance_name
5256

    
5257
    self.dst_node = self.cfg.GetNodeInfo(
5258
      self.cfg.ExpandNodeName(self.op.target_node))
5259

    
5260
    if self.dst_node is None:
5261
      # This is wrong node name, not a non-locked node
5262
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5263

    
5264
    # instance disk type verification
5265
    for disk in self.instance.disks:
5266
      if disk.dev_type == constants.LD_FILE:
5267
        raise errors.OpPrereqError("Export not supported for instances with"
5268
                                   " file-based disks")
5269

    
5270
  def Exec(self, feedback_fn):
5271
    """Export an instance to an image in the cluster.
5272

5273
    """
5274
    instance = self.instance
5275
    dst_node = self.dst_node
5276
    src_node = instance.primary_node
5277
    if self.op.shutdown:
5278
      # shutdown the instance, but not the disks
5279
      if not self.rpc.call_instance_shutdown(src_node, instance):
5280
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5281
                                 (instance.name, src_node))
5282

    
5283
    vgname = self.cfg.GetVGName()
5284

    
5285
    snap_disks = []
5286

    
5287
    try:
5288
      for disk in instance.disks:
5289
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5290
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5291

    
5292
        if not new_dev_name:
5293
          self.LogWarning("Could not snapshot block device %s on node %s",
5294
                          disk.logical_id[1], src_node)
5295
          snap_disks.append(False)
5296
        else:
5297
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5298
                                 logical_id=(vgname, new_dev_name),
5299
                                 physical_id=(vgname, new_dev_name),
5300
                                 iv_name=disk.iv_name)
5301
          snap_disks.append(new_dev)
5302

    
5303
    finally:
5304
      if self.op.shutdown and instance.status == "up":
5305
        if not self.rpc.call_instance_start(src_node, instance, None):
5306
          _ShutdownInstanceDisks(self, instance)
5307
          raise errors.OpExecError("Could not start instance")
5308

    
5309
    # TODO: check for size
5310

    
5311
    cluster_name = self.cfg.GetClusterName()
5312
    for idx, dev in enumerate(snap_disks):
5313
      if dev:
5314
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5315
                                             instance, cluster_name, idx):
5316
          self.LogWarning("Could not export block device %s from node %s to"
5317
                          " node %s", dev.logical_id[1], src_node,
5318
                          dst_node.name)
5319
        if not self.rpc.call_blockdev_remove(src_node, dev):
5320
          self.LogWarning("Could not remove snapshot block device %s from node"
5321
                          " %s", dev.logical_id[1], src_node)
5322

    
5323
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5324
      self.LogWarning("Could not finalize export for instance %s on node %s",
5325
                      instance.name, dst_node.name)
5326

    
5327
    nodelist = self.cfg.GetNodeList()
5328
    nodelist.remove(dst_node.name)
5329

    
5330
    # on one-node clusters nodelist will be empty after the removal
5331
    # if we proceed the backup would be removed because OpQueryExports
5332
    # substitutes an empty list with the full cluster node list.
5333
    if nodelist:
5334
      exportlist = self.rpc.call_export_list(nodelist)
5335
      for node in exportlist:
5336
        if instance.name in exportlist[node]:
5337
          if not self.rpc.call_export_remove(node, instance.name):
5338
            self.LogWarning("Could not remove older export for instance %s"
5339
                            " on node %s", instance.name, node)
5340

    
5341

    
5342
class LURemoveExport(NoHooksLU):
5343
  """Remove exports related to the named instance.
5344

5345
  """
5346
  _OP_REQP = ["instance_name"]
5347
  REQ_BGL = False
5348

    
5349
  def ExpandNames(self):
5350
    self.needed_locks = {}
5351
    # We need all nodes to be locked in order for RemoveExport to work, but we
5352
    # don't need to lock the instance itself, as nothing will happen to it (and
5353
    # we can remove exports also for a removed instance)
5354
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5355

    
5356
  def CheckPrereq(self):
5357
    """Check prerequisites.
5358
    """
5359
    pass
5360

    
5361
  def Exec(self, feedback_fn):
5362
    """Remove any export.
5363

5364
    """
5365
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5366
    # If the instance was not found we'll try with the name that was passed in.
5367
    # This will only work if it was an FQDN, though.
5368
    fqdn_warn = False
5369
    if not instance_name:
5370
      fqdn_warn = True
5371
      instance_name = self.op.instance_name
5372

    
5373
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5374
      locking.LEVEL_NODE])
5375
    found = False
5376
    for node in exportlist:
5377
      if instance_name in exportlist[node]:
5378
        found = True
5379
        if not self.rpc.call_export_remove(node, instance_name):
5380
          logging.error("Could not remove export for instance %s"
5381
                        " on node %s", instance_name, node)
5382

    
5383
    if fqdn_warn and not found:
5384
      feedback_fn("Export not found. If trying to remove an export belonging"
5385
                  " to a deleted instance please use its Fully Qualified"
5386
                  " Domain Name.")
5387

    
5388

    
5389
class TagsLU(NoHooksLU):
5390
  """Generic tags LU.
5391

5392
  This is an abstract class which is the parent of all the other tags LUs.
5393

5394
  """
5395

    
5396
  def ExpandNames(self):
5397
    self.needed_locks = {}
5398
    if self.op.kind == constants.TAG_NODE:
5399
      name = self.cfg.ExpandNodeName(self.op.name)
5400
      if name is None:
5401
        raise errors.OpPrereqError("Invalid node name (%s)" %
5402
                                   (self.op.name,))
5403
      self.op.name = name
5404
      self.needed_locks[locking.LEVEL_NODE] = name
5405
    elif self.op.kind == constants.TAG_INSTANCE:
5406
      name = self.cfg.ExpandInstanceName(self.op.name)
5407
      if name is None:
5408
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5409
                                   (self.op.name,))
5410
      self.op.name = name
5411
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5412

    
5413
  def CheckPrereq(self):
5414
    """Check prerequisites.
5415

5416
    """
5417
    if self.op.kind == constants.TAG_CLUSTER:
5418
      self.target = self.cfg.GetClusterInfo()
5419
    elif self.op.kind == constants.TAG_NODE:
5420
      self.target = self.cfg.GetNodeInfo(self.op.name)
5421
    elif self.op.kind == constants.TAG_INSTANCE:
5422
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5423
    else:
5424
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5425
                                 str(self.op.kind))
5426

    
5427

    
5428
class LUGetTags(TagsLU):
5429
  """Returns the tags of a given object.
5430

5431
  """
5432
  _OP_REQP = ["kind", "name"]
5433
  REQ_BGL = False
5434

    
5435
  def Exec(self, feedback_fn):
5436
    """Returns the tag list.
5437

5438
    """
5439
    return list(self.target.GetTags())
5440

    
5441

    
5442
class LUSearchTags(NoHooksLU):
5443
  """Searches the tags for a given pattern.
5444

5445
  """
5446
  _OP_REQP = ["pattern"]
5447
  REQ_BGL = False
5448

    
5449
  def ExpandNames(self):
5450
    self.needed_locks = {}
5451

    
5452
  def CheckPrereq(self):
5453
    """Check prerequisites.
5454

5455
    This checks the pattern passed for validity by compiling it.
5456

5457
    """
5458
    try:
5459
      self.re = re.compile(self.op.pattern)
5460
    except re.error, err:
5461
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5462
                                 (self.op.pattern, err))
5463

    
5464
  def Exec(self, feedback_fn):
5465
    """Returns the tag list.
5466

5467
    """
5468
    cfg = self.cfg
5469
    tgts = [("/cluster", cfg.GetClusterInfo())]
5470
    ilist = cfg.GetAllInstancesInfo().values()
5471
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5472
    nlist = cfg.GetAllNodesInfo().values()
5473
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5474
    results = []
5475
    for path, target in tgts:
5476
      for tag in target.GetTags():
5477
        if self.re.search(tag):
5478
          results.append((path, tag))
5479
    return results
5480

    
5481

    
5482
class LUAddTags(TagsLU):
5483
  """Sets a tag on a given object.
5484

5485
  """
5486
  _OP_REQP = ["kind", "name", "tags"]
5487
  REQ_BGL = False
5488

    
5489
  def CheckPrereq(self):
5490
    """Check prerequisites.
5491

5492
    This checks the type and length of the tag name and value.
5493

5494
    """
5495
    TagsLU.CheckPrereq(self)
5496
    for tag in self.op.tags:
5497
      objects.TaggableObject.ValidateTag(tag)
5498

    
5499
  def Exec(self, feedback_fn):
5500
    """Sets the tag.
5501

5502
    """
5503
    try:
5504
      for tag in self.op.tags:
5505
        self.target.AddTag(tag)
5506
    except errors.TagError, err:
5507
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5508
    try:
5509
      self.cfg.Update(self.target)
5510
    except errors.ConfigurationError:
5511
      raise errors.OpRetryError("There has been a modification to the"
5512
                                " config file and the operation has been"
5513
                                " aborted. Please retry.")
5514

    
5515

    
5516
class LUDelTags(TagsLU):
5517
  """Delete a list of tags from a given object.
5518

5519
  """
5520
  _OP_REQP = ["kind", "name", "tags"]
5521
  REQ_BGL = False
5522

    
5523
  def CheckPrereq(self):
5524
    """Check prerequisites.
5525

5526
    This checks that we have the given tag.
5527

5528
    """
5529
    TagsLU.CheckPrereq(self)
5530
    for tag in self.op.tags:
5531
      objects.TaggableObject.ValidateTag(tag)
5532
    del_tags = frozenset(self.op.tags)
5533
    cur_tags = self.target.GetTags()
5534
    if not del_tags <= cur_tags:
5535
      diff_tags = del_tags - cur_tags
5536
      diff_names = ["'%s'" % tag for tag in diff_tags]
5537
      diff_names.sort()
5538
      raise errors.OpPrereqError("Tag(s) %s not found" %
5539
                                 (",".join(diff_names)))
5540

    
5541
  def Exec(self, feedback_fn):
5542
    """Remove the tag from the object.
5543

5544
    """
5545
    for tag in self.op.tags:
5546
      self.target.RemoveTag(tag)
5547
    try:
5548
      self.cfg.Update(self.target)
5549
    except errors.ConfigurationError:
5550
      raise errors.OpRetryError("There has been a modification to the"
5551
                                " config file and the operation has been"
5552
                                " aborted. Please retry.")
5553

    
5554

    
5555
class LUTestDelay(NoHooksLU):
5556
  """Sleep for a specified amount of time.
5557

5558
  This LU sleeps on the master and/or nodes for a specified amount of
5559
  time.
5560

5561
  """
5562
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5563
  REQ_BGL = False
5564

    
5565
  def ExpandNames(self):
5566
    """Expand names and set required locks.
5567

5568
    This expands the node list, if any.
5569

5570
    """
5571
    self.needed_locks = {}
5572
    if self.op.on_nodes:
5573
      # _GetWantedNodes can be used here, but is not always appropriate to use
5574
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5575
      # more information.
5576
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5577
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5578

    
5579
  def CheckPrereq(self):
5580
    """Check prerequisites.
5581

5582
    """
5583

    
5584
  def Exec(self, feedback_fn):
5585
    """Do the actual sleep.
5586

5587
    """
5588
    if self.op.on_master:
5589
      if not utils.TestDelay(self.op.duration):
5590
        raise errors.OpExecError("Error during master delay test")
5591
    if self.op.on_nodes:
5592
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5593
      if not result:
5594
        raise errors.OpExecError("Complete failure from rpc call")
5595
      for node, node_result in result.items():
5596
        if not node_result:
5597
          raise errors.OpExecError("Failure during rpc call to node %s,"
5598
                                   " result: %s" % (node, node_result))
5599

    
5600

    
5601
class IAllocator(object):
5602
  """IAllocator framework.
5603

5604
  An IAllocator instance has three sets of attributes:
5605
    - cfg that is needed to query the cluster
5606
    - input data (all members of the _KEYS class attribute are required)
5607
    - four buffer attributes (in|out_data|text), that represent the
5608
      input (to the external script) in text and data structure format,
5609
      and the output from it, again in two formats
5610
    - the result variables from the script (success, info, nodes) for
5611
      easy usage
5612

5613
  """
5614
  _ALLO_KEYS = [
5615
    "mem_size", "disks", "disk_template",
5616
    "os", "tags", "nics", "vcpus", "hypervisor",
5617
    ]
5618
  _RELO_KEYS = [
5619
    "relocate_from",
5620
    ]
5621

    
5622
  def __init__(self, lu, mode, name, **kwargs):
5623
    self.lu = lu
5624
    # init buffer variables
5625
    self.in_text = self.out_text = self.in_data = self.out_data = None
5626
    # init all input fields so that pylint is happy
5627
    self.mode = mode
5628
    self.name = name
5629
    self.mem_size = self.disks = self.disk_template = None
5630
    self.os = self.tags = self.nics = self.vcpus = None
5631
    self.relocate_from = None
5632
    # computed fields
5633
    self.required_nodes = None
5634
    # init result fields
5635
    self.success = self.info = self.nodes = None
5636
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5637
      keyset = self._ALLO_KEYS
5638
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5639
      keyset = self._RELO_KEYS
5640
    else:
5641
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5642
                                   " IAllocator" % self.mode)
5643
    for key in kwargs:
5644
      if key not in keyset:
5645
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5646
                                     " IAllocator" % key)
5647
      setattr(self, key, kwargs[key])
5648
    for key in keyset:
5649
      if key not in kwargs:
5650
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5651
                                     " IAllocator" % key)
5652
    self._BuildInputData()
5653

    
5654
  def _ComputeClusterData(self):
5655
    """Compute the generic allocator input data.
5656

5657
    This is the data that is independent of the actual operation.
5658

5659
    """
5660
    cfg = self.lu.cfg
5661
    cluster_info = cfg.GetClusterInfo()
5662
    # cluster data
5663
    data = {
5664
      "version": 1,
5665
      "cluster_name": cfg.GetClusterName(),
5666
      "cluster_tags": list(cluster_info.GetTags()),
5667
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5668
      # we don't have job IDs
5669
      }
5670
    iinfo = cfg.GetAllInstancesInfo().values()
5671
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5672

    
5673
    # node data
5674
    node_results = {}
5675
    node_list = cfg.GetNodeList()
5676

    
5677
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5678
      hypervisor = self.hypervisor
5679
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5680
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5681

    
5682
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5683
                                           hypervisor)
5684
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5685
                       cluster_info.enabled_hypervisors)
5686
    for nname in node_list:
5687
      ninfo = cfg.GetNodeInfo(nname)
5688
      if nname not in node_data or not isinstance(node_data[nname], dict):
5689
        raise errors.OpExecError("Can't get data for node %s" % nname)
5690
      remote_info = node_data[nname]
5691
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5692
                   'vg_size', 'vg_free', 'cpu_total']:
5693
        if attr not in remote_info:
5694
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5695
                                   (nname, attr))
5696
        try:
5697
          remote_info[attr] = int(remote_info[attr])
5698
        except ValueError, err:
5699
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5700
                                   " %s" % (nname, attr, str(err)))
5701
      # compute memory used by primary instances
5702
      i_p_mem = i_p_up_mem = 0
5703
      for iinfo, beinfo in i_list:
5704
        if iinfo.primary_node == nname:
5705
          i_p_mem += beinfo[constants.BE_MEMORY]
5706
          if iinfo.name not in node_iinfo[nname]:
5707
            i_used_mem = 0
5708
          else:
5709
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5710
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5711
          remote_info['memory_free'] -= max(0, i_mem_diff)
5712

    
5713
          if iinfo.status == "up":
5714
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5715

    
5716
      # compute memory used by instances
5717
      pnr = {
5718
        "tags": list(ninfo.GetTags()),
5719
        "total_memory": remote_info['memory_total'],
5720
        "reserved_memory": remote_info['memory_dom0'],
5721
        "free_memory": remote_info['memory_free'],
5722
        "i_pri_memory": i_p_mem,
5723
        "i_pri_up_memory": i_p_up_mem,
5724
        "total_disk": remote_info['vg_size'],
5725
        "free_disk": remote_info['vg_free'],
5726
        "primary_ip": ninfo.primary_ip,
5727
        "secondary_ip": ninfo.secondary_ip,
5728
        "total_cpus": remote_info['cpu_total'],
5729
        }
5730
      node_results[nname] = pnr
5731
    data["nodes"] = node_results
5732

    
5733
    # instance data
5734
    instance_data = {}
5735
    for iinfo, beinfo in i_list:
5736
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5737
                  for n in iinfo.nics]
5738
      pir = {
5739
        "tags": list(iinfo.GetTags()),
5740
        "should_run": iinfo.status == "up",
5741
        "vcpus": beinfo[constants.BE_VCPUS],
5742
        "memory": beinfo[constants.BE_MEMORY],
5743
        "os": iinfo.os,
5744
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5745
        "nics": nic_data,
5746
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5747
        "disk_template": iinfo.disk_template,
5748
        "hypervisor": iinfo.hypervisor,
5749
        }
5750
      instance_data[iinfo.name] = pir
5751

    
5752
    data["instances"] = instance_data
5753

    
5754
    self.in_data = data
5755

    
5756
  def _AddNewInstance(self):
5757
    """Add new instance data to allocator structure.
5758

5759
    This in combination with _AllocatorGetClusterData will create the
5760
    correct structure needed as input for the allocator.
5761

5762
    The checks for the completeness of the opcode must have already been
5763
    done.
5764

5765
    """
5766
    data = self.in_data
5767
    if len(self.disks) != 2:
5768
      raise errors.OpExecError("Only two-disk configurations supported")
5769

    
5770
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5771

    
5772
    if self.disk_template in constants.DTS_NET_MIRROR:
5773
      self.required_nodes = 2
5774
    else:
5775
      self.required_nodes = 1
5776
    request = {
5777
      "type": "allocate",
5778
      "name": self.name,
5779
      "disk_template": self.disk_template,
5780
      "tags": self.tags,
5781
      "os": self.os,
5782
      "vcpus": self.vcpus,
5783
      "memory": self.mem_size,
5784
      "disks": self.disks,
5785
      "disk_space_total": disk_space,
5786
      "nics": self.nics,
5787
      "required_nodes": self.required_nodes,
5788
      }
5789
    data["request"] = request
5790

    
5791
  def _AddRelocateInstance(self):
5792
    """Add relocate instance data to allocator structure.
5793

5794
    This in combination with _IAllocatorGetClusterData will create the
5795
    correct structure needed as input for the allocator.
5796

5797
    The checks for the completeness of the opcode must have already been
5798
    done.
5799

5800
    """
5801
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5802
    if instance is None:
5803
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5804
                                   " IAllocator" % self.name)
5805

    
5806
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5807
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5808

    
5809
    if len(instance.secondary_nodes) != 1:
5810
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5811

    
5812
    self.required_nodes = 1
5813
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5814
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5815

    
5816
    request = {
5817
      "type": "relocate",
5818
      "name": self.name,
5819
      "disk_space_total": disk_space,
5820
      "required_nodes": self.required_nodes,
5821
      "relocate_from": self.relocate_from,
5822
      }
5823
    self.in_data["request"] = request
5824

    
5825
  def _BuildInputData(self):
5826
    """Build input data structures.
5827

5828
    """
5829
    self._ComputeClusterData()
5830

    
5831
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5832
      self._AddNewInstance()
5833
    else:
5834
      self._AddRelocateInstance()
5835

    
5836
    self.in_text = serializer.Dump(self.in_data)
5837

    
5838
  def Run(self, name, validate=True, call_fn=None):
5839
    """Run an instance allocator and return the results.
5840

5841
    """
5842
    if call_fn is None:
5843
      call_fn = self.lu.rpc.call_iallocator_runner
5844
    data = self.in_text
5845

    
5846
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5847

    
5848
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5849
      raise errors.OpExecError("Invalid result from master iallocator runner")
5850

    
5851
    rcode, stdout, stderr, fail = result
5852

    
5853
    if rcode == constants.IARUN_NOTFOUND:
5854
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5855
    elif rcode == constants.IARUN_FAILURE:
5856
      raise errors.OpExecError("Instance allocator call failed: %s,"
5857
                               " output: %s" % (fail, stdout+stderr))
5858
    self.out_text = stdout
5859
    if validate:
5860
      self._ValidateResult()
5861

    
5862
  def _ValidateResult(self):
5863
    """Process the allocator results.
5864

5865
    This will process and if successful save the result in
5866
    self.out_data and the other parameters.
5867

5868
    """
5869
    try:
5870
      rdict = serializer.Load(self.out_text)
5871
    except Exception, err:
5872
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5873

    
5874
    if not isinstance(rdict, dict):
5875
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5876

    
5877
    for key in "success", "info", "nodes":
5878
      if key not in rdict:
5879
        raise errors.OpExecError("Can't parse iallocator results:"
5880
                                 " missing key '%s'" % key)
5881
      setattr(self, key, rdict[key])
5882

    
5883
    if not isinstance(rdict["nodes"], list):
5884
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5885
                               " is not a list")
5886
    self.out_data = rdict
5887

    
5888

    
5889
class LUTestAllocator(NoHooksLU):
5890
  """Run allocator tests.
5891

5892
  This LU runs the allocator tests
5893

5894
  """
5895
  _OP_REQP = ["direction", "mode", "name"]
5896

    
5897
  def CheckPrereq(self):
5898
    """Check prerequisites.
5899

5900
    This checks the opcode parameters depending on the director and mode test.
5901

5902
    """
5903
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5904
      for attr in ["name", "mem_size", "disks", "disk_template",
5905
                   "os", "tags", "nics", "vcpus"]:
5906
        if not hasattr(self.op, attr):
5907
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5908
                                     attr)
5909
      iname = self.cfg.ExpandInstanceName(self.op.name)
5910
      if iname is not None:
5911
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5912
                                   iname)
5913
      if not isinstance(self.op.nics, list):
5914
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5915
      for row in self.op.nics:
5916
        if (not isinstance(row, dict) or
5917
            "mac" not in row or
5918
            "ip" not in row or
5919
            "bridge" not in row):
5920
          raise errors.OpPrereqError("Invalid contents of the"
5921
                                     " 'nics' parameter")
5922
      if not isinstance(self.op.disks, list):
5923
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5924
      if len(self.op.disks) != 2:
5925
        raise errors.OpPrereqError("Only two-disk configurations supported")
5926
      for row in self.op.disks:
5927
        if (not isinstance(row, dict) or
5928
            "size" not in row or
5929
            not isinstance(row["size"], int) or
5930
            "mode" not in row or
5931
            row["mode"] not in ['r', 'w']):
5932
          raise errors.OpPrereqError("Invalid contents of the"
5933
                                     " 'disks' parameter")
5934
      if self.op.hypervisor is None:
5935
        self.op.hypervisor = self.cfg.GetHypervisorType()
5936
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5937
      if not hasattr(self.op, "name"):
5938
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5939
      fname = self.cfg.ExpandInstanceName(self.op.name)
5940
      if fname is None:
5941
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5942
                                   self.op.name)
5943
      self.op.name = fname
5944
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5945
    else:
5946
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5947
                                 self.op.mode)
5948

    
5949
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5950
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5951
        raise errors.OpPrereqError("Missing allocator name")
5952
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5953
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5954
                                 self.op.direction)
5955

    
5956
  def Exec(self, feedback_fn):
5957
    """Run the allocator test.
5958

5959
    """
5960
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5961
      ial = IAllocator(self,
5962
                       mode=self.op.mode,
5963
                       name=self.op.name,
5964
                       mem_size=self.op.mem_size,
5965
                       disks=self.op.disks,
5966
                       disk_template=self.op.disk_template,
5967
                       os=self.op.os,
5968
                       tags=self.op.tags,
5969
                       nics=self.op.nics,
5970
                       vcpus=self.op.vcpus,
5971
                       hypervisor=self.op.hypervisor,
5972
                       )
5973
    else:
5974
      ial = IAllocator(self,
5975
                       mode=self.op.mode,
5976
                       name=self.op.name,
5977
                       relocate_from=list(self.relocate_from),
5978
                       )
5979

    
5980
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5981
      result = ial.in_text
5982
    else:
5983
      ial.Run(self.op.allocator, validate=False)
5984
      result = ial.out_text
5985
    return result