Revision 763ad5be lib/cmdlib/instance.py

b/lib/cmdlib/instance.py
55 55
  _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56 56
  _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57 57
  _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58
  _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \
59
  _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \
58
  _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
60 59
  _CheckInstanceState, _ExpandNodeName
61
from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
62
  _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
63
  _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \
64
  _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \
65
  _RemoveInstance, _ExpandCheckDisks
60
from ganeti.cmdlib.instance_storage import _CreateDisks, \
61
  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
62
  _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63
  _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64
  _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65
  _AssembleInstanceDisks, _ExpandCheckDisks
66
from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67
  _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68
  _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69
  _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70
  _GetInstanceInfoText, _RemoveDisks
66 71

  
67 72
import ganeti.masterd.instance
68 73

  
69 74

  
70
_DISK_TEMPLATE_NAME_PREFIX = {
71
  constants.DT_PLAIN: "",
72
  constants.DT_RBD: ".rbd",
73
  constants.DT_EXT: ".ext",
74
  }
75

  
76

  
77
_DISK_TEMPLATE_DEVICE_TYPE = {
78
  constants.DT_PLAIN: constants.LD_LV,
79
  constants.DT_FILE: constants.LD_FILE,
80
  constants.DT_SHARED_FILE: constants.LD_FILE,
81
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
82
  constants.DT_RBD: constants.LD_RBD,
83
  constants.DT_EXT: constants.LD_EXT,
84
  }
85

  
86

  
87 75
#: Type description for changes as returned by L{ApplyContainerMods}'s
88 76
#: callbacks
89 77
_TApplyContModsCbChanges = \
......
93 81
    ])))
94 82

  
95 83

  
96
def _CopyLockList(names):
97
  """Makes a copy of a list of lock names.
98

  
99
  Handles L{locking.ALL_SET} correctly.
100

  
101
  """
102
  if names == locking.ALL_SET:
103
    return locking.ALL_SET
104
  else:
105
    return names[:]
106

  
107

  
108
def _ReleaseLocks(lu, level, names=None, keep=None):
109
  """Releases locks owned by an LU.
110

  
111
  @type lu: L{LogicalUnit}
112
  @param level: Lock level
113
  @type names: list or None
114
  @param names: Names of locks to release
115
  @type keep: list or None
116
  @param keep: Names of locks to retain
117

  
118
  """
119
  assert not (keep is not None and names is not None), \
120
    "Only one of the 'names' and the 'keep' parameters can be given"
121

  
122
  if names is not None:
123
    should_release = names.__contains__
124
  elif keep:
125
    should_release = lambda name: name not in keep
126
  else:
127
    should_release = None
128

  
129
  owned = lu.owned_locks(level)
130
  if not owned:
131
    # Not owning any lock at this level, do nothing
132
    pass
133

  
134
  elif should_release:
135
    retain = []
136
    release = []
137

  
138
    # Determine which locks to release
139
    for name in owned:
140
      if should_release(name):
141
        release.append(name)
142
      else:
143
        retain.append(name)
144

  
145
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
146

  
147
    # Release just some locks
148
    lu.glm.release(level, names=release)
149

  
150
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
151
  else:
152
    # Release everything
153
    lu.glm.release(level)
154

  
155
    assert not lu.glm.is_owned(level), "No locks should be owned"
156

  
157

  
158 84
def _CheckHostnameSane(lu, name):
159 85
  """Ensures that a given hostname resolves to a 'sane' name.
160 86

  
......
344 270
  return (None, None)
345 271

  
346 272

  
347
def _CheckRADOSFreeSpace():
348
  """Compute disk size requirements inside the RADOS cluster.
349

  
350
  """
351
  # For the RADOS cluster we assume there is always enough space.
352
  pass
353

  
354

  
355
def _WaitForSync(lu, instance, disks=None, oneshot=False):
356
  """Sleep and poll for an instance's disk to sync.
357

  
358
  """
359
  if not instance.disks or disks is not None and not disks:
360
    return True
361

  
362
  disks = _ExpandCheckDisks(instance, disks)
363

  
364
  if not oneshot:
365
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
366

  
367
  node = instance.primary_node
368

  
369
  for dev in disks:
370
    lu.cfg.SetDiskID(dev, node)
371

  
372
  # TODO: Convert to utils.Retry
373

  
374
  retries = 0
375
  degr_retries = 10 # in seconds, as we sleep 1 second each time
376
  while True:
377
    max_time = 0
378
    done = True
379
    cumul_degraded = False
380
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
381
    msg = rstats.fail_msg
382
    if msg:
383
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
384
      retries += 1
385
      if retries >= 10:
386
        raise errors.RemoteError("Can't contact node %s for mirror data,"
387
                                 " aborting." % node)
388
      time.sleep(6)
389
      continue
390
    rstats = rstats.payload
391
    retries = 0
392
    for i, mstat in enumerate(rstats):
393
      if mstat is None:
394
        lu.LogWarning("Can't compute data for node %s/%s",
395
                      node, disks[i].iv_name)
396
        continue
397

  
398
      cumul_degraded = (cumul_degraded or
399
                        (mstat.is_degraded and mstat.sync_percent is None))
400
      if mstat.sync_percent is not None:
401
        done = False
402
        if mstat.estimated_time is not None:
403
          rem_time = ("%s remaining (estimated)" %
404
                      utils.FormatSeconds(mstat.estimated_time))
405
          max_time = mstat.estimated_time
406
        else:
407
          rem_time = "no time estimate"
408
        lu.LogInfo("- device %s: %5.2f%% done, %s",
409
                   disks[i].iv_name, mstat.sync_percent, rem_time)
410

  
411
    # if we're done but degraded, let's do a few small retries, to
412
    # make sure we see a stable and not transient situation; therefore
413
    # we force restart of the loop
414
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
415
      logging.info("Degraded disks found, %d retries left", degr_retries)
416
      degr_retries -= 1
417
      time.sleep(1)
418
      continue
419

  
420
    if done or oneshot:
421
      break
422

  
423
    time.sleep(min(60, max_time))
424

  
425
  if done:
426
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
427

  
428
  return not cumul_degraded
429

  
430

  
431
def _ComputeDisks(op, default_vg):
432
  """Computes the instance disks.
433

  
434
  @param op: The instance opcode
435
  @param default_vg: The default_vg to assume
