Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b31c8676

History | View | Annotate | Download (205.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    "master_candidate",
1551
    "master",
1552
    )
1553

    
1554
  def ExpandNames(self):
1555
    _CheckOutputFields(static=self._FIELDS_STATIC,
1556
                       dynamic=self._FIELDS_DYNAMIC,
1557
                       selected=self.op.output_fields)
1558

    
1559
    self.needed_locks = {}
1560
    self.share_locks[locking.LEVEL_NODE] = 1
1561

    
1562
    if self.op.names:
1563
      self.wanted = _GetWantedNodes(self, self.op.names)
1564
    else:
1565
      self.wanted = locking.ALL_SET
1566

    
1567
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1568
    if self.do_locking:
1569
      # if we don't request only static fields, we need to lock the nodes
1570
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1571

    
1572

    
1573
  def CheckPrereq(self):
1574
    """Check prerequisites.
1575

1576
    """
1577
    # The validation of the node list is done in the _GetWantedNodes,
1578
    # if non empty, and if empty, there's no validation to do
1579
    pass
1580

    
1581
  def Exec(self, feedback_fn):
1582
    """Computes the list of nodes and their attributes.
1583

1584
    """
1585
    all_info = self.cfg.GetAllNodesInfo()
1586
    if self.do_locking:
1587
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1588
    elif self.wanted != locking.ALL_SET:
1589
      nodenames = self.wanted
1590
      missing = set(nodenames).difference(all_info.keys())
1591
      if missing:
1592
        raise errors.OpExecError(
1593
          "Some nodes were removed before retrieving their data: %s" % missing)
1594
    else:
1595
      nodenames = all_info.keys()
1596

    
1597
    nodenames = utils.NiceSort(nodenames)
1598
    nodelist = [all_info[name] for name in nodenames]
1599

    
1600
    # begin data gathering
1601

    
1602
    if self.do_locking:
1603
      live_data = {}
1604
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1605
                                          self.cfg.GetHypervisorType())
1606
      for name in nodenames:
1607
        nodeinfo = node_data.get(name, None)
1608
        if nodeinfo:
1609
          fn = utils.TryConvert
1610
          live_data[name] = {
1611
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1612
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1613
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1614
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1615
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1616
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1617
            "bootid": nodeinfo.get('bootid', None),
1618
            }
1619
        else:
1620
          live_data[name] = {}
1621
    else:
1622
      live_data = dict.fromkeys(nodenames, {})
1623

    
1624
    node_to_primary = dict([(name, set()) for name in nodenames])
1625
    node_to_secondary = dict([(name, set()) for name in nodenames])
1626

    
1627
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1628
                             "sinst_cnt", "sinst_list"))
1629
    if inst_fields & frozenset(self.op.output_fields):
1630
      instancelist = self.cfg.GetInstanceList()
1631

    
1632
      for instance_name in instancelist:
1633
        inst = self.cfg.GetInstanceInfo(instance_name)
1634
        if inst.primary_node in node_to_primary:
1635
          node_to_primary[inst.primary_node].add(inst.name)
1636
        for secnode in inst.secondary_nodes:
1637
          if secnode in node_to_secondary:
1638
            node_to_secondary[secnode].add(inst.name)
1639

    
1640
    master_node = self.cfg.GetMasterNode()
1641

    
1642
    # end data gathering
1643

    
1644
    output = []
1645
    for node in nodelist:
1646
      node_output = []
1647
      for field in self.op.output_fields:
1648
        if field == "name":
1649
          val = node.name
1650
        elif field == "pinst_list":
1651
          val = list(node_to_primary[node.name])
1652
        elif field == "sinst_list":
1653
          val = list(node_to_secondary[node.name])
1654
        elif field == "pinst_cnt":
1655
          val = len(node_to_primary[node.name])
1656
        elif field == "sinst_cnt":
1657
          val = len(node_to_secondary[node.name])
1658
        elif field == "pip":
1659
          val = node.primary_ip
1660
        elif field == "sip":
1661
          val = node.secondary_ip
1662
        elif field == "tags":
1663
          val = list(node.GetTags())
1664
        elif field == "serial_no":
1665
          val = node.serial_no
1666
        elif field == "master_candidate":
1667
          val = node.master_candidate
1668
        elif field == "master":
1669
          val = node.name == master_node
1670
        elif self._FIELDS_DYNAMIC.Matches(field):
1671
          val = live_data[node.name].get(field, None)
1672
        else:
1673
          raise errors.ParameterError(field)
1674
        node_output.append(val)
1675
      output.append(node_output)
1676

    
1677
    return output
1678

    
1679

    
1680
class LUQueryNodeVolumes(NoHooksLU):
1681
  """Logical unit for getting volumes on node(s).
1682

1683
  """
1684
  _OP_REQP = ["nodes", "output_fields"]
1685
  REQ_BGL = False
1686
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1687
  _FIELDS_STATIC = utils.FieldSet("node")
1688

    
1689
  def ExpandNames(self):
1690
    _CheckOutputFields(static=self._FIELDS_STATIC,
1691
                       dynamic=self._FIELDS_DYNAMIC,
1692
                       selected=self.op.output_fields)
1693

    
1694
    self.needed_locks = {}
1695
    self.share_locks[locking.LEVEL_NODE] = 1
1696
    if not self.op.nodes:
1697
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1698
    else:
1699
      self.needed_locks[locking.LEVEL_NODE] = \
1700
        _GetWantedNodes(self, self.op.nodes)
1701

    
1702
  def CheckPrereq(self):
1703
    """Check prerequisites.
1704

1705
    This checks that the fields required are valid output fields.
1706

1707
    """
1708
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1709

    
1710
  def Exec(self, feedback_fn):
1711
    """Computes the list of nodes and their attributes.
1712

1713
    """
1714
    nodenames = self.nodes
1715
    volumes = self.rpc.call_node_volumes(nodenames)
1716

    
1717
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1718
             in self.cfg.GetInstanceList()]
1719

    
1720
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1721

    
1722
    output = []
1723
    for node in nodenames:
1724
      if node not in volumes or not volumes[node]:
1725
        continue
1726

    
1727
      node_vols = volumes[node][:]
1728
      node_vols.sort(key=lambda vol: vol['dev'])
1729

    
1730
      for vol in node_vols:
1731
        node_output = []
1732
        for field in self.op.output_fields:
1733
          if field == "node":
1734
            val = node
1735
          elif field == "phys":
1736
            val = vol['dev']
1737
          elif field == "vg":
1738
            val = vol['vg']
1739
          elif field == "name":
1740
            val = vol['name']
1741
          elif field == "size":
1742
            val = int(float(vol['size']))
1743
          elif field == "instance":
1744
            for inst in ilist:
1745
              if node not in lv_by_node[inst]:
1746
                continue
1747
              if vol['name'] in lv_by_node[inst][node]:
1748
                val = inst.name
1749
                break
1750
            else:
1751
              val = '-'
1752
          else:
1753
            raise errors.ParameterError(field)
1754
          node_output.append(str(val))
1755

    
1756
        output.append(node_output)
1757

    
1758
    return output
1759

    
1760

    
1761
class LUAddNode(LogicalUnit):
1762
  """Logical unit for adding node to the cluster.
1763

1764
  """
1765
  HPATH = "node-add"
1766
  HTYPE = constants.HTYPE_NODE
1767
  _OP_REQP = ["node_name"]
1768

    
1769
  def BuildHooksEnv(self):
1770
    """Build hooks env.
1771

1772
    This will run on all nodes before, and on all nodes + the new node after.
1773

1774
    """
1775
    env = {
1776
      "OP_TARGET": self.op.node_name,
1777
      "NODE_NAME": self.op.node_name,
1778
      "NODE_PIP": self.op.primary_ip,
1779
      "NODE_SIP": self.op.secondary_ip,
1780
      }
1781
    nodes_0 = self.cfg.GetNodeList()
1782
    nodes_1 = nodes_0 + [self.op.node_name, ]
1783
    return env, nodes_0, nodes_1
1784

    
1785
  def CheckPrereq(self):
1786
    """Check prerequisites.
1787

1788
    This checks:
1789
     - the new node is not already in the config
1790
     - it is resolvable
1791
     - its parameters (single/dual homed) matches the cluster
1792

1793
    Any errors are signalled by raising errors.OpPrereqError.
1794

1795
    """
1796
    node_name = self.op.node_name
1797
    cfg = self.cfg
1798

    
1799
    dns_data = utils.HostInfo(node_name)
1800

    
1801
    node = dns_data.name
1802
    primary_ip = self.op.primary_ip = dns_data.ip
1803
    secondary_ip = getattr(self.op, "secondary_ip", None)
1804
    if secondary_ip is None:
1805
      secondary_ip = primary_ip
1806
    if not utils.IsValidIP(secondary_ip):
1807
      raise errors.OpPrereqError("Invalid secondary IP given")
1808
    self.op.secondary_ip = secondary_ip
1809

    
1810
    node_list = cfg.GetNodeList()
1811
    if not self.op.readd and node in node_list:
1812
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1813
                                 node)
1814
    elif self.op.readd and node not in node_list:
1815
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1816

    
1817
    for existing_node_name in node_list:
1818
      existing_node = cfg.GetNodeInfo(existing_node_name)
1819

    
1820
      if self.op.readd and node == existing_node_name:
1821
        if (existing_node.primary_ip != primary_ip or
1822
            existing_node.secondary_ip != secondary_ip):
1823
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1824
                                     " address configuration as before")
1825
        continue
1826

    
1827
      if (existing_node.primary_ip == primary_ip or
1828
          existing_node.secondary_ip == primary_ip or
1829
          existing_node.primary_ip == secondary_ip or
1830
          existing_node.secondary_ip == secondary_ip):
1831
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1832
                                   " existing node %s" % existing_node.name)
1833

    
1834
    # check that the type of the node (single versus dual homed) is the
1835
    # same as for the master
1836
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1837
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1838
    newbie_singlehomed = secondary_ip == primary_ip
1839
    if master_singlehomed != newbie_singlehomed:
1840
      if master_singlehomed:
1841
        raise errors.OpPrereqError("The master has no private ip but the"
1842
                                   " new node has one")
1843
      else:
1844
        raise errors.OpPrereqError("The master has a private ip but the"
1845
                                   " new node doesn't have one")
1846

    
1847
    # checks reachablity
1848
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1849
      raise errors.OpPrereqError("Node not reachable by ping")
1850

    
1851
    if not newbie_singlehomed:
1852
      # check reachability from my secondary ip to newbie's secondary ip
1853
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1854
                           source=myself.secondary_ip):
1855
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1856
                                   " based ping to noded port")
1857

    
1858
    self.new_node = objects.Node(name=node,
1859
                                 primary_ip=primary_ip,
1860
                                 secondary_ip=secondary_ip)
1861

    
1862
  def Exec(self, feedback_fn):
1863
    """Adds the new node to the cluster.
1864

1865
    """
1866
    new_node = self.new_node
1867
    node = new_node.name
1868

    
1869
    # check connectivity
1870
    result = self.rpc.call_version([node])[node]
1871
    if result:
1872
      if constants.PROTOCOL_VERSION == result:
1873
        logging.info("Communication to node %s fine, sw version %s match",
1874
                     node, result)
1875
      else:
1876
        raise errors.OpExecError("Version mismatch master version %s,"
1877
                                 " node version %s" %
1878
                                 (constants.PROTOCOL_VERSION, result))
1879
    else:
1880
      raise errors.OpExecError("Cannot get version from the new node")
1881

    
1882
    # setup ssh on node
1883
    logging.info("Copy ssh key to node %s", node)
1884
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1885
    keyarray = []
1886
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1887
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1888
                priv_key, pub_key]
1889

    
1890
    for i in keyfiles:
1891
      f = open(i, 'r')
1892
      try:
1893
        keyarray.append(f.read())
1894
      finally:
1895
        f.close()
1896

    
1897
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1898
                                    keyarray[2],
1899
                                    keyarray[3], keyarray[4], keyarray[5])
1900

    
1901
    if not result:
1902
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1903

    
1904
    # Add node to our /etc/hosts, and add key to known_hosts
1905
    utils.AddHostToEtcHosts(new_node.name)
1906

    
1907
    if new_node.secondary_ip != new_node.primary_ip:
1908
      if not self.rpc.call_node_has_ip_address(new_node.name,
1909
                                               new_node.secondary_ip):
1910
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1911
                                 " you gave (%s). Please fix and re-run this"
1912
                                 " command." % new_node.secondary_ip)
1913

    
1914
    node_verify_list = [self.cfg.GetMasterNode()]
1915
    node_verify_param = {
1916
      'nodelist': [node],
1917
      # TODO: do a node-net-test as well?
1918
    }
1919

    
1920
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1921
                                       self.cfg.GetClusterName())
1922
    for verifier in node_verify_list:
1923
      if not result[verifier]:
1924
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1925
                                 " for remote verification" % verifier)
1926
      if result[verifier]['nodelist']:
1927
        for failed in result[verifier]['nodelist']:
1928
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1929
                      (verifier, result[verifier]['nodelist'][failed]))
1930
        raise errors.OpExecError("ssh/hostname verification failed.")
1931

    
1932
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1933
    # including the node just added
1934
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1935
    dist_nodes = self.cfg.GetNodeList()
1936
    if not self.op.readd:
1937
      dist_nodes.append(node)
1938
    if myself.name in dist_nodes:
1939
      dist_nodes.remove(myself.name)
1940

    
1941
    logging.debug("Copying hosts and known_hosts to all nodes")
1942
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1943
      result = self.rpc.call_upload_file(dist_nodes, fname)
1944
      for to_node in dist_nodes:
1945
        if not result[to_node]:
1946
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1947

    
1948
    to_copy = []
1949
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1950
      to_copy.append(constants.VNC_PASSWORD_FILE)
1951
    for fname in to_copy:
1952
      result = self.rpc.call_upload_file([node], fname)
1953
      if not result[node]:
1954
        logging.error("Could not copy file %s to node %s", fname, node)
1955

    
1956
    if self.op.readd:
1957
      self.context.ReaddNode(new_node)
1958
    else:
1959
      self.context.AddNode(new_node)
1960

    
1961

    
1962
class LUSetNodeParams(LogicalUnit):
1963
  """Modifies the parameters of a node.
1964

1965
  """
1966
  HPATH = "node-modify"
1967
  HTYPE = constants.HTYPE_NODE
1968
  _OP_REQP = ["node_name"]
1969
  REQ_BGL = False
1970

    
1971
  def CheckArguments(self):
1972
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
1973
    if node_name is None:
1974
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
1975
    self.op.node_name = node_name
1976
    if not hasattr(self.op, 'master_candidate'):
1977
      raise errors.OpPrereqError("Please pass at least one modification")
1978
    self.op.master_candidate = bool(self.op.master_candidate)
1979

    
1980
  def ExpandNames(self):
1981
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
1982

    
1983
  def BuildHooksEnv(self):
1984
    """Build hooks env.
1985

1986
    This runs on the master node.
1987

1988
    """
1989
    env = {
1990
      "OP_TARGET": self.op.node_name,
1991
      "MASTER_CANDIDATE": str(self.op.master_candidate),
1992
      }
1993
    nl = [self.cfg.GetMasterNode(),
1994
          self.op.node_name]
1995
    return env, nl, nl
1996

    
1997
  def CheckPrereq(self):
1998
    """Check prerequisites.
1999

2000
    This only checks the instance list against the existing names.
2001

2002
    """
2003
    force = self.force = self.op.force
2004

    
2005
    return
2006

    
2007
  def Exec(self, feedback_fn):
2008
    """Modifies a node.
2009

2010
    """
2011
    node = self.cfg.GetNodeInfo(self.op.node_name)
2012

    
2013
    result = []
2014

    
2015
    if self.op.master_candidate is not None:
2016
      node.master_candidate = self.op.master_candidate
2017
      result.append(("master_candidate", str(self.op.master_candidate)))
2018

    
2019
    # this will trigger configuration file update, if needed
2020
    self.cfg.Update(node)
2021
    # this will trigger job queue propagation or cleanup
2022
    self.context.ReaddNode(node)
2023

    
2024
    return result
2025

    
2026

    
2027
class LUQueryClusterInfo(NoHooksLU):
2028
  """Query cluster configuration.
2029

2030
  """
2031
  _OP_REQP = []
2032
  REQ_BGL = False
2033

    
2034
  def ExpandNames(self):
2035
    self.needed_locks = {}
2036

    
2037
  def CheckPrereq(self):
2038
    """No prerequsites needed for this LU.
2039

2040
    """
2041
    pass
2042

    
2043
  def Exec(self, feedback_fn):
2044
    """Return cluster config.
2045

2046
    """
2047
    cluster = self.cfg.GetClusterInfo()
2048
    result = {
2049
      "software_version": constants.RELEASE_VERSION,
2050
      "protocol_version": constants.PROTOCOL_VERSION,
2051
      "config_version": constants.CONFIG_VERSION,
2052
      "os_api_version": constants.OS_API_VERSION,
2053
      "export_version": constants.EXPORT_VERSION,
2054
      "architecture": (platform.architecture()[0], platform.machine()),
2055
      "name": cluster.cluster_name,
2056
      "master": cluster.master_node,
2057
      "default_hypervisor": cluster.default_hypervisor,
2058
      "enabled_hypervisors": cluster.enabled_hypervisors,
2059
      "hvparams": cluster.hvparams,
2060
      "beparams": cluster.beparams,
2061
      }
2062

    
2063
    return result
2064

    
2065

    
2066
class LUQueryConfigValues(NoHooksLU):
2067
  """Return configuration values.
2068

2069
  """
2070
  _OP_REQP = []
2071
  REQ_BGL = False
2072
  _FIELDS_DYNAMIC = utils.FieldSet()
2073
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2074

    
2075
  def ExpandNames(self):
2076
    self.needed_locks = {}
2077

    
2078
    _CheckOutputFields(static=self._FIELDS_STATIC,
2079
                       dynamic=self._FIELDS_DYNAMIC,
2080
                       selected=self.op.output_fields)
2081

    
2082
  def CheckPrereq(self):
2083
    """No prerequisites.
2084

2085
    """
2086
    pass
2087

    
2088
  def Exec(self, feedback_fn):
2089
    """Dump a representation of the cluster config to the standard output.
2090

2091
    """
2092
    values = []
2093
    for field in self.op.output_fields:
2094
      if field == "cluster_name":
2095
        entry = self.cfg.GetClusterName()
2096
      elif field == "master_node":
2097
        entry = self.cfg.GetMasterNode()
2098
      elif field == "drain_flag":
2099
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2100
      else:
2101
        raise errors.ParameterError(field)
2102
      values.append(entry)
2103
    return values
