Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / group.py @ 31d3b918

History | View | Annotate | Download (31.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with node groups."""
23

    
24
import itertools
25
import logging
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import objects
31
from ganeti import utils
32
from ganeti.masterd import iallocator
33
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
34
from ganeti.cmdlib.common import MergeAndVerifyHvState, \
35
  MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
36
  CheckNodeGroupInstances, GetUpdatedIPolicy, \
37
  ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
38
  CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
39
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
40
  CheckDiskAccessModeConsistency
41

    
42
import ganeti.masterd.instance
43

    
44

    
45
class LUGroupAdd(LogicalUnit):
46
  """Logical unit for creating node groups.
47

48
  """
49
  HPATH = "group-add"
50
  HTYPE = constants.HTYPE_GROUP
51
  REQ_BGL = False
52

    
53
  def ExpandNames(self):
54
    # We need the new group's UUID here so that we can create and acquire the
55
    # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup
56
    # that it should not check whether the UUID exists in the configuration.
57
    self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
58
    self.needed_locks = {}
59
    self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
60

    
61
  def _CheckIpolicy(self):
62
    """Checks the group's ipolicy for consistency and validity.
63

64
    """
65
    if self.op.ipolicy:
66
      cluster = self.cfg.GetClusterInfo()
67
      full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
68
      try:
69
        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
70
      except errors.ConfigurationError, err:
71
        raise errors.OpPrereqError("Invalid instance policy: %s" % err,
72
                                   errors.ECODE_INVAL)
73
      CheckIpolicyVsDiskTemplates(full_ipolicy,
74
                                  cluster.enabled_disk_templates)
75

    
76
  def CheckPrereq(self):
77
    """Check prerequisites.
78

79
    This checks that the given group name is not an existing node group
80
    already.
81

82
    """
83
    try:
84
      existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
85
    except errors.OpPrereqError:
86
      pass
87
    else:
88
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
89
                                 " node group (UUID: %s)" %
90
                                 (self.op.group_name, existing_uuid),
91
                                 errors.ECODE_EXISTS)
92

    
93
    if self.op.ndparams:
94
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
95

    
96
    if self.op.hv_state:
97
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
98
    else:
99
      self.new_hv_state = None
100

    
101
    if self.op.disk_state:
102
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
103
    else:
104
      self.new_disk_state = None
105

    
106
    if self.op.diskparams:
107
      for templ in constants.DISK_TEMPLATES:
108
        if templ in self.op.diskparams:
109
          utils.ForceDictType(self.op.diskparams[templ],
110
                              constants.DISK_DT_TYPES)
111
      self.new_diskparams = self.op.diskparams
112
      try:
113
        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
114
      except errors.OpPrereqError, err:
115
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
116
                                   errors.ECODE_INVAL)
117
    else:
118
      self.new_diskparams = {}
119

    
120
    self._CheckIpolicy()
121

    
122
  def BuildHooksEnv(self):
123
    """Build hooks env.
124

125
    """
126
    return {
127
      "GROUP_NAME": self.op.group_name,
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    mn = self.cfg.GetMasterNode()
135
    return ([mn], [mn])
136

    
137
  def Exec(self, feedback_fn):
138
    """Add the node group to the cluster.
139

140
    """
141
    group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
142
                                  uuid=self.group_uuid,
143
                                  alloc_policy=self.op.alloc_policy,
144
                                  ndparams=self.op.ndparams,
145
                                  diskparams=self.new_diskparams,
146
                                  ipolicy=self.op.ipolicy,
147
                                  hv_state_static=self.new_hv_state,
148
                                  disk_state_static=self.new_disk_state)
149

    
150
    self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
151
    del self.remove_locks[locking.LEVEL_NODEGROUP]
152

    
153

    
154
class LUGroupAssignNodes(NoHooksLU):
155
  """Logical unit for assigning nodes to groups.
156

157
  """
158
  REQ_BGL = False
159

    
160
  def ExpandNames(self):
161
    # These raise errors.OpPrereqError on their own:
162
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
163
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
164

    
165
    # We want to lock all the affected nodes and groups. We have readily
166
    # available the list of nodes, and the *destination* group. To gather the
167
    # list of "source" groups, we need to fetch node information later on.
168
    self.needed_locks = {
169
      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
170
      locking.LEVEL_NODE: self.op.node_uuids,
171
      }
172

    
173
  def DeclareLocks(self, level):
174
    if level == locking.LEVEL_NODEGROUP:
175
      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
176

    
177
      # Try to get all affected nodes' groups without having the group or node
178
      # lock yet. Needs verification later in the code flow.
179
      groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
180

    
181
      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
182

    
183
  def CheckPrereq(self):
184
    """Check prerequisites.
185

186
    """
187
    assert self.needed_locks[locking.LEVEL_NODEGROUP]
188
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
189
            frozenset(self.op.node_uuids))
190

    
191
    expected_locks = (set([self.group_uuid]) |
192
                      self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
193
    actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
194
    if actual_locks != expected_locks:
195
      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
196
                               " current groups are '%s', used to be '%s'" %
197
                               (utils.CommaJoin(expected_locks),
198
                                utils.CommaJoin(actual_locks)))
199

    
200
    self.node_data = self.cfg.GetAllNodesInfo()
201
    self.group = self.cfg.GetNodeGroup(self.group_uuid)
202
    instance_data = self.cfg.GetAllInstancesInfo()
203

    
204
    if self.group is None:
205
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
206
                               (self.op.group_name, self.group_uuid))
207

    
208
    (new_splits, previous_splits) = \
209
      self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
210
                                             for uuid in self.op.node_uuids],
211
                                            self.node_data, instance_data)
212

    
213
    if new_splits:
214
      fmt_new_splits = utils.CommaJoin(utils.NiceSort(
215
                         self.cfg.GetInstanceNames(new_splits)))
216

    
217
      if not self.op.force:
218
        raise errors.OpExecError("The following instances get split by this"
219
                                 " change and --force was not given: %s" %
220
                                 fmt_new_splits)
221
      else:
222
        self.LogWarning("This operation will split the following instances: %s",
223
                        fmt_new_splits)
224

    
225
        if previous_splits:
226
          self.LogWarning("In addition, these already-split instances continue"
227
                          " to be split across groups: %s",
228
                          utils.CommaJoin(utils.NiceSort(
229
                            self.cfg.GetInstanceNames(previous_splits))))
230

    
231
  def Exec(self, feedback_fn):
232
    """Assign nodes to a new group.
233

234
    """
235
    mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
236

    
237
    self.cfg.AssignGroupNodes(mods)
238

    
239
  @staticmethod
240
  def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
241
    """Check for split instances after a node assignment.
242

243
    This method considers a series of node assignments as an atomic operation,
244
    and returns information about split instances after applying the set of
245
    changes.
246

247
    In particular, it returns information about newly split instances, and
248
    instances that were already split, and remain so after the change.
249

250
    Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
251
    considered.
252

253
    @type changes: list of (node_uuid, new_group_uuid) pairs.
254
    @param changes: list of node assignments to consider.
255
    @param node_data: a dict with data for all nodes
256
    @param instance_data: a dict with all instances to consider
257
    @rtype: a two-tuple
258
    @return: a list of instances that were previously okay and result split as a
259
      consequence of this change, and a list of instances that were previously
260
      split and this change does not fix.
261

262
    """
263
    changed_nodes = dict((uuid, group) for uuid, group in changes
264
                         if node_data[uuid].group != group)
265

    
266
    all_split_instances = set()
267
    previously_split_instances = set()
268

    
269
    for inst in instance_data.values():
270
      if inst.disk_template not in constants.DTS_INT_MIRROR:
271
        continue
272

    
273
      if len(set(node_data[node_uuid].group
274
                 for node_uuid in inst.all_nodes)) > 1:
275
        previously_split_instances.add(inst.uuid)
276

    
277
      if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
278
                 for node_uuid in inst.all_nodes)) > 1:
279
        all_split_instances.add(inst.uuid)
280

    
281
    return (list(all_split_instances - previously_split_instances),
282
            list(previously_split_instances & all_split_instances))
283

    
284

    
285
class LUGroupSetParams(LogicalUnit):
286
  """Modifies the parameters of a node group.
287

288
  """
289
  HPATH = "group-modify"
290
  HTYPE = constants.HTYPE_GROUP
291
  REQ_BGL = False
292

    
293
  def CheckArguments(self):
294
    all_changes = [
295
      self.op.ndparams,
296
      self.op.diskparams,
297
      self.op.alloc_policy,
298
      self.op.hv_state,
299
      self.op.disk_state,
300
      self.op.ipolicy,
301
      ]
302

    
303
    if all_changes.count(None) == len(all_changes):
304
      raise errors.OpPrereqError("Please pass at least one modification",
305
                                 errors.ECODE_INVAL)
306

    
307
    if self.op.diskparams:
308
      CheckDiskAccessModeValidity(self.op.diskparams)
309

    
310
  def ExpandNames(self):
311
    # This raises errors.OpPrereqError on its own:
312
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
313

    
314
    self.needed_locks = {
315
      locking.LEVEL_INSTANCE: [],
316
      locking.LEVEL_NODEGROUP: [self.group_uuid],
317
      }
318

    
319
    self.share_locks[locking.LEVEL_INSTANCE] = 1
320

    
321
  def DeclareLocks(self, level):
322
    if level == locking.LEVEL_INSTANCE:
323
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
324

    
325
      # Lock instances optimistically, needs verification once group lock has
326
      # been acquired
327
      self.needed_locks[locking.LEVEL_INSTANCE] = \
328
        self.cfg.GetInstanceNames(
329
          self.cfg.GetNodeGroupInstances(self.group_uuid))
330

    
331
  @staticmethod
332
  def _UpdateAndVerifyDiskParams(old, new):
333
    """Updates and verifies disk parameters.
334

335
    """
336
    new_params = GetUpdatedParams(old, new)
337
    utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
338
    return new_params
339

    
340
  def _CheckIpolicy(self, cluster, owned_instance_names):
341
    """Sanity checks for the ipolicy.
342

343
    @type cluster: C{objects.Cluster}
344
    @param cluster: the cluster's configuration
345
    @type owned_instance_names: list of string
346
    @param owned_instance_names: list of instances
347

348
    """
349
    if self.op.ipolicy:
350
      self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy,
351
                                           self.op.ipolicy,
352
                                           group_policy=True)
353

    
354
      new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
355
      CheckIpolicyVsDiskTemplates(new_ipolicy,
356
                                  cluster.enabled_disk_templates)
357
      instances = \
358
        dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
359
      gmi = ganeti.masterd.instance
360
      violations = \
361
          ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
362
                                                                 self.group),
363
                                       new_ipolicy, instances.values(),
364
                                       self.cfg)
365

    
366
      if violations:
367
        self.LogWarning("After the ipolicy change the following instances"
368
                        " violate them: %s",
369
                        utils.CommaJoin(violations))
370

    
371
  def CheckPrereq(self):
372
    """Check prerequisites.
373

374
    """
375
    owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
376

    
377
    # Check if locked instances are still correct
378
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
379

    
380
    self.group = self.cfg.GetNodeGroup(self.group_uuid)
381
    cluster = self.cfg.GetClusterInfo()
382

    
383
    if self.group is None:
384
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
385
                               (self.op.group_name, self.group_uuid))
386

    
387
    if self.op.ndparams:
388
      new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
389
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
390
      self.new_ndparams = new_ndparams
391

    
392
    if self.op.diskparams:
393
      diskparams = self.group.diskparams
394
      uavdp = self._UpdateAndVerifyDiskParams
395
      # For each disktemplate subdict update and verify the values
396
      new_diskparams = dict((dt,
397
                             uavdp(diskparams.get(dt, {}),
398
                                   self.op.diskparams[dt]))
399
                            for dt in constants.DISK_TEMPLATES
400
                            if dt in self.op.diskparams)
401
      # As we've all subdicts of diskparams ready, lets merge the actual
402
      # dict with all updated subdicts
403
      self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
404

    
405
      try:
406
        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
407
        CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
408
                                       group=self.group)
409
      except errors.OpPrereqError, err:
410
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
411
                                   errors.ECODE_INVAL)
412

    
413
    if self.op.hv_state:
414
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
415
                                                self.group.hv_state_static)
416

    
417
    if self.op.disk_state:
418
      self.new_disk_state = \
419
        MergeAndVerifyDiskState(self.op.disk_state,
420
                                self.group.disk_state_static)
421

    
422
    self._CheckIpolicy(cluster, owned_instance_names)
423

    
424
  def BuildHooksEnv(self):
425
    """Build hooks env.
426

427
    """
428
    return {
429
      "GROUP_NAME": self.op.group_name,
430
      "NEW_ALLOC_POLICY": self.op.alloc_policy,
431
      }
432

    
433
  def BuildHooksNodes(self):
434
    """Build hooks nodes.
435

436
    """
437
    mn = self.cfg.GetMasterNode()
438
    return ([mn], [mn])
439

    
440
  def Exec(self, feedback_fn):
441
    """Modifies the node group.
442

443
    """
444
    result = []
445

    
446
    if self.op.ndparams:
447
      self.group.ndparams = self.new_ndparams
448
      result.append(("ndparams", str(self.group.ndparams)))
449

    
450
    if self.op.diskparams:
451
      self.group.diskparams = self.new_diskparams
452
      result.append(("diskparams", str(self.group.diskparams)))
453

    
454
    if self.op.alloc_policy:
455
      self.group.alloc_policy = self.op.alloc_policy
456

    
457
    if self.op.hv_state:
458
      self.group.hv_state_static = self.new_hv_state
459

    
460
    if self.op.disk_state:
461
      self.group.disk_state_static = self.new_disk_state
462

    
463
    if self.op.ipolicy:
464
      self.group.ipolicy = self.new_ipolicy
465

    
466
    self.cfg.Update(self.group, feedback_fn)
467
    return result
468

    
469

    
470
class LUGroupRemove(LogicalUnit):
471
  HPATH = "group-remove"
472
  HTYPE = constants.HTYPE_GROUP
473
  REQ_BGL = False
474

    
475
  def ExpandNames(self):
476
    # This will raises errors.OpPrereqError on its own:
477
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
478
    self.needed_locks = {
479
      locking.LEVEL_NODEGROUP: [self.group_uuid],
480
      }
481

    
482
  def CheckPrereq(self):
483
    """Check prerequisites.
484

485
    This checks that the given group name exists as a node group, that is
486
    empty (i.e., contains no nodes), and that is not the last group of the
487
    cluster.
488

489
    """
490
    # Verify that the group is empty.
491
    group_nodes = [node.uuid
492
                   for node in self.cfg.GetAllNodesInfo().values()
493
                   if node.group == self.group_uuid]
494

    
495
    if group_nodes:
496
      raise errors.OpPrereqError("Group '%s' not empty, has the following"
497
                                 " nodes: %s" %
498
                                 (self.op.group_name,
499
                                  utils.CommaJoin(utils.NiceSort(group_nodes))),
500
                                 errors.ECODE_STATE)
501

    
502
    # Verify the cluster would not be left group-less.
503
    if len(self.cfg.GetNodeGroupList()) == 1:
504
      raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
505
                                 " removed" % self.op.group_name,
506
                                 errors.ECODE_STATE)
507

    
508
  def BuildHooksEnv(self):
509
    """Build hooks env.
510

511
    """
512
    return {
513
      "GROUP_NAME": self.op.group_name,
514
      }
515

    
516
  def BuildHooksNodes(self):
517
    """Build hooks nodes.
518

519
    """
520
    mn = self.cfg.GetMasterNode()
521
    return ([mn], [mn])
522

    
523
  def Exec(self, feedback_fn):
524
    """Remove the node group.
525

526
    """
527
    try:
528
      self.cfg.RemoveNodeGroup(self.group_uuid)
529
    except errors.ConfigurationError:
530
      raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
531
                               (self.op.group_name, self.group_uuid))
532

    
533
    self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
534

    
535

    
536
class LUGroupRename(LogicalUnit):
537
  HPATH = "group-rename"
538
  HTYPE = constants.HTYPE_GROUP
539
  REQ_BGL = False
540

    
541
  def ExpandNames(self):
542
    # This raises errors.OpPrereqError on its own:
543
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
544

    
545
    self.needed_locks = {
546
      locking.LEVEL_NODEGROUP: [self.group_uuid],
547
      }
548

    
549
  def CheckPrereq(self):
550
    """Check prerequisites.
551

552
    Ensures requested new name is not yet used.
553

554
    """
555
    try:
556
      new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
557
    except errors.OpPrereqError:
558
      pass
559
    else:
560
      raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
561
                                 " node group (UUID: %s)" %
562
                                 (self.op.new_name, new_name_uuid),
563
                                 errors.ECODE_EXISTS)
564

    
565
  def BuildHooksEnv(self):
566
    """Build hooks env.
567

568
    """
569
    return {
570
      "OLD_NAME": self.op.group_name,
571
      "NEW_NAME": self.op.new_name,
572
      }
573

    
574
  def BuildHooksNodes(self):
575
    """Build hooks nodes.
576

577
    """
578
    mn = self.cfg.GetMasterNode()
579

    
580
    all_nodes = self.cfg.GetAllNodesInfo()
581
    all_nodes.pop(mn, None)
582

    
583
    run_nodes = [mn]
584
    run_nodes.extend(node.uuid for node in all_nodes.values()
585
                     if node.group == self.group_uuid)
586

    
587
    return (run_nodes, run_nodes)
588

    
589
  def Exec(self, feedback_fn):
590
    """Rename the node group.
591

592
    """
593
    group = self.cfg.GetNodeGroup(self.group_uuid)
594

    
595
    if group is None:
596
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
597
                               (self.op.group_name, self.group_uuid))
598

    
599
    group.name = self.op.new_name
600
    self.cfg.Update(group, feedback_fn)
601

    
602
    return self.op.new_name
603

    
604

    
605
class LUGroupEvacuate(LogicalUnit):
606
  HPATH = "group-evacuate"
607
  HTYPE = constants.HTYPE_GROUP
608
  REQ_BGL = False
609

    
610
  def ExpandNames(self):
611
    # This raises errors.OpPrereqError on its own:
612
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
613

    
614
    if self.op.target_groups:
615
      self.req_target_uuids = map(self.cfg.LookupNodeGroup,
616
                                  self.op.target_groups)
617
    else:
618
      self.req_target_uuids = []
619

    
620
    if self.group_uuid in self.req_target_uuids:
621
      raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
622
                                 " as a target group (targets are %s)" %
623
                                 (self.group_uuid,
624
                                  utils.CommaJoin(self.req_target_uuids)),
625
                                 errors.ECODE_INVAL)
626

    
627
    self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
628

    
629
    self.share_locks = ShareAll()
630
    self.needed_locks = {
631
      locking.LEVEL_INSTANCE: [],
632
      locking.LEVEL_NODEGROUP: [],
633
      locking.LEVEL_NODE: [],
634
      }
635

    
636
  def DeclareLocks(self, level):
637
    if level == locking.LEVEL_INSTANCE:
638
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
639

    
640
      # Lock instances optimistically, needs verification once node and group
641
      # locks have been acquired
642
      self.needed_locks[locking.LEVEL_INSTANCE] = \
643
        self.cfg.GetInstanceNames(
644
          self.cfg.GetNodeGroupInstances(self.group_uuid))
645

    
646
    elif level == locking.LEVEL_NODEGROUP:
647
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
648

    
649
      if self.req_target_uuids:
650
        lock_groups = set([self.group_uuid] + self.req_target_uuids)
651

    
652
        # Lock all groups used by instances optimistically; this requires going
653
        # via the node before it's locked, requiring verification later on
654
        lock_groups.update(group_uuid
655
                           for instance_name in
656
                             self.owned_locks(locking.LEVEL_INSTANCE)
657
                           for group_uuid in
658
                             self.cfg.GetInstanceNodeGroups(
659
                               self.cfg.GetInstanceInfoByName(instance_name)
660
                                 .uuid))
661
      else:
662
        # No target groups, need to lock all of them
663
        lock_groups = locking.ALL_SET
664

    
665
      self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
666

    
667
    elif level == locking.LEVEL_NODE:
668
      # This will only lock the nodes in the group to be evacuated which
669
      # contain actual instances
670
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
671
      self._LockInstancesNodes()
672

    
673
      # Lock all nodes in group to be evacuated and target groups
674
      owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
675
      assert self.group_uuid in owned_groups
676
      member_node_uuids = [node_uuid
677
                           for group in owned_groups
678
                           for node_uuid in
679
                             self.cfg.GetNodeGroup(group).members]
680
      self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
681

    
682
  def CheckPrereq(self):
683
    owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
684
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
685
    owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
686

    
687
    assert owned_groups.issuperset(self.req_target_uuids)
688
    assert self.group_uuid in owned_groups
689

    
690
    # Check if locked instances are still correct
691
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
692

    
693
    # Get instance information
694
    self.instances = \
695
      dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
696

    
697
    # Check if node groups for locked instances are still correct
698
    CheckInstancesNodeGroups(self.cfg, self.instances,
699
                             owned_groups, owned_node_uuids, self.group_uuid)
700

    
701
    if self.req_target_uuids:
702
      # User requested specific target groups
703
      self.target_uuids = self.req_target_uuids
704
    else:
705
      # All groups except the one to be evacuated are potential targets
706
      self.target_uuids = [group_uuid for group_uuid in owned_groups
707
                           if group_uuid != self.group_uuid]
708

    
709
      if not self.target_uuids:
710
        raise errors.OpPrereqError("There are no possible target groups",
711
                                   errors.ECODE_INVAL)
712

    
713
  def BuildHooksEnv(self):
714
    """Build hooks env.
715

716
    """
717
    return {
718
      "GROUP_NAME": self.op.group_name,
719
      "TARGET_GROUPS": " ".join(self.target_uuids),
720
      }
721

    
722
  def BuildHooksNodes(self):
723
    """Build hooks nodes.
724

725
    """
726
    mn = self.cfg.GetMasterNode()
727

    
728
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
729

    
730
    run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
731

    
732
    return (run_nodes, run_nodes)
733

    
734
  def Exec(self, feedback_fn):
735
    inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
736

    
737
    assert self.group_uuid not in self.target_uuids
738

    
739
    req = iallocator.IAReqGroupChange(instances=inst_names,
740
                                      target_groups=self.target_uuids)
741
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
742

    
743
    ial.Run(self.op.iallocator)
744

    
745
    if not ial.success:
746
      raise errors.OpPrereqError("Can't compute group evacuation using"
747
                                 " iallocator '%s': %s" %
748
                                 (self.op.iallocator, ial.info),
749
                                 errors.ECODE_NORES)
750

    
751
    jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
752

    
753
    self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
754
                 len(jobs), self.op.group_name)
755

    
756
    return ResultWithJobs(jobs)
757

    
758

    
759
class LUGroupVerifyDisks(NoHooksLU):
760
  """Verifies the status of all disks in a node group.
761

762
  """
763
  REQ_BGL = False
764

    
765
  def ExpandNames(self):
766
    # Raises errors.OpPrereqError on its own if group can't be found
767
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
768

    
769
    self.share_locks = ShareAll()
770
    self.needed_locks = {
771
      locking.LEVEL_INSTANCE: [],
772
      locking.LEVEL_NODEGROUP: [],
773
      locking.LEVEL_NODE: [],
774

    
775
      # This opcode is acquires all node locks in a group. LUClusterVerifyDisks
776
      # starts one instance of this opcode for every group, which means all
777
      # nodes will be locked for a short amount of time, so it's better to
778
      # acquire the node allocation lock as well.
779
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
780
      }
781

    
782
  def DeclareLocks(self, level):
783
    if level == locking.LEVEL_INSTANCE:
784
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
785

    
786
      # Lock instances optimistically, needs verification once node and group
787
      # locks have been acquired
788
      self.needed_locks[locking.LEVEL_INSTANCE] = \
789
        self.cfg.GetInstanceNames(
790
          self.cfg.GetNodeGroupInstances(self.group_uuid))
791

    
792
    elif level == locking.LEVEL_NODEGROUP:
793
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
794

    
795
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
796
        set([self.group_uuid] +
797
            # Lock all groups used by instances optimistically; this requires
798
            # going via the node before it's locked, requiring verification
799
            # later on
800
            [group_uuid
801
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
802
             for group_uuid in
803
               self.cfg.GetInstanceNodeGroups(
804
                 self.cfg.GetInstanceInfoByName(instance_name).uuid)])
805

    
806
    elif level == locking.LEVEL_NODE:
807
      # This will only lock the nodes in the group to be verified which contain
808
      # actual instances
809
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
810
      self._LockInstancesNodes()
811

    
812
      # Lock all nodes in group to be verified
813
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
814
      member_node_uuids = self.cfg.GetNodeGroup(self.group_uuid).members
815
      self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
816

    
817
  def CheckPrereq(self):
818
    owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
819
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
820
    owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
821

    
822
    assert self.group_uuid in owned_groups
823

    
824
    # Check if locked instances are still correct
825
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names)
826

    
827
    # Get instance information
828
    self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names))
829

    
830
    # Check if node groups for locked instances are still correct
831
    CheckInstancesNodeGroups(self.cfg, self.instances,
832
                             owned_groups, owned_node_uuids, self.group_uuid)
833

    
834
  def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
835
                         missing_disks):
836
    node_lv_to_inst = MapInstanceLvsToNodes(
837
      [inst for inst in self.instances.values() if inst.disks_active])
838
    if node_lv_to_inst:
839
      node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
840
                                  set(self.cfg.GetVmCapableNodeList()))
841

    
842
      node_lvs = self.rpc.call_lv_list(node_uuids, [])
843

    
844
      for (node_uuid, node_res) in node_lvs.items():
845
        if node_res.offline:
846
          continue
847

    
848
        msg = node_res.fail_msg
849
        if msg:
850
          logging.warning("Error enumerating LVs on node %s: %s",
851
                          self.cfg.GetNodeName(node_uuid), msg)
852
          node_errors[node_uuid] = msg
853
          continue
854

    
855
        for lv_name, (_, _, lv_online) in node_res.payload.items():
856
          inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
857
          if not lv_online and inst is not None:
858
            offline_disk_instance_names.add(inst.name)
859

    
860
      # any leftover items in nv_dict are missing LVs, let's arrange the data
861
      # better
862
      for key, inst in node_lv_to_inst.iteritems():
863
        missing_disks.setdefault(inst.name, []).append(list(key))
864

    
865
  def _VerifyDrbdStates(self, node_errors, offline_disk_instance_names):
866
    node_to_inst = {}
867
    for inst in self.instances.values():
868
      if not inst.disks_active or inst.disk_template != constants.DT_DRBD8:
869
        continue
870

    
871
      for node_uuid in itertools.chain([inst.primary_node],
872
                                       inst.secondary_nodes):
873
        node_to_inst.setdefault(node_uuid, []).append(inst)
874

    
875
    for (node_uuid, insts) in node_to_inst.items():
876
      node_disks = [(inst.disks, inst) for inst in insts]
877
      node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
878
      msg = node_res.fail_msg
879
      if msg:
880
        logging.warning("Error getting DRBD status on node %s: %s",
881
                        self.cfg.GetNodeName(node_uuid), msg)
882
        node_errors[node_uuid] = msg
883
        continue
884

    
885
      faulty_disk_uuids = set(node_res.payload)
886
      for inst in self.instances.values():
887
        inst_disk_uuids = set([disk.uuid for disk in inst.disks])
888
        if inst_disk_uuids.intersection(faulty_disk_uuids):
889
          offline_disk_instance_names.add(inst.name)
890

    
891
  def Exec(self, feedback_fn):
892
    """Verify integrity of cluster disks.
893

894
    @rtype: tuple of three items
895
    @return: a tuple of (dict of node-to-node_error, list of instances
896
        which need activate-disks, dict of instance: (node, volume) for
897
        missing volumes
898

899
    """
900
    node_errors = {}
901
    offline_disk_instance_names = set()
902
    missing_disks = {}
903

    
904
    self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
905
                            missing_disks)
906
    self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
907

    
908
    return (node_errors, list(offline_disk_instance_names), missing_disks)