436

  
437
  @return: The computed disks
438

  
439
  """
440
  disks = []
441
  for disk in op.disks:
442
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
443
    if mode not in constants.DISK_ACCESS_SET:
444
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
445
                                 mode, errors.ECODE_INVAL)
446
    size = disk.get(constants.IDISK_SIZE, None)
447
    if size is None:
448
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
449
    try:
450
      size = int(size)
451
    except (TypeError, ValueError):
452
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
453
                                 errors.ECODE_INVAL)
454

  
455
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
456
    if ext_provider and op.disk_template != constants.DT_EXT:
457
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
458
                                 " disk template, not %s" %
459
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
460
                                  op.disk_template), errors.ECODE_INVAL)
461

  
462
    data_vg = disk.get(constants.IDISK_VG, default_vg)
463
    name = disk.get(constants.IDISK_NAME, None)
464
    if name is not None and name.lower() == constants.VALUE_NONE:
465
      name = None
466
    new_disk = {
467
      constants.IDISK_SIZE: size,
468
      constants.IDISK_MODE: mode,
469
      constants.IDISK_VG: data_vg,
470
      constants.IDISK_NAME: name,
471
      }
472

  
473
    if constants.IDISK_METAVG in disk:
474
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
475
    if constants.IDISK_ADOPT in disk:
476
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
477

  
478
    # For extstorage, demand the `provider' option and add any
479
    # additional parameters (ext-params) to the dict
480
    if op.disk_template == constants.DT_EXT:
481
      if ext_provider:
482
        new_disk[constants.IDISK_PROVIDER] = ext_provider
483
        for key in disk:
484
          if key not in constants.IDISK_PARAMS:
485
            new_disk[key] = disk[key]
486
      else:
487
        raise errors.OpPrereqError("Missing provider for template '%s'" %
488
                                   constants.DT_EXT, errors.ECODE_INVAL)
489

  
490
    disks.append(new_disk)
491

  
492
  return disks
493

  
494

  
495
def _ComputeDiskSizePerVG(disk_template, disks):
496
  """Compute disk size requirements in the volume group
497

  
498
  """
499
  def _compute(disks, payload):
500
    """Universal algorithm.
501

  
502
    """
503
    vgs = {}
504
    for disk in disks:
505
      vgs[disk[constants.IDISK_VG]] = \
506
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
507

  
508
    return vgs
509

  
510
  # Required free disk space as a function of disk and swap space
511
  req_size_dict = {
512
    constants.DT_DISKLESS: {},
513
    constants.DT_PLAIN: _compute(disks, 0),
514
    # 128 MB are added for drbd metadata for each disk
515
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
516
    constants.DT_FILE: {},
517
    constants.DT_SHARED_FILE: {},
518
    }
519

  
520
  if disk_template not in req_size_dict:
521
    raise errors.ProgrammerError("Disk template '%s' size requirement"
522
                                 " is unknown" % disk_template)
523

  
524
  return req_size_dict[disk_template]
525

  
526

  
527
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
528
  """Checks if nodes have enough free disk space in the specified VG.
529

  
530
  This function checks if all given nodes have the needed amount of
531
  free disk. In case any node has less disk or we cannot get the
532
  information from the node, this function raises an OpPrereqError
533
  exception.
534

  
535
  @type lu: C{LogicalUnit}
536
  @param lu: a logical unit from which we get configuration data
537
  @type nodenames: C{list}
538
  @param nodenames: the list of node names to check
539
  @type vg: C{str}
540
  @param vg: the volume group to check
541
  @type requested: C{int}
542
  @param requested: the amount of disk in MiB to check for
543
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
544
      or we cannot check the node
545

  
546
  """
547
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
548
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
549
  for node in nodenames:
550
    info = nodeinfo[node]
551
    info.Raise("Cannot get current information from node %s" % node,
552
               prereq=True, ecode=errors.ECODE_ENVIRON)
553
    (_, (vg_info, ), _) = info.payload
554
    vg_free = vg_info.get("vg_free", None)
555
    if not isinstance(vg_free, int):
556
      raise errors.OpPrereqError("Can't compute free disk space on node"
557
                                 " %s for vg %s, result was '%s'" %
558
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
559
    if requested > vg_free:
560
      raise errors.OpPrereqError("Not enough disk space on target node %s"
561
                                 " vg %s: required %d MiB, available %d MiB" %
562
                                 (node, vg, requested, vg_free),
563
                                 errors.ECODE_NORES)
564

  
565

  
566
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
567
  """Checks if nodes have enough free disk space in all the VGs.
568

  
569
  This function checks if all given nodes have the needed amount of
570
  free disk. In case any node has less disk or we cannot get the
571
  information from the node, this function raises an OpPrereqError
572
  exception.
573

  
574
  @type lu: C{LogicalUnit}
575
  @param lu: a logical unit from which we get configuration data
576
  @type nodenames: C{list}
577
  @param nodenames: the list of node names to check
578
  @type req_sizes: C{dict}
579
  @param req_sizes: the hash of vg and corresponding amount of disk in
580
      MiB to check for
581
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
582
      or we cannot check the node
583

  
584
  """
585
  for vg, req_size in req_sizes.items():
586
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
587

  
588

  
589
def _CheckNodeVmCapable(lu, node):
590
  """Ensure that a given node is vm capable.
591

  
592
  @param lu: the LU on behalf of which we make the check
593
  @param node: the node to check
594
  @raise errors.OpPrereqError: if the node is not vm capable
595

  
596
  """
597
  if not lu.cfg.GetNodeInfo(node).vm_capable:
598
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
599
                               errors.ECODE_STATE)
600

  
601

  
602 273
def _ComputeIPolicyInstanceSpecViolation(
603 274
  ipolicy, instance_spec, disk_template,
604 275
  _compute_fn=_ComputeIPolicySpecViolation):
......
723 394
  return free_mem
724 395

  
725 396

  
726
def _GenerateUniqueNames(lu, exts):
727
  """Generate a suitable LV name.
728

  
729
  This will generate a logical volume name for the given instance.
730

  
731
  """
732
  results = []
733
  for val in exts:
734
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
735
    results.append("%s%s" % (new_id, val))
736
  return results
737

  
738

  
739
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
740
                         iv_name, p_minor, s_minor):
741
  """Generate a drbd8 device complete with its children.