2104

    
2105

    
2106
class LUActivateInstanceDisks(NoHooksLU):
2107
  """Bring up an instance's disks.
2108

2109
  """
2110
  _OP_REQP = ["instance_name"]
2111
  REQ_BGL = False
2112

    
2113
  def ExpandNames(self):
2114
    self._ExpandAndLockInstance()
2115
    self.needed_locks[locking.LEVEL_NODE] = []
2116
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2117

    
2118
  def DeclareLocks(self, level):
2119
    if level == locking.LEVEL_NODE:
2120
      self._LockInstancesNodes()
2121

    
2122
  def CheckPrereq(self):
2123
    """Check prerequisites.
2124

2125
    This checks that the instance is in the cluster.
2126

2127
    """
2128
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2129
    assert self.instance is not None, \
2130
      "Cannot retrieve locked instance %s" % self.op.instance_name
2131

    
2132
  def Exec(self, feedback_fn):
2133
    """Activate the disks.
2134

2135
    """
2136
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2137
    if not disks_ok:
2138
      raise errors.OpExecError("Cannot activate block devices")
2139

    
2140
    return disks_info
2141

    
2142

    
2143
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2144
  """Prepare the block devices for an instance.
2145

2146
  This sets up the block devices on all nodes.
2147

2148
  @type lu: L{LogicalUnit}
2149
  @param lu: the logical unit on whose behalf we execute
2150
  @type instance: L{objects.Instance}
2151
  @param instance: the instance for whose disks we assemble
2152
  @type ignore_secondaries: boolean
2153
  @param ignore_secondaries: if true, errors on secondary nodes
2154
      won't result in an error return from the function
2155
  @return: False if the operation failed, otherwise a list of
2156
      (host, instance_visible_name, node_visible_name)
2157
      with the mapping from node devices to instance devices
2158

2159
  """
2160
  device_info = []
2161
  disks_ok = True
2162
  iname = instance.name
2163
  # With the two passes mechanism we try to reduce the window of
2164
  # opportunity for the race condition of switching DRBD to primary
2165
  # before handshaking occured, but we do not eliminate it
2166

    
2167
  # The proper fix would be to wait (with some limits) until the
2168
  # connection has been made and drbd transitions from WFConnection
2169
  # into any other network-connected state (Connected, SyncTarget,
2170
  # SyncSource, etc.)
2171

    
2172
  # 1st pass, assemble on all nodes in secondary mode
2173
  for inst_disk in instance.disks:
2174
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2175
      lu.cfg.SetDiskID(node_disk, node)
2176
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2177
      if not result:
2178
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2179
                           " (is_primary=False, pass=1)",
2180
                           inst_disk.iv_name, node)
2181
        if not ignore_secondaries:
2182
          disks_ok = False
2183

    
2184
  # FIXME: race condition on drbd migration to primary
2185

    
2186
  # 2nd pass, do only the primary node
2187
  for inst_disk in instance.disks:
2188
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2189
      if node != instance.primary_node:
2190
        continue
2191
      lu.cfg.SetDiskID(node_disk, node)
2192
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2193
      if not result:
2194
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2195
                           " (is_primary=True, pass=2)",
2196
                           inst_disk.iv_name, node)
2197
        disks_ok = False
2198
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2199

    
2200
  # leave the disks configured for the primary node
2201
  # this is a workaround that would be fixed better by
2202
  # improving the logical/physical id handling
2203
  for disk in instance.disks:
2204
    lu.cfg.SetDiskID(disk, instance.primary_node)
2205

    
2206
  return disks_ok, device_info
2207

    
2208

    
2209
def _StartInstanceDisks(lu, instance, force):
2210
  """Start the disks of an instance.
2211

2212
  """
2213
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2214
                                           ignore_secondaries=force)
2215
  if not disks_ok:
2216
    _ShutdownInstanceDisks(lu, instance)
2217
    if force is not None and not force:
2218
      lu.proc.LogWarning("", hint="If the message above refers to a"
2219
                         " secondary node,"
2220
                         " you can retry the operation using '--force'.")
2221
    raise errors.OpExecError("Disk consistency error")
2222

    
2223

    
2224
class LUDeactivateInstanceDisks(NoHooksLU):
2225
  """Shutdown an instance's disks.
2226

2227
  """
2228
  _OP_REQP = ["instance_name"]
2229
  REQ_BGL = False
2230

    
2231
  def ExpandNames(self):
2232
    self._ExpandAndLockInstance()
2233
    self.needed_locks[locking.LEVEL_NODE] = []
2234
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2235

    
2236
  def DeclareLocks(self, level):
2237
    if level == locking.LEVEL_NODE:
2238
      self._LockInstancesNodes()
2239

    
2240
  def CheckPrereq(self):
2241
    """Check prerequisites.
2242

2243
    This checks that the instance is in the cluster.
2244

2245
    """
2246
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2247
    assert self.instance is not None, \
2248
      "Cannot retrieve locked instance %s" % self.op.instance_name
2249

    
2250
  def Exec(self, feedback_fn):
2251
    """Deactivate the disks
2252

2253
    """
2254
    instance = self.instance
2255
    _SafeShutdownInstanceDisks(self, instance)
2256

    
2257

    
2258
def _SafeShutdownInstanceDisks(lu, instance):
2259
  """Shutdown block devices of an instance.
2260

2261
  This function checks if an instance is running, before calling
2262
  _ShutdownInstanceDisks.
2263

2264
  """
2265
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2266
                                      [instance.hypervisor])
2267
  ins_l = ins_l[instance.primary_node]
2268
  if not type(ins_l) is list:
2269
    raise errors.OpExecError("Can't contact node '%s'" %
2270
                             instance.primary_node)
2271

    
2272
  if instance.name in ins_l:
2273
    raise errors.OpExecError("Instance is running, can't shutdown"
2274
                             " block devices.")
2275

    
2276
  _ShutdownInstanceDisks(lu, instance)
2277

    
2278

    
2279
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2280
  """Shutdown block devices of an instance.
2281

2282
  This does the shutdown on all nodes of the instance.
2283

2284
  If the ignore_primary is false, errors on the primary node are
2285
  ignored.
2286

2287
  """
2288
  result = True
2289
  for disk in instance.disks:
2290
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2291
      lu.cfg.SetDiskID(top_disk, node)
2292
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2293
        logging.error("Could not shutdown block device %s on node %s",
2294
                      disk.iv_name, node)
2295
        if not ignore_primary or node != instance.primary_node:
2296
          result = False
2297
  return result
2298

    
2299

    
2300
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2301
  """Checks if a node has enough free memory.
2302

2303
  This function check if a given node has the needed amount of free
2304
  memory. In case the node has less memory or we cannot get the
2305
  information from the node, this function raise an OpPrereqError
2306
  exception.
2307

2308
  @type lu: C{LogicalUnit}
2309
  @param lu: a logical unit from which we get configuration data
2310
  @type node: C{str}
2311
  @param node: the node to check
2312
  @type reason: C{str}
2313
  @param reason: string to use in the error message
2314
  @type requested: C{int}
2315
  @param requested: the amount of memory in MiB to check for
2316
  @type hypervisor: C{str}
2317
  @param hypervisor: the hypervisor to ask for memory stats
2318
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2319
      we cannot check the node
2320

2321
  """
2322
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2323
  if not nodeinfo or not isinstance(nodeinfo, dict):
2324
    raise errors.OpPrereqError("Could not contact node %s for resource"
2325
                             " information" % (node,))
2326

    
2327
  free_mem = nodeinfo[node].get('memory_free')
2328
  if not isinstance(free_mem, int):
2329
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2330
                             " was '%s'" % (node, free_mem))
2331
  if requested > free_mem:
2332
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2333
                             " needed %s MiB, available %s MiB" %
2334
                             (node, reason, requested, free_mem))
2335

    
2336

    
2337
class LUStartupInstance(LogicalUnit):
2338
  """Starts an instance.
2339

2340
  """
2341
  HPATH = "instance-start"
2342
  HTYPE = constants.HTYPE_INSTANCE
2343
  _OP_REQP = ["instance_name", "force"]
2344
  REQ_BGL = False
2345

    
2346
  def ExpandNames(self):
2347
    self._ExpandAndLockInstance()
2348

    
2349
  def BuildHooksEnv(self):
2350
    """Build hooks env.
2351

2352
    This runs on master, primary and secondary nodes of the instance.
2353

2354
    """
2355
    env = {
2356
      "FORCE": self.op.force,
2357
      }
2358
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2359
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2360
          list(self.instance.secondary_nodes))
2361
    return env, nl, nl
2362

    
2363
  def CheckPrereq(self):
2364
    """Check prerequisites.
2365

2366
    This checks that the instance is in the cluster.
2367

2368
    """
2369
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2370
    assert self.instance is not None, \
2371
      "Cannot retrieve locked instance %s" % self.op.instance_name
2372

    
2373
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2374
    # check bridges existance
2375
    _CheckInstanceBridgesExist(self, instance)
2376

    
2377
    _CheckNodeFreeMemory(self, instance.primary_node,
2378
                         "starting instance %s" % instance.name,
2379
                         bep[constants.BE_MEMORY], instance.hypervisor)
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Start the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    force = self.op.force
2387
    extra_args = getattr(self.op, "extra_args", "")
2388

    
2389
    self.cfg.MarkInstanceUp(instance.name)
2390

    
2391
    node_current = instance.primary_node
2392

    
2393
    _StartInstanceDisks(self, instance, force)
2394

    
2395
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2396
      _ShutdownInstanceDisks(self, instance)
2397
      raise errors.OpExecError("Could not start instance")
2398

    
2399

    
2400
class LURebootInstance(LogicalUnit):
2401
  """Reboot an instance.
2402

2403
  """
2404
  HPATH = "instance-reboot"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2407
  REQ_BGL = False
2408

    
2409
  def ExpandNames(self):
2410
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2411
                                   constants.INSTANCE_REBOOT_HARD,
2412
                                   constants.INSTANCE_REBOOT_FULL]:
2413
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2414
                                  (constants.INSTANCE_REBOOT_SOFT,
2415
                                   constants.INSTANCE_REBOOT_HARD,
2416
                                   constants.INSTANCE_REBOOT_FULL))
2417
    self._ExpandAndLockInstance()
2418

    
2419
  def BuildHooksEnv(self):
2420
    """Build hooks env.
2421

2422
    This runs on master, primary and secondary nodes of the instance.
2423

2424
    """
2425
    env = {
2426
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2427
      }
2428
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2429
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2430
          list(self.instance.secondary_nodes))
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster.
2437

2438
    """
2439
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2440
    assert self.instance is not None, \
2441
      "Cannot retrieve locked instance %s" % self.op.instance_name
2442

    
2443
    # check bridges existance
2444
    _CheckInstanceBridgesExist(self, instance)
2445

    
2446
  def Exec(self, feedback_fn):
2447
    """Reboot the instance.
2448

2449
    """
2450
    instance = self.instance
2451
    ignore_secondaries = self.op.ignore_secondaries
2452
    reboot_type = self.op.reboot_type
2453
    extra_args = getattr(self.op, "extra_args", "")
2454

    
2455
    node_current = instance.primary_node
2456

    
2457
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2458
                       constants.INSTANCE_REBOOT_HARD]:
2459
      if not self.rpc.call_instance_reboot(node_current, instance,
2460
                                           reboot_type, extra_args):
2461
        raise errors.OpExecError("Could not reboot instance")
2462
    else:
2463
      if not self.rpc.call_instance_shutdown(node_current, instance):
2464
        raise errors.OpExecError("could not shutdown instance for full reboot")
2465
      _ShutdownInstanceDisks(self, instance)
2466
      _StartInstanceDisks(self, instance, ignore_secondaries)
2467
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2468
        _ShutdownInstanceDisks(self, instance)
2469
        raise errors.OpExecError("Could not start instance for full reboot")
2470

    
2471
    self.cfg.MarkInstanceUp(instance.name)
2472

    
2473

    
2474
class LUShutdownInstance(LogicalUnit):
2475
  """Shutdown an instance.
2476

2477
  """
2478
  HPATH = "instance-stop"
2479
  HTYPE = constants.HTYPE_INSTANCE
2480
  _OP_REQP = ["instance_name"]
2481
  REQ_BGL = False
2482

    
2483
  def ExpandNames(self):
2484
    self._ExpandAndLockInstance()
2485

    
2486
  def BuildHooksEnv(self):
2487
    """Build hooks env.
2488

2489
    This runs on master, primary and secondary nodes of the instance.
2490

2491
    """
2492
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2493
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2494
          list(self.instance.secondary_nodes))
2495
    return env, nl, nl
2496

    
2497
  def CheckPrereq(self):
2498
    """Check prerequisites.
2499

2500
    This checks that the instance is in the cluster.
2501

2502
    """
2503
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2504
    assert self.instance is not None, \
2505
      "Cannot retrieve locked instance %s" % self.op.instance_name
2506

    
2507
  def Exec(self, feedback_fn):
2508
    """Shutdown the instance.
2509

2510
    """
2511
    instance = self.instance
2512
    node_current = instance.primary_node
2513
    self.cfg.MarkInstanceDown(instance.name)
2514
    if not self.rpc.call_instance_shutdown(node_current, instance):
2515
      self.proc.LogWarning("Could not shutdown instance")
2516

    
2517
    _ShutdownInstanceDisks(self, instance)
2518

    
2519

    
2520
class LUReinstallInstance(LogicalUnit):
2521
  """Reinstall an instance.
2522

2523
  """
2524
  HPATH = "instance-reinstall"
2525
  HTYPE = constants.HTYPE_INSTANCE
2526
  _OP_REQP = ["instance_name"]
2527
  REQ_BGL = False
2528

    
2529
  def ExpandNames(self):
2530
    self._ExpandAndLockInstance()
2531

    
2532
  def BuildHooksEnv(self):
2533
    """Build hooks env.
2534

2535
    This runs on master, primary and secondary nodes of the instance.
2536

2537
    """
2538
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2539
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540
          list(self.instance.secondary_nodes))
2541
    return env, nl, nl
2542

    
2543
  def CheckPrereq(self):
2544
    """Check prerequisites.
2545

2546
    This checks that the instance is in the cluster and is not running.
2547

2548
    """
2549
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2550
    assert instance is not None, \
2551
      "Cannot retrieve locked instance %s" % self.op.instance_name
2552

    
2553
    if instance.disk_template == constants.DT_DISKLESS:
2554
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2555
                                 self.op.instance_name)
2556
    if instance.status != "down":
2557
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2558
                                 self.op.instance_name)
2559
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2560
                                              instance.name,
2561
                                              instance.hypervisor)
2562
    if remote_info:
2563
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2564
                                 (self.op.instance_name,
2565
                                  instance.primary_node))
2566

    
2567
    self.op.os_type = getattr(self.op, "os_type", None)
2568
    if self.op.os_type is not None:
2569
      # OS verification
2570
      pnode = self.cfg.GetNodeInfo(
2571
        self.cfg.ExpandNodeName(instance.primary_node))
2572
      if pnode is None:
2573
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2574
                                   self.op.pnode)
2575
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2576
      if not os_obj:
2577
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2578
                                   " primary node"  % self.op.os_type)
2579

    
2580
    self.instance = instance
2581

    
2582
  def Exec(self, feedback_fn):
2583
    """Reinstall the instance.
2584

2585
    """
2586
    inst = self.instance
2587

    
2588
    if self.op.os_type is not None:
2589
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2590
      inst.os = self.op.os_type
2591
      self.cfg.Update(inst)
2592

    
2593
    _StartInstanceDisks(self, inst, None)
2594
    try:
2595
      feedback_fn("Running the instance OS create scripts...")
2596
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2597
        raise errors.OpExecError("Could not install OS for instance %s"
2598
                                 " on node %s" %
2599
                                 (inst.name, inst.primary_node))
2600
    finally:
2601
      _ShutdownInstanceDisks(self, inst)
2602

    
2603

    
2604
class LURenameInstance(LogicalUnit):
2605
  """Rename an instance.
2606

2607
  """
2608
  HPATH = "instance-rename"
2609
  HTYPE = constants.HTYPE_INSTANCE
2610
  _OP_REQP = ["instance_name", "new_name"]
2611

    
2612
  def BuildHooksEnv(self):
2613
    """Build hooks env.
2614

2615
    This runs on master, primary and secondary nodes of the instance.
2616

2617
    """
2618
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2619
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2620
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2621
          list(self.instance.secondary_nodes))
2622
    return env, nl, nl
2623

    
2624
  def CheckPrereq(self):
2625
    """Check prerequisites.
2626

2627
    This checks that the instance is in the cluster and is not running.
2628

2629
    """
2630
    instance = self.cfg.GetInstanceInfo(
2631
      self.cfg.ExpandInstanceName(self.op.instance_name))
2632
    if instance is None:
2633
      raise errors.OpPrereqError("Instance '%s' not known" %
2634
                                 self.op.instance_name)
2635
    if instance.status != "down":
2636
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2637
                                 self.op.instance_name)
2638
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2639
                                              instance.name,
2640
                                              instance.hypervisor)
2641
    if remote_info:
2642
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2643
                                 (self.op.instance_name,
2644
                                  instance.primary_node))
2645
    self.instance = instance
2646

    
2647
    # new name verification
2648
    name_info = utils.HostInfo(self.op.new_name)
2649

    
2650
    self.op.new_name = new_name = name_info.name
2651
    instance_list = self.cfg.GetInstanceList()
2652
    if new_name in instance_list:
2653
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2654
                                 new_name)
2655

    
2656
    if not getattr(self.op, "ignore_ip", False):
2657
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2658
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2659
                                   (name_info.ip, new_name))
2660

    
2661

    
2662
  def Exec(self, feedback_fn):
2663
    """Reinstall the instance.
2664

2665
    """
2666
    inst = self.instance
2667
    old_name = inst.name
2668

    
2669
    if inst.disk_template == constants.DT_FILE:
2670
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2671

    
2672
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2673
    # Change the instance lock. This is definitely safe while we hold the BGL
2674
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2675
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2676

    
2677
    # re-read the instance from the configuration after rename
2678
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2679

    
2680
    if inst.disk_template == constants.DT_FILE:
2681
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2682
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2683
                                                     old_file_storage_dir,
2684
                                                     new_file_storage_dir)
2685

    
2686
      if not result:
2687
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2688
                                 " directory '%s' to '%s' (but the instance"
2689
                                 " has been renamed in Ganeti)" % (
2690
                                 inst.primary_node, old_file_storage_dir,
2691
                                 new_file_storage_dir))
2692

    
2693
      if not result[0]:
2694
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2695
                                 " (but the instance has been renamed in"
2696
                                 " Ganeti)" % (old_file_storage_dir,
2697
                                               new_file_storage_dir))
2698

    
2699
    _StartInstanceDisks(self, inst, None)
2700
    try:
2701
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2702
                                               old_name):
2703
        msg = ("Could not run OS rename script for instance %s on node %s"
2704
               " (but the instance has been renamed in Ganeti)" %
2705
               (inst.name, inst.primary_node))
2706
        self.proc.LogWarning(msg)
2707
    finally:
2708
      _ShutdownInstanceDisks(self, inst)
2709

    
2710

    
2711
class LURemoveInstance(LogicalUnit):
2712
  """Remove an instance.
