Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / group.py @ 96431562

History | View | Annotate | Download (31.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with node groups."""
23

    
24
import itertools
25
import logging
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import objects
31
from ganeti import query
32
from ganeti import utils
33
from ganeti.masterd import iallocator
34
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
35
  ResultWithJobs
36
from ganeti.cmdlib.common import MergeAndVerifyHvState, \
37
  MergeAndVerifyDiskState, GetWantedNodes, GetUpdatedParams, \
38
  CheckNodeGroupInstances, GetUpdatedIPolicy, \
39
  ComputeNewInstanceViolations, GetDefaultIAllocator, ShareAll, \
40
  CheckInstancesNodeGroups, LoadNodeEvacResult, MapInstanceLvsToNodes, \
41
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
42
  CheckDiskAccessModeConsistency
43

    
44
import ganeti.masterd.instance
45

    
46

    
47
class LUGroupAdd(LogicalUnit):
48
  """Logical unit for creating node groups.
49

50
  """
51
  HPATH = "group-add"
52
  HTYPE = constants.HTYPE_GROUP
53
  REQ_BGL = False
54

    
55
  def ExpandNames(self):
56
    # We need the new group's UUID here so that we can create and acquire the
57
    # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup
58
    # that it should not check whether the UUID exists in the configuration.
59
    self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
60
    self.needed_locks = {}
61
    self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
62

    
63
  def _CheckIpolicy(self):
64
    """Checks the group's ipolicy for consistency and validity.
65

66
    """
67
    if self.op.ipolicy:
68
      cluster = self.cfg.GetClusterInfo()
69
      full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
70
      try:
71
        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
72
      except errors.ConfigurationError, err:
73
        raise errors.OpPrereqError("Invalid instance policy: %s" % err,
74
                                   errors.ECODE_INVAL)
75
      CheckIpolicyVsDiskTemplates(full_ipolicy,
76
                                  cluster.enabled_disk_templates)
77

    
78
  def CheckPrereq(self):
79
    """Check prerequisites.
80

81
    This checks that the given group name is not an existing node group
82
    already.
83

84
    """
85
    try:
86
      existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
87
    except errors.OpPrereqError:
88
      pass
89
    else:
90
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
91
                                 " node group (UUID: %s)" %
92
                                 (self.op.group_name, existing_uuid),
93
                                 errors.ECODE_EXISTS)
94

    
95
    if self.op.ndparams:
96
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
97

    
98
    if self.op.hv_state:
99
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
100
    else:
101
      self.new_hv_state = None
102

    
103
    if self.op.disk_state:
104
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
105
    else:
106
      self.new_disk_state = None
107

    
108
    if self.op.diskparams:
109
      for templ in constants.DISK_TEMPLATES:
110
        if templ in self.op.diskparams:
111
          utils.ForceDictType(self.op.diskparams[templ],
112
                              constants.DISK_DT_TYPES)
113
      self.new_diskparams = self.op.diskparams
114
      try:
115
        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
116
      except errors.OpPrereqError, err:
117
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
118
                                   errors.ECODE_INVAL)
119
    else:
120
      self.new_diskparams = {}
121

    
122
    self._CheckIpolicy()
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "GROUP_NAME": self.op.group_name,
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    mn = self.cfg.GetMasterNode()
137
    return ([mn], [mn])
138

    
139
  def Exec(self, feedback_fn):
140
    """Add the node group to the cluster.
141

142
    """
143
    group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
144
                                  uuid=self.group_uuid,
145
                                  alloc_policy=self.op.alloc_policy,
146
                                  ndparams=self.op.ndparams,
147
                                  diskparams=self.new_diskparams,
148
                                  ipolicy=self.op.ipolicy,
149
                                  hv_state_static=self.new_hv_state,
150
                                  disk_state_static=self.new_disk_state)
151

    
152
    self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
153
    del self.remove_locks[locking.LEVEL_NODEGROUP]
154

    
155

    
156
class LUGroupAssignNodes(NoHooksLU):
157
  """Logical unit for assigning nodes to groups.
158

159
  """
160
  REQ_BGL = False
161

    
162
  def ExpandNames(self):
163
    # These raise errors.OpPrereqError on their own:
164
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
165
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
166

    
167
    # We want to lock all the affected nodes and groups. We have readily
168
    # available the list of nodes, and the *destination* group. To gather the
169
    # list of "source" groups, we need to fetch node information later on.
170
    self.needed_locks = {
171
      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
172
      locking.LEVEL_NODE: self.op.node_uuids,
173
      }
174

    
175
  def DeclareLocks(self, level):
176
    if level == locking.LEVEL_NODEGROUP:
177
      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
178

    
179
      # Try to get all affected nodes' groups without having the group or node
180
      # lock yet. Needs verification later in the code flow.
181
      groups = self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids)
182

    
183
      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
184

    
185
  def CheckPrereq(self):
186
    """Check prerequisites.
187

188
    """
189
    assert self.needed_locks[locking.LEVEL_NODEGROUP]
190
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
191
            frozenset(self.op.node_uuids))
192

    
193
    expected_locks = (set([self.group_uuid]) |
194
                      self.cfg.GetNodeGroupsFromNodes(self.op.node_uuids))
195
    actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP)
196
    if actual_locks != expected_locks:
197
      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
198
                               " current groups are '%s', used to be '%s'" %
199
                               (utils.CommaJoin(expected_locks),
200
                                utils.CommaJoin(actual_locks)))
201

    
202
    self.node_data = self.cfg.GetAllNodesInfo()
203
    self.group = self.cfg.GetNodeGroup(self.group_uuid)
204
    instance_data = self.cfg.GetAllInstancesInfo()
205

    
206
    if self.group is None:
207
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
208
                               (self.op.group_name, self.group_uuid))
209

    
210
    (new_splits, previous_splits) = \
211
      self.CheckAssignmentForSplitInstances([(uuid, self.group_uuid)
212
                                             for uuid in self.op.node_uuids],
213
                                            self.node_data, instance_data)
214

    
215
    if new_splits:
216
      fmt_new_splits = utils.CommaJoin(utils.NiceSort(
217
                         self.cfg.GetInstanceNames(new_splits)))
218

    
219
      if not self.op.force:
220
        raise errors.OpExecError("The following instances get split by this"
221
                                 " change and --force was not given: %s" %
222
                                 fmt_new_splits)
223
      else:
224
        self.LogWarning("This operation will split the following instances: %s",
225
                        fmt_new_splits)
226

    
227
        if previous_splits:
228
          self.LogWarning("In addition, these already-split instances continue"
229
                          " to be split across groups: %s",
230
                          utils.CommaJoin(utils.NiceSort(
231
                            self.cfg.GetInstanceNames(previous_splits))))
232

    
233
  def Exec(self, feedback_fn):
234
    """Assign nodes to a new group.
235

236
    """
237
    mods = [(node_uuid, self.group_uuid) for node_uuid in self.op.node_uuids]
238

    
239
    self.cfg.AssignGroupNodes(mods)
240

    
241
  @staticmethod
242
  def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
243
    """Check for split instances after a node assignment.
244

245
    This method considers a series of node assignments as an atomic operation,
246
    and returns information about split instances after applying the set of
247
    changes.
248

249
    In particular, it returns information about newly split instances, and
250
    instances that were already split, and remain so after the change.
251

252
    Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
253
    considered.
254

255
    @type changes: list of (node_uuid, new_group_uuid) pairs.
256
    @param changes: list of node assignments to consider.
257
    @param node_data: a dict with data for all nodes
258
    @param instance_data: a dict with all instances to consider
259
    @rtype: a two-tuple
260
    @return: a list of instances that were previously okay and result split as a
261
      consequence of this change, and a list of instances that were previously
262
      split and this change does not fix.
263

264
    """
265
    changed_nodes = dict((uuid, group) for uuid, group in changes
266
                         if node_data[uuid].group != group)
267

    
268
    all_split_instances = set()
269
    previously_split_instances = set()
270

    
271
    for inst in instance_data.values():
272
      if inst.disk_template not in constants.DTS_INT_MIRROR:
273
        continue
274

    
275
      if len(set(node_data[node_uuid].group
276
                 for node_uuid in inst.all_nodes)) > 1:
277
        previously_split_instances.add(inst.uuid)
278

    
279
      if len(set(changed_nodes.get(node_uuid, node_data[node_uuid].group)
280
                 for node_uuid in inst.all_nodes)) > 1:
281
        all_split_instances.add(inst.uuid)
282

    
283
    return (list(all_split_instances - previously_split_instances),
284
            list(previously_split_instances & all_split_instances))
285

    
286

    
287
class GroupQuery(QueryBase):
288
  FIELDS = query.GROUP_FIELDS
289

    
290
  def ExpandNames(self, lu):
291
    raise NotImplementedError
292

    
293
  def DeclareLocks(self, lu, level):
294
    pass
295

    
296
  def _GetQueryData(self, lu):
297
    raise NotImplementedError
298

    
299

    
300
class LUGroupQuery(NoHooksLU):
301
  """Logical unit for querying node groups.
302

303
  """
304
  REQ_BGL = False
305

    
306
  def CheckArguments(self):
307
    raise NotImplementedError
308

    
309
  def ExpandNames(self):
310
    raise NotImplementedError
311

    
312
  def DeclareLocks(self, level):
313
    raise NotImplementedError
314

    
315
  def Exec(self, feedback_fn):
316
    raise NotImplementedError
317

    
318

    
319
class LUGroupSetParams(LogicalUnit):
320
  """Modifies the parameters of a node group.
321

322
  """
323
  HPATH = "group-modify"
324
  HTYPE = constants.HTYPE_GROUP
325
  REQ_BGL = False
326

    
327
  def CheckArguments(self):
328
    all_changes = [
329
      self.op.ndparams,
330
      self.op.diskparams,
331
      self.op.alloc_policy,
332
      self.op.hv_state,
333
      self.op.disk_state,
334
      self.op.ipolicy,
335
      ]
336

    
337
    if all_changes.count(None) == len(all_changes):
338
      raise errors.OpPrereqError("Please pass at least one modification",
339
                                 errors.ECODE_INVAL)
340

    
341
    if self.op.diskparams:
342
      CheckDiskAccessModeValidity(self.op.diskparams)
343

    
344
  def ExpandNames(self):
345
    # This raises errors.OpPrereqError on its own:
346
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
347

    
348
    self.needed_locks = {
349
      locking.LEVEL_INSTANCE: [],
350
      locking.LEVEL_NODEGROUP: [self.group_uuid],
351
      }
352

    
353
    self.share_locks[locking.LEVEL_INSTANCE] = 1
354

    
355
  def DeclareLocks(self, level):
356
    if level == locking.LEVEL_INSTANCE:
357
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
358

    
359
      # Lock instances optimistically, needs verification once group lock has
360
      # been acquired
361
      self.needed_locks[locking.LEVEL_INSTANCE] = \
362
        self.cfg.GetInstanceNames(
363
          self.cfg.GetNodeGroupInstances(self.group_uuid))
364

    
365
  @staticmethod
366
  def _UpdateAndVerifyDiskParams(old, new):
367
    """Updates and verifies disk parameters.
368

369
    """
370
    new_params = GetUpdatedParams(old, new)
371
    utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
372
    return new_params
373

    
374
  def _CheckIpolicy(self, cluster, owned_instance_names):
375
    """Sanity checks for the ipolicy.
376

377
    @type cluster: C{objects.Cluster}
378
    @param cluster: the cluster's configuration
379
    @type owned_instance_names: list of string
380
    @param owned_instance_names: list of instances
381

382
    """
383
    if self.op.ipolicy:
384
      self.new_ipolicy = GetUpdatedIPolicy(self.group.ipolicy,
385
                                           self.op.ipolicy,
386
                                           group_policy=True)
387

    
388
      new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
389
      CheckIpolicyVsDiskTemplates(new_ipolicy,
390
                                  cluster.enabled_disk_templates)
391
      instances = \
392
        dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
393
      gmi = ganeti.masterd.instance
394
      violations = \
395
          ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
396
                                                                 self.group),
397
                                       new_ipolicy, instances.values(),
398
                                       self.cfg)
399

    
400
      if violations:
401
        self.LogWarning("After the ipolicy change the following instances"
402
                        " violate them: %s",
403
                        utils.CommaJoin(violations))
404

    
405
  def CheckPrereq(self):
406
    """Check prerequisites.
407

408
    """
409
    owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
410

    
411
    # Check if locked instances are still correct
412
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
413

    
414
    self.group = self.cfg.GetNodeGroup(self.group_uuid)
415
    cluster = self.cfg.GetClusterInfo()
416

    
417
    if self.group is None:
418
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
419
                               (self.op.group_name, self.group_uuid))
420

    
421
    if self.op.ndparams:
422
      new_ndparams = GetUpdatedParams(self.group.ndparams, self.op.ndparams)
423
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
424
      self.new_ndparams = new_ndparams
425

    
426
    if self.op.diskparams:
427
      diskparams = self.group.diskparams
428
      uavdp = self._UpdateAndVerifyDiskParams
429
      # For each disktemplate subdict update and verify the values
430
      new_diskparams = dict((dt,
431
                             uavdp(diskparams.get(dt, {}),
432
                                   self.op.diskparams[dt]))
433
                            for dt in constants.DISK_TEMPLATES
434
                            if dt in self.op.diskparams)
435
      # As we've all subdicts of diskparams ready, lets merge the actual
436
      # dict with all updated subdicts
437
      self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
438

    
439
      try:
440
        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
441
        CheckDiskAccessModeConsistency(self.new_diskparams, self.cfg,
442
                                       group=self.group)
443
      except errors.OpPrereqError, err:
444
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
445
                                   errors.ECODE_INVAL)
446

    
447
    if self.op.hv_state:
448
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
449
                                                self.group.hv_state_static)
450

    
451
    if self.op.disk_state:
452
      self.new_disk_state = \
453
        MergeAndVerifyDiskState(self.op.disk_state,
454
                                self.group.disk_state_static)
455

    
456
    self._CheckIpolicy(cluster, owned_instance_names)
457

    
458
  def BuildHooksEnv(self):
459
    """Build hooks env.
460

461
    """
462
    return {
463
      "GROUP_NAME": self.op.group_name,
464
      "NEW_ALLOC_POLICY": self.op.alloc_policy,
465
      }
466

    
467
  def BuildHooksNodes(self):
468
    """Build hooks nodes.
469

470
    """
471
    mn = self.cfg.GetMasterNode()
472
    return ([mn], [mn])
473

    
474
  def Exec(self, feedback_fn):
475
    """Modifies the node group.
476

477
    """
478
    result = []
479

    
480
    if self.op.ndparams:
481
      self.group.ndparams = self.new_ndparams
482
      result.append(("ndparams", str(self.group.ndparams)))
483

    
484
    if self.op.diskparams:
485
      self.group.diskparams = self.new_diskparams
486
      result.append(("diskparams", str(self.group.diskparams)))
487

    
488
    if self.op.alloc_policy:
489
      self.group.alloc_policy = self.op.alloc_policy
490

    
491
    if self.op.hv_state:
492
      self.group.hv_state_static = self.new_hv_state
493

    
494
    if self.op.disk_state:
495
      self.group.disk_state_static = self.new_disk_state
496

    
497
    if self.op.ipolicy:
498
      self.group.ipolicy = self.new_ipolicy
499

    
500
    self.cfg.Update(self.group, feedback_fn)
501
    return result
502

    
503

    
504
class LUGroupRemove(LogicalUnit):
505
  HPATH = "group-remove"
506
  HTYPE = constants.HTYPE_GROUP
507
  REQ_BGL = False
508

    
509
  def ExpandNames(self):
510
    # This will raises errors.OpPrereqError on its own:
511
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
512
    self.needed_locks = {
513
      locking.LEVEL_NODEGROUP: [self.group_uuid],
514
      }
515

    
516
  def CheckPrereq(self):
517
    """Check prerequisites.
518

519
    This checks that the given group name exists as a node group, that is
520
    empty (i.e., contains no nodes), and that is not the last group of the
521
    cluster.
522

523
    """
524
    # Verify that the group is empty.
525
    group_nodes = [node.uuid
526
                   for node in self.cfg.GetAllNodesInfo().values()
527
                   if node.group == self.group_uuid]
528

    
529
    if group_nodes:
530
      raise errors.OpPrereqError("Group '%s' not empty, has the following"
531
                                 " nodes: %s" %
532
                                 (self.op.group_name,
533
                                  utils.CommaJoin(utils.NiceSort(group_nodes))),
534
                                 errors.ECODE_STATE)
535

    
536
    # Verify the cluster would not be left group-less.
537
    if len(self.cfg.GetNodeGroupList()) == 1:
538
      raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
539
                                 " removed" % self.op.group_name,
540
                                 errors.ECODE_STATE)
541

    
542
  def BuildHooksEnv(self):
543
    """Build hooks env.
544

545
    """
546
    return {
547
      "GROUP_NAME": self.op.group_name,
548
      }
549

    
550
  def BuildHooksNodes(self):
551
    """Build hooks nodes.
552

553
    """
554
    mn = self.cfg.GetMasterNode()
555
    return ([mn], [mn])
556

    
557
  def Exec(self, feedback_fn):
558
    """Remove the node group.
559

560
    """
561
    try:
562
      self.cfg.RemoveNodeGroup(self.group_uuid)
563
    except errors.ConfigurationError:
564
      raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
565
                               (self.op.group_name, self.group_uuid))
566

    
567
    self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
568

    
569

    
570
class LUGroupRename(LogicalUnit):
571
  HPATH = "group-rename"
572
  HTYPE = constants.HTYPE_GROUP
573
  REQ_BGL = False
574

    
575
  def ExpandNames(self):
576
    # This raises errors.OpPrereqError on its own:
577
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
578

    
579
    self.needed_locks = {
580
      locking.LEVEL_NODEGROUP: [self.group_uuid],
581
      }
582

    
583
  def CheckPrereq(self):
584
    """Check prerequisites.
585

586
    Ensures requested new name is not yet used.
587

588
    """
589
    try:
590
      new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
591
    except errors.OpPrereqError:
592
      pass
593
    else:
594
      raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
595
                                 " node group (UUID: %s)" %
596
                                 (self.op.new_name, new_name_uuid),
597
                                 errors.ECODE_EXISTS)
598

    
599
  def BuildHooksEnv(self):
600
    """Build hooks env.
601

602
    """
603
    return {
604
      "OLD_NAME": self.op.group_name,
605
      "NEW_NAME": self.op.new_name,
606
      }
607

    
608
  def BuildHooksNodes(self):
609
    """Build hooks nodes.
610

611
    """
612
    mn = self.cfg.GetMasterNode()
613

    
614
    all_nodes = self.cfg.GetAllNodesInfo()
615
    all_nodes.pop(mn, None)
616

    
617
    run_nodes = [mn]
618
    run_nodes.extend(node.uuid for node in all_nodes.values()
619
                     if node.group == self.group_uuid)
620

    
621
    return (run_nodes, run_nodes)
622

    
623
  def Exec(self, feedback_fn):
624
    """Rename the node group.
625

626
    """
627
    group = self.cfg.GetNodeGroup(self.group_uuid)
628

    
629
    if group is None:
630
      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
631
                               (self.op.group_name, self.group_uuid))
632

    
633
    group.name = self.op.new_name
634
    self.cfg.Update(group, feedback_fn)
635

    
636
    return self.op.new_name
637

    
638

    
639
class LUGroupEvacuate(LogicalUnit):
640
  HPATH = "group-evacuate"
641
  HTYPE = constants.HTYPE_GROUP
642
  REQ_BGL = False
643

    
644
  def ExpandNames(self):
645
    # This raises errors.OpPrereqError on its own:
646
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
647

    
648
    if self.op.target_groups:
649
      self.req_target_uuids = map(self.cfg.LookupNodeGroup,
650
                                  self.op.target_groups)
651
    else:
652
      self.req_target_uuids = []
653

    
654
    if self.group_uuid in self.req_target_uuids:
655
      raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
656
                                 " as a target group (targets are %s)" %
657
                                 (self.group_uuid,
658
                                  utils.CommaJoin(self.req_target_uuids)),
659
                                 errors.ECODE_INVAL)
660

    
661
    self.op.iallocator = GetDefaultIAllocator(self.cfg, self.op.iallocator)
662

    
663
    self.share_locks = ShareAll()
664
    self.needed_locks = {
665
      locking.LEVEL_INSTANCE: [],
666
      locking.LEVEL_NODEGROUP: [],
667
      locking.LEVEL_NODE: [],
668
      }
669

    
670
  def DeclareLocks(self, level):
671
    if level == locking.LEVEL_INSTANCE:
672
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
673

    
674
      # Lock instances optimistically, needs verification once node and group
675
      # locks have been acquired
676
      self.needed_locks[locking.LEVEL_INSTANCE] = \
677
        self.cfg.GetInstanceNames(
678
          self.cfg.GetNodeGroupInstances(self.group_uuid))
679

    
680
    elif level == locking.LEVEL_NODEGROUP:
681
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
682

    
683
      if self.req_target_uuids:
684
        lock_groups = set([self.group_uuid] + self.req_target_uuids)
685

    
686
        # Lock all groups used by instances optimistically; this requires going
687
        # via the node before it's locked, requiring verification later on
688
        lock_groups.update(group_uuid
689
                           for instance_name in
690
                             self.owned_locks(locking.LEVEL_INSTANCE)
691
                           for group_uuid in
692
                             self.cfg.GetInstanceNodeGroups(
693
                               self.cfg.GetInstanceInfoByName(instance_name)
694
                                 .uuid))
695
      else:
696
        # No target groups, need to lock all of them
697
        lock_groups = locking.ALL_SET
698

    
699
      self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
700

    
701
    elif level == locking.LEVEL_NODE:
702
      # This will only lock the nodes in the group to be evacuated which
703
      # contain actual instances
704
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
705
      self._LockInstancesNodes()
706

    
707
      # Lock all nodes in group to be evacuated and target groups
708
      owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
709
      assert self.group_uuid in owned_groups
710
      member_node_uuids = [node_uuid
711
                           for group in owned_groups
712
                           for node_uuid in
713
                             self.cfg.GetNodeGroup(group).members]
714
      self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
715

    
716
  def CheckPrereq(self):
717
    owned_instance_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
718
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
719
    owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
720

    
721
    assert owned_groups.issuperset(self.req_target_uuids)
722
    assert self.group_uuid in owned_groups
723

    
724
    # Check if locked instances are still correct
725
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instance_names)
726

    
727
    # Get instance information
728
    self.instances = \
729
      dict(self.cfg.GetMultiInstanceInfoByName(owned_instance_names))
730

    
731
    # Check if node groups for locked instances are still correct
732
    CheckInstancesNodeGroups(self.cfg, self.instances,
733
                             owned_groups, owned_node_uuids, self.group_uuid)
734

    
735
    if self.req_target_uuids:
736
      # User requested specific target groups
737
      self.target_uuids = self.req_target_uuids
738
    else:
739
      # All groups except the one to be evacuated are potential targets
740
      self.target_uuids = [group_uuid for group_uuid in owned_groups
741
                           if group_uuid != self.group_uuid]
742

    
743
      if not self.target_uuids:
744
        raise errors.OpPrereqError("There are no possible target groups",
745
                                   errors.ECODE_INVAL)
746

    
747
  def BuildHooksEnv(self):
748
    """Build hooks env.
749

750
    """
751
    return {
752
      "GROUP_NAME": self.op.group_name,
753
      "TARGET_GROUPS": " ".join(self.target_uuids),
754
      }
755

    
756
  def BuildHooksNodes(self):
757
    """Build hooks nodes.
758

759
    """
760
    mn = self.cfg.GetMasterNode()
761

    
762
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
763

    
764
    run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
765

    
766
    return (run_nodes, run_nodes)
767

    
768
  def Exec(self, feedback_fn):
769
    inst_names = list(self.owned_locks(locking.LEVEL_INSTANCE))
770

    
771
    assert self.group_uuid not in self.target_uuids
772

    
773
    req = iallocator.IAReqGroupChange(instances=inst_names,
774
                                      target_groups=self.target_uuids)
775
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
776

    
777
    ial.Run(self.op.iallocator)
778

    
779
    if not ial.success:
780
      raise errors.OpPrereqError("Can't compute group evacuation using"
781
                                 " iallocator '%s': %s" %
782
                                 (self.op.iallocator, ial.info),
783
                                 errors.ECODE_NORES)
784

    
785
    jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
786

    
787
    self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
788
                 len(jobs), self.op.group_name)
789

    
790
    return ResultWithJobs(jobs)
791

    
792

    
793
class LUGroupVerifyDisks(NoHooksLU):
794
  """Verifies the status of all disks in a node group.
795

796
  """
797
  REQ_BGL = False
798

    
799
  def ExpandNames(self):
800
    # Raises errors.OpPrereqError on its own if group can't be found
801
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
802

    
803
    self.share_locks = ShareAll()
804
    self.needed_locks = {
805
      locking.LEVEL_INSTANCE: [],
806
      locking.LEVEL_NODEGROUP: [],
807
      locking.LEVEL_NODE: [],
808

    
809
      # This opcode is acquires all node locks in a group. LUClusterVerifyDisks
810
      # starts one instance of this opcode for every group, which means all
811
      # nodes will be locked for a short amount of time, so it's better to
812
      # acquire the node allocation lock as well.
813
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
814
      }
815

    
816
  def DeclareLocks(self, level):
817
    if level == locking.LEVEL_INSTANCE:
818
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
819

    
820
      # Lock instances optimistically, needs verification once node and group
821
      # locks have been acquired
822
      self.needed_locks[locking.LEVEL_INSTANCE] = \
823
        self.cfg.GetInstanceNames(
824
          self.cfg.GetNodeGroupInstances(self.group_uuid))
825

    
826
    elif level == locking.LEVEL_NODEGROUP:
827
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
828

    
829
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
830
        set([self.group_uuid] +
831
            # Lock all groups used by instances optimistically; this requires
832
            # going via the node before it's locked, requiring verification
833
            # later on
834
            [group_uuid
835
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
836
             for group_uuid in
837
               self.cfg.GetInstanceNodeGroups(
838
                 self.cfg.GetInstanceInfoByName(instance_name).uuid)])
839

    
840
    elif level == locking.LEVEL_NODE:
841
      # This will only lock the nodes in the group to be verified which contain
842
      # actual instances
843
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
844
      self._LockInstancesNodes()
845

    
846
      # Lock all nodes in group to be verified
847
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
848
      member_node_uuids = self.cfg.GetNodeGroup(self.group_uuid).members
849
      self.needed_locks[locking.LEVEL_NODE].extend(member_node_uuids)
850

    
851
  def CheckPrereq(self):
852
    owned_inst_names = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
853
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
854
    owned_node_uuids = frozenset(self.owned_locks(locking.LEVEL_NODE))
855

    
856
    assert self.group_uuid in owned_groups
857

    
858
    # Check if locked instances are still correct
859
    CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_inst_names)
860

    
861
    # Get instance information
862
    self.instances = dict(self.cfg.GetMultiInstanceInfoByName(owned_inst_names))
863

    
864
    # Check if node groups for locked instances are still correct
865
    CheckInstancesNodeGroups(self.cfg, self.instances,
866
                             owned_groups, owned_node_uuids, self.group_uuid)
867

    
868
  def _VerifyInstanceLvs(self, node_errors, offline_disk_instance_names,
869
                         missing_disks):
870
    node_lv_to_inst = MapInstanceLvsToNodes(
871
      [inst for inst in self.instances.values() if inst.disks_active])
872
    if node_lv_to_inst:
873
      node_uuids = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
874
                                  set(self.cfg.GetVmCapableNodeList()))
875

    
876
      node_lvs = self.rpc.call_lv_list(node_uuids, [])
877

    
878
      for (node_uuid, node_res) in node_lvs.items():
879
        if node_res.offline:
880
          continue
881

    
882
        msg = node_res.fail_msg
883
        if msg:
884
          logging.warning("Error enumerating LVs on node %s: %s",
885
                          self.cfg.GetNodeName(node_uuid), msg)
886
          node_errors[node_uuid] = msg
887
          continue
888

    
889
        for lv_name, (_, _, lv_online) in node_res.payload.items():
890
          inst = node_lv_to_inst.pop((node_uuid, lv_name), None)
891
          if not lv_online and inst is not None:
892
            offline_disk_instance_names.add(inst.name)
893

    
894
      # any leftover items in nv_dict are missing LVs, let's arrange the data
895
      # better
896
      for key, inst in node_lv_to_inst.iteritems():
897
        missing_disks.setdefault(inst.name, []).append(list(key))
898

    
899
  def _VerifyDrbdStates(self, node_errors, offline_disk_instance_names):
900
    node_to_inst = {}
901
    for inst in self.instances.values():
902
      if not inst.disks_active or inst.disk_template != constants.DT_DRBD8:
903
        continue
904

    
905
      for node_uuid in itertools.chain([inst.primary_node],
906
                                       inst.secondary_nodes):
907
        node_to_inst.setdefault(node_uuid, []).append(inst)
908

    
909
    for (node_uuid, insts) in node_to_inst.items():
910
      node_disks = [(inst.disks, inst) for inst in insts]
911
      node_res = self.rpc.call_drbd_needs_activation(node_uuid, node_disks)
912
      msg = node_res.fail_msg
913
      if msg:
914
        logging.warning("Error getting DRBD status on node %s: %s",
915
                        self.cfg.GetNodeName(node_uuid), msg)
916
        node_errors[node_uuid] = msg
917
        continue
918

    
919
      faulty_disk_uuids = set(node_res.payload)
920
      for inst in self.instances.values():
921
        inst_disk_uuids = set([disk.uuid for disk in inst.disks])
922
        if inst_disk_uuids.intersection(faulty_disk_uuids):
923
          offline_disk_instance_names.add(inst.name)
924

    
925
  def Exec(self, feedback_fn):
926
    """Verify integrity of cluster disks.
927

928
    @rtype: tuple of three items
929
    @return: a tuple of (dict of node-to-node_error, list of instances
930
        which need activate-disks, dict of instance: (node, volume) for
931
        missing volumes
932

933
    """
934
    node_errors = {}
935
    offline_disk_instance_names = set()
936
    missing_disks = {}
937

    
938
    self._VerifyInstanceLvs(node_errors, offline_disk_instance_names,
939
                            missing_disks)
940
    self._VerifyDrbdStates(node_errors, offline_disk_instance_names)
941

    
942
    return (node_errors, list(offline_disk_instance_names), missing_disks)