742

  
743
  """
744
  assert len(vgnames) == len(names) == 2
745
  port = lu.cfg.AllocatePort()
746
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
747

  
748
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
749
                          logical_id=(vgnames[0], names[0]),
750
                          params={})
751
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
752
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
753
                          size=constants.DRBD_META_SIZE,
754
                          logical_id=(vgnames[1], names[1]),
755
                          params={})
756
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
757
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
758
                          logical_id=(primary, secondary, port,
759
                                      p_minor, s_minor,
760
                                      shared_secret),
761
                          children=[dev_data, dev_meta],
762
                          iv_name=iv_name, params={})
763
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
764
  return drbd_dev
765

  
766

  
767
def _GenerateDiskTemplate(
768
  lu, template_name, instance_name, primary_node, secondary_nodes,
769
  disk_info, file_storage_dir, file_driver, base_index,
770
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
771
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
772
  """Generate the entire disk layout for a given template type.
773

  
774
  """
775
  vgname = lu.cfg.GetVGName()
776
  disk_count = len(disk_info)
777
  disks = []
778

  
779
  if template_name == constants.DT_DISKLESS:
780
    pass
781
  elif template_name == constants.DT_DRBD8:
782
    if len(secondary_nodes) != 1:
783
      raise errors.ProgrammerError("Wrong template configuration")
784
    remote_node = secondary_nodes[0]
785
    minors = lu.cfg.AllocateDRBDMinor(
786
      [primary_node, remote_node] * len(disk_info), instance_name)
787

  
788
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
789
                                                       full_disk_params)
790
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
791

  
792
    names = []
793
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
794
                                               for i in range(disk_count)]):
795
      names.append(lv_prefix + "_data")
796
      names.append(lv_prefix + "_meta")
797
    for idx, disk in enumerate(disk_info):
798
      disk_index = idx + base_index
799
      data_vg = disk.get(constants.IDISK_VG, vgname)
800
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
801
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
802
                                      disk[constants.IDISK_SIZE],
803
                                      [data_vg, meta_vg],
804
                                      names[idx * 2:idx * 2 + 2],
805
                                      "disk/%d" % disk_index,
806
                                      minors[idx * 2], minors[idx * 2 + 1])
807
      disk_dev.mode = disk[constants.IDISK_MODE]
808
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
809
      disks.append(disk_dev)
810
  else:
811
    if secondary_nodes:
812
      raise errors.ProgrammerError("Wrong template configuration")
813

  
814
    if template_name == constants.DT_FILE:
815
      _req_file_storage()
816
    elif template_name == constants.DT_SHARED_FILE:
817
      _req_shr_file_storage()
818

  
819
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
820
    if name_prefix is None:
821
      names = None
822
    else:
823
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
824
                                        (name_prefix, base_index + i)
825
                                        for i in range(disk_count)])
826

  
827
    if template_name == constants.DT_PLAIN:
828

  
829
      def logical_id_fn(idx, _, disk):
830
        vg = disk.get(constants.IDISK_VG, vgname)
831
        return (vg, names[idx])
832

  
833
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
834
      logical_id_fn = \
835
        lambda _, disk_index, disk: (file_driver,
836
                                     "%s/disk%d" % (file_storage_dir,
837
                                                    disk_index))
838
    elif template_name == constants.DT_BLOCK:
839
      logical_id_fn = \
840
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
841
                                       disk[constants.IDISK_ADOPT])
842
    elif template_name == constants.DT_RBD:
843
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
844
    elif template_name == constants.DT_EXT:
845
      def logical_id_fn(idx, _, disk):
846
        provider = disk.get(constants.IDISK_PROVIDER, None)
847
        if provider is None:
848
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
849
                                       " not found", constants.DT_EXT,
850
                                       constants.IDISK_PROVIDER)
851
        return (provider, names[idx])
852
    else:
853
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
854

  
855
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
856

  
857
    for idx, disk in enumerate(disk_info):
858
      params = {}
859
      # Only for the Ext template add disk_info to params
860
      if template_name == constants.DT_EXT:
861
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
862
        for key in disk:
863
          if key not in constants.IDISK_PARAMS:
864
            params[key] = disk[key]
865
      disk_index = idx + base_index
866
      size = disk[constants.IDISK_SIZE]
867
      feedback_fn("* disk %s, size %s" %
868
                  (disk_index, utils.FormatUnit(size, "h")))
869
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
870
                              logical_id=logical_id_fn(idx, disk_index, disk),
871
                              iv_name="disk/%d" % disk_index,
872
                              mode=disk[constants.IDISK_MODE],
873
                              params=params)
874
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
875
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
876
      disks.append(disk_dev)
877

  
878
  return disks
879

  
880

  
881
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
882
                          excl_stor):
883
  """Create a single block device on a given node.
884

  
885
  This will not recurse over children of the device, so they must be
886
  created in advance.
887

  
888
  @param lu: the lu on whose behalf we execute
889
  @param node: the node on which to create the device
890
  @type instance: L{objects.Instance}
891
  @param instance: the instance which owns the device
892
  @type device: L{objects.Disk}
893
  @param device: the device to create
894
  @param info: the extra 'metadata' we should attach to the device
895
      (this will be represented as a LVM tag)
896
  @type force_open: boolean
897
  @param force_open: this parameter will be passes to the
898
      L{backend.BlockdevCreate} function where it specifies
899
      whether we run on primary or not, and it affects both
900
      the child assembly and the device own Open() execution
901
  @type excl_stor: boolean
902
  @param excl_stor: Whether exclusive_storage is active for the node
903

  
904
  """
905
  lu.cfg.SetDiskID(device, node)
906
  result = lu.rpc.call_blockdev_create(node, device, device.size,
907
                                       instance.name, force_open, info,
908
                                       excl_stor)
909
  result.Raise("Can't create block device %s on"
910
               " node %s for instance %s" % (device, node, instance.name))
911
  if device.physical_id is None:
912
    device.physical_id = result.payload
913

  
914

  
915
def _CreateBlockDevInner(lu, node, instance, device, force_create,
916
                         info, force_open, excl_stor):
917
  """Create a tree of block devices on a given node.
918

  
919
  If this device type has to be created on secondaries, create it and
920
  all its children.
921

  
922
  If not, just recurse to children keeping the same 'force' value.
923

  
924
  @attention: The device has to be annotated already.
925

  
926
  @param lu: the lu on whose behalf we execute