2713

2714
  """
2715
  HPATH = "instance-remove"
2716
  HTYPE = constants.HTYPE_INSTANCE
2717
  _OP_REQP = ["instance_name", "ignore_failures"]
2718
  REQ_BGL = False
2719

    
2720
  def ExpandNames(self):
2721
    self._ExpandAndLockInstance()
2722
    self.needed_locks[locking.LEVEL_NODE] = []
2723
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2724

    
2725
  def DeclareLocks(self, level):
2726
    if level == locking.LEVEL_NODE:
2727
      self._LockInstancesNodes()
2728

    
2729
  def BuildHooksEnv(self):
2730
    """Build hooks env.
2731

2732
    This runs on master, primary and secondary nodes of the instance.
2733

2734
    """
2735
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2736
    nl = [self.cfg.GetMasterNode()]
2737
    return env, nl, nl
2738

    
2739
  def CheckPrereq(self):
2740
    """Check prerequisites.
2741

2742
    This checks that the instance is in the cluster.
2743

2744
    """
2745
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2746
    assert self.instance is not None, \
2747
      "Cannot retrieve locked instance %s" % self.op.instance_name
2748

    
2749
  def Exec(self, feedback_fn):
2750
    """Remove the instance.
2751

2752
    """
2753
    instance = self.instance
2754
    logging.info("Shutting down instance %s on node %s",
2755
                 instance.name, instance.primary_node)
2756

    
2757
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2758
      if self.op.ignore_failures:
2759
        feedback_fn("Warning: can't shutdown instance")
2760
      else:
2761
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2762
                                 (instance.name, instance.primary_node))
2763

    
2764
    logging.info("Removing block devices for instance %s", instance.name)
2765

    
2766
    if not _RemoveDisks(self, instance):
2767
      if self.op.ignore_failures:
2768
        feedback_fn("Warning: can't remove instance's disks")
2769
      else:
2770
        raise errors.OpExecError("Can't remove instance's disks")
2771

    
2772
    logging.info("Removing instance %s out of cluster config", instance.name)
2773

    
2774
    self.cfg.RemoveInstance(instance.name)
2775
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2776

    
2777

    
2778
class LUQueryInstances(NoHooksLU):
2779
  """Logical unit for querying instances.
2780

2781
  """
2782
  _OP_REQP = ["output_fields", "names"]
2783
  REQ_BGL = False
2784
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2785
                                    "admin_state", "admin_ram",
2786
                                    "disk_template", "ip", "mac", "bridge",
2787
                                    "sda_size", "sdb_size", "vcpus", "tags",
2788
                                    "network_port", "beparams",
2789
                                    "(disk).(size)/([0-9]+)",
2790
                                    "(disk).(sizes)",
2791
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2792
                                    "(nic).(macs|ips|bridges)",
2793
                                    "(disk|nic).(count)",
2794
                                    "serial_no", "hypervisor", "hvparams",] +
2795
                                  ["hv/%s" % name
2796
                                   for name in constants.HVS_PARAMETERS] +
2797
                                  ["be/%s" % name
2798
                                   for name in constants.BES_PARAMETERS])
2799
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2800

    
2801

    
2802
  def ExpandNames(self):
2803
    _CheckOutputFields(static=self._FIELDS_STATIC,
2804
                       dynamic=self._FIELDS_DYNAMIC,
2805
                       selected=self.op.output_fields)
2806

    
2807
    self.needed_locks = {}
2808
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2809
    self.share_locks[locking.LEVEL_NODE] = 1
2810

    
2811
    if self.op.names:
2812
      self.wanted = _GetWantedInstances(self, self.op.names)
2813
    else:
2814
      self.wanted = locking.ALL_SET
2815

    
2816
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2817
    if self.do_locking:
2818
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2819
      self.needed_locks[locking.LEVEL_NODE] = []
2820
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2821

    
2822
  def DeclareLocks(self, level):
2823
    if level == locking.LEVEL_NODE and self.do_locking:
2824
      self._LockInstancesNodes()
2825

    
2826
  def CheckPrereq(self):
2827
    """Check prerequisites.
2828

2829
    """
2830
    pass
2831

    
2832
  def Exec(self, feedback_fn):
2833
    """Computes the list of nodes and their attributes.
2834

2835
    """
2836
    all_info = self.cfg.GetAllInstancesInfo()
2837
    if self.do_locking:
2838
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2839
    elif self.wanted != locking.ALL_SET:
2840
      instance_names = self.wanted
2841
      missing = set(instance_names).difference(all_info.keys())
2842
      if missing:
2843
        raise errors.OpExecError(
2844
          "Some instances were removed before retrieving their data: %s"
2845
          % missing)
2846
    else:
2847
      instance_names = all_info.keys()
2848

    
2849
    instance_names = utils.NiceSort(instance_names)
2850
    instance_list = [all_info[iname] for iname in instance_names]
2851

    
2852
    # begin data gathering
2853

    
2854
    nodes = frozenset([inst.primary_node for inst in instance_list])
2855
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2856

    
2857
    bad_nodes = []
2858
    if self.do_locking:
2859
      live_data = {}
2860
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2861
      for name in nodes:
2862
        result = node_data[name]
2863
        if result:
2864
          live_data.update(result)
2865
        elif result == False:
2866
          bad_nodes.append(name)
2867
        # else no instance is alive
2868
    else:
2869
      live_data = dict([(name, {}) for name in instance_names])
2870

    
2871
    # end data gathering
2872

    
2873
    HVPREFIX = "hv/"
2874
    BEPREFIX = "be/"
2875
    output = []
2876
    for instance in instance_list:
2877
      iout = []
2878
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2879
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2880
      for field in self.op.output_fields:
2881
        st_match = self._FIELDS_STATIC.Matches(field)
2882
        if field == "name":
2883
          val = instance.name
2884
        elif field == "os":
2885
          val = instance.os
2886
        elif field == "pnode":
2887
          val = instance.primary_node
2888
        elif field == "snodes":
2889
          val = list(instance.secondary_nodes)
2890
        elif field == "admin_state":
2891
          val = (instance.status != "down")
2892
        elif field == "oper_state":
2893
          if instance.primary_node in bad_nodes:
2894
            val = None
2895
          else:
2896
            val = bool(live_data.get(instance.name))
2897
        elif field == "status":
2898
          if instance.primary_node in bad_nodes:
2899
            val = "ERROR_nodedown"
2900
          else:
2901
            running = bool(live_data.get(instance.name))
2902
            if running:
2903
              if instance.status != "down":
2904
                val = "running"
2905
              else:
2906
                val = "ERROR_up"
2907
            else:
2908
              if instance.status != "down":
2909
                val = "ERROR_down"
2910
              else:
2911
                val = "ADMIN_down"
2912
        elif field == "oper_ram":
2913
          if instance.primary_node in bad_nodes:
2914
            val = None
2915
          elif instance.name in live_data:
2916
            val = live_data[instance.name].get("memory", "?")
2917
          else:
2918
            val = "-"
2919
        elif field == "disk_template":
2920
          val = instance.disk_template
2921
        elif field == "ip":
2922
          val = instance.nics[0].ip
2923
        elif field == "bridge":
2924
          val = instance.nics[0].bridge
2925
        elif field == "mac":
2926
          val = instance.nics[0].mac
2927
        elif field == "sda_size" or field == "sdb_size":
2928
          idx = ord(field[2]) - ord('a')
2929
          try:
2930
            val = instance.FindDisk(idx).size
2931
          except errors.OpPrereqError:
2932
            val = None
2933
        elif field == "tags":
2934
          val = list(instance.GetTags())
2935
        elif field == "serial_no":
2936
          val = instance.serial_no
2937
        elif field == "network_port":
2938
          val = instance.network_port
2939
        elif field == "hypervisor":
2940
          val = instance.hypervisor
2941
        elif field == "hvparams":
2942
          val = i_hv
2943
        elif (field.startswith(HVPREFIX) and
2944
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2945
          val = i_hv.get(field[len(HVPREFIX):], None)
2946
        elif field == "beparams":
2947
          val = i_be
2948
        elif (field.startswith(BEPREFIX) and
2949
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2950
          val = i_be.get(field[len(BEPREFIX):], None)
2951
        elif st_match and st_match.groups():
2952
          # matches a variable list
2953
          st_groups = st_match.groups()
2954
          if st_groups and st_groups[0] == "disk":
2955
            if st_groups[1] == "count":
2956
              val = len(instance.disks)
2957
            elif st_groups[1] == "sizes":
2958
              val = [disk.size for disk in instance.disks]
2959
            elif st_groups[1] == "size":
2960
              try:
2961
                val = instance.FindDisk(st_groups[2]).size
2962
              except errors.OpPrereqError:
2963
                val = None
2964
            else:
2965
              assert False, "Unhandled disk parameter"
2966
          elif st_groups[0] == "nic":
2967
            if st_groups[1] == "count":
2968
              val = len(instance.nics)
2969
            elif st_groups[1] == "macs":
2970
              val = [nic.mac for nic in instance.nics]
2971
            elif st_groups[1] == "ips":
2972
              val = [nic.ip for nic in instance.nics]
2973
            elif st_groups[1] == "bridges":
2974
              val = [nic.bridge for nic in instance.nics]
2975
            else:
2976
              # index-based item
2977
              nic_idx = int(st_groups[2])
2978
              if nic_idx >= len(instance.nics):
2979
                val = None
2980
              else:
2981
                if st_groups[1] == "mac":
2982
                  val = instance.nics[nic_idx].mac
2983
                elif st_groups[1] == "ip":
2984
                  val = instance.nics[nic_idx].ip
2985
                elif st_groups[1] == "bridge":
2986
                  val = instance.nics[nic_idx].bridge
2987
                else:
2988
                  assert False, "Unhandled NIC parameter"
2989
          else:
2990
            assert False, "Unhandled variable parameter"
2991
        else:
2992
          raise errors.ParameterError(field)
2993
        iout.append(val)
2994
      output.append(iout)
2995

    
2996
    return output
2997

    
2998

    
2999
class LUFailoverInstance(LogicalUnit):
3000
  """Failover an instance.
3001

3002
  """
3003
  HPATH = "instance-failover"
3004
  HTYPE = constants.HTYPE_INSTANCE
3005
  _OP_REQP = ["instance_name", "ignore_consistency"]
3006
  REQ_BGL = False
3007

    
3008
  def ExpandNames(self):
3009
    self._ExpandAndLockInstance()
3010
    self.needed_locks[locking.LEVEL_NODE] = []
3011
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3012

    
3013
  def DeclareLocks(self, level):
3014
    if level == locking.LEVEL_NODE:
3015
      self._LockInstancesNodes()
3016

    
3017
  def BuildHooksEnv(self):
3018
    """Build hooks env.
3019

3020
    This runs on master, primary and secondary nodes of the instance.
3021

3022
    """
3023
    env = {
3024
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3025
      }
3026
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3027
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3028
    return env, nl, nl
3029

    
3030
  def CheckPrereq(self):
3031
    """Check prerequisites.
3032

3033
    This checks that the instance is in the cluster.
3034

3035
    """
3036
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3037
    assert self.instance is not None, \
3038
      "Cannot retrieve locked instance %s" % self.op.instance_name
3039

    
3040
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3041
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3042
      raise errors.OpPrereqError("Instance's disk layout is not"
3043
                                 " network mirrored, cannot failover.")
3044

    
3045
    secondary_nodes = instance.secondary_nodes
3046
    if not secondary_nodes:
3047
      raise errors.ProgrammerError("no secondary node but using "
3048
                                   "a mirrored disk template")
3049

    
3050
    target_node = secondary_nodes[0]
3051
    # check memory requirements on the secondary node
3052
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3053
                         instance.name, bep[constants.BE_MEMORY],
3054
                         instance.hypervisor)
3055

    
3056
    # check bridge existance
3057
    brlist = [nic.bridge for nic in instance.nics]
3058
    if not self.rpc.call_bridges_exist(target_node, brlist):
3059
      raise errors.OpPrereqError("One or more target bridges %s does not"
3060
                                 " exist on destination node '%s'" %
3061
                                 (brlist, target_node))
3062

    
3063
  def Exec(self, feedback_fn):
3064
    """Failover an instance.
3065

3066
    The failover is done by shutting it down on its present node and
3067
    starting it on the secondary.
3068

3069
    """
3070
    instance = self.instance
3071

    
3072
    source_node = instance.primary_node
3073
    target_node = instance.secondary_nodes[0]
3074

    
3075
    feedback_fn("* checking disk consistency between source and target")
3076
    for dev in instance.disks:
3077
      # for drbd, these are drbd over lvm
3078
      if not _CheckDiskConsistency(self, dev, target_node, False):
3079
        if instance.status == "up" and not self.op.ignore_consistency:
3080
          raise errors.OpExecError("Disk %s is degraded on target node,"
3081
                                   " aborting failover." % dev.iv_name)
3082

    
3083
    feedback_fn("* shutting down instance on source node")
3084
    logging.info("Shutting down instance %s on node %s",
3085
                 instance.name, source_node)
3086

    
3087
    if not self.rpc.call_instance_shutdown(source_node, instance):
3088
      if self.op.ignore_consistency:
3089
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3090
                             " Proceeding"
3091
                             " anyway. Please make sure node %s is down",
3092
                             instance.name, source_node, source_node)
3093
      else:
3094
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3095
                                 (instance.name, source_node))
3096

    
3097
    feedback_fn("* deactivating the instance's disks on source node")
3098
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3099
      raise errors.OpExecError("Can't shut down the instance's disks.")
3100

    
3101
    instance.primary_node = target_node
3102
    # distribute new instance config to the other nodes
3103
    self.cfg.Update(instance)
3104

    
3105
    # Only start the instance if it's marked as up
3106
    if instance.status == "up":
3107
      feedback_fn("* activating the instance's disks on target node")
3108
      logging.info("Starting instance %s on node %s",
3109
                   instance.name, target_node)
3110

    
3111
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3112
                                               ignore_secondaries=True)
3113
      if not disks_ok:
3114
        _ShutdownInstanceDisks(self, instance)
3115
        raise errors.OpExecError("Can't activate the instance's disks")
3116

    
3117
      feedback_fn("* starting the instance on the target node")
3118
      if not self.rpc.call_instance_start(target_node, instance, None):
3119
        _ShutdownInstanceDisks(self, instance)
3120
        raise errors.OpExecError("Could not start instance %s on node %s." %
3121
                                 (instance.name, target_node))
3122

    
3123

    
3124
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3125
  """Create a tree of block devices on the primary node.
3126

3127
  This always creates all devices.
3128

3129
  """
3130
  if device.children:
3131
    for child in device.children:
3132
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3133
        return False
3134

    
3135
  lu.cfg.SetDiskID(device, node)
3136
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3137
                                       instance.name, True, info)
3138
  if not new_id:
3139
    return False
3140
  if device.physical_id is None:
3141
    device.physical_id = new_id
3142
  return True
3143

    
3144

    
3145
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3146
  """Create a tree of block devices on a secondary node.
3147

3148
  If this device type has to be created on secondaries, create it and
3149
  all its children.
3150

3151
  If not, just recurse to children keeping the same 'force' value.
3152

3153
  """
3154
  if device.CreateOnSecondary():
3155
    force = True
3156
  if device.children:
3157
    for child in device.children:
3158
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3159
                                        child, force, info):
3160
        return False
3161

    
3162
  if not force:
3163
    return True
3164
  lu.cfg.SetDiskID(device, node)
3165
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3166
                                       instance.name, False, info)
3167
  if not new_id:
3168
    return False
3169
  if device.physical_id is None:
3170
    device.physical_id = new_id
3171
  return True
3172

    
3173

    
3174
def _GenerateUniqueNames(lu, exts):
3175
  """Generate a suitable LV name.
3176

3177
  This will generate a logical volume name for the given instance.
3178

3179
  """
3180
  results = []
3181
  for val in exts:
3182
    new_id = lu.cfg.GenerateUniqueID()
3183
    results.append("%s%s" % (new_id, val))
3184
  return results
3185

    
3186

    
3187
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3188
                         p_minor, s_minor):
3189
  """Generate a drbd8 device complete with its children.
3190

3191
  """
3192
  port = lu.cfg.AllocatePort()
3193
  vgname = lu.cfg.GetVGName()
3194
  shared_secret = lu.cfg.GenerateDRBDSecret()
3195
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3196
                          logical_id=(vgname, names[0]))
3197
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3198
                          logical_id=(vgname, names[1]))
3199
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3200
                          logical_id=(primary, secondary, port,
3201
                                      p_minor, s_minor,
3202
                                      shared_secret),
3203
                          children=[dev_data, dev_meta],
3204
                          iv_name=iv_name)
3205
  return drbd_dev
3206

    
3207

    
3208
def _GenerateDiskTemplate(lu, template_name,
3209
                          instance_name, primary_node,
3210
                          secondary_nodes, disk_info,
3211
                          file_storage_dir, file_driver,
3212
                          base_index):
3213
  """Generate the entire disk layout for a given template type.
3214