927
  @param node: the node on which to create the device
928
  @type instance: L{objects.Instance}
929
  @param instance: the instance which owns the device
930
  @type device: L{objects.Disk}
931
  @param device: the device to create
932
  @type force_create: boolean
933
  @param force_create: whether to force creation of this device; this
934
      will be change to True whenever we find a device which has
935
      CreateOnSecondary() attribute
936
  @param info: the extra 'metadata' we should attach to the device
937
      (this will be represented as a LVM tag)
938
  @type force_open: boolean
939
  @param force_open: this parameter will be passes to the
940
      L{backend.BlockdevCreate} function where it specifies
941
      whether we run on primary or not, and it affects both
942
      the child assembly and the device own Open() execution
943
  @type excl_stor: boolean
944
  @param excl_stor: Whether exclusive_storage is active for the node
945

  
946
  @return: list of created devices
947
  """
948
  created_devices = []
949
  try:
950
    if device.CreateOnSecondary():
951
      force_create = True
952

  
953
    if device.children:
954
      for child in device.children:
955
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
956
                                    info, force_open, excl_stor)
957
        created_devices.extend(devs)
958

  
959
    if not force_create:
960
      return created_devices
961

  
962
    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
963
                          excl_stor)
964
    # The device has been completely created, so there is no point in keeping
965
    # its subdevices in the list. We just add the device itself instead.
966
    created_devices = [(node, device)]
967
    return created_devices
968

  
969
  except errors.DeviceCreationError, e:
970
    e.created_devices.extend(created_devices)
971
    raise e
972
  except errors.OpExecError, e:
973
    raise errors.DeviceCreationError(str(e), created_devices)
974

  
975

  
976
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
977
  """Whether exclusive_storage is in effect for the given node.
978

  
979
  @type cfg: L{config.ConfigWriter}
980
  @param cfg: The cluster configuration
981
  @type nodename: string
982
  @param nodename: The node
983
  @rtype: bool
984
  @return: The effective value of exclusive_storage
985
  @raise errors.OpPrereqError: if no node exists with the given name
986

  
987
  """
988
  ni = cfg.GetNodeInfo(nodename)
989
  if ni is None:
990
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
991
                               errors.ECODE_NOENT)
992
  return _IsExclusiveStorageEnabledNode(cfg, ni)
993

  
994

  
995
def _CreateBlockDev(lu, node, instance, device, force_create, info,
996
                    force_open):
997
  """Wrapper around L{_CreateBlockDevInner}.
998

  
999
  This method annotates the root device first.
1000

  
1001
  """
1002
  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
1003
  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
1004
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
1005
                              force_open, excl_stor)
1006

  
1007

  
1008
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
1009
  """Create all disks for an instance.
1010

  
1011
  This abstracts away some work from AddInstance.
1012

  
1013
  @type lu: L{LogicalUnit}
1014
  @param lu: the logical unit on whose behalf we execute
1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance whose disks we should create
1017
  @type to_skip: list
1018
  @param to_skip: list of indices to skip
1019
  @type target_node: string
1020
  @param target_node: if passed, overrides the target node for creation
1021
  @rtype: boolean
1022
  @return: the success of the creation
1023

  
1024
  """
1025
  info = _GetInstanceInfoText(instance)
1026
  if target_node is None:
1027
    pnode = instance.primary_node
1028
    all_nodes = instance.all_nodes
1029
  else:
1030
    pnode = target_node
1031
    all_nodes = [pnode]
1032

  
1033
  if instance.disk_template in constants.DTS_FILEBASED:
1034
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
1035
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
1036

  
1037
    result.Raise("Failed to create directory '%s' on"
1038
                 " node %s" % (file_storage_dir, pnode))
1039

  
1040
  disks_created = []
1041
  # Note: this needs to be kept in sync with adding of disks in
1042
  # LUInstanceSetParams
1043
  for idx, device in enumerate(instance.disks):
1044
    if to_skip and idx in to_skip:
1045
      continue
1046
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
1047
    #HARDCODE
1048
    for node in all_nodes:
1049
      f_create = node == pnode
1050
      try:
1051
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
1052
        disks_created.append((node, device))
1053
      except errors.OpExecError:
1054
        logging.warning("Creating disk %s for instance '%s' failed",
1055
                        idx, instance.name)
1056
      except errors.DeviceCreationError, e:
1057
        logging.warning("Creating disk %s for instance '%s' failed",
1058
                        idx, instance.name)
1059
        disks_created.extend(e.created_devices)
1060
        for (node, disk) in disks_created:
1061
          lu.cfg.SetDiskID(disk, node)
1062
          result = lu.rpc.call_blockdev_remove(node, disk)
1063
          if result.fail_msg:
1064
            logging.warning("Failed to remove newly-created disk %s on node %s:"
1065
                            " %s", device, node, result.fail_msg)
1066
        raise errors.OpExecError(e.message)
1067

  
1068

  
1069
def _CalcEta(time_taken, written, total_size):
1070
  """Calculates the ETA based on size written and total size.
1071

  
1072
  @param time_taken: The time taken so far
1073
  @param written: amount written so far
1074
  @param total_size: The total size of data to be written
1075
  @return: The remaining time in seconds
1076

  
1077
  """
1078
  avg_time = time_taken / float(written)
1079
  return (total_size - written) * avg_time
1080

  
1081

  
1082
def _WipeDisks(lu, instance, disks=None):
1083
  """Wipes instance disks.
1084

  
1085
  @type lu: L{LogicalUnit}
1086
  @param lu: the logical unit on whose behalf we execute
1087
  @type instance: L{objects.Instance}
1088
  @param instance: the instance whose disks we should create
1089
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1090
  @param disks: Disk details; tuple contains disk index, disk object and the
1091
    start offset
1092

  
1093
  """
1094
  node = instance.primary_node
1095

  
1096
  if disks is None:
1097
    disks = [(idx, disk, 0)
1098
             for (idx, disk) in enumerate(instance.disks)]
1099

  
1100
  for (_, device, _) in disks:
1101
    lu.cfg.SetDiskID(device, node)
1102

  
1103
  logging.info("Pausing synchronization of disks of instance '%s'",
1104
               instance.name)
1105
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
1106
                                                  (map(compat.snd, disks),
1107
                                                   instance),
1108
                                                  True)
1109
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
1110

  
1111
  for idx, success in enumerate(result.payload):
1112
    if not success:
1113
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1114
                   " failed", idx, instance.name)
1115

  
1116
  try:
1117
    for (idx, device, offset) in disks:
1118
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1119
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1120
      wipe_chunk_size = \
1121
        int(min(constants.MAX_WIPE_CHUNK,
1122
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1123

  
1124
      size = device.size
1125
      last_output = 0
1126
      start_time = time.time()
1127

  
1128
      if offset == 0:
1129
        info_text = ""
1130
      else:
1131
        info_text = (" (from %s to %s)" %
1132
                     (utils.FormatUnit(offset, "h"),
1133
                      utils.FormatUnit(size, "h")))
1134

  
1135
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1136

  
1137
      logging.info("Wiping disk %d for instance %s on node %s using"
1138
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1139

  
1140
      while offset < size:
1141
        wipe_size = min(wipe_chunk_size, size - offset)
1142

  
1143
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1144
                      idx, offset, wipe_size)
1145

  
1146
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1147
                                           wipe_size)
1148
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1149
                     (idx, offset, wipe_size))
1150

  
1151
        now = time.time()
1152
        offset += wipe_size
1153
        if now - last_output >= 60:
1154
          eta = _CalcEta(now - start_time, offset, size)
1155
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1156
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1157
          last_output = now
1158
  finally:
1159
    logging.info("Resuming synchronization of disks for instance '%s'",
1160
                 instance.name)
1161

  
1162
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1163
                                                    (map(compat.snd, disks),
1164
                                                     instance),
1165
                                                    False)
1166

  
1167
    if result.fail_msg:
1168
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1169
                    node, result.fail_msg)
1170
    else:
1171
      for idx, success in enumerate(result.payload):
1172
        if not success:
1173
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1174
                        " failed", idx, instance.name)
1175

  
1176

  
1177 397
class LUInstanceCreate(LogicalUnit):
1178 398
  """Create an instance.
1179 399

  
......
2259 1479
    return list(iobj.all_nodes)
2260 1480

  
2261 1481

  
2262
def _GetInstanceInfoText(instance):
2263
  """Compute that text that should be added to the disk's metadata.
2264

  
2265
  """
2266
  return "originstname+%s" % instance.name
2267

  
2268

  
2269 1482
class LUInstanceRename(LogicalUnit):
2270 1483
  """Rename an instance.
2271 1484

  
......
2476 1689
  _CheckNicsBridgesExist(lu, instance.nics, node)
2477 1690

  
2478 1691

  
2479
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
2480
                                 target_group, cfg,
2481
                                 _compute_fn=_ComputeIPolicyInstanceViolation):
2482
  """Compute if instance meets the specs of the new target group.
2483

  
2484
  @param ipolicy: The ipolicy to verify
2485
  @param instance: The instance object to verify
2486
  @param current_group: The current group of the instance
2487
  @param target_group: The new group of the instance
2488
  @type cfg: L{config.ConfigWriter}
2489
  @param cfg: Cluster configuration
2490
  @param _compute_fn: The function to verify ipolicy (unittest only)
2491
  @see: L{_ComputeIPolicySpecViolation}
2492

  
2493
  """
2494
  if current_group == target_group:
2495
    return []
2496
  else:
2497
    return _compute_fn(ipolicy, instance, cfg)
2498

  
2499

  
2500
def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
2501
                            _compute_fn=_ComputeIPolicyNodeViolation):
2502
  """Checks that the target node is correct in terms of instance policy.
2503

  
2504
  @param ipolicy: The ipolicy to verify
2505
  @param instance: The instance object to verify
2506
  @param node: The new node to relocate
2507
  @type cfg: L{config.ConfigWriter}
2508
  @param cfg: Cluster configuration
2509
  @param ignore: Ignore violations of the ipolicy
2510
  @param _compute_fn: The function to verify ipolicy (unittest only)
2511
  @see: L{_ComputeIPolicySpecViolation}
2512

  
2513
  """
2514
  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
2515
  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
2516

  
2517
  if res:
2518
    msg = ("Instance does not meet target node group's (%s) instance"
2519
           " policy: %s") % (node.group, utils.CommaJoin(res))
2520
    if ignore:
2521
      lu.LogWarning(msg)
2522
    else:
2523
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2524

  
2525

  
2526 1692
class LUInstanceMove(LogicalUnit):
2527 1693
  """Move an instance by data-copying.
2528 1694

  
......
3072 2238

  
3073 2239
    cluster = self.cfg.GetClusterInfo()
3074 2240

  
3075
    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
3076
    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
3077

  
3078
    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
3079
                                                 for node in nodes.values()))
3080

  
3081
    group2name_fn = lambda uuid: groups[uuid].name
3082
    for instance in self.wanted_instances:
3083
      pnode = nodes[instance.primary_node]
3084

  
3085
      if self.op.static or pnode.offline:
3086
        remote_state = None
3087
        if pnode.offline:
3088
          self.LogWarning("Primary node %s is marked offline, returning static"
3089
                          " information only for instance %s" %
3090
                          (pnode.name, instance.name))
3091
      else:
3092
        remote_info = self.rpc.call_instance_info(instance.primary_node,
3093
                                                  instance.name,
3094
                                                  instance.hypervisor)
3095
        remote_info.Raise("Error checking node %s" % instance.primary_node)
3096
        remote_info = remote_info.payload
3097
        if remote_info and "state" in remote_info:
3098
          remote_state = "up"
3099
        else:
3100
          if instance.admin_state == constants.ADMINST_UP:
3101
            remote_state = "down"
3102
          else:
3103
            remote_state = instance.admin_state
3104

  
3105
      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
3106
                  instance.disks)
3107

  
3108
      snodes_group_uuids = [nodes[snode_name].group
3109
                            for snode_name in instance.secondary_nodes]