3215
  """
3216
  #TODO: compute space requirements
3217

    
3218
  vgname = lu.cfg.GetVGName()
3219
  disk_count = len(disk_info)
3220
  disks = []
3221
  if template_name == constants.DT_DISKLESS:
3222
    pass
3223
  elif template_name == constants.DT_PLAIN:
3224
    if len(secondary_nodes) != 0:
3225
      raise errors.ProgrammerError("Wrong template configuration")
3226

    
3227
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3228
                                      for i in range(disk_count)])
3229
    for idx, disk in enumerate(disk_info):
3230
      disk_index = idx + base_index
3231
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3232
                              logical_id=(vgname, names[idx]),
3233
                              iv_name="disk/%d" % disk_index)
3234
      disks.append(disk_dev)
3235
  elif template_name == constants.DT_DRBD8:
3236
    if len(secondary_nodes) != 1:
3237
      raise errors.ProgrammerError("Wrong template configuration")
3238
    remote_node = secondary_nodes[0]
3239
    minors = lu.cfg.AllocateDRBDMinor(
3240
      [primary_node, remote_node] * len(disk_info), instance_name)
3241

    
3242
    names = _GenerateUniqueNames(lu,
3243
                                 [".disk%d_%s" % (i, s)
3244
                                  for i in range(disk_count)
3245
                                  for s in ("data", "meta")
3246
                                  ])
3247
    for idx, disk in enumerate(disk_info):
3248
      disk_index = idx + base_index
3249
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3250
                                      disk["size"], names[idx*2:idx*2+2],
3251
                                      "disk/%d" % disk_index,
3252
                                      minors[idx*2], minors[idx*2+1])
3253
      disks.append(disk_dev)
3254
  elif template_name == constants.DT_FILE:
3255
    if len(secondary_nodes) != 0:
3256
      raise errors.ProgrammerError("Wrong template configuration")
3257

    
3258
    for idx, disk in enumerate(disk_info):
3259
      disk_index = idx + base_index
3260
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3261
                              iv_name="disk/%d" % disk_index,
3262
                              logical_id=(file_driver,
3263
                                          "%s/disk%d" % (file_storage_dir,
3264
                                                         idx)))
3265
      disks.append(disk_dev)
3266
  else:
3267
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3268
  return disks
3269

    
3270

    
3271
def _GetInstanceInfoText(instance):
3272
  """Compute that text that should be added to the disk's metadata.
3273

3274
  """
3275
  return "originstname+%s" % instance.name
3276

    
3277

    
3278
def _CreateDisks(lu, instance):
3279
  """Create all disks for an instance.
3280

3281
  This abstracts away some work from AddInstance.
3282

3283
  @type lu: L{LogicalUnit}
3284
  @param lu: the logical unit on whose behalf we execute
3285
  @type instance: L{objects.Instance}
3286
  @param instance: the instance whose disks we should create
3287
  @rtype: boolean
3288
  @return: the success of the creation
3289

3290
  """
3291
  info = _GetInstanceInfoText(instance)
3292

    
3293
  if instance.disk_template == constants.DT_FILE:
3294
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3295
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3296
                                                 file_storage_dir)
3297

    
3298
    if not result:
3299
      logging.error("Could not connect to node '%s'", instance.primary_node)
3300
      return False
3301

    
3302
    if not result[0]:
3303
      logging.error("Failed to create directory '%s'", file_storage_dir)
3304
      return False
3305

    
3306
  # Note: this needs to be kept in sync with adding of disks in
3307
  # LUSetInstanceParams
3308
  for device in instance.disks:
3309
    logging.info("Creating volume %s for instance %s",
3310
                 device.iv_name, instance.name)
3311
    #HARDCODE
3312
    for secondary_node in instance.secondary_nodes:
3313
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3314
                                        device, False, info):
3315
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3316
                      device.iv_name, device, secondary_node)
3317
        return False
3318
    #HARDCODE
3319
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3320
                                    instance, device, info):
3321
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3322
      return False
3323

    
3324
  return True
3325

    
3326

    
3327
def _RemoveDisks(lu, instance):
3328
  """Remove all disks for an instance.
3329

3330
  This abstracts away some work from `AddInstance()` and
3331
  `RemoveInstance()`. Note that in case some of the devices couldn't
3332
  be removed, the removal will continue with the other ones (compare
3333
  with `_CreateDisks()`).
3334

3335
  @type lu: L{LogicalUnit}
3336
  @param lu: the logical unit on whose behalf we execute
3337
  @type instance: L{objects.Instance}
3338
  @param instance: the instance whose disks we should remove
3339
  @rtype: boolean
3340
  @return: the success of the removal
3341

3342
  """
3343
  logging.info("Removing block devices for instance %s", instance.name)
3344

    
3345
  result = True
3346
  for device in instance.disks:
3347
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3348
      lu.cfg.SetDiskID(disk, node)
3349
      if not lu.rpc.call_blockdev_remove(node, disk):
3350
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3351
                           " continuing anyway", device.iv_name, node)
3352
        result = False
3353

    
3354
  if instance.disk_template == constants.DT_FILE:
3355
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3356
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3357
                                               file_storage_dir):
3358
      logging.error("Could not remove directory '%s'", file_storage_dir)
3359
      result = False
3360

    
3361
  return result
3362

    
3363

    
3364
def _ComputeDiskSize(disk_template, disks):
3365
  """Compute disk size requirements in the volume group
3366

3367
  """
3368
  # Required free disk space as a function of disk and swap space
3369
  req_size_dict = {
3370
    constants.DT_DISKLESS: None,
3371
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3372
    # 128 MB are added for drbd metadata for each disk
3373
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3374
    constants.DT_FILE: None,
3375
  }
3376

    
3377
  if disk_template not in req_size_dict:
3378
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3379
                                 " is unknown" %  disk_template)
3380

    
3381
  return req_size_dict[disk_template]
3382

    
3383

    
3384
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3385
  """Hypervisor parameter validation.
3386

3387
  This function abstract the hypervisor parameter validation to be
3388
  used in both instance create and instance modify.
3389

3390
  @type lu: L{LogicalUnit}
3391
  @param lu: the logical unit for which we check
3392
  @type nodenames: list
3393
  @param nodenames: the list of nodes on which we should check
3394
  @type hvname: string
3395
  @param hvname: the name of the hypervisor we should use
3396
  @type hvparams: dict
3397
  @param hvparams: the parameters which we need to check
3398
  @raise errors.OpPrereqError: if the parameters are not valid
3399

3400
  """
3401
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3402
                                                  hvname,
3403
                                                  hvparams)
3404
  for node in nodenames:
3405
    info = hvinfo.get(node, None)
3406
    if not info or not isinstance(info, (tuple, list)):
3407
      raise errors.OpPrereqError("Cannot get current information"
3408
                                 " from node '%s' (%s)" % (node, info))
3409
    if not info[0]:
3410
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3411
                                 " %s" % info[1])
3412

    
3413

    
3414
class LUCreateInstance(LogicalUnit):
3415
  """Create an instance.
3416

3417
  """
3418
  HPATH = "instance-add"
3419
  HTYPE = constants.HTYPE_INSTANCE
3420
  _OP_REQP = ["instance_name", "disks", "disk_template",
3421
              "mode", "start",
3422
              "wait_for_sync", "ip_check", "nics",
3423
              "hvparams", "beparams"]
3424
  REQ_BGL = False
3425

    
3426
  def _ExpandNode(self, node):
3427
    """Expands and checks one node name.
3428

3429
    """
3430
    node_full = self.cfg.ExpandNodeName(node)
3431
    if node_full is None:
3432
      raise errors.OpPrereqError("Unknown node %s" % node)
3433
    return node_full
3434

    
3435
  def ExpandNames(self):
3436
    """ExpandNames for CreateInstance.
3437

3438
    Figure out the right locks for instance creation.
3439

3440
    """
3441
    self.needed_locks = {}
3442

    
3443
    # set optional parameters to none if they don't exist
3444
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3445
      if not hasattr(self.op, attr):
3446
        setattr(self.op, attr, None)
3447

    
3448
    # cheap checks, mostly valid constants given
3449

    
3450
    # verify creation mode
3451
    if self.op.mode not in (constants.INSTANCE_CREATE,
3452
                            constants.INSTANCE_IMPORT):
3453
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3454
                                 self.op.mode)
3455

    
3456
    # disk template and mirror node verification
3457
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3458
      raise errors.OpPrereqError("Invalid disk template name")
3459

    
3460
    if self.op.hypervisor is None:
3461
      self.op.hypervisor = self.cfg.GetHypervisorType()
3462

    
3463
    cluster = self.cfg.GetClusterInfo()
3464
    enabled_hvs = cluster.enabled_hypervisors
3465
    if self.op.hypervisor not in enabled_hvs:
3466
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3467
                                 " cluster (%s)" % (self.op.hypervisor,
3468
                                  ",".join(enabled_hvs)))
3469

    
3470
    # check hypervisor parameter syntax (locally)
3471

    
3472
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3473
                                  self.op.hvparams)
3474
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3475
    hv_type.CheckParameterSyntax(filled_hvp)
3476

    
3477
    # fill and remember the beparams dict
3478
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3479
                                    self.op.beparams)
3480

    
3481
    #### instance parameters check
3482

    
3483
    # instance name verification
3484
    hostname1 = utils.HostInfo(self.op.instance_name)
3485
    self.op.instance_name = instance_name = hostname1.name
3486

    
3487
    # this is just a preventive check, but someone might still add this
3488
    # instance in the meantime, and creation will fail at lock-add time
3489
    if instance_name in self.cfg.GetInstanceList():
3490
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3491
                                 instance_name)
3492

    
3493
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3494

    
3495
    # NIC buildup
3496
    self.nics = []
3497
    for nic in self.op.nics:
3498
      # ip validity checks
3499
      ip = nic.get("ip", None)
3500
      if ip is None or ip.lower() == "none":
3501
        nic_ip = None
3502
      elif ip.lower() == constants.VALUE_AUTO:
3503
        nic_ip = hostname1.ip
3504
      else:
3505
        if not utils.IsValidIP(ip):
3506
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3507
                                     " like a valid IP" % ip)
3508
        nic_ip = ip
3509

    
3510
      # MAC address verification
3511
      mac = nic.get("mac", constants.VALUE_AUTO)
3512
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3513
        if not utils.IsValidMac(mac.lower()):
3514
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3515
                                     mac)
3516
      # bridge verification
3517
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3518
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3519

    
3520
    # disk checks/pre-build
3521
    self.disks = []
3522
    for disk in self.op.disks:
3523
      mode = disk.get("mode", constants.DISK_RDWR)
3524
      if mode not in constants.DISK_ACCESS_SET:
3525
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3526
                                   mode)
3527
      size = disk.get("size", None)
3528
      if size is None:
3529
        raise errors.OpPrereqError("Missing disk size")
3530
      try:
3531
        size = int(size)
3532
      except ValueError:
3533
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3534
      self.disks.append({"size": size, "mode": mode})
3535

    
3536
    # used in CheckPrereq for ip ping check
3537
    self.check_ip = hostname1.ip
3538

    
3539
    # file storage checks
3540
    if (self.op.file_driver and
3541
        not self.op.file_driver in constants.FILE_DRIVER):
3542
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3543
                                 self.op.file_driver)
3544

    
3545
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3546
      raise errors.OpPrereqError("File storage directory path not absolute")
3547

    
3548
    ### Node/iallocator related checks
3549
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3550
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3551
                                 " node must be given")
3552

    
3553
    if self.op.iallocator:
3554
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3555
    else:
3556
      self.op.pnode = self._ExpandNode(self.op.pnode)
3557
      nodelist = [self.op.pnode]
3558
      if self.op.snode is not None:
3559
        self.op.snode = self._ExpandNode(self.op.snode)
3560
        nodelist.append(self.op.snode)
3561
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3562

    
3563
    # in case of import lock the source node too
3564
    if self.op.mode == constants.INSTANCE_IMPORT:
3565
      src_node = getattr(self.op, "src_node", None)
3566
      src_path = getattr(self.op, "src_path", None)
3567

    
3568
      if src_node is None or src_path is None:
3569
        raise errors.OpPrereqError("Importing an instance requires source"
3570
                                   " node and path options")
3571

    
3572
      if not os.path.isabs(src_path):
3573
        raise errors.OpPrereqError("The source path must be absolute")
3574

    
3575
      self.op.src_node = src_node = self._ExpandNode(src_node)
3576
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3577
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3578

    
3579
    else: # INSTANCE_CREATE
3580
      if getattr(self.op, "os_type", None) is None:
3581
        raise errors.OpPrereqError("No guest OS specified")
3582

    
3583
  def _RunAllocator(self):
3584
    """Run the allocator based on input opcode.
3585

3586
    """
3587
    nics = [n.ToDict() for n in self.nics]
3588
    ial = IAllocator(self,
3589
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3590
                     name=self.op.instance_name,
3591
                     disk_template=self.op.disk_template,
3592
                     tags=[],
3593
                     os=self.op.os_type,
3594
                     vcpus=self.be_full[constants.BE_VCPUS],
3595
                     mem_size=self.be_full[constants.BE_MEMORY],
3596
                     disks=self.disks,
3597
                     nics=nics,
3598
                     hypervisor=self.op.hypervisor,
3599
                     )
3600

    
3601
    ial.Run(self.op.iallocator)
3602

    
3603
    if not ial.success:
3604
      raise errors.OpPrereqError("Can't compute nodes using"
3605
                                 " iallocator '%s': %s" % (self.op.iallocator,
3606
                                                           ial.info))
3607
    if len(ial.nodes) != ial.required_nodes:
3608
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3609
                                 " of nodes (%s), required %s" %
3610
                                 (self.op.iallocator, len(ial.nodes),
3611
                                  ial.required_nodes))
3612
    self.op.pnode = ial.nodes[0]
3613
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3614
                 self.op.instance_name, self.op.iallocator,
3615
                 ", ".join(ial.nodes))
3616
    if ial.required_nodes == 2:
3617
      self.op.snode = ial.nodes[1]
3618

    
3619
  def BuildHooksEnv(self):
3620
    """Build hooks env.
3621

3622
    This runs on master, primary and secondary nodes of the instance.
3623

3624
    """
3625
    env = {
3626
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3627
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3628
      "INSTANCE_ADD_MODE": self.op.mode,
3629
      }
3630
    if self.op.mode == constants.INSTANCE_IMPORT:
3631
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3632
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3633
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3634

    
3635
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3636
      primary_node=self.op.pnode,
3637
      secondary_nodes=self.secondaries,
3638
      status=self.instance_status,
3639
      os_type=self.op.os_type,
3640
      memory=self.be_full[constants.BE_MEMORY],
3641
      vcpus=self.be_full[constants.BE_VCPUS],
3642
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3643
    ))
3644

    
3645
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3646
          self.secondaries)
3647
    return env, nl, nl
3648

    
3649

    
3650
  def CheckPrereq(self):
3651
    """Check prerequisites.
3652

3653
    """
3654
    if (not self.cfg.GetVGName() and
3655
        self.op.disk_template not in constants.DTS_NOT_LVM):
3656
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3657
                                 " instances")
3658

    
3659

    
3660
    if self.op.mode == constants.INSTANCE_IMPORT:
3661
      src_node = self.op.src_node
3662
      src_path = self.op.src_path
3663

    
3664
      export_info = self.rpc.call_export_info(src_node, src_path)
3665

    
3666
      if not export_info:
3667
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3668

    
3669
      if not export_info.has_section(constants.INISECT_EXP):
3670
        raise errors.ProgrammerError("Corrupted export config")
3671

    
3672
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3673
      if (int(ei_version) != constants.EXPORT_VERSION):
3674
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3675
                                   (ei_version, constants.EXPORT_VERSION))
3676

    
3677
      # Check that the new instance doesn't have less disks than the export
3678
      instance_disks = len(self.disks)
3679
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3680
      if instance_disks < export_disks:
3681
        raise errors.OpPrereqError("Not enough disks to import."
3682
                                   " (instance: %d, export: %d)" %
3683
                                   (instance_disks, export_disks))
3684

    
3685
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3686
      disk_images = []
3687
      for idx in range(export_disks):
3688
        option = 'disk%d_dump' % idx
3689
        if export_info.has_option(constants.INISECT_INS, option):
3690
          # FIXME: are the old os-es, disk sizes, etc. useful?
3691
          export_name = export_info.get(constants.INISECT_INS, option)
3692
          image = os.path.join(src_path, export_name)
3693
          disk_images.append(image)
3694
        else:
3695
          disk_images.append(False)
3696

    
3697
      self.src_images = disk_images
3698

    
3699
      old_name = export_info.get(constants.INISECT_INS, 'name')
3700
      # FIXME: int() here could throw a ValueError on broken exports
3701
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3702
      if self.op.instance_name == old_name:
3703
        for idx, nic in enumerate(self.nics):
3704
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3705
            nic_mac_ini = 'nic%d_mac' % idx
3706
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3707

    
3708
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3709
    if self.op.start and not self.op.ip_check:
3710
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3711
                                 " adding an instance in start mode")
3712

    
3713
    if self.op.ip_check:
3714
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3715
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3716
                                   (self.check_ip, self.op.instance_name))
3717

    
3718
    #### allocator run
3719

    
3720
    if self.op.iallocator is not None:
3721
      self._RunAllocator()
3722

    
3723
    #### node related checks
3724

    
3725
    # check primary node
3726
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3727
    assert self.pnode is not None, \
3728
      "Cannot retrieve locked node %s" % self.op.pnode
3729
    self.secondaries = []
3730

    
3731
    # mirror node verification
3732
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3733
      if self.op.snode is None:
3734
        raise errors.OpPrereqError("The networked disk templates need"
3735
                                   " a mirror node")
3736
      if self.op.snode == pnode.name:
3737
        raise errors.OpPrereqError("The secondary node cannot be"
3738
                                   " the primary node.")
3739
      self.secondaries.append(self.op.snode)
3740

    
3741
    nodenames = [pnode.name] + self.secondaries
3742

    
3743
    req_size = _ComputeDiskSize(self.op.disk_template,
3744
                                self.disks)
3745

    
3746
    # Check lv size requirements
3747
    if req_size is not None:
3748
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3749
                                         self.op.hypervisor)
3750
      for node in nodenames:
3751
        info = nodeinfo.get(node, None)
3752
        if not info:
3753
          raise errors.OpPrereqError("Cannot get current information"
3754
                                     " from node '%s'" % node)
3755
        vg_free = info.get('vg_free', None)
3756
        if not isinstance(vg_free, int):
3757
          raise errors.OpPrereqError("Can't compute free disk space on"
3758
                                     " node %s" % node)
3759
        if req_size > info['vg_free']:
3760
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3761
                                     " %d MB available, %d MB required" %
3762
                                     (node, info['vg_free'], req_size))
3763

    
3764
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3765

    
3766
    # os verification
3767
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3768
    if not os_obj:
3769
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3770
                                 " primary node"  % self.op.os_type)
3771

    
3772
    # bridge check on primary node
3773
    bridges = [n.bridge for n in self.nics]
3774
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3775
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3776
                                 " exist on"
3777
                                 " destination node '%s'" %
3778
                                 (",".join(bridges), pnode.name))
3779

    
3780
    # memory check on primary node
3781
    if self.op.start:
3782
      _CheckNodeFreeMemory(self, self.pnode.name,
3783
                           "creating instance %s" % self.op.instance_name,
3784
                           self.be_full[constants.BE_MEMORY],
3785
                           self.op.hypervisor)
3786

    
3787
    if self.op.start:
3788
      self.instance_status = 'up'
3789
    else:
3790
      self.instance_status = 'down'
3791

    
3792
  def Exec(self, feedback_fn):
3793
    """Create and add the instance to the cluster.
3794

3795
    """
3796
    instance = self.op.instance_name
3797
    pnode_name = self.pnode.name
3798

    
3799
    for nic in self.nics:
3800
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3801
        nic.mac = self.cfg.GenerateMAC()
3802

    
3803
    ht_kind = self.op.hypervisor
3804
    if ht_kind in constants.HTS_REQ_PORT:
3805
      network_port = self.cfg.AllocatePort()
3806
    else:
3807
      network_port = None
3808

    
3809
    ##if self.op.vnc_bind_address is None:
3810
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3811

    
3812
    # this is needed because os.path.join does not accept None arguments
3813
    if self.op.file_storage_dir is None:
3814
      string_file_storage_dir = ""
3815
    else:
3816
      string_file_storage_dir = self.op.file_storage_dir
3817

    
3818
    # build the full file storage dir path
3819
    file_storage_dir = os.path.normpath(os.path.join(
3820
                                        self.cfg.GetFileStorageDir(),
3821
                                        string_file_storage_dir, instance))
3822

    
3823

    
3824
    disks = _GenerateDiskTemplate(self,
3825
                                  self.op.disk_template,
3826
                                  instance, pnode_name,
3827
                                  self.secondaries,
3828
                                  self.disks,
3829
                                  file_storage_dir,
3830
                                  self.op.file_driver,
3831
                                  0)
3832

    
3833
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3834
                            primary_node=pnode_name,
3835
                            nics=self.nics, disks=disks,
3836
                            disk_template=self.op.disk_template,
3837
                            status=self.instance_status,
3838
                            network_port=network_port,
3839
                            beparams=self.op.beparams,
3840
                            hvparams=self.op.hvparams,
3841
                            hypervisor=self.op.hypervisor,
3842
                            )
3843

    
3844
    feedback_fn("* creating instance disks...")
3845
    if not _CreateDisks(self, iobj):
3846
      _RemoveDisks(self, iobj)
3847
      self.cfg.ReleaseDRBDMinors(instance)
3848
      raise errors.OpExecError("Device creation failed, reverting...")
3849

    
3850
    feedback_fn("adding instance %s to cluster config" % instance)
3851

    
3852
    self.cfg.AddInstance(iobj)
3853
    # Declare that we don't want to remove the instance lock anymore, as we've
3854
    # added the instance to the config
3855
    del self.remove_locks[locking.LEVEL_INSTANCE]
3856
    # Remove the temp. assignements for the instance's drbds
3857
    self.cfg.ReleaseDRBDMinors(instance)
3858
    # Unlock all the nodes
3859
    self.context.glm.release(locking.LEVEL_NODE)
3860
    del self.acquired_locks[locking.LEVEL_NODE]
3861

    
3862
    if self.op.wait_for_sync:
3863
      disk_abort = not _WaitForSync(self, iobj)
3864
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3865
      # make sure the disks are not degraded (still sync-ing is ok)
3866
      time.sleep(15)
3867
      feedback_fn("* checking mirrors status")
3868
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3869
    else:
3870
      disk_abort = False
3871

    
3872
    if disk_abort:
3873
      _RemoveDisks(self, iobj)
3874
      self.cfg.RemoveInstance(iobj.name)
3875
      # Make sure the instance lock gets removed
3876
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3877
      raise errors.OpExecError("There are some degraded disks for"
3878
                               " this instance")
3879

    
3880
    feedback_fn("creating os for instance %s on node %s" %
3881
                (instance, pnode_name))
3882

    
3883
    if iobj.disk_template != constants.DT_DISKLESS:
3884
      if self.op.mode == constants.INSTANCE_CREATE:
3885
        feedback_fn("* running the instance OS create scripts...")
3886
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3887
          raise errors.OpExecError("could not add os for instance %s"
3888
                                   " on node %s" %
3889
                                   (instance, pnode_name))
3890

    
3891
      elif self.op.mode == constants.INSTANCE_IMPORT:
3892
        feedback_fn("* running the instance OS import scripts...")
3893
        src_node = self.op.src_node
3894
        src_images = self.src_images
3895
        cluster_name = self.cfg.GetClusterName()
3896
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3897
                                                         src_node, src_images,
3898
                                                         cluster_name)
3899
        for idx, result in enumerate(import_result):
3900
          if not result:
3901
            self.LogWarning("Could not import the image %s for instance"
3902
                            " %s, disk %d, on node %s" %
3903
                            (src_images[idx], instance, idx, pnode_name))
3904
      else:
3905
        # also checked in the prereq part
3906
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3907
                                     % self.op.mode)
3908

    
3909
    if self.op.start:
3910
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3911
      feedback_fn("* starting instance...")
3912
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3913
        raise errors.OpExecError("Could not start instance")
3914

    
3915

    
3916
class LUConnectConsole(NoHooksLU):
3917
  """Connect to an instance's console.
3918

3919
  This is somewhat special in that it returns the command line that
3920
  you need to run on the master node in order to connect to the
3921
  console.
3922

3923
  """
3924
  _OP_REQP = ["instance_name"]
3925
  REQ_BGL = False
3926

    
3927
  def ExpandNames(self):
3928
    self._ExpandAndLockInstance()
3929

    
3930
  def CheckPrereq(self):
3931
    """Check prerequisites.
3932

3933
    This checks that the instance is in the cluster.
3934

3935
    """
3936
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3937
    assert self.instance is not None, \
3938
      "Cannot retrieve locked instance %s" % self.op.instance_name
3939

    
3940
  def Exec(self, feedback_fn):
3941
    """Connect to the console of an instance
3942

3943
    """
3944
    instance = self.instance
3945
    node = instance.primary_node
3946

    
3947
    node_insts = self.rpc.call_instance_list([node],
3948
                                             [instance.hypervisor])[node]
3949
    if node_insts is False:
3950
      raise errors.OpExecError("Can't connect to node %s." % node)
3951

    
3952
    if instance.name not in node_insts:
3953
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3954

    
3955
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3956

    
3957
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3958
    console_cmd = hyper.GetShellCommandForConsole(instance)
3959

    
3960
    # build ssh cmdline
3961
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3962

    
3963

    
3964
class LUReplaceDisks(LogicalUnit):
3965
  """Replace the disks of an instance.
3966

3967
  """
3968
  HPATH = "mirrors-replace"
3969
  HTYPE = constants.HTYPE_INSTANCE
3970
  _OP_REQP = ["instance_name", "mode", "disks"]
3971
  REQ_BGL = False
3972

    
3973
  def ExpandNames(self):
3974
    self._ExpandAndLockInstance()
3975

    
3976
    if not hasattr(self.op, "remote_node"):
3977
      self.op.remote_node = None
3978

    
3979
    ia_name = getattr(self.op, "iallocator", None)
3980
    if ia_name is not None:
3981
      if self.op.remote_node is not None:
3982
        raise errors.OpPrereqError("Give either the iallocator or the new"
3983
                                   " secondary, not both")
3984
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3985
    elif self.op.remote_node is not None:
3986
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3987
      if remote_node is None:
3988
        raise errors.OpPrereqError("Node '%s' not known" %
3989
                                   self.op.remote_node)
3990
      self.op.remote_node = remote_node
3991
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3992
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3993
    else:
3994
      self.needed_locks[locking.LEVEL_NODE] = []
3995
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3996

    
3997
  def DeclareLocks(self, level):
3998
    # If we're not already locking all nodes in the set we have to declare the
3999
    # instance's primary/secondary nodes.
4000
    if (level == locking.LEVEL_NODE and
4001
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4002
      self._LockInstancesNodes()
4003

    
4004
  def _RunAllocator(self):
4005
    """Compute a new secondary node using an IAllocator.
4006

4007
    """
4008
    ial = IAllocator(self,
4009
                     mode=constants.IALLOCATOR_MODE_RELOC,
4010
                     name=self.op.instance_name,
4011
                     relocate_from=[self.sec_node])
4012

    
4013
    ial.Run(self.op.iallocator)
4014

    
4015
    if not ial.success:
4016
      raise errors.OpPrereqError("Can't compute nodes using"
4017
                                 " iallocator '%s': %s" % (self.op.iallocator,
4018
                                                           ial.info))
4019
    if len(ial.nodes) != ial.required_nodes:
4020
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4021
                                 " of nodes (%s), required %s" %
4022
                                 (len(ial.nodes), ial.required_nodes))
4023
    self.op.remote_node = ial.nodes[0]
4024
    self.LogInfo("Selected new secondary for the instance: %s",
4025
                 self.op.remote_node)
4026

    
4027
  def BuildHooksEnv(self):
4028
    """Build hooks env.
4029

4030
    This runs on the master, the primary and all the secondaries.
4031

4032
    """
4033
    env = {
4034
      "MODE": self.op.mode,
4035
      "NEW_SECONDARY": self.op.remote_node,
4036
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4037
      }
4038
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4039
    nl = [
4040
      self.cfg.GetMasterNode(),
4041
      self.instance.primary_node,
4042
      ]
4043
    if self.op.remote_node is not None:
4044
      nl.append(self.op.remote_node)
4045
    return env, nl, nl
4046

    
4047
  def CheckPrereq(self):
4048
    """Check prerequisites.
4049

4050
    This checks that the instance is in the cluster.
4051

4052
    """
4053
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4054
    assert instance is not None, \
4055
      "Cannot retrieve locked instance %s" % self.op.instance_name
4056
    self.instance = instance
4057

    
4058
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4059
      raise errors.OpPrereqError("Instance's disk layout is not"
4060
                                 " network mirrored.")
4061

    
4062
    if len(instance.secondary_nodes) != 1:
4063
      raise errors.OpPrereqError("The instance has a strange layout,"
4064
                                 " expected one secondary but found %d" %
4065
                                 len(instance.secondary_nodes))
4066

    
4067
    self.sec_node = instance.secondary_nodes[0]
4068

    
4069
    ia_name = getattr(self.op, "iallocator", None)
4070
    if ia_name is not None:
4071
      self._RunAllocator()
4072

    
4073
    remote_node = self.op.remote_node
4074
    if remote_node is not None:
4075
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4076
      assert self.remote_node_info is not None, \
4077
        "Cannot retrieve locked node %s" % remote_node
4078
    else:
4079
      self.remote_node_info = None
4080
    if remote_node == instance.primary_node:
4081
      raise errors.OpPrereqError("The specified node is the primary node of"
4082
                                 " the instance.")
4083
    elif remote_node == self.sec_node:
4084
      if self.op.mode == constants.REPLACE_DISK_SEC:
4085
        # this is for DRBD8, where we can't execute the same mode of
4086
        # replacement as for drbd7 (no different port allocated)
4087
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4088
                                   " replacement")
4089
    if instance.disk_template == constants.DT_DRBD8:
4090
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4091
          remote_node is not None):
4092
        # switch to replace secondary mode
4093
        self.op.mode = constants.REPLACE_DISK_SEC
4094

    
4095
      if self.op.mode == constants.REPLACE_DISK_ALL:
4096
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4097
                                   " secondary disk replacement, not"
4098
                                   " both at once")
4099
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4100
        if remote_node is not None:
4101
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4102
                                     " the secondary while doing a primary"
4103
                                     " node disk replacement")
4104
        self.tgt_node = instance.primary_node
4105
        self.oth_node = instance.secondary_nodes[0]
4106
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4107
        self.new_node = remote_node # this can be None, in which case
4108
                                    # we don't change the secondary
4109
        self.tgt_node = instance.secondary_nodes[0]
4110
        self.oth_node = instance.primary_node
4111
      else:
4112
        raise errors.ProgrammerError("Unhandled disk replace mode")
4113

    
4114
    if not self.op.disks:
4115
      self.op.disks = range(len(instance.disks))
4116

    
4117
    for disk_idx in self.op.disks:
4118
      instance.FindDisk(disk_idx)
4119

    
4120
  def _ExecD8DiskOnly(self, feedback_fn):
4121
    """Replace a disk on the primary or secondary for dbrd8.
4122

4123
    The algorithm for replace is quite complicated:
4124

4125
      1. for each disk to be replaced:
4126

4127
        1. create new LVs on the target node with unique names
4128
        1. detach old LVs from the drbd device
4129
        1. rename old LVs to name_replaced.<time_t>
4130
        1. rename new LVs to old LVs
4131
        1. attach the new LVs (with the old names now) to the drbd device
4132

4133
      1. wait for sync across all devices
4134

4135
      1. for each modified disk:
4136

4137
        1. remove old LVs (which have the name name_replaces.<time_t>)
4138

4139
    Failures are not very well handled.
4140

4141
    """
4142
    steps_total = 6
4143
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4144
    instance = self.instance
4145
    iv_names = {}
4146
    vgname = self.cfg.GetVGName()
4147
    # start of work
4148
    cfg = self.cfg
4149
    tgt_node = self.tgt_node
4150
    oth_node = self.oth_node
4151

    
4152
    # Step: check device activation
4153
    self.proc.LogStep(1, steps_total, "check device existence")
4154
    info("checking volume groups")
4155
    my_vg = cfg.GetVGName()
4156
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4157
    if not results:
4158
      raise errors.OpExecError("Can't list volume groups on the nodes")
4159
    for node in oth_node, tgt_node:
4160
      res = results.get(node, False)
4161
      if not res or my_vg not in res:
4162
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4163
                                 (my_vg, node))
4164
    for idx, dev in enumerate(instance.disks):
4165
      if idx not in self.op.disks:
4166
        continue
4167
      for node in tgt_node, oth_node:
4168
        info("checking disk/%d on %s" % (idx, node))
4169
        cfg.SetDiskID(dev, node)
4170
        if not self.rpc.call_blockdev_find(node, dev):
4171
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4172
                                   (idx, node))
4173

    
4174
    # Step: check other node consistency
4175
    self.proc.LogStep(2, steps_total, "check peer consistency")
4176
    for idx, dev in enumerate(instance.disks):
4177
      if idx not in self.op.disks:
4178
        continue
4179
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4180
      if not _CheckDiskConsistency(self, dev, oth_node,
4181
                                   oth_node==instance.primary_node):
4182
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4183
                                 " to replace disks on this node (%s)" %
4184
                                 (oth_node, tgt_node))
4185

    
4186
    # Step: create new storage
4187
    self.proc.LogStep(3, steps_total, "allocate new storage")
4188
    for idx, dev in enumerate(instance.disks):
4189
      if idx not in self.op.disks:
4190
        continue
4191
      size = dev.size
4192
      cfg.SetDiskID(dev, tgt_node)
4193
      lv_names = [".disk%d_%s" % (idx, suf)
4194
                  for suf in ["data", "meta"]]
4195
      names = _GenerateUniqueNames(self, lv_names)
4196
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4197
                             logical_id=(vgname, names[0]))
4198
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4199
                             logical_id=(vgname, names[1]))
4200
      new_lvs = [lv_data, lv_meta]
4201
      old_lvs = dev.children
4202
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4203
      info("creating new local storage on %s for %s" %
4204
           (tgt_node, dev.iv_name))
4205
      # since we *always* want to create this LV, we use the
4206
      # _Create...OnPrimary (which forces the creation), even if we
4207
      # are talking about the secondary node
4208
      for new_lv in new_lvs:
4209
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4210
                                        _GetInstanceInfoText(instance)):
4211
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4212
                                   " node '%s'" %
4213
                                   (new_lv.logical_id[1], tgt_node))
4214

    
4215
    # Step: for each lv, detach+rename*2+attach
4216
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4217
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4218
      info("detaching %s drbd from local storage" % dev.iv_name)
4219
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4220
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4221
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4222
      #dev.children = []
4223
      #cfg.Update(instance)
4224

    
4225
      # ok, we created the new LVs, so now we know we have the needed
4226
      # storage; as such, we proceed on the target node to rename
4227
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4228
      # using the assumption that logical_id == physical_id (which in
4229
      # turn is the unique_id on that node)
4230

    
4231
      # FIXME(iustin): use a better name for the replaced LVs
4232
      temp_suffix = int(time.time())
4233
      ren_fn = lambda d, suff: (d.physical_id[0],
4234
                                d.physical_id[1] + "_replaced-%s" % suff)
4235
      # build the rename list based on what LVs exist on the node
4236
      rlist = []
4237
      for to_ren in old_lvs:
4238
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4239
        if find_res is not None: # device exists
4240
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4241

    
4242
      info("renaming the old LVs on the target node")
4243
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4244
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4245
      # now we rename the new LVs to the old LVs
4246
      info("renaming the new LVs on the target node")
4247
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4248
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4249
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4250

    
4251
      for old, new in zip(old_lvs, new_lvs):
4252
        new.logical_id = old.logical_id
4253
        cfg.SetDiskID(new, tgt_node)
4254

    
4255
      for disk in old_lvs:
4256
        disk.logical_id = ren_fn(disk, temp_suffix)
4257
        cfg.SetDiskID(disk, tgt_node)
4258

    
4259
      # now that the new lvs have the old name, we can add them to the device
4260
      info("adding new mirror component on %s" % tgt_node)
4261
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4262
        for new_lv in new_lvs:
4263
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4264
            warning("Can't rollback device %s", hint="manually cleanup unused"
4265
                    " logical volumes")
4266
        raise errors.OpExecError("Can't add local storage to drbd")
4267

    
4268
      dev.children = new_lvs
4269
      cfg.Update(instance)
4270

    
4271
    # Step: wait for sync
4272

    
4273
    # this can fail as the old devices are degraded and _WaitForSync
4274
    # does a combined result over all disks, so we don't check its
4275
    # return value
4276
    self.proc.LogStep(5, steps_total, "sync devices")
4277
    _WaitForSync(self, instance, unlock=True)
4278

    
4279
    # so check manually all the devices
4280
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4281
      cfg.SetDiskID(dev, instance.primary_node)
4282
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4283
      if is_degr:
4284
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4285

    
4286
    # Step: remove old storage
4287
    self.proc.LogStep(6, steps_total, "removing old storage")
4288
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4289
      info("remove logical volumes for %s" % name)
4290
      for lv in old_lvs:
4291
        cfg.SetDiskID(lv, tgt_node)
4292
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4293
          warning("Can't remove old LV", hint="manually remove unused LVs")
4294
          continue
4295

    
4296
  def _ExecD8Secondary(self, feedback_fn):
4297
    """Replace the secondary node for drbd8.
4298

4299
    The algorithm for replace is quite complicated:
4300
      - for all disks of the instance:
4301
        - create new LVs on the new node with same names
4302
        - shutdown the drbd device on the old secondary
4303
        - disconnect the drbd network on the primary
4304
        - create the drbd device on the new secondary
4305
        - network attach the drbd on the primary, using an artifice:
4306
          the drbd code for Attach() will connect to the network if it
4307
          finds a device which is connected to the good local disks but
4308
          not network enabled
4309
      - wait for sync across all devices
4310
      - remove all disks from the old secondary