3110

  
3111
      result[instance.name] = {
3112
        "name": instance.name,
3113
        "config_state": instance.admin_state,
3114
        "run_state": remote_state,
3115
        "pnode": instance.primary_node,
3116
        "pnode_group_uuid": pnode.group,
3117
        "pnode_group_name": group2name_fn(pnode.group),
3118
        "snodes": instance.secondary_nodes,
3119
        "snodes_group_uuids": snodes_group_uuids,
3120
        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
3121
        "os": instance.os,
3122
        # this happens to be the same format used for hooks
3123
        "nics": _NICListToTuple(self, instance.nics),
3124
        "disk_template": instance.disk_template,
3125
        "disks": disks,
3126
        "hypervisor": instance.hypervisor,
3127
        "network_port": instance.network_port,
3128
        "hv_instance": instance.hvparams,
3129
        "hv_actual": cluster.FillHV(instance, skip_globals=True),
3130
        "be_instance": instance.beparams,
3131
        "be_actual": cluster.FillBE(instance),
3132
        "os_instance": instance.osparams,
3133
        "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
3134
        "serial_no": instance.serial_no,
3135
        "mtime": instance.mtime,
3136
        "ctime": instance.ctime,
3137
        "uuid": instance.uuid,
3138
        }
3139

  
3140
    return result
3141

  
3142

  
3143
class LUInstanceRecreateDisks(LogicalUnit):
3144
  """Recreate an instance's missing disks.
3145

  
3146
  """
3147
  HPATH = "instance-recreate-disks"
3148
  HTYPE = constants.HTYPE_INSTANCE
3149
  REQ_BGL = False
3150

  
3151
  _MODIFYABLE = compat.UniqueFrozenset([
3152
    constants.IDISK_SIZE,
3153
    constants.IDISK_MODE,
3154
    ])
3155

  
3156
  # New or changed disk parameters may have different semantics
3157
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
3158
    constants.IDISK_ADOPT,
3159

  
3160
    # TODO: Implement support changing VG while recreating
3161
    constants.IDISK_VG,
3162
    constants.IDISK_METAVG,
3163
    constants.IDISK_PROVIDER,
3164
    constants.IDISK_NAME,
3165
    ]))
3166

  
3167
  def _RunAllocator(self):
3168
    """Run the allocator based on input opcode.
3169

  
3170
    """
3171
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
3172

  
3173
    # FIXME
3174
    # The allocator should actually run in "relocate" mode, but current
3175
    # allocators don't support relocating all the nodes of an instance at
3176
    # the same time. As a workaround we use "allocate" mode, but this is
3177
    # suboptimal for two reasons:
3178
    # - The instance name passed to the allocator is present in the list of
3179
    #   existing instances, so there could be a conflict within the
3180
    #   internal structures of the allocator. This doesn't happen with the
3181
    #   current allocators, but it's a liability.
3182
    # - The allocator counts the resources used by the instance twice: once
3183
    #   because the instance exists already, and once because it tries to
3184
    #   allocate a new instance.
3185
    # The allocator could choose some of the nodes on which the instance is
3186
    # running, but that's not a problem. If the instance nodes are broken,
3187
    # they should be already be marked as drained or offline, and hence
3188
    # skipped by the allocator. If instance disks have been lost for other
3189
    # reasons, then recreating the disks on the same nodes should be fine.
3190
    disk_template = self.instance.disk_template
3191
    spindle_use = be_full[constants.BE_SPINDLE_USE]
3192
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
3193
                                        disk_template=disk_template,
3194
                                        tags=list(self.instance.GetTags()),
3195
                                        os=self.instance.os,
3196
                                        nics=[{}],
3197
                                        vcpus=be_full[constants.BE_VCPUS],
3198
                                        memory=be_full[constants.BE_MAXMEM],
3199
                                        spindle_use=spindle_use,
3200
                                        disks=[{constants.IDISK_SIZE: d.size,
3201
                                                constants.IDISK_MODE: d.mode}
3202
                                               for d in self.instance.disks],
3203
                                        hypervisor=self.instance.hypervisor,
3204
                                        node_whitelist=None)
3205
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3206

  
3207
    ial.Run(self.op.iallocator)
3208

  
3209
    assert req.RequiredNodes() == len(self.instance.all_nodes)
3210

  
3211
    if not ial.success:
3212
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
3213
                                 " %s" % (self.op.iallocator, ial.info),
3214
                                 errors.ECODE_NORES)
3215

  
3216
    self.op.nodes = ial.result
3217
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3218
                 self.op.instance_name, self.op.iallocator,
3219
                 utils.CommaJoin(ial.result))
3220

  
3221
  def CheckArguments(self):
3222
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
3223
      # Normalize and convert deprecated list of disk indices
3224
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
3225

  
3226
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
3227
    if duplicates:
3228
      raise errors.OpPrereqError("Some disks have been specified more than"
3229
                                 " once: %s" % utils.CommaJoin(duplicates),
3230
                                 errors.ECODE_INVAL)
3231

  
3232
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
3233
    # when neither iallocator nor nodes are specified
3234
    if self.op.iallocator or self.op.nodes:
3235
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
3236

  
3237
    for (idx, params) in self.op.disks:
3238
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
3239
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
3240
      if unsupported:
3241
        raise errors.OpPrereqError("Parameters for disk %s try to change"
3242
                                   " unmodifyable parameter(s): %s" %
3243
                                   (idx, utils.CommaJoin(unsupported)),
3244
                                   errors.ECODE_INVAL)
3245

  
3246
  def ExpandNames(self):
3247
    self._ExpandAndLockInstance()
3248
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3249

  
3250
    if self.op.nodes:
3251
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
3252
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
3253
    else:
3254
      self.needed_locks[locking.LEVEL_NODE] = []
3255
      if self.op.iallocator:
3256
        # iallocator will select a new node in the same group
3257
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
3258
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3259

  
3260
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3261

  
3262
  def DeclareLocks(self, level):
3263
    if level == locking.LEVEL_NODEGROUP:
3264
      assert self.op.iallocator is not None
3265
      assert not self.op.nodes
3266
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3267
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
3268
      # Lock the primary group used by the instance optimistically; this
3269
      # requires going via the node before it's locked, requiring
3270
      # verification later on
3271
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3272
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
3273

  
3274
    elif level == locking.LEVEL_NODE:
3275
      # If an allocator is used, then we lock all the nodes in the current
3276
      # instance group, as we don't know yet which ones will be selected;
3277
      # if we replace the nodes without using an allocator, locks are
3278
      # already declared in ExpandNames; otherwise, we need to lock all the
3279
      # instance nodes for disk re-creation
3280
      if self.op.iallocator:
3281
        assert not self.op.nodes
3282
        assert not self.needed_locks[locking.LEVEL_NODE]
3283
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
3284

  
3285
        # Lock member nodes of the group of the primary node