4311

4312
    Failures are not very well handled.
4313

4314
    """
4315
    steps_total = 6
4316
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4317
    instance = self.instance
4318
    iv_names = {}
4319
    vgname = self.cfg.GetVGName()
4320
    # start of work
4321
    cfg = self.cfg
4322
    old_node = self.tgt_node
4323
    new_node = self.new_node
4324
    pri_node = instance.primary_node
4325

    
4326
    # Step: check device activation
4327
    self.proc.LogStep(1, steps_total, "check device existence")
4328
    info("checking volume groups")
4329
    my_vg = cfg.GetVGName()
4330
    results = self.rpc.call_vg_list([pri_node, new_node])
4331
    if not results:
4332
      raise errors.OpExecError("Can't list volume groups on the nodes")
4333
    for node in pri_node, new_node:
4334
      res = results.get(node, False)
4335
      if not res or my_vg not in res:
4336
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4337
                                 (my_vg, node))
4338
    for idx, dev in enumerate(instance.disks):
4339
      if idx not in self.op.disks:
4340
        continue
4341
      info("checking disk/%d on %s" % (idx, pri_node))
4342
      cfg.SetDiskID(dev, pri_node)
4343
      if not self.rpc.call_blockdev_find(pri_node, dev):
4344
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4345
                                 (idx, pri_node))
4346

    
4347
    # Step: check other node consistency
4348
    self.proc.LogStep(2, steps_total, "check peer consistency")
4349
    for idx, dev in enumerate(instance.disks):
4350
      if idx not in self.op.disks:
4351
        continue
4352
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4353
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4354
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4355
                                 " unsafe to replace the secondary" %
4356
                                 pri_node)
4357

    
4358
    # Step: create new storage
4359
    self.proc.LogStep(3, steps_total, "allocate new storage")
4360
    for idx, dev in enumerate(instance.disks):
4361
      size = dev.size
4362
      info("adding new local storage on %s for disk/%d" %
4363
           (new_node, idx))
4364
      # since we *always* want to create this LV, we use the
4365
      # _Create...OnPrimary (which forces the creation), even if we
4366
      # are talking about the secondary node
4367
      for new_lv in dev.children:
4368
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4369
                                        _GetInstanceInfoText(instance)):
4370
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4371
                                   " node '%s'" %
4372
                                   (new_lv.logical_id[1], new_node))
4373

    
4374
    # Step 4: dbrd minors and drbd setups changes
4375
    # after this, we must manually remove the drbd minors on both the
4376
    # error and the success paths
4377
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4378
                                   instance.name)
4379
    logging.debug("Allocated minors %s" % (minors,))
4380
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4381
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4382
      size = dev.size
4383
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4384
      # create new devices on new_node
4385
      if pri_node == dev.logical_id[0]:
4386
        new_logical_id = (pri_node, new_node,
4387
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4388
                          dev.logical_id[5])
4389
      else:
4390
        new_logical_id = (new_node, pri_node,
4391
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4392
                          dev.logical_id[5])
4393
      iv_names[idx] = (dev, dev.children, new_logical_id)
4394
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4395
                    new_logical_id)
4396
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4397
                              logical_id=new_logical_id,
4398
                              children=dev.children)
4399
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4400
                                        new_drbd, False,
4401
                                        _GetInstanceInfoText(instance)):
4402
        self.cfg.ReleaseDRBDMinors(instance.name)
4403
        raise errors.OpExecError("Failed to create new DRBD on"
4404
                                 " node '%s'" % new_node)
4405

    
4406
    for idx, dev in enumerate(instance.disks):
4407
      # we have new devices, shutdown the drbd on the old secondary
4408
      info("shutting down drbd for disk/%d on old node" % idx)
4409
      cfg.SetDiskID(dev, old_node)
4410
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4411
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4412
                hint="Please cleanup this device manually as soon as possible")
4413

    
4414
    info("detaching primary drbds from the network (=> standalone)")
4415
    done = 0
4416
    for idx, dev in enumerate(instance.disks):
4417
      cfg.SetDiskID(dev, pri_node)
4418
      # set the network part of the physical (unique in bdev terms) id
4419
      # to None, meaning detach from network
4420
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4421
      # and 'find' the device, which will 'fix' it to match the
4422
      # standalone state
4423
      if self.rpc.call_blockdev_find(pri_node, dev):
4424
        done += 1
4425
      else:
4426
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4427
                idx)
4428

    
4429
    if not done:
4430
      # no detaches succeeded (very unlikely)
4431
      self.cfg.ReleaseDRBDMinors(instance.name)
4432
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4433

    
4434
    # if we managed to detach at least one, we update all the disks of
4435
    # the instance to point to the new secondary
4436
    info("updating instance configuration")
4437
    for dev, _, new_logical_id in iv_names.itervalues():
4438
      dev.logical_id = new_logical_id
4439
      cfg.SetDiskID(dev, pri_node)
4440
    cfg.Update(instance)
4441
    # we can remove now the temp minors as now the new values are
4442
    # written to the config file (and therefore stable)
4443
    self.cfg.ReleaseDRBDMinors(instance.name)
4444

    
4445
    # and now perform the drbd attach
4446
    info("attaching primary drbds to new secondary (standalone => connected)")
4447
    failures = []
4448
    for idx, dev in enumerate(instance.disks):
4449
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4450
      # since the attach is smart, it's enough to 'find' the device,
4451
      # it will automatically activate the network, if the physical_id
4452
      # is correct
4453
      cfg.SetDiskID(dev, pri_node)
4454
      logging.debug("Disk to attach: %s", dev)
4455
      if not self.rpc.call_blockdev_find(pri_node, dev):
4456
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4457
                "please do a gnt-instance info to see the status of disks")
4458

    
4459
    # this can fail as the old devices are degraded and _WaitForSync
4460
    # does a combined result over all disks, so we don't check its
4461
    # return value
4462
    self.proc.LogStep(5, steps_total, "sync devices")
4463
    _WaitForSync(self, instance, unlock=True)
4464

    
4465
    # so check manually all the devices
4466
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4467
      cfg.SetDiskID(dev, pri_node)
4468
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4469
      if is_degr:
4470
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4471

    
4472
    self.proc.LogStep(6, steps_total, "removing old storage")
4473
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4474
      info("remove logical volumes for disk/%d" % idx)
4475
      for lv in old_lvs:
4476
        cfg.SetDiskID(lv, old_node)
4477
        if not self.rpc.call_blockdev_remove(old_node, lv):
4478
          warning("Can't remove LV on old secondary",
4479
                  hint="Cleanup stale volumes by hand")
4480

    
4481
  def Exec(self, feedback_fn):
4482
    """Execute disk replacement.
4483

4484
    This dispatches the disk replacement to the appropriate handler.
4485

4486
    """
4487
    instance = self.instance
4488

    
4489
    # Activate the instance disks if we're replacing them on a down instance
4490
    if instance.status == "down":
4491
      _StartInstanceDisks(self, instance, True)
4492

    
4493
    if instance.disk_template == constants.DT_DRBD8:
4494
      if self.op.remote_node is None:
4495
        fn = self._ExecD8DiskOnly
4496
      else:
4497
        fn = self._ExecD8Secondary
4498
    else:
4499
      raise errors.ProgrammerError("Unhandled disk replacement case")
4500

    
4501
    ret = fn(feedback_fn)
4502

    
4503
    # Deactivate the instance disks if we're replacing them on a down instance
4504
    if instance.status == "down":
4505
      _SafeShutdownInstanceDisks(self, instance)
4506

    
4507
    return ret
4508

    
4509

    
4510
class LUGrowDisk(LogicalUnit):
4511
  """Grow a disk of an instance.
4512

4513
  """
4514
  HPATH = "disk-grow"
4515
  HTYPE = constants.HTYPE_INSTANCE
4516
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4517
  REQ_BGL = False
4518

    
4519
  def ExpandNames(self):
4520
    self._ExpandAndLockInstance()
4521
    self.needed_locks[locking.LEVEL_NODE] = []
4522
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4523

    
4524
  def DeclareLocks(self, level):
4525
    if level == locking.LEVEL_NODE:
4526
      self._LockInstancesNodes()
4527

    
4528
  def BuildHooksEnv(self):
4529
    """Build hooks env.
4530

4531
    This runs on the master, the primary and all the secondaries.
4532

4533
    """
4534
    env = {
4535
      "DISK": self.op.disk,
4536
      "AMOUNT": self.op.amount,
4537
      }
4538
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4539
    nl = [
4540
      self.cfg.GetMasterNode(),
4541
      self.instance.primary_node,
4542
      ]
4543
    return env, nl, nl
4544

    
4545
  def CheckPrereq(self):
4546
    """Check prerequisites.
4547

4548
    This checks that the instance is in the cluster.
4549

4550
    """
4551
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4552
    assert instance is not None, \
4553
      "Cannot retrieve locked instance %s" % self.op.instance_name
4554

    
4555
    self.instance = instance
4556

    
4557
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4558
      raise errors.OpPrereqError("Instance's disk layout does not support"
4559
                                 " growing.")
4560

    
4561
    self.disk = instance.FindDisk(self.op.disk)
4562

    
4563
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4564
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4565
                                       instance.hypervisor)
4566
    for node in nodenames:
4567
      info = nodeinfo.get(node, None)
4568
      if not info:
4569
        raise errors.OpPrereqError("Cannot get current information"
4570
                                   " from node '%s'" % node)
4571
      vg_free = info.get('vg_free', None)
4572
      if not isinstance(vg_free, int):
4573
        raise errors.OpPrereqError("Can't compute free disk space on"
4574
                                   " node %s" % node)
4575
      if self.op.amount > info['vg_free']:
4576
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4577
                                   " %d MiB available, %d MiB required" %
4578
                                   (node, info['vg_free'], self.op.amount))
4579

    
4580
  def Exec(self, feedback_fn):
4581
    """Execute disk grow.
4582

4583
    """
4584
    instance = self.instance
4585
    disk = self.disk
4586
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4587
      self.cfg.SetDiskID(disk, node)
4588
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4589
      if (not result or not isinstance(result, (list, tuple)) or
4590
          len(result) != 2):
4591
        raise errors.OpExecError("grow request failed to node %s" % node)
4592
      elif not result[0]:
4593
        raise errors.OpExecError("grow request failed to node %s: %s" %
4594
                                 (node, result[1]))
4595
    disk.RecordGrow(self.op.amount)
4596
    self.cfg.Update(instance)
4597
    if self.op.wait_for_sync:
4598
      disk_abort = not _WaitForSync(self, instance)
4599
      if disk_abort:
4600
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4601
                             " status.\nPlease check the instance.")
4602

    
4603

    
4604
class LUQueryInstanceData(NoHooksLU):
4605
  """Query runtime instance data.
4606

4607
  """
4608
  _OP_REQP = ["instances", "static"]
4609
  REQ_BGL = False
4610

    
4611
  def ExpandNames(self):
4612
    self.needed_locks = {}
4613
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4614

    
4615
    if not isinstance(self.op.instances, list):
4616
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4617

    
4618
    if self.op.instances:
4619
      self.wanted_names = []
4620
      for name in self.op.instances:
4621
        full_name = self.cfg.ExpandInstanceName(name)
4622
        if full_name is None:
4623
          raise errors.OpPrereqError("Instance '%s' not known" %
4624
                                     self.op.instance_name)
4625
        self.wanted_names.append(full_name)
4626
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4627
    else:
4628
      self.wanted_names = None
4629
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4630

    
4631
    self.needed_locks[locking.LEVEL_NODE] = []
4632
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4633

    
4634
  def DeclareLocks(self, level):
4635
    if level == locking.LEVEL_NODE:
4636
      self._LockInstancesNodes()
4637

    
4638
  def CheckPrereq(self):
4639
    """Check prerequisites.
4640

4641
    This only checks the optional instance list against the existing names.
4642

4643
    """
4644
    if self.wanted_names is None:
4645
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4646

    
4647
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4648
                             in self.wanted_names]
4649
    return
4650

    
4651
  def _ComputeDiskStatus(self, instance, snode, dev):
4652
    """Compute block device status.
4653

4654
    """
4655
    static = self.op.static
4656
    if not static:
4657
      self.cfg.SetDiskID(dev, instance.primary_node)
4658
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4659
    else:
4660
      dev_pstatus = None
4661

    
4662
    if dev.dev_type in constants.LDS_DRBD:
4663
      # we change the snode then (otherwise we use the one passed in)
4664
      if dev.logical_id[0] == instance.primary_node:
4665
        snode = dev.logical_id[1]
4666
      else:
4667
        snode = dev.logical_id[0]
4668

    
4669
    if snode and not static:
4670
      self.cfg.SetDiskID(dev, snode)
4671
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4672
    else:
4673
      dev_sstatus = None
4674

    
4675
    if dev.children:
4676
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4677
                      for child in dev.children]
4678
    else:
4679
      dev_children = []
4680

    
4681
    data = {
4682
      "iv_name": dev.iv_name,
4683
      "dev_type": dev.dev_type,
4684
      "logical_id": dev.logical_id,
4685
      "physical_id": dev.physical_id,
4686
      "pstatus": dev_pstatus,
4687
      "sstatus": dev_sstatus,
4688
      "children": dev_children,
4689
      "mode": dev.mode,
4690
      }
4691

    
4692
    return data
4693

    
4694
  def Exec(self, feedback_fn):
4695
    """Gather and return data"""
4696
    result = {}
4697

    
4698
    cluster = self.cfg.GetClusterInfo()
4699

    
4700
    for instance in self.wanted_instances:
4701
      if not self.op.static:
4702
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4703
                                                  instance.name,
4704
                                                  instance.hypervisor)
4705
        if remote_info and "state" in remote_info:
4706
          remote_state = "up"
4707
        else:
4708
          remote_state = "down"
4709
      else:
4710
        remote_state = None
4711
      if instance.status == "down":
4712
        config_state = "down"
4713
      else:
4714
        config_state = "up"
4715

    
4716
      disks = [self._ComputeDiskStatus(instance, None, device)
4717
               for device in instance.disks]
4718

    
4719
      idict = {
4720
        "name": instance.name,
4721
        "config_state": config_state,
4722
        "run_state": remote_state,
4723
        "pnode": instance.primary_node,
4724
        "snodes": instance.secondary_nodes,
4725
        "os": instance.os,
4726
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4727
        "disks": disks,
4728
        "hypervisor": instance.hypervisor,
4729
        "network_port": instance.network_port,
4730
        "hv_instance": instance.hvparams,
4731
        "hv_actual": cluster.FillHV(instance),
4732
        "be_instance": instance.beparams,
4733
        "be_actual": cluster.FillBE(instance),
4734
        }
4735

    
4736
      result[instance.name] = idict
4737

    
4738
    return result
4739

    
4740

    
4741
class LUSetInstanceParams(LogicalUnit):
4742
  """Modifies an instances's parameters.
4743

4744
  """
4745
  HPATH = "instance-modify"
4746
  HTYPE = constants.HTYPE_INSTANCE
4747
  _OP_REQP = ["instance_name"]
4748
  REQ_BGL = False
4749

    
4750
  def CheckArguments(self):
4751
    if not hasattr(self.op, 'nics'):
4752
      self.op.nics = []
4753
    if not hasattr(self.op, 'disks'):
4754
      self.op.disks = []
4755
    if not hasattr(self.op, 'beparams'):
4756
      self.op.beparams = {}
4757
    if not hasattr(self.op, 'hvparams'):
4758
      self.op.hvparams = {}
4759
    self.op.force = getattr(self.op, "force", False)
4760
    if not (self.op.nics or self.op.disks or
4761
            self.op.hvparams or self.op.beparams):
4762
      raise errors.OpPrereqError("No changes submitted")
4763

    
4764
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4765
      val = self.op.beparams.get(item, None)
4766
      if val is not None:
4767
        try:
4768
          val = int(val)
4769
        except ValueError, err:
4770
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4771
        self.op.beparams[item] = val
4772
    # Disk validation
4773
    disk_addremove = 0
4774
    for disk_op, disk_dict in self.op.disks:
4775
      if disk_op == constants.DDM_REMOVE:
4776
        disk_addremove += 1
4777
        continue
4778
      elif disk_op == constants.DDM_ADD:
4779
        disk_addremove += 1
4780
      else:
4781
        if not isinstance(disk_op, int):
4782
          raise errors.OpPrereqError("Invalid disk index")
4783
      if disk_op == constants.DDM_ADD:
4784
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4785
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4786
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4787
        size = disk_dict.get('size', None)
4788
        if size is None:
4789
          raise errors.OpPrereqError("Required disk parameter size missing")
4790
        try:
4791
          size = int(size)
4792
        except ValueError, err:
4793
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4794
                                     str(err))
4795
        disk_dict['size'] = size
4796
      else:
4797
        # modification of disk
4798
        if 'size' in disk_dict:
4799
          raise errors.OpPrereqError("Disk size change not possible, use"
4800
                                     " grow-disk")
4801

    
4802
    if disk_addremove > 1:
4803
      raise errors.OpPrereqError("Only one disk add or remove operation"
4804
                                 " supported at a time")
4805

    
4806
    # NIC validation
4807
    nic_addremove = 0
4808
    for nic_op, nic_dict in self.op.nics:
4809
      if nic_op == constants.DDM_REMOVE:
4810
        nic_addremove += 1
4811
        continue
4812
      elif nic_op == constants.DDM_ADD:
4813
        nic_addremove += 1
4814
      else:
4815
        if not isinstance(nic_op, int):
4816
          raise errors.OpPrereqError("Invalid nic index")
4817

    
4818
      # nic_dict should be a dict
4819
      nic_ip = nic_dict.get('ip', None)
4820
      if nic_ip is not None:
4821
        if nic_ip.lower() == "none":
4822
          nic_dict['ip'] = None
4823
        else:
4824
          if not utils.IsValidIP(nic_ip):
4825
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4826
      # we can only check None bridges and assign the default one
4827
      nic_bridge = nic_dict.get('bridge', None)
4828
      if nic_bridge is None:
4829
        nic_dict['bridge'] = self.cfg.GetDefBridge()
4830
      # but we can validate MACs
4831
      nic_mac = nic_dict.get('mac', None)
4832
      if nic_mac is not None:
4833
        if self.cfg.IsMacInUse(nic_mac):
4834
          raise errors.OpPrereqError("MAC address %s already in use"
4835
                                     " in cluster" % nic_mac)
4836
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4837
          if not utils.IsValidMac(nic_mac):
4838
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4839
    if nic_addremove > 1:
4840
      raise errors.OpPrereqError("Only one NIC add or remove operation"
4841
                                 " supported at a time")
4842

    
4843
  def ExpandNames(self):
4844
    self._ExpandAndLockInstance()
4845
    self.needed_locks[locking.LEVEL_NODE] = []
4846
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4847

    
4848
  def DeclareLocks(self, level):
4849
    if level == locking.LEVEL_NODE:
4850
      self._LockInstancesNodes()
4851

    
4852
  def BuildHooksEnv(self):
4853
    """Build hooks env.
4854

4855
    This runs on the master, primary and secondaries.
4856

4857
    """
4858
    args = dict()
4859
    if constants.BE_MEMORY in self.be_new:
4860
      args['memory'] = self.be_new[constants.BE_MEMORY]
4861
    if constants.BE_VCPUS in self.be_new:
4862
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4863
    # FIXME: readd disk/nic changes
4864
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4865
    nl = [self.cfg.GetMasterNode(),
4866
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4867
    return env, nl, nl
4868

    
4869
  def CheckPrereq(self):
4870
    """Check prerequisites.
4871

4872
    This only checks the instance list against the existing names.
4873

4874
    """
4875
    force = self.force = self.op.force
4876

    
4877
    # checking the new params on the primary/secondary nodes
4878

    
4879
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4880
    assert self.instance is not None, \
4881
      "Cannot retrieve locked instance %s" % self.op.instance_name
4882
    pnode = self.instance.primary_node
4883
    nodelist = [pnode]
4884
    nodelist.extend(instance.secondary_nodes)
4885

    
4886
    # hvparams processing
4887
    if self.op.hvparams:
4888
      i_hvdict = copy.deepcopy(instance.hvparams)
4889
      for key, val in self.op.hvparams.iteritems():
4890
        if val is None:
4891
          try:
4892
            del i_hvdict[key]
4893
          except KeyError:
4894
            pass
4895
        else:
4896
          i_hvdict[key] = val
4897
      cluster = self.cfg.GetClusterInfo()
4898
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4899
                                i_hvdict)
4900
      # local check
4901
      hypervisor.GetHypervisor(
4902
        instance.hypervisor).CheckParameterSyntax(hv_new)
4903
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4904
      self.hv_new = hv_new # the new actual values
4905
      self.hv_inst = i_hvdict # the new dict (without defaults)
4906
    else:
4907
      self.hv_new = self.hv_inst = {}
4908

    
4909
    # beparams processing
4910
    if self.op.beparams:
4911
      i_bedict = copy.deepcopy(instance.beparams)
4912
      for key, val in self.op.beparams.iteritems():
4913
        if val is None:
4914
          try:
4915
            del i_bedict[key]
4916
          except KeyError:
4917
            pass
4918
        else:
4919
          i_bedict[key] = val
4920
      cluster = self.cfg.GetClusterInfo()
4921
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4922
                                i_bedict)
4923
      self.be_new = be_new # the new actual values
4924
      self.be_inst = i_bedict # the new dict (without defaults)
4925
    else:
4926
      self.be_new = self.be_inst = {}
4927

    
4928
    self.warn = []
4929

    
4930
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4931
      mem_check_list = [pnode]
4932
      if be_new[constants.BE_AUTO_BALANCE]:
4933
        # either we changed auto_balance to yes or it was from before
4934
        mem_check_list.extend(instance.secondary_nodes)
4935
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4936
                                                  instance.hypervisor)
4937
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4938
                                         instance.hypervisor)
4939

    
4940
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4941
        # Assume the primary node is unreachable and go ahead
4942
        self.warn.append("Can't get info from primary node %s" % pnode)
4943
      else:
4944
        if instance_info:
4945
          current_mem = instance_info['memory']
4946
        else:
4947
          # Assume instance not running
4948
          # (there is a slight race condition here, but it's not very probable,
4949
          # and we have no other way to check)
4950
          current_mem = 0
4951
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4952
                    nodeinfo[pnode]['memory_free'])
4953
        if miss_mem > 0:
4954
          raise errors.OpPrereqError("This change will prevent the instance"
4955
                                     " from starting, due to %d MB of memory"
4956
                                     " missing on its primary node" % miss_mem)
4957

    
4958
      if be_new[constants.BE_AUTO_BALANCE]:
4959
        for node in instance.secondary_nodes:
4960
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4961
            self.warn.append("Can't get info from secondary node %s" % node)
4962
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4963
            self.warn.append("Not enough memory to failover instance to"
4964
                             " secondary node %s" % node)
4965

    
4966
    # NIC processing
4967
    for nic_op, nic_dict in self.op.nics:
4968
      if nic_op == constants.DDM_REMOVE:
4969
        if not instance.nics:
4970
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
4971
        continue
4972
      if nic_op != constants.DDM_ADD:
4973
        # an existing nic
4974
        if nic_op < 0 or nic_op >= len(instance.nics):
4975
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
4976
                                     " are 0 to %d" %
4977
                                     (nic_op, len(instance.nics)))
4978
      nic_bridge = nic_dict.get('bridge', None)
4979
      if nic_bridge is not None:
4980
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
4981
          msg = ("Bridge '%s' doesn't exist on one of"
4982
                 " the instance nodes" % nic_bridge)
4983
          if self.force:
4984
            self.warn.append(msg)
4985
          else:
4986
            raise errors.OpPrereqError(msg)
4987

    
4988
    # DISK processing
4989
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
4990
      raise errors.OpPrereqError("Disk operations not supported for"
4991
                                 " diskless instances")
4992
    for disk_op, disk_dict in self.op.disks:
4993
      if disk_op == constants.DDM_REMOVE:
4994
        if len(instance.disks) == 1:
4995
          raise errors.OpPrereqError("Cannot remove the last disk of"
4996
                                     " an instance")
4997
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
4998
        ins_l = ins_l[pnode]
4999
        if not type(ins_l) is list:
5000
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5001
        if instance.name in ins_l:
5002
          raise errors.OpPrereqError("Instance is running, can't remove"
5003
                                     " disks.")
5004

    
5005
      if (disk_op == constants.DDM_ADD and
5006
          len(instance.nics) >= constants.MAX_DISKS):
5007
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5008
                                   " add more" % constants.MAX_DISKS)
5009
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5010
        # an existing disk
5011
        if disk_op < 0 or disk_op >= len(instance.disks):
5012
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5013
                                     " are 0 to %d" %
5014
                                     (disk_op, len(instance.disks)))
5015

    
5016
    return
5017

    
5018
  def Exec(self, feedback_fn):
5019
    """Modifies an instance.
5020

5021
    All parameters take effect only at the next restart of the instance.
5022

5023
    """
5024
    # Process here the warnings from CheckPrereq, as we don't have a
5025
    # feedback_fn there.
5026
    for warn in self.warn:
5027
      feedback_fn("WARNING: %s" % warn)
5028

    
5029
    result = []
5030
    instance = self.instance
5031
    # disk changes
5032
    for disk_op, disk_dict in self.op.disks:
5033
      if disk_op == constants.DDM_REMOVE:
5034
        # remove the last disk
5035
        device = instance.disks.pop()
5036
        device_idx = len(instance.disks)
5037
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5038
          self.cfg.SetDiskID(disk, node)
5039
          if not self.rpc.call_blockdev_remove(node, disk):
5040
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
5041
                                 " continuing anyway", device_idx, node)
5042
        result.append(("disk/%d" % device_idx, "remove"))
5043
      elif disk_op == constants.DDM_ADD:
5044
        # add a new disk
5045
        if instance.disk_template == constants.DT_FILE:
5046
          file_driver, file_path = instance.disks[0].logical_id
5047
          file_path = os.path.dirname(file_path)
5048
        else:
5049
          file_driver = file_path = None
5050
        disk_idx_base = len(instance.disks)
5051
        new_disk = _GenerateDiskTemplate(self,
5052
                                         instance.disk_template,
5053
                                         instance, instance.primary_node,
5054
                                         instance.secondary_nodes,
5055
                                         [disk_dict],
5056
                                         file_path,
5057
                                         file_driver,
5058
                                         disk_idx_base)[0]
5059
        new_disk.mode = disk_dict['mode']
5060
        instance.disks.append(new_disk)
5061
        info = _GetInstanceInfoText(instance)
5062

    
5063
        logging.info("Creating volume %s for instance %s",
5064
                     new_disk.iv_name, instance.name)
5065
        # Note: this needs to be kept in sync with _CreateDisks
5066
        #HARDCODE
5067
        for secondary_node in instance.secondary_nodes:
5068
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5069
                                            new_disk, False, info):
5070
            self.LogWarning("Failed to create volume %s (%s) on"
5071
                            " secondary node %s!",
5072
                            new_disk.iv_name, new_disk, secondary_node)
5073
        #HARDCODE
5074
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5075
                                        instance, new_disk, info):
5076
          self.LogWarning("Failed to create volume %s on primary!",
5077
                          new_disk.iv_name)
5078
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5079
                       (new_disk.size, new_disk.mode)))
5080
      else:
5081
        # change a given disk
5082
        instance.disks[disk_op].mode = disk_dict['mode']
5083
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5084
    # NIC changes
5085
    for nic_op, nic_dict in self.op.nics:
5086
      if nic_op == constants.DDM_REMOVE:
5087
        # remove the last nic
5088
        del instance.nics[-1]
5089
        result.append(("nic.%d" % len(instance.nics), "remove"))
5090
      elif nic_op == constants.DDM_ADD:
5091
        # add a new nic
5092
        if 'mac' not in nic_dict:
5093
          mac = constants.VALUE_GENERATE
5094
        else:
5095
          mac = nic_dict['mac']
5096
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5097
          mac = self.cfg.GenerateMAC()
5098
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5099
                              bridge=nic_dict.get('bridge', None))
5100
        instance.nics.append(new_nic)
5101
        result.append(("nic.%d" % (len(instance.nics) - 1),
5102
                       "add:mac=%s,ip=%s,bridge=%s" %
5103
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5104
      else:
5105
        # change a given nic
5106
        for key in 'mac', 'ip', 'bridge':
5107
          if key in nic_dict:
5108
            setattr(instance.nics[nic_op], key, nic_dict[key])
5109
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5110

    
5111
    # hvparams changes
5112
    if self.op.hvparams:
5113
      instance.hvparams = self.hv_new
5114
      for key, val in self.op.hvparams.iteritems():
5115
        result.append(("hv/%s" % key, val))
5116

    
5117
    # beparams changes
5118
    if self.op.beparams:
5119
      instance.beparams = self.be_inst
5120
      for key, val in self.op.beparams.iteritems():
5121
        result.append(("be/%s" % key, val))
5122

    
5123
    self.cfg.Update(instance)
5124

    
5125
    return result
5126

    
5127

    
5128
class LUQueryExports(NoHooksLU):
5129
  """Query the exports list
5130

5131
  """
5132
  _OP_REQP = ['nodes']
5133
  REQ_BGL = False
5134

    
5135
  def ExpandNames(self):
5136
    self.needed_locks = {}
5137
    self.share_locks[locking.LEVEL_NODE] = 1
5138
    if not self.op.nodes:
5139
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5140
    else:
5141
      self.needed_locks[locking.LEVEL_NODE] = \
5142
        _GetWantedNodes(self, self.op.nodes)
5143

    
5144
  def CheckPrereq(self):
5145
    """Check prerequisites.
5146

5147
    """
5148
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5149

    
5150
  def Exec(self, feedback_fn):
5151
    """Compute the list of all the exported system images.
5152

5153
    @rtype: dict
5154
    @return: a dictionary with the structure node->(export-list)
5155
        where export-list is a list of the instances exported on
5156
        that node.
5157

5158
    """
5159
    return self.rpc.call_export_list(self.nodes)
5160

    
5161

    
5162
class LUExportInstance(LogicalUnit):
5163
  """Export an instance to an image in the cluster.
5164

5165
  """
5166
  HPATH = "instance-export"
5167
  HTYPE = constants.HTYPE_INSTANCE
5168
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5169
  REQ_BGL = False
5170

    
5171
  def ExpandNames(self):
5172
    self._ExpandAndLockInstance()
5173
    # FIXME: lock only instance primary and destination node
5174
    #
5175
    # Sad but true, for now we have do lock all nodes, as we don't know where
5176
    # the previous export might be, and and in this LU we search for it and
5177
    # remove it from its current node. In the future we could fix this by:
5178
    #  - making a tasklet to search (share-lock all), then create the new one,
5179
    #    then one to remove, after
5180
    #  - removing the removal operation altoghether
5181
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5182

    
5183
  def DeclareLocks(self, level):
5184
    """Last minute lock declaration."""
5185
    # All nodes are locked anyway, so nothing to do here.
5186

    
5187
  def BuildHooksEnv(self):
5188
    """Build hooks env.
5189

5190
    This will run on the master, primary node and target node.
5191

5192
    """
5193
    env = {
5194
      "EXPORT_NODE": self.op.target_node,
5195
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5196
      }
5197
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5198
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5199
          self.op.target_node]
5200
    return env, nl, nl
5201

    
5202
  def CheckPrereq(self):
5203
    """Check prerequisites.
5204

5205
    This checks that the instance and node names are valid.
5206

5207
    """
5208
    instance_name = self.op.instance_name
5209
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5210
    assert self.instance is not None, \
5211
          "Cannot retrieve locked instance %s" % self.op.instance_name
5212

    
5213
    self.dst_node = self.cfg.GetNodeInfo(
5214
      self.cfg.ExpandNodeName(self.op.target_node))
5215

    
5216
    if self.dst_node is None:
5217
      # This is wrong node name, not a non-locked node
5218
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5219

    
5220
    # instance disk type verification
5221
    for disk in self.instance.disks:
5222
      if disk.dev_type == constants.LD_FILE:
5223
        raise errors.OpPrereqError("Export not supported for instances with"
5224
                                   " file-based disks")
5225

    
5226
  def Exec(self, feedback_fn):
5227
    """Export an instance to an image in the cluster.
5228

5229
    """
5230
    instance = self.instance
5231
    dst_node = self.dst_node
5232
    src_node = instance.primary_node
5233
    if self.op.shutdown:
5234
      # shutdown the instance, but not the disks
5235
      if not self.rpc.call_instance_shutdown(src_node, instance):
5236
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5237
                                 (instance.name, src_node))
5238

    
5239
    vgname = self.cfg.GetVGName()
5240

    
5241
    snap_disks = []
5242

    
5243
    try:
5244
      for disk in instance.disks:
5245
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5246
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5247

    
5248
        if not new_dev_name:
5249
          self.LogWarning("Could not snapshot block device %s on node %s",
5250
                          disk.logical_id[1], src_node)
5251
          snap_disks.append(False)
5252
        else:
5253
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5254
                                 logical_id=(vgname, new_dev_name),
5255
                                 physical_id=(vgname, new_dev_name),
5256
                                 iv_name=disk.iv_name)
5257
          snap_disks.append(new_dev)
5258

    
5259
    finally:
5260
      if self.op.shutdown and instance.status == "up":
5261
        if not self.rpc.call_instance_start(src_node, instance, None):
5262
          _ShutdownInstanceDisks(self, instance)
5263
          raise errors.OpExecError("Could not start instance")
5264

    
5265
    # TODO: check for size
5266

    
5267
    cluster_name = self.cfg.GetClusterName()
5268
    for idx, dev in enumerate(snap_disks):
5269
      if dev:
5270
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5271
                                             instance, cluster_name, idx):
5272
          self.LogWarning("Could not export block device %s from node %s to"
5273
                          " node %s", dev.logical_id[1], src_node,
5274
                          dst_node.name)
5275
        if not self.rpc.call_blockdev_remove(src_node, dev):
5276
          self.LogWarning("Could not remove snapshot block device %s from node"
5277
                          " %s", dev.logical_id[1], src_node)
5278

    
5279
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5280
      self.LogWarning("Could not finalize export for instance %s on node %s",
5281
                      instance.name, dst_node.name)
5282

    
5283
    nodelist = self.cfg.GetNodeList()
5284
    nodelist.remove(dst_node.name)
5285

    
5286
    # on one-node clusters nodelist will be empty after the removal
5287
    # if we proceed the backup would be removed because OpQueryExports
5288
    # substitutes an empty list with the full cluster node list.
5289
    if nodelist:
5290
      exportlist = self.rpc.call_export_list(nodelist)
5291
      for node in exportlist:
5292
        if instance.name in exportlist[node]:
5293
          if not self.rpc.call_export_remove(node, instance.name):
5294
            self.LogWarning("Could not remove older export for instance %s"
5295
                            " on node %s", instance.name, node)
5296

    
5297

    
5298
class LURemoveExport(NoHooksLU):
5299
  """Remove exports related to the named instance.
5300

5301
  """
5302
  _OP_REQP = ["instance_name"]
5303
  REQ_BGL = False
5304

    
5305
  def ExpandNames(self):
5306
    self.needed_locks = {}
5307
    # We need all nodes to be locked in order for RemoveExport to work, but we
5308
    # don't need to lock the instance itself, as nothing will happen to it (and
5309
    # we can remove exports also for a removed instance)
5310
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5311

    
5312
  def CheckPrereq(self):
5313
    """Check prerequisites.
5314
    """
5315
    pass
5316

    
5317
  def Exec(self, feedback_fn):
5318
    """Remove any export.
5319

5320
    """
5321
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5322
    # If the instance was not found we'll try with the name that was passed in.
5323
    # This will only work if it was an FQDN, though.
5324
    fqdn_warn = False
5325
    if not instance_name:
5326
      fqdn_warn = True
5327
      instance_name = self.op.instance_name
5328

    
5329
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5330
      locking.LEVEL_NODE])
5331
    found = False
5332
    for node in exportlist:
5333
      if instance_name in exportlist[node]:
5334
        found = True
5335
        if not self.rpc.call_export_remove(node, instance_name):
5336
          logging.error("Could not remove export for instance %s"
5337
                        " on node %s", instance_name, node)
5338

    
5339
    if fqdn_warn and not found:
5340
      feedback_fn("Export not found. If trying to remove an export belonging"
5341
                  " to a deleted instance please use its Fully Qualified"
5342
                  " Domain Name.")
5343

    
5344

    
5345
class TagsLU(NoHooksLU):
5346
  """Generic tags LU.
5347

5348
  This is an abstract class which is the parent of all the other tags LUs.
5349

5350
  """
5351

    
5352
  def ExpandNames(self):
5353
    self.needed_locks = {}
5354
    if self.op.kind == constants.TAG_NODE:
5355
      name = self.cfg.ExpandNodeName(self.op.name)
5356
      if name is None:
5357
        raise errors.OpPrereqError("Invalid node name (%s)" %
5358
                                   (self.op.name,))
5359
      self.op.name = name
5360
      self.needed_locks[locking.LEVEL_NODE] = name
5361
    elif self.op.kind == constants.TAG_INSTANCE:
5362
      name = self.cfg.ExpandInstanceName(self.op.name)
5363
      if name is None:
5364
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5365
                                   (self.op.name,))
5366
      self.op.name = name
5367
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5368

    
5369
  def CheckPrereq(self):
5370
    """Check prerequisites.