3286
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
3287
          self.needed_locks[locking.LEVEL_NODE].extend(
3288
            self.cfg.GetNodeGroup(group_uuid).members)
3289

  
3290
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3291
      elif not self.op.nodes:
3292
        self._LockInstancesNodes(primary_only=False)
3293
    elif level == locking.LEVEL_NODE_RES:
3294
      # Copy node locks
3295
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3296
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3297

  
3298
  def BuildHooksEnv(self):
3299
    """Build hooks env.
3300

  
3301
    This runs on master, primary and secondary nodes of the instance.
3302

  
3303
    """
3304
    return _BuildInstanceHookEnvByObject(self, self.instance)
3305

  
3306
  def BuildHooksNodes(self):
3307
    """Build hooks nodes.
3308

  
3309
    """
3310
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3311
    return (nl, nl)
3312

  
3313
  def CheckPrereq(self):
3314
    """Check prerequisites.
3315

  
3316
    This checks that the instance is in the cluster and is not running.
3317

  
3318
    """
3319
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3320
    assert instance is not None, \
3321
      "Cannot retrieve locked instance %s" % self.op.instance_name
3322
    if self.op.nodes:
3323
      if len(self.op.nodes) != len(instance.all_nodes):
3324
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
3325
                                   " %d replacement nodes were specified" %
3326
                                   (instance.name, len(instance.all_nodes),
3327
                                    len(self.op.nodes)),
3328
                                   errors.ECODE_INVAL)
3329
      assert instance.disk_template != constants.DT_DRBD8 or \
3330
             len(self.op.nodes) == 2
3331
      assert instance.disk_template != constants.DT_PLAIN or \
3332
             len(self.op.nodes) == 1
3333
      primary_node = self.op.nodes[0]
3334
    else:
3335
      primary_node = instance.primary_node
3336
    if not self.op.iallocator:
3337
      _CheckNodeOnline(self, primary_node)
3338

  
3339
    if instance.disk_template == constants.DT_DISKLESS:
3340
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3341
                                 self.op.instance_name, errors.ECODE_INVAL)
3342

  
3343
    # Verify if node group locks are still correct
3344
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3345
    if owned_groups:
3346
      # Node group locks are acquired only for the primary node (and only
3347
      # when the allocator is used)
3348
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
3349
                               primary_only=True)
3350

  
3351
    # if we replace nodes *and* the old primary is offline, we don't
3352
    # check the instance state
3353
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
3354
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
3355
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3356
                          msg="cannot recreate disks")
3357

  
3358
    if self.op.disks:
3359
      self.disks = dict(self.op.disks)
3360
    else:
3361
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
3362

  
3363
    maxidx = max(self.disks.keys())
3364
    if maxidx >= len(instance.disks):
3365
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
3366
                                 errors.ECODE_INVAL)
3367

  
3368
    if ((self.op.nodes or self.op.iallocator) and
3369
         sorted(self.disks.keys()) != range(len(instance.disks))):
3370
      raise errors.OpPrereqError("Can't recreate disks partially and"
3371
                                 " change the nodes at the same time",
3372
                                 errors.ECODE_INVAL)
3373

  
3374
    self.instance = instance
3375

  
3376
    if self.op.iallocator:
3377
      self._RunAllocator()
3378
      # Release unneeded node and node resource locks
3379
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
3380
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
3381
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
3382

  
3383
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3384

  
3385
  def Exec(self, feedback_fn):
3386
    """Recreate the disks.
3387

  
3388
    """
3389
    instance = self.instance
3390

  
3391
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3392
            self.owned_locks(locking.LEVEL_NODE_RES))
3393

  
3394
    to_skip = []
3395
    mods = [] # keeps track of needed changes
3396

  
3397
    for idx, disk in enumerate(instance.disks):
3398
      try:
3399
        changes = self.disks[idx]
3400
      except KeyError:
3401
        # Disk should not be recreated
3402
        to_skip.append(idx)
3403
        continue
3404

  
3405
      # update secondaries for disks, if needed
3406
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
3407
        # need to update the nodes and minors
3408
        assert len(self.op.nodes) == 2
3409
        assert len(disk.logical_id) == 6 # otherwise disk internals
3410
                                         # have changed
3411
        (_, _, old_port, _, _, old_secret) = disk.logical_id
3412
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
3413
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
3414
                  new_minors[0], new_minors[1], old_secret)
3415
        assert len(disk.logical_id) == len(new_id)
3416
      else:
3417
        new_id = None
3418

  
3419
      mods.append((idx, new_id, changes))
3420

  
3421
    # now that we have passed all asserts above, we can apply the mods
3422
    # in a single run (to avoid partial changes)
3423
    for idx, new_id, changes in mods:
3424
      disk = instance.disks[idx]
3425
      if new_id is not None:
3426
        assert disk.dev_type == constants.LD_DRBD8
3427
        disk.logical_id = new_id
3428
      if changes:
3429
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
3430
                    mode=changes.get(constants.IDISK_MODE, None))
3431

  
3432
    # change primary node, if needed
3433
    if self.op.nodes:
3434
      instance.primary_node = self.op.nodes[0]
3435
      self.LogWarning("Changing the instance's nodes, you will have to"
3436
                      " remove any disks left on the older nodes manually")
3437

  
3438
    if self.op.nodes:
3439
      self.cfg.Update(instance, feedback_fn)
3440

  
3441
    # All touched nodes must be locked
3442
    mylocks = self.owned_locks(locking.LEVEL_NODE)
3443
    assert mylocks.issuperset(frozenset(instance.all_nodes))
3444
    _CreateDisks(self, instance, to_skip=to_skip)
3445

  
3446

  
3447
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
3448
  """Shutdown block devices of an instance.
3449

  
3450
  This function checks if an instance is running, before calling
3451
  _ShutdownInstanceDisks.
3452

  
3453
  """
3454
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
3455
  _ShutdownInstanceDisks(lu, instance, disks=disks)
3456

  
3457

  
3458
def _DiskSizeInBytesToMebibytes(lu, size):
3459
  """Converts a disk size in bytes to mebibytes.
3460

  
3461
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
3462

  
3463
  """
3464
  (mib, remainder) = divmod(size, 1024 * 1024)
3465

  
3466
  if remainder != 0:
3467
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
3468
                  " to not overwrite existing data (%s bytes will not be"