5371

5372
    """
5373
    if self.op.kind == constants.TAG_CLUSTER:
5374
      self.target = self.cfg.GetClusterInfo()
5375
    elif self.op.kind == constants.TAG_NODE:
5376
      self.target = self.cfg.GetNodeInfo(self.op.name)
5377
    elif self.op.kind == constants.TAG_INSTANCE:
5378
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5379
    else:
5380
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5381
                                 str(self.op.kind))
5382

    
5383

    
5384
class LUGetTags(TagsLU):
5385
  """Returns the tags of a given object.
5386

5387
  """
5388
  _OP_REQP = ["kind", "name"]
5389
  REQ_BGL = False
5390

    
5391
  def Exec(self, feedback_fn):
5392
    """Returns the tag list.
5393

5394
    """
5395
    return list(self.target.GetTags())
5396

    
5397

    
5398
class LUSearchTags(NoHooksLU):
5399
  """Searches the tags for a given pattern.
5400

5401
  """
5402
  _OP_REQP = ["pattern"]
5403
  REQ_BGL = False
5404

    
5405
  def ExpandNames(self):
5406
    self.needed_locks = {}
5407

    
5408
  def CheckPrereq(self):
5409
    """Check prerequisites.
5410

5411
    This checks the pattern passed for validity by compiling it.
5412

5413
    """
5414
    try:
5415
      self.re = re.compile(self.op.pattern)
5416
    except re.error, err:
5417
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5418
                                 (self.op.pattern, err))
5419

    
5420
  def Exec(self, feedback_fn):
5421
    """Returns the tag list.
5422

5423
    """
5424
    cfg = self.cfg
5425
    tgts = [("/cluster", cfg.GetClusterInfo())]
5426
    ilist = cfg.GetAllInstancesInfo().values()
5427
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5428
    nlist = cfg.GetAllNodesInfo().values()
5429
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5430
    results = []
5431
    for path, target in tgts:
5432
      for tag in target.GetTags():
5433
        if self.re.search(tag):
5434
          results.append((path, tag))
5435
    return results
5436

    
5437

    
5438
class LUAddTags(TagsLU):
5439
  """Sets a tag on a given object.
5440

5441
  """
5442
  _OP_REQP = ["kind", "name", "tags"]
5443
  REQ_BGL = False
5444

    
5445
  def CheckPrereq(self):
5446
    """Check prerequisites.
5447

5448
    This checks the type and length of the tag name and value.
5449

5450
    """
5451
    TagsLU.CheckPrereq(self)
5452
    for tag in self.op.tags:
5453
      objects.TaggableObject.ValidateTag(tag)
5454

    
5455
  def Exec(self, feedback_fn):
5456
    """Sets the tag.
5457

5458
    """
5459
    try:
5460
      for tag in self.op.tags:
5461
        self.target.AddTag(tag)
5462
    except errors.TagError, err:
5463
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5464
    try:
5465
      self.cfg.Update(self.target)
5466
    except errors.ConfigurationError:
5467
      raise errors.OpRetryError("There has been a modification to the"
5468
                                " config file and the operation has been"
5469
                                " aborted. Please retry.")
5470

    
5471

    
5472
class LUDelTags(TagsLU):
5473
  """Delete a list of tags from a given object.
5474

5475
  """
5476
  _OP_REQP = ["kind", "name", "tags"]
5477
  REQ_BGL = False
5478

    
5479
  def CheckPrereq(self):
5480
    """Check prerequisites.
5481

5482
    This checks that we have the given tag.
5483

5484
    """
5485
    TagsLU.CheckPrereq(self)
5486
    for tag in self.op.tags:
5487
      objects.TaggableObject.ValidateTag(tag)
5488
    del_tags = frozenset(self.op.tags)
5489
    cur_tags = self.target.GetTags()
5490
    if not del_tags <= cur_tags:
5491
      diff_tags = del_tags - cur_tags
5492
      diff_names = ["'%s'" % tag for tag in diff_tags]
5493
      diff_names.sort()
5494
      raise errors.OpPrereqError("Tag(s) %s not found" %
5495
                                 (",".join(diff_names)))
5496

    
5497
  def Exec(self, feedback_fn):
5498
    """Remove the tag from the object.
5499

5500
    """
5501
    for tag in self.op.tags:
5502
      self.target.RemoveTag(tag)
5503
    try:
5504
      self.cfg.Update(self.target)
5505
    except errors.ConfigurationError:
5506
      raise errors.OpRetryError("There has been a modification to the"
5507
                                " config file and the operation has been"
5508
                                " aborted. Please retry.")
5509

    
5510

    
5511
class LUTestDelay(NoHooksLU):
5512
  """Sleep for a specified amount of time.
5513

5514
  This LU sleeps on the master and/or nodes for a specified amount of
5515
  time.
5516

5517
  """
5518
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5519
  REQ_BGL = False
5520

    
5521
  def ExpandNames(self):
5522
    """Expand names and set required locks.
5523

5524
    This expands the node list, if any.
5525

5526
    """
5527
    self.needed_locks = {}
5528
    if self.op.on_nodes:
5529
      # _GetWantedNodes can be used here, but is not always appropriate to use
5530
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5531
      # more information.
5532
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5533
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5534

    
5535
  def CheckPrereq(self):
5536
    """Check prerequisites.
5537

5538
    """
5539

    
5540
  def Exec(self, feedback_fn):
5541
    """Do the actual sleep.
5542

5543
    """
5544
    if self.op.on_master:
5545
      if not utils.TestDelay(self.op.duration):
5546
        raise errors.OpExecError("Error during master delay test")
5547
    if self.op.on_nodes:
5548
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5549
      if not result:
5550
        raise errors.OpExecError("Complete failure from rpc call")
5551
      for node, node_result in result.items():
5552
        if not node_result:
5553
          raise errors.OpExecError("Failure during rpc call to node %s,"
5554
                                   " result: %s" % (node, node_result))
5555

    
5556

    
5557
class IAllocator(object):
5558
  """IAllocator framework.
5559

5560
  An IAllocator instance has three sets of attributes:
5561
    - cfg that is needed to query the cluster
5562
    - input data (all members of the _KEYS class attribute are required)
5563
    - four buffer attributes (in|out_data|text), that represent the
5564
      input (to the external script) in text and data structure format,
5565
      and the output from it, again in two formats
5566
    - the result variables from the script (success, info, nodes) for
5567
      easy usage
5568

5569
  """
5570
  _ALLO_KEYS = [
5571
    "mem_size", "disks", "disk_template",
5572
    "os", "tags", "nics", "vcpus", "hypervisor",
5573
    ]
5574
  _RELO_KEYS = [
5575
    "relocate_from",
5576
    ]
5577

    
5578
  def __init__(self, lu, mode, name, **kwargs):
5579
    self.lu = lu
5580
    # init buffer variables
5581
    self.in_text = self.out_text = self.in_data = self.out_data = None
5582
    # init all input fields so that pylint is happy
5583
    self.mode = mode
5584
    self.name = name
5585
    self.mem_size = self.disks = self.disk_template = None
5586
    self.os = self.tags = self.nics = self.vcpus = None
5587
    self.relocate_from = None
5588
    # computed fields
5589
    self.required_nodes = None
5590
    # init result fields
5591
    self.success = self.info = self.nodes = None
5592
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5593
      keyset = self._ALLO_KEYS
5594
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5595
      keyset = self._RELO_KEYS
5596
    else:
5597
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5598
                                   " IAllocator" % self.mode)
5599
    for key in kwargs:
5600
      if key not in keyset:
5601
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5602
                                     " IAllocator" % key)
5603
      setattr(self, key, kwargs[key])
5604
    for key in keyset:
5605
      if key not in kwargs:
5606
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5607
                                     " IAllocator" % key)
5608
    self._BuildInputData()
5609

    
5610
  def _ComputeClusterData(self):
5611
    """Compute the generic allocator input data.
5612

5613
    This is the data that is independent of the actual operation.
5614

5615
    """
5616
    cfg = self.lu.cfg
5617
    cluster_info = cfg.GetClusterInfo()
5618
    # cluster data
5619
    data = {
5620
      "version": 1,
5621
      "cluster_name": cfg.GetClusterName(),
5622
      "cluster_tags": list(cluster_info.GetTags()),
5623
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5624
      # we don't have job IDs
5625
      }
5626
    iinfo = cfg.GetAllInstancesInfo().values()
5627
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5628

    
5629
    # node data
5630
    node_results = {}
5631
    node_list = cfg.GetNodeList()
5632

    
5633
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5634
      hypervisor = self.hypervisor
5635
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5636
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5637

    
5638
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5639
                                           hypervisor)
5640
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5641
                       cluster_info.enabled_hypervisors)
5642
    for nname in node_list:
5643
      ninfo = cfg.GetNodeInfo(nname)
5644
      if nname not in node_data or not isinstance(node_data[nname], dict):
5645
        raise errors.OpExecError("Can't get data for node %s" % nname)
5646
      remote_info = node_data[nname]
5647
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5648
                   'vg_size', 'vg_free', 'cpu_total']:
5649
        if attr not in remote_info:
5650
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5651
                                   (nname, attr))
5652
        try:
5653
          remote_info[attr] = int(remote_info[attr])
5654
        except ValueError, err:
5655
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5656
                                   " %s" % (nname, attr, str(err)))
5657
      # compute memory used by primary instances
5658
      i_p_mem = i_p_up_mem = 0
5659
      for iinfo, beinfo in i_list:
5660
        if iinfo.primary_node == nname:
5661
          i_p_mem += beinfo[constants.BE_MEMORY]
5662
          if iinfo.name not in node_iinfo[nname]:
5663
            i_used_mem = 0
5664
          else:
5665
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5666
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5667
          remote_info['memory_free'] -= max(0, i_mem_diff)
5668

    
5669
          if iinfo.status == "up":
5670
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5671

    
5672
      # compute memory used by instances
5673
      pnr = {
5674
        "tags": list(ninfo.GetTags()),
5675
        "total_memory": remote_info['memory_total'],
5676
        "reserved_memory": remote_info['memory_dom0'],
5677
        "free_memory": remote_info['memory_free'],
5678
        "i_pri_memory": i_p_mem,
5679
        "i_pri_up_memory": i_p_up_mem,
5680
        "total_disk": remote_info['vg_size'],
5681
        "free_disk": remote_info['vg_free'],
5682
        "primary_ip": ninfo.primary_ip,
5683
        "secondary_ip": ninfo.secondary_ip,
5684
        "total_cpus": remote_info['cpu_total'],
5685
        }
5686
      node_results[nname] = pnr
5687
    data["nodes"] = node_results
5688

    
5689
    # instance data
5690
    instance_data = {}
5691
    for iinfo, beinfo in i_list:
5692
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5693
                  for n in iinfo.nics]
5694
      pir = {
5695
        "tags": list(iinfo.GetTags()),
5696
        "should_run": iinfo.status == "up",
5697
        "vcpus": beinfo[constants.BE_VCPUS],
5698
        "memory": beinfo[constants.BE_MEMORY],
5699
        "os": iinfo.os,
5700
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5701
        "nics": nic_data,
5702
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5703
        "disk_template": iinfo.disk_template,
5704
        "hypervisor": iinfo.hypervisor,
5705
        }
5706
      instance_data[iinfo.name] = pir
5707

    
5708
    data["instances"] = instance_data
5709

    
5710
    self.in_data = data
5711

    
5712
  def _AddNewInstance(self):
5713
    """Add new instance data to allocator structure.
5714

5715
    This in combination with _AllocatorGetClusterData will create the
5716
    correct structure needed as input for the allocator.
5717

5718
    The checks for the completeness of the opcode must have already been
5719
    done.
5720

5721
    """
5722
    data = self.in_data
5723
    if len(self.disks) != 2:
5724
      raise errors.OpExecError("Only two-disk configurations supported")
5725

    
5726
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5727

    
5728
    if self.disk_template in constants.DTS_NET_MIRROR:
5729
      self.required_nodes = 2
5730
    else:
5731
      self.required_nodes = 1
5732
    request = {
5733
      "type": "allocate",
5734
      "name": self.name,
5735
      "disk_template": self.disk_template,
5736
      "tags": self.tags,
5737
      "os": self.os,
5738
      "vcpus": self.vcpus,
5739
      "memory": self.mem_size,
5740
      "disks": self.disks,
5741
      "disk_space_total": disk_space,
5742
      "nics": self.nics,
5743
      "required_nodes": self.required_nodes,
5744
      }
5745
    data["request"] = request
5746

    
5747
  def _AddRelocateInstance(self):
5748
    """Add relocate instance data to allocator structure.
5749

5750
    This in combination with _IAllocatorGetClusterData will create the
5751
    correct structure needed as input for the allocator.
5752

5753
    The checks for the completeness of the opcode must have already been
5754
    done.
5755

5756
    """
5757
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5758
    if instance is None:
5759
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5760
                                   " IAllocator" % self.name)
5761

    
5762
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5763
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5764

    
5765
    if len(instance.secondary_nodes) != 1:
5766
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5767

    
5768
    self.required_nodes = 1
5769
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5770
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5771

    
5772
    request = {
5773
      "type": "relocate",
5774
      "name": self.name,
5775
      "disk_space_total": disk_space,
5776
      "required_nodes": self.required_nodes,
5777
      "relocate_from": self.relocate_from,
5778
      }
5779
    self.in_data["request"] = request
5780

    
5781
  def _BuildInputData(self):
5782
    """Build input data structures.
5783

5784
    """
5785
    self._ComputeClusterData()
5786

    
5787
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5788
      self._AddNewInstance()
5789
    else:
5790
      self._AddRelocateInstance()
5791

    
5792
    self.in_text = serializer.Dump(self.in_data)
5793

    
5794
  def Run(self, name, validate=True, call_fn=None):
5795
    """Run an instance allocator and return the results.
5796

5797
    """
5798
    if call_fn is None:
5799
      call_fn = self.lu.rpc.call_iallocator_runner
5800
    data = self.in_text
5801

    
5802
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5803

    
5804
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5805
      raise errors.OpExecError("Invalid result from master iallocator runner")
5806

    
5807
    rcode, stdout, stderr, fail = result
5808

    
5809
    if rcode == constants.IARUN_NOTFOUND:
5810
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5811
    elif rcode == constants.IARUN_FAILURE:
5812
      raise errors.OpExecError("Instance allocator call failed: %s,"
5813
                               " output: %s" % (fail, stdout+stderr))
5814
    self.out_text = stdout
5815
    if validate:
5816
      self._ValidateResult()
5817

    
5818
  def _ValidateResult(self):
5819
    """Process the allocator results.
5820

5821
    This will process and if successful save the result in
5822
    self.out_data and the other parameters.
5823

5824
    """
5825
    try:
5826
      rdict = serializer.Load(self.out_text)
5827
    except Exception, err:
5828
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5829

    
5830
    if not isinstance(rdict, dict):
5831
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5832

    
5833
    for key in "success", "info", "nodes":
5834
      if key not in rdict:
5835
        raise errors.OpExecError("Can't parse iallocator results:"
5836
                                 " missing key '%s'" % key)
5837
      setattr(self, key, rdict[key])
5838

    
5839
    if not isinstance(rdict["nodes"], list):
5840
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5841
                               " is not a list")
5842
    self.out_data = rdict
5843

    
5844

    
5845
class LUTestAllocator(NoHooksLU):
5846
  """Run allocator tests.
5847

5848
  This LU runs the allocator tests
5849

5850
  """
5851
  _OP_REQP = ["direction", "mode", "name"]
5852

    
5853
  def CheckPrereq(self):
5854
    """Check prerequisites.
5855

5856
    This checks the opcode parameters depending on the director and mode test.
5857

5858
    """
5859
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5860
      for attr in ["name", "mem_size", "disks", "disk_template",
5861
                   "os", "tags", "nics", "vcpus"]:
5862
        if not hasattr(self.op, attr):
5863
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5864
                                     attr)
5865
      iname = self.cfg.ExpandInstanceName(self.op.name)
5866
      if iname is not None:
5867
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5868
                                   iname)
5869
      if not isinstance(self.op.nics, list):
5870
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5871
      for row in self.op.nics:
5872
        if (not isinstance(row, dict) or
5873
            "mac" not in row or
5874
            "ip" not in row or
5875
            "bridge" not in row):
5876
          raise errors.OpPrereqError("Invalid contents of the"
5877
                                     " 'nics' parameter")
5878
      if not isinstance(self.op.disks, list):
5879
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5880
      if len(self.op.disks) != 2:
5881
        raise errors.OpPrereqError("Only two-disk configurations supported")
5882
      for row in self.op.disks:
5883
        if (not isinstance(row, dict) or
5884
            "size" not in row or
5885
            not isinstance(row["size"], int) or
5886
            "mode" not in row or
5887
            row["mode"] not in ['r', 'w']):
5888
          raise errors.OpPrereqError("Invalid contents of the"
5889
                                     " 'disks' parameter")
5890
      if self.op.hypervisor is None:
5891
        self.op.hypervisor = self.cfg.GetHypervisorType()
5892
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5893
      if not hasattr(self.op, "name"):
5894
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5895
      fname = self.cfg.ExpandInstanceName(self.op.name)
5896
      if fname is None:
5897
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5898
                                   self.op.name)
5899
      self.op.name = fname
5900
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5901
    else:
5902
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5903
                                 self.op.mode)
5904

    
5905
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5906
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5907
        raise errors.OpPrereqError("Missing allocator name")
5908
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5909
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5910
                                 self.op.direction)
5911

    
5912
  def Exec(self, feedback_fn):
5913
    """Run the allocator test.
5914

5915
    """
5916
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5917
      ial = IAllocator(self,
5918
                       mode=self.op.mode,
5919
                       name=self.op.name,
5920
                       mem_size=self.op.mem_size,
5921
                       disks=self.op.disks,
5922
                       disk_template=self.op.disk_template,
5923
                       os=self.op.os,
5924
                       tags=self.op.tags,
5925
                       nics=self.op.nics,
5926
                       vcpus=self.op.vcpus,
5927
                       hypervisor=self.op.hypervisor,
5928
                       )
5929
    else:
5930
      ial = IAllocator(self,
5931
                       mode=self.op.mode,
5932
                       name=self.op.name,
5933
                       relocate_from=list(self.relocate_from),
5934
                       )
5935

    
5936
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5937
      result = ial.in_text
5938
    else:
5939
      ial.Run(self.op.allocator, validate=False)
5940
      result = ial.out_text
5941
    return result