3469
                  " wiped)", (1024 * 1024) - remainder)
3470
    mib += 1
3471

  
3472
  return mib
3473

  
3474

  
3475
class LUInstanceGrowDisk(LogicalUnit):
3476
  """Grow a disk of an instance.
3477

  
3478
  """
3479
  HPATH = "disk-grow"
3480
  HTYPE = constants.HTYPE_INSTANCE
3481
  REQ_BGL = False
3482

  
3483
  def ExpandNames(self):
3484
    self._ExpandAndLockInstance()
3485
    self.needed_locks[locking.LEVEL_NODE] = []
3486
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3487
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3488
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3489

  
3490
  def DeclareLocks(self, level):
3491
    if level == locking.LEVEL_NODE:
3492
      self._LockInstancesNodes()
3493
    elif level == locking.LEVEL_NODE_RES:
3494
      # Copy node locks
3495
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3496
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3497

  
3498
  def BuildHooksEnv(self):
3499
    """Build hooks env.
3500

  
3501
    This runs on the master, the primary and all the secondaries.
3502

  
3503
    """
3504
    env = {
3505
      "DISK": self.op.disk,
3506
      "AMOUNT": self.op.amount,
3507
      "ABSOLUTE": self.op.absolute,
3508
      }
3509
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3510
    return env
3511

  
3512
  def BuildHooksNodes(self):
3513
    """Build hooks nodes.
3514

  
3515
    """
3516
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3517
    return (nl, nl)
3518

  
3519
  def CheckPrereq(self):
3520
    """Check prerequisites.
3521

  
3522
    This checks that the instance is in the cluster.
3523

  
3524
    """
3525
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3526
    assert instance is not None, \
3527
      "Cannot retrieve locked instance %s" % self.op.instance_name
3528
    nodenames = list(instance.all_nodes)
3529
    for node in nodenames:
3530
      _CheckNodeOnline(self, node)
3531

  
3532
    self.instance = instance
3533

  
3534
    if instance.disk_template not in constants.DTS_GROWABLE:
3535
      raise errors.OpPrereqError("Instance's disk layout does not support"
3536
                                 " growing", errors.ECODE_INVAL)
3537

  
3538
    self.disk = instance.FindDisk(self.op.disk)
3539

  
3540
    if self.op.absolute:
3541
      self.target = self.op.amount
3542
      self.delta = self.target - self.disk.size
3543
      if self.delta < 0:
3544
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
3545
                                   "current disk size (%s)" %
3546
                                   (utils.FormatUnit(self.target, "h"),
3547
                                    utils.FormatUnit(self.disk.size, "h")),
3548
                                   errors.ECODE_STATE)
3549
    else:
3550
      self.delta = self.op.amount
3551
      self.target = self.disk.size + self.delta
3552
      if self.delta < 0:
3553
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
3554
                                   utils.FormatUnit(self.delta, "h"),
3555
                                   errors.ECODE_INVAL)
3556

  
3557
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
3558

  
3559
  def _CheckDiskSpace(self, nodenames, req_vgspace):
3560
    template = self.instance.disk_template
3561
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
3562
      # TODO: check the free disk space for file, when that feature will be
3563
      # supported
3564
      nodes = map(self.cfg.GetNodeInfo, nodenames)
3565
      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
3566
                        nodes)
3567
      if es_nodes:
3568
        # With exclusive storage we need to something smarter than just looking
3569
        # at free space; for now, let's simply abort the operation.
3570
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
3571
                                   " is enabled", errors.ECODE_STATE)
3572
      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
3573

  
3574
  def Exec(self, feedback_fn):
3575
    """Execute disk grow.
3576

  
3577
    """
3578
    instance = self.instance
3579
    disk = self.disk
3580

  
3581
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
3582
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3583
            self.owned_locks(locking.LEVEL_NODE_RES))
3584

  
3585
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
3586

  
3587
    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
3588
    if not disks_ok:
3589
      raise errors.OpExecError("Cannot activate block device to grow")
3590

  
3591
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
3592
                (self.op.disk, instance.name,
3593
                 utils.FormatUnit(self.delta, "h"),
3594
                 utils.FormatUnit(self.target, "h")))
3595

  
3596
    # First run all grow ops in dry-run mode
3597
    for node in instance.all_nodes:
3598
      self.cfg.SetDiskID(disk, node)
3599
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3600
                                           True, True)
3601
      result.Raise("Dry-run grow request failed to node %s" % node)
3602

  
3603
    if wipe_disks:
3604
      # Get disk size from primary node for wiping
3605
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
3606
      result.Raise("Failed to retrieve disk size from node '%s'" %
3607
                   instance.primary_node)
3608

  
3609
      (disk_size_in_bytes, ) = result.payload
3610

  
3611
      if disk_size_in_bytes is None:
3612
        raise errors.OpExecError("Failed to retrieve disk size from primary"
3613
                                 " node '%s'" % instance.primary_node)
3614

  
3615
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
3616

  
3617
      assert old_disk_size >= disk.size, \
3618
        ("Retrieved disk size too small (got %s, should be at least %s)" %
3619
         (old_disk_size, disk.size))
3620
    else:
3621
      old_disk_size = None
3622

  
3623
    # We know that (as far as we can test) operations across different
3624
    # nodes will succeed, time to run it for real on the backing storage
3625
    for node in instance.all_nodes:
3626
      self.cfg.SetDiskID(disk, node)
3627
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3628
                                           False, True)
3629
      result.Raise("Grow request failed to node %s" % node)
3630

  
3631
    # And now execute it for logical storage, on the primary node
3632
    node = instance.primary_node
3633
    self.cfg.SetDiskID(disk, node)
3634
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3635
                                         False, False)
3636
    result.Raise("Grow request failed to node %s" % node)
3637

  
3638
    disk.RecordGrow(self.delta)
3639
    self.cfg.Update(instance, feedback_fn)
3640

  
3641
    # Changes have been recorded, release node lock
3642
    _ReleaseLocks(self, locking.LEVEL_NODE)
3643

  
3644
    # Downgrade lock while waiting for sync
3645
    self.glm.downgrade(locking.LEVEL_INSTANCE)
3646

  
3647
    assert wipe_disks ^ (old_disk_size is None)
3648

  
3649
    if wipe_disks:
3650
      assert instance.disks[self.op.disk] == disk
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff