Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance.py @ 22b7f6f8

History | View | Annotate | Download (287.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instances."""
23

    
24
import OpenSSL
25
import copy
26
import itertools
27
import logging
28
import operator
29
import os
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import ht
36
from ganeti import hypervisor
37
from ganeti import locking
38
from ganeti.masterd import iallocator
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import qlang
45
from ganeti import rpc
46
from ganeti import utils
47
from ganeti import query
48

    
49
from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50
  ResultWithJobs, Tasklet
51

    
52
from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54
  _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55
  _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56
  _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57
  _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58
  _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \
59
  _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \
60
  _CheckInstanceState, _ExpandNodeName
61
from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
62
  _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
63
  _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \
64
  _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \
65
  _RemoveInstance, _ExpandCheckDisks
66

    
67
import ganeti.masterd.instance
68

    
69

    
70
_DISK_TEMPLATE_NAME_PREFIX = {
71
  constants.DT_PLAIN: "",
72
  constants.DT_RBD: ".rbd",
73
  constants.DT_EXT: ".ext",
74
  }
75

    
76

    
77
_DISK_TEMPLATE_DEVICE_TYPE = {
78
  constants.DT_PLAIN: constants.LD_LV,
79
  constants.DT_FILE: constants.LD_FILE,
80
  constants.DT_SHARED_FILE: constants.LD_FILE,
81
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
82
  constants.DT_RBD: constants.LD_RBD,
83
  constants.DT_EXT: constants.LD_EXT,
84
  }
85

    
86

    
87
#: Type description for changes as returned by L{ApplyContainerMods}'s
88
#: callbacks
89
_TApplyContModsCbChanges = \
90
  ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
91
    ht.TNonEmptyString,
92
    ht.TAny,
93
    ])))
94

    
95

    
96
def _CopyLockList(names):
97
  """Makes a copy of a list of lock names.
98

99
  Handles L{locking.ALL_SET} correctly.
100

101
  """
102
  if names == locking.ALL_SET:
103
    return locking.ALL_SET
104
  else:
105
    return names[:]
106

    
107

    
108
def _ReleaseLocks(lu, level, names=None, keep=None):
109
  """Releases locks owned by an LU.
110

111
  @type lu: L{LogicalUnit}
112
  @param level: Lock level
113
  @type names: list or None
114
  @param names: Names of locks to release
115
  @type keep: list or None
116
  @param keep: Names of locks to retain
117

118
  """
119
  assert not (keep is not None and names is not None), \
120
    "Only one of the 'names' and the 'keep' parameters can be given"
121

    
122
  if names is not None:
123
    should_release = names.__contains__
124
  elif keep:
125
    should_release = lambda name: name not in keep
126
  else:
127
    should_release = None
128

    
129
  owned = lu.owned_locks(level)
130
  if not owned:
131
    # Not owning any lock at this level, do nothing
132
    pass
133

    
134
  elif should_release:
135
    retain = []
136
    release = []
137

    
138
    # Determine which locks to release
139
    for name in owned:
140
      if should_release(name):
141
        release.append(name)
142
      else:
143
        retain.append(name)
144

    
145
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
146

    
147
    # Release just some locks
148
    lu.glm.release(level, names=release)
149

    
150
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
151
  else:
152
    # Release everything
153
    lu.glm.release(level)
154

    
155
    assert not lu.glm.is_owned(level), "No locks should be owned"
156

    
157

    
158
def _CheckHostnameSane(lu, name):
159
  """Ensures that a given hostname resolves to a 'sane' name.
160

161
  The given name is required to be a prefix of the resolved hostname,
162
  to prevent accidental mismatches.
163

164
  @param lu: the logical unit on behalf of which we're checking
165
  @param name: the name we should resolve and check
166
  @return: the resolved hostname object
167

168
  """
169
  hostname = netutils.GetHostname(name=name)
170
  if hostname.name != name:
171
    lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
172
  if not utils.MatchNameComponent(name, [hostname.name]):
173
    raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
174
                                " same as given hostname '%s'") %
175
                               (hostname.name, name), errors.ECODE_INVAL)
176
  return hostname
177

    
178

    
179
def _CheckOpportunisticLocking(op):
180
  """Generate error if opportunistic locking is not possible.
181

182
  """
183
  if op.opportunistic_locking and not op.iallocator:
184
    raise errors.OpPrereqError("Opportunistic locking is only available in"
185
                               " combination with an instance allocator",
186
                               errors.ECODE_INVAL)
187

    
188

    
189
def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
190
  """Wrapper around IAReqInstanceAlloc.
191

192
  @param op: The instance opcode
193
  @param disks: The computed disks
194
  @param nics: The computed nics
195
  @param beparams: The full filled beparams
196
  @param node_whitelist: List of nodes which should appear as online to the
197
    allocator (unless the node is already marked offline)
198

199
  @returns: A filled L{iallocator.IAReqInstanceAlloc}
200

201
  """
202
  spindle_use = beparams[constants.BE_SPINDLE_USE]
203
  return iallocator.IAReqInstanceAlloc(name=op.instance_name,
204
                                       disk_template=op.disk_template,
205
                                       tags=op.tags,
206
                                       os=op.os_type,
207
                                       vcpus=beparams[constants.BE_VCPUS],
208
                                       memory=beparams[constants.BE_MAXMEM],
209
                                       spindle_use=spindle_use,
210
                                       disks=disks,
211
                                       nics=[n.ToDict() for n in nics],
212
                                       hypervisor=op.hypervisor,
213
                                       node_whitelist=node_whitelist)
214

    
215

    
216
def _ComputeFullBeParams(op, cluster):
217
  """Computes the full beparams.
218

219
  @param op: The instance opcode
220
  @param cluster: The cluster config object
221

222
  @return: The fully filled beparams
223

224
  """
225
  default_beparams = cluster.beparams[constants.PP_DEFAULT]
226
  for param, value in op.beparams.iteritems():
227
    if value == constants.VALUE_AUTO:
228
      op.beparams[param] = default_beparams[param]
229
  objects.UpgradeBeParams(op.beparams)
230
  utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
231
  return cluster.SimpleFillBE(op.beparams)
232

    
233

    
234
def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
235
  """Computes the nics.
236

237
  @param op: The instance opcode
238
  @param cluster: Cluster configuration object
239
  @param default_ip: The default ip to assign
240
  @param cfg: An instance of the configuration object
241
  @param ec_id: Execution context ID
242

243
  @returns: The build up nics
244

245
  """
246
  nics = []
247
  for nic in op.nics:
248
    nic_mode_req = nic.get(constants.INIC_MODE, None)
249
    nic_mode = nic_mode_req
250
    if nic_mode is None or nic_mode == constants.VALUE_AUTO:
251
      nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
252

    
253
    net = nic.get(constants.INIC_NETWORK, None)
254
    link = nic.get(constants.NIC_LINK, None)
255
    ip = nic.get(constants.INIC_IP, None)
256

    
257
    if net is None or net.lower() == constants.VALUE_NONE:
258
      net = None
259
    else:
260
      if nic_mode_req is not None or link is not None:
261
        raise errors.OpPrereqError("If network is given, no mode or link"
262
                                   " is allowed to be passed",
263
                                   errors.ECODE_INVAL)
264

    
265
    # ip validity checks
266
    if ip is None or ip.lower() == constants.VALUE_NONE:
267
      nic_ip = None
268
    elif ip.lower() == constants.VALUE_AUTO:
269
      if not op.name_check:
270
        raise errors.OpPrereqError("IP address set to auto but name checks"
271
                                   " have been skipped",
272
                                   errors.ECODE_INVAL)
273
      nic_ip = default_ip
274
    else:
275
      # We defer pool operations until later, so that the iallocator has
276
      # filled in the instance's node(s) dimara
277
      if ip.lower() == constants.NIC_IP_POOL:
278
        if net is None:
279
          raise errors.OpPrereqError("if ip=pool, parameter network"
280
                                     " must be passed too",
281
                                     errors.ECODE_INVAL)
282

    
283
      elif not netutils.IPAddress.IsValid(ip):
284
        raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
285
                                   errors.ECODE_INVAL)
286

    
287
      nic_ip = ip
288

    
289
    # TODO: check the ip address for uniqueness
290
    if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
291
      raise errors.OpPrereqError("Routed nic mode requires an ip address",
292
                                 errors.ECODE_INVAL)
293

    
294
    # MAC address verification
295
    mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
296
    if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
297
      mac = utils.NormalizeAndValidateMac(mac)
298

    
299
      try:
300
        # TODO: We need to factor this out
301
        cfg.ReserveMAC(mac, ec_id)
302
      except errors.ReservationError:
303
        raise errors.OpPrereqError("MAC address %s already in use"
304
                                   " in cluster" % mac,
305
                                   errors.ECODE_NOTUNIQUE)
306

    
307
    #  Build nic parameters
308
    nicparams = {}
309
    if nic_mode_req:
310
      nicparams[constants.NIC_MODE] = nic_mode
311
    if link:
312
      nicparams[constants.NIC_LINK] = link
313

    
314
    check_params = cluster.SimpleFillNIC(nicparams)
315
    objects.NIC.CheckParameterSyntax(check_params)
316
    net_uuid = cfg.LookupNetwork(net)
317
    name = nic.get(constants.INIC_NAME, None)
318
    if name is not None and name.lower() == constants.VALUE_NONE:
319
      name = None
320
    nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
321
                          network=net_uuid, nicparams=nicparams)
322
    nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
323
    nics.append(nic_obj)
324

    
325
  return nics
326

    
327

    
328
def _CheckForConflictingIp(lu, ip, node):
329
  """In case of conflicting IP address raise error.
330

331
  @type ip: string
332
  @param ip: IP address
333
  @type node: string
334
  @param node: node name
335

336
  """
337
  (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
338
  if conf_net is not None:
339
    raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
340
                                " network %s, but the target NIC does not." %
341
                                (ip, conf_net)),
342
                               errors.ECODE_STATE)
343

    
344
  return (None, None)
345

    
346

    
347
def _CheckRADOSFreeSpace():
348
  """Compute disk size requirements inside the RADOS cluster.
349

350
  """
351
  # For the RADOS cluster we assume there is always enough space.
352
  pass
353

    
354

    
355
def _WaitForSync(lu, instance, disks=None, oneshot=False):
356
  """Sleep and poll for an instance's disk to sync.
357

358
  """
359
  if not instance.disks or disks is not None and not disks:
360
    return True
361

    
362
  disks = _ExpandCheckDisks(instance, disks)
363

    
364
  if not oneshot:
365
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
366

    
367
  node = instance.primary_node
368

    
369
  for dev in disks:
370
    lu.cfg.SetDiskID(dev, node)
371

    
372
  # TODO: Convert to utils.Retry
373

    
374
  retries = 0
375
  degr_retries = 10 # in seconds, as we sleep 1 second each time
376
  while True:
377
    max_time = 0
378
    done = True
379
    cumul_degraded = False
380
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
381
    msg = rstats.fail_msg
382
    if msg:
383
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
384
      retries += 1
385
      if retries >= 10:
386
        raise errors.RemoteError("Can't contact node %s for mirror data,"
387
                                 " aborting." % node)
388
      time.sleep(6)
389
      continue
390
    rstats = rstats.payload
391
    retries = 0
392
    for i, mstat in enumerate(rstats):
393
      if mstat is None:
394
        lu.LogWarning("Can't compute data for node %s/%s",
395
                      node, disks[i].iv_name)
396
        continue
397

    
398
      cumul_degraded = (cumul_degraded or
399
                        (mstat.is_degraded and mstat.sync_percent is None))
400
      if mstat.sync_percent is not None:
401
        done = False
402
        if mstat.estimated_time is not None:
403
          rem_time = ("%s remaining (estimated)" %
404
                      utils.FormatSeconds(mstat.estimated_time))
405
          max_time = mstat.estimated_time
406
        else:
407
          rem_time = "no time estimate"
408
        lu.LogInfo("- device %s: %5.2f%% done, %s",
409
                   disks[i].iv_name, mstat.sync_percent, rem_time)
410

    
411
    # if we're done but degraded, let's do a few small retries, to
412
    # make sure we see a stable and not transient situation; therefore
413
    # we force restart of the loop
414
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
415
      logging.info("Degraded disks found, %d retries left", degr_retries)
416
      degr_retries -= 1
417
      time.sleep(1)
418
      continue
419

    
420
    if done or oneshot:
421
      break
422

    
423
    time.sleep(min(60, max_time))
424

    
425
  if done:
426
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
427

    
428
  return not cumul_degraded
429

    
430

    
431
def _ComputeDisks(op, default_vg):
432
  """Computes the instance disks.
433

434
  @param op: The instance opcode
435
  @param default_vg: The default_vg to assume
436

437
  @return: The computed disks
438

439
  """
440
  disks = []
441
  for disk in op.disks:
442
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
443
    if mode not in constants.DISK_ACCESS_SET:
444
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
445
                                 mode, errors.ECODE_INVAL)
446
    size = disk.get(constants.IDISK_SIZE, None)
447
    if size is None:
448
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
449
    try:
450
      size = int(size)
451
    except (TypeError, ValueError):
452
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
453
                                 errors.ECODE_INVAL)
454

    
455
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
456
    if ext_provider and op.disk_template != constants.DT_EXT:
457
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
458
                                 " disk template, not %s" %
459
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
460
                                  op.disk_template), errors.ECODE_INVAL)
461

    
462
    data_vg = disk.get(constants.IDISK_VG, default_vg)
463
    name = disk.get(constants.IDISK_NAME, None)
464
    if name is not None and name.lower() == constants.VALUE_NONE:
465
      name = None
466
    new_disk = {
467
      constants.IDISK_SIZE: size,
468
      constants.IDISK_MODE: mode,
469
      constants.IDISK_VG: data_vg,
470
      constants.IDISK_NAME: name,
471
      }
472

    
473
    if constants.IDISK_METAVG in disk:
474
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
475
    if constants.IDISK_ADOPT in disk:
476
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
477

    
478
    # For extstorage, demand the `provider' option and add any
479
    # additional parameters (ext-params) to the dict
480
    if op.disk_template == constants.DT_EXT:
481
      if ext_provider:
482
        new_disk[constants.IDISK_PROVIDER] = ext_provider
483
        for key in disk:
484
          if key not in constants.IDISK_PARAMS:
485
            new_disk[key] = disk[key]
486
      else:
487
        raise errors.OpPrereqError("Missing provider for template '%s'" %
488
                                   constants.DT_EXT, errors.ECODE_INVAL)
489

    
490
    disks.append(new_disk)
491

    
492
  return disks
493

    
494

    
495
def _ComputeDiskSizePerVG(disk_template, disks):
496
  """Compute disk size requirements in the volume group
497

498
  """
499
  def _compute(disks, payload):
500
    """Universal algorithm.
501

502
    """
503
    vgs = {}
504
    for disk in disks:
505
      vgs[disk[constants.IDISK_VG]] = \
506
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
507

    
508
    return vgs
509

    
510
  # Required free disk space as a function of disk and swap space
511
  req_size_dict = {
512
    constants.DT_DISKLESS: {},
513
    constants.DT_PLAIN: _compute(disks, 0),
514
    # 128 MB are added for drbd metadata for each disk
515
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
516
    constants.DT_FILE: {},
517
    constants.DT_SHARED_FILE: {},
518
    }
519

    
520
  if disk_template not in req_size_dict:
521
    raise errors.ProgrammerError("Disk template '%s' size requirement"
522
                                 " is unknown" % disk_template)
523

    
524
  return req_size_dict[disk_template]
525

    
526

    
527
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
528
  """Checks if nodes have enough free disk space in the specified VG.
529

530
  This function checks if all given nodes have the needed amount of
531
  free disk. In case any node has less disk or we cannot get the
532
  information from the node, this function raises an OpPrereqError
533
  exception.
534

535
  @type lu: C{LogicalUnit}
536
  @param lu: a logical unit from which we get configuration data
537
  @type nodenames: C{list}
538
  @param nodenames: the list of node names to check
539
  @type vg: C{str}
540
  @param vg: the volume group to check
541
  @type requested: C{int}
542
  @param requested: the amount of disk in MiB to check for
543
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
544
      or we cannot check the node
545

546
  """
547
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
548
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
549
  for node in nodenames:
550
    info = nodeinfo[node]
551
    info.Raise("Cannot get current information from node %s" % node,
552
               prereq=True, ecode=errors.ECODE_ENVIRON)
553
    (_, (vg_info, ), _) = info.payload
554
    vg_free = vg_info.get("vg_free", None)
555
    if not isinstance(vg_free, int):
556
      raise errors.OpPrereqError("Can't compute free disk space on node"
557
                                 " %s for vg %s, result was '%s'" %
558
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
559
    if requested > vg_free:
560
      raise errors.OpPrereqError("Not enough disk space on target node %s"
561
                                 " vg %s: required %d MiB, available %d MiB" %
562
                                 (node, vg, requested, vg_free),
563
                                 errors.ECODE_NORES)
564

    
565

    
566
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
567
  """Checks if nodes have enough free disk space in all the VGs.
568

569
  This function checks if all given nodes have the needed amount of
570
  free disk. In case any node has less disk or we cannot get the
571
  information from the node, this function raises an OpPrereqError
572
  exception.
573

574
  @type lu: C{LogicalUnit}
575
  @param lu: a logical unit from which we get configuration data
576
  @type nodenames: C{list}
577
  @param nodenames: the list of node names to check
578
  @type req_sizes: C{dict}
579
  @param req_sizes: the hash of vg and corresponding amount of disk in
580
      MiB to check for
581
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
582
      or we cannot check the node
583

584
  """
585
  for vg, req_size in req_sizes.items():
586
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
587

    
588

    
589
def _CheckNodeVmCapable(lu, node):
590
  """Ensure that a given node is vm capable.
591

592
  @param lu: the LU on behalf of which we make the check
593
  @param node: the node to check
594
  @raise errors.OpPrereqError: if the node is not vm capable
595

596
  """
597
  if not lu.cfg.GetNodeInfo(node).vm_capable:
598
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
599
                               errors.ECODE_STATE)
600

    
601

    
602
def _ComputeIPolicyInstanceSpecViolation(
603
  ipolicy, instance_spec, disk_template,
604
  _compute_fn=_ComputeIPolicySpecViolation):
605
  """Compute if instance specs meets the specs of ipolicy.
606

607
  @type ipolicy: dict
608
  @param ipolicy: The ipolicy to verify against
609
  @param instance_spec: dict
610
  @param instance_spec: The instance spec to verify
611
  @type disk_template: string
612
  @param disk_template: the disk template of the instance
613
  @param _compute_fn: The function to verify ipolicy (unittest only)
614
  @see: L{_ComputeIPolicySpecViolation}
615

616
  """
617
  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
618
  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
619
  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
620
  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
621
  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
622
  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
623

    
624
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
625
                     disk_sizes, spindle_use, disk_template)
626

    
627

    
628
def _CheckOSVariant(os_obj, name):
629
  """Check whether an OS name conforms to the os variants specification.
630

631
  @type os_obj: L{objects.OS}
632
  @param os_obj: OS object to check
633
  @type name: string
634
  @param name: OS name passed by the user, to check for validity
635

636
  """
637
  variant = objects.OS.GetVariant(name)
638
  if not os_obj.supported_variants:
639
    if variant:
640
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
641
                                 " passed)" % (os_obj.name, variant),
642
                                 errors.ECODE_INVAL)
643
    return
644
  if not variant:
645
    raise errors.OpPrereqError("OS name must include a variant",
646
                               errors.ECODE_INVAL)
647

    
648
  if variant not in os_obj.supported_variants:
649
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
650

    
651

    
652
def _CheckNodeHasOS(lu, node, os_name, force_variant):
653
  """Ensure that a node supports a given OS.
654

655
  @param lu: the LU on behalf of which we make the check
656
  @param node: the node to check
657
  @param os_name: the OS to query about
658
  @param force_variant: whether to ignore variant errors
659
  @raise errors.OpPrereqError: if the node is not supporting the OS
660

661
  """
662
  result = lu.rpc.call_os_get(node, os_name)
663
  result.Raise("OS '%s' not in supported OS list for node %s" %
664
               (os_name, node),
665
               prereq=True, ecode=errors.ECODE_INVAL)
666
  if not force_variant:
667
    _CheckOSVariant(result.payload, os_name)
668

    
669

    
670
def _CheckNicsBridgesExist(lu, target_nics, target_node):
671
  """Check that the brigdes needed by a list of nics exist.
672

673
  """
674
  cluster = lu.cfg.GetClusterInfo()
675
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
676
  brlist = [params[constants.NIC_LINK] for params in paramslist
677
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
678
  if brlist:
679
    result = lu.rpc.call_bridges_exist(target_node, brlist)
680
    result.Raise("Error checking bridges on destination node '%s'" %
681
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
682

    
683

    
684
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
685
  """Checks if a node has enough free memory.
686

687
  This function checks if a given node has the needed amount of free
688
  memory. In case the node has less memory or we cannot get the
689
  information from the node, this function raises an OpPrereqError
690
  exception.
691

692
  @type lu: C{LogicalUnit}
693
  @param lu: a logical unit from which we get configuration data
694
  @type node: C{str}
695
  @param node: the node to check
696
  @type reason: C{str}
697
  @param reason: string to use in the error message
698
  @type requested: C{int}
699
  @param requested: the amount of memory in MiB to check for
700
  @type hypervisor_name: C{str}
701
  @param hypervisor_name: the hypervisor to ask for memory stats
702
  @rtype: integer
703
  @return: node current free memory
704
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
705
      we cannot check the node
706

707
  """
708
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
709
  nodeinfo[node].Raise("Can't get data from node %s" % node,
710
                       prereq=True, ecode=errors.ECODE_ENVIRON)
711
  (_, _, (hv_info, )) = nodeinfo[node].payload
712

    
713
  free_mem = hv_info.get("memory_free", None)
714
  if not isinstance(free_mem, int):
715
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
716
                               " was '%s'" % (node, free_mem),
717
                               errors.ECODE_ENVIRON)
718
  if requested > free_mem:
719
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
720
                               " needed %s MiB, available %s MiB" %
721
                               (node, reason, requested, free_mem),
722
                               errors.ECODE_NORES)
723
  return free_mem
724

    
725

    
726
def _GenerateUniqueNames(lu, exts):
727
  """Generate a suitable LV name.
728

729
  This will generate a logical volume name for the given instance.
730

731
  """
732
  results = []
733
  for val in exts:
734
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
735
    results.append("%s%s" % (new_id, val))
736
  return results
737

    
738

    
739
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
740
                         iv_name, p_minor, s_minor):
741
  """Generate a drbd8 device complete with its children.
742

743
  """
744
  assert len(vgnames) == len(names) == 2
745
  port = lu.cfg.AllocatePort()
746
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
747

    
748
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
749
                          logical_id=(vgnames[0], names[0]),
750
                          params={})
751
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
752
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
753
                          size=constants.DRBD_META_SIZE,
754
                          logical_id=(vgnames[1], names[1]),
755
                          params={})
756
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
757
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
758
                          logical_id=(primary, secondary, port,
759
                                      p_minor, s_minor,
760
                                      shared_secret),
761
                          children=[dev_data, dev_meta],
762
                          iv_name=iv_name, params={})
763
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
764
  return drbd_dev
765

    
766

    
767
def _GenerateDiskTemplate(
768
  lu, template_name, instance_name, primary_node, secondary_nodes,
769
  disk_info, file_storage_dir, file_driver, base_index,
770
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
771
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
772
  """Generate the entire disk layout for a given template type.
773

774
  """
775
  vgname = lu.cfg.GetVGName()
776
  disk_count = len(disk_info)
777
  disks = []
778

    
779
  if template_name == constants.DT_DISKLESS:
780
    pass
781
  elif template_name == constants.DT_DRBD8:
782
    if len(secondary_nodes) != 1:
783
      raise errors.ProgrammerError("Wrong template configuration")
784
    remote_node = secondary_nodes[0]
785
    minors = lu.cfg.AllocateDRBDMinor(
786
      [primary_node, remote_node] * len(disk_info), instance_name)
787

    
788
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
789
                                                       full_disk_params)
790
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
791

    
792
    names = []
793
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
794
                                               for i in range(disk_count)]):
795
      names.append(lv_prefix + "_data")
796
      names.append(lv_prefix + "_meta")
797
    for idx, disk in enumerate(disk_info):
798
      disk_index = idx + base_index
799
      data_vg = disk.get(constants.IDISK_VG, vgname)
800
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
801
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
802
                                      disk[constants.IDISK_SIZE],
803
                                      [data_vg, meta_vg],
804
                                      names[idx * 2:idx * 2 + 2],
805
                                      "disk/%d" % disk_index,
806
                                      minors[idx * 2], minors[idx * 2 + 1])
807
      disk_dev.mode = disk[constants.IDISK_MODE]
808
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
809
      disks.append(disk_dev)
810
  else:
811
    if secondary_nodes:
812
      raise errors.ProgrammerError("Wrong template configuration")
813

    
814
    if template_name == constants.DT_FILE:
815
      _req_file_storage()
816
    elif template_name == constants.DT_SHARED_FILE:
817
      _req_shr_file_storage()
818

    
819
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
820
    if name_prefix is None:
821
      names = None
822
    else:
823
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
824
                                        (name_prefix, base_index + i)
825
                                        for i in range(disk_count)])
826

    
827
    if template_name == constants.DT_PLAIN:
828

    
829
      def logical_id_fn(idx, _, disk):
830
        vg = disk.get(constants.IDISK_VG, vgname)
831
        return (vg, names[idx])
832

    
833
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
834
      logical_id_fn = \
835
        lambda _, disk_index, disk: (file_driver,
836
                                     "%s/disk%d" % (file_storage_dir,
837
                                                    disk_index))
838
    elif template_name == constants.DT_BLOCK:
839
      logical_id_fn = \
840
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
841
                                       disk[constants.IDISK_ADOPT])
842
    elif template_name == constants.DT_RBD:
843
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
844
    elif template_name == constants.DT_EXT:
845
      def logical_id_fn(idx, _, disk):
846
        provider = disk.get(constants.IDISK_PROVIDER, None)
847
        if provider is None:
848
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
849
                                       " not found", constants.DT_EXT,
850
                                       constants.IDISK_PROVIDER)
851
        return (provider, names[idx])
852
    else:
853
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
854

    
855
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
856

    
857
    for idx, disk in enumerate(disk_info):
858
      params = {}
859
      # Only for the Ext template add disk_info to params
860
      if template_name == constants.DT_EXT:
861
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
862
        for key in disk:
863
          if key not in constants.IDISK_PARAMS:
864
            params[key] = disk[key]
865
      disk_index = idx + base_index
866
      size = disk[constants.IDISK_SIZE]
867
      feedback_fn("* disk %s, size %s" %
868
                  (disk_index, utils.FormatUnit(size, "h")))
869
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
870
                              logical_id=logical_id_fn(idx, disk_index, disk),
871
                              iv_name="disk/%d" % disk_index,
872
                              mode=disk[constants.IDISK_MODE],
873
                              params=params)
874
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
875
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
876
      disks.append(disk_dev)
877

    
878
  return disks
879

    
880

    
881
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
882
                          excl_stor):
883
  """Create a single block device on a given node.
884

885
  This will not recurse over children of the device, so they must be
886
  created in advance.
887

888
  @param lu: the lu on whose behalf we execute
889
  @param node: the node on which to create the device
890
  @type instance: L{objects.Instance}
891
  @param instance: the instance which owns the device
892
  @type device: L{objects.Disk}
893
  @param device: the device to create
894
  @param info: the extra 'metadata' we should attach to the device
895
      (this will be represented as a LVM tag)
896
  @type force_open: boolean
897
  @param force_open: this parameter will be passes to the
898
      L{backend.BlockdevCreate} function where it specifies
899
      whether we run on primary or not, and it affects both
900
      the child assembly and the device own Open() execution
901
  @type excl_stor: boolean
902
  @param excl_stor: Whether exclusive_storage is active for the node
903

904
  """
905
  lu.cfg.SetDiskID(device, node)
906
  result = lu.rpc.call_blockdev_create(node, device, device.size,
907
                                       instance.name, force_open, info,
908
                                       excl_stor)
909
  result.Raise("Can't create block device %s on"
910
               " node %s for instance %s" % (device, node, instance.name))
911
  if device.physical_id is None:
912
    device.physical_id = result.payload
913

    
914

    
915
def _CreateBlockDevInner(lu, node, instance, device, force_create,
916
                         info, force_open, excl_stor):
917
  """Create a tree of block devices on a given node.
918

919
  If this device type has to be created on secondaries, create it and
920
  all its children.
921

922
  If not, just recurse to children keeping the same 'force' value.
923

924
  @attention: The device has to be annotated already.
925

926
  @param lu: the lu on whose behalf we execute
927
  @param node: the node on which to create the device
928
  @type instance: L{objects.Instance}
929
  @param instance: the instance which owns the device
930
  @type device: L{objects.Disk}
931
  @param device: the device to create
932
  @type force_create: boolean
933
  @param force_create: whether to force creation of this device; this
934
      will be change to True whenever we find a device which has
935
      CreateOnSecondary() attribute
936
  @param info: the extra 'metadata' we should attach to the device
937
      (this will be represented as a LVM tag)
938
  @type force_open: boolean
939
  @param force_open: this parameter will be passes to the
940
      L{backend.BlockdevCreate} function where it specifies
941
      whether we run on primary or not, and it affects both
942
      the child assembly and the device own Open() execution
943
  @type excl_stor: boolean
944
  @param excl_stor: Whether exclusive_storage is active for the node
945

946
  @return: list of created devices
947
  """
948
  created_devices = []
949
  try:
950
    if device.CreateOnSecondary():
951
      force_create = True
952

    
953
    if device.children:
954
      for child in device.children:
955
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
956
                                    info, force_open, excl_stor)
957
        created_devices.extend(devs)
958

    
959
    if not force_create:
960
      return created_devices
961

    
962
    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
963
                          excl_stor)
964
    # The device has been completely created, so there is no point in keeping
965
    # its subdevices in the list. We just add the device itself instead.
966
    created_devices = [(node, device)]
967
    return created_devices
968

    
969
  except errors.DeviceCreationError, e:
970
    e.created_devices.extend(created_devices)
971
    raise e
972
  except errors.OpExecError, e:
973
    raise errors.DeviceCreationError(str(e), created_devices)
974

    
975

    
976
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
977
  """Whether exclusive_storage is in effect for the given node.
978

979
  @type cfg: L{config.ConfigWriter}
980
  @param cfg: The cluster configuration
981
  @type nodename: string
982
  @param nodename: The node
983
  @rtype: bool
984
  @return: The effective value of exclusive_storage
985
  @raise errors.OpPrereqError: if no node exists with the given name
986

987
  """
988
  ni = cfg.GetNodeInfo(nodename)
989
  if ni is None:
990
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
991
                               errors.ECODE_NOENT)
992
  return _IsExclusiveStorageEnabledNode(cfg, ni)
993

    
994

    
995
def _CreateBlockDev(lu, node, instance, device, force_create, info,
996
                    force_open):
997
  """Wrapper around L{_CreateBlockDevInner}.
998

999
  This method annotates the root device first.
1000

1001
  """
1002
  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
1003
  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
1004
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
1005
                              force_open, excl_stor)
1006

    
1007

    
1008
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
1009
  """Create all disks for an instance.
1010

1011
  This abstracts away some work from AddInstance.
1012

1013
  @type lu: L{LogicalUnit}
1014
  @param lu: the logical unit on whose behalf we execute
1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance whose disks we should create
1017
  @type to_skip: list
1018
  @param to_skip: list of indices to skip
1019
  @type target_node: string
1020
  @param target_node: if passed, overrides the target node for creation
1021
  @rtype: boolean
1022
  @return: the success of the creation
1023

1024
  """
1025
  info = _GetInstanceInfoText(instance)
1026
  if target_node is None:
1027
    pnode = instance.primary_node
1028
    all_nodes = instance.all_nodes
1029
  else:
1030
    pnode = target_node
1031
    all_nodes = [pnode]
1032

    
1033
  if instance.disk_template in constants.DTS_FILEBASED:
1034
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
1035
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
1036

    
1037
    result.Raise("Failed to create directory '%s' on"
1038
                 " node %s" % (file_storage_dir, pnode))
1039

    
1040
  disks_created = []
1041
  # Note: this needs to be kept in sync with adding of disks in
1042
  # LUInstanceSetParams
1043
  for idx, device in enumerate(instance.disks):
1044
    if to_skip and idx in to_skip:
1045
      continue
1046
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
1047
    #HARDCODE
1048
    for node in all_nodes:
1049
      f_create = node == pnode
1050
      try:
1051
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
1052
        disks_created.append((node, device))
1053
      except errors.OpExecError:
1054
        logging.warning("Creating disk %s for instance '%s' failed",
1055
                        idx, instance.name)
1056
      except errors.DeviceCreationError, e:
1057
        logging.warning("Creating disk %s for instance '%s' failed",
1058
                        idx, instance.name)
1059
        disks_created.extend(e.created_devices)
1060
        for (node, disk) in disks_created:
1061
          lu.cfg.SetDiskID(disk, node)
1062
          result = lu.rpc.call_blockdev_remove(node, disk)
1063
          if result.fail_msg:
1064
            logging.warning("Failed to remove newly-created disk %s on node %s:"
1065
                            " %s", device, node, result.fail_msg)
1066
        raise errors.OpExecError(e.message)
1067

    
1068

    
1069
def _CalcEta(time_taken, written, total_size):
1070
  """Calculates the ETA based on size written and total size.
1071

1072
  @param time_taken: The time taken so far
1073
  @param written: amount written so far
1074
  @param total_size: The total size of data to be written
1075
  @return: The remaining time in seconds
1076

1077
  """
1078
  avg_time = time_taken / float(written)
1079
  return (total_size - written) * avg_time
1080

    
1081

    
1082
def _WipeDisks(lu, instance, disks=None):
1083
  """Wipes instance disks.
1084

1085
  @type lu: L{LogicalUnit}
1086
  @param lu: the logical unit on whose behalf we execute
1087
  @type instance: L{objects.Instance}
1088
  @param instance: the instance whose disks we should create
1089
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1090
  @param disks: Disk details; tuple contains disk index, disk object and the
1091
    start offset
1092

1093
  """
1094
  node = instance.primary_node
1095

    
1096
  if disks is None:
1097
    disks = [(idx, disk, 0)
1098
             for (idx, disk) in enumerate(instance.disks)]
1099

    
1100
  for (_, device, _) in disks:
1101
    lu.cfg.SetDiskID(device, node)
1102

    
1103
  logging.info("Pausing synchronization of disks of instance '%s'",
1104
               instance.name)
1105
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
1106
                                                  (map(compat.snd, disks),
1107
                                                   instance),
1108
                                                  True)
1109
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
1110

    
1111
  for idx, success in enumerate(result.payload):
1112
    if not success:
1113
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1114
                   " failed", idx, instance.name)
1115

    
1116
  try:
1117
    for (idx, device, offset) in disks:
1118
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1119
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1120
      wipe_chunk_size = \
1121
        int(min(constants.MAX_WIPE_CHUNK,
1122
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1123

    
1124
      size = device.size
1125
      last_output = 0
1126
      start_time = time.time()
1127

    
1128
      if offset == 0:
1129
        info_text = ""
1130
      else:
1131
        info_text = (" (from %s to %s)" %
1132
                     (utils.FormatUnit(offset, "h"),
1133
                      utils.FormatUnit(size, "h")))
1134

    
1135
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1136

    
1137
      logging.info("Wiping disk %d for instance %s on node %s using"
1138
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1139

    
1140
      while offset < size:
1141
        wipe_size = min(wipe_chunk_size, size - offset)
1142

    
1143
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1144
                      idx, offset, wipe_size)
1145

    
1146
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1147
                                           wipe_size)
1148
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1149
                     (idx, offset, wipe_size))
1150

    
1151
        now = time.time()
1152
        offset += wipe_size
1153
        if now - last_output >= 60:
1154
          eta = _CalcEta(now - start_time, offset, size)
1155
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1156
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1157
          last_output = now
1158
  finally:
1159
    logging.info("Resuming synchronization of disks for instance '%s'",
1160
                 instance.name)
1161

    
1162
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1163
                                                    (map(compat.snd, disks),
1164
                                                     instance),
1165
                                                    False)
1166

    
1167
    if result.fail_msg:
1168
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1169
                    node, result.fail_msg)
1170
    else:
1171
      for idx, success in enumerate(result.payload):
1172
        if not success:
1173
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1174
                        " failed", idx, instance.name)
1175

    
1176

    
1177
class LUInstanceCreate(LogicalUnit):
1178
  """Create an instance.
1179

1180
  """
1181
  HPATH = "instance-add"
1182
  HTYPE = constants.HTYPE_INSTANCE
1183
  REQ_BGL = False
1184

    
1185
  def CheckArguments(self):
1186
    """Check arguments.
1187

1188
    """
1189
    # do not require name_check to ease forward/backward compatibility
1190
    # for tools
1191
    if self.op.no_install and self.op.start:
1192
      self.LogInfo("No-installation mode selected, disabling startup")
1193
      self.op.start = False
1194
    # validate/normalize the instance name
1195
    self.op.instance_name = \
1196
      netutils.Hostname.GetNormalizedName(self.op.instance_name)
1197

    
1198
    if self.op.ip_check and not self.op.name_check:
1199
      # TODO: make the ip check more flexible and not depend on the name check
1200
      raise errors.OpPrereqError("Cannot do IP address check without a name"
1201
                                 " check", errors.ECODE_INVAL)
1202

    
1203
    # check nics' parameter names
1204
    for nic in self.op.nics:
1205
      utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
1206
    # check that NIC's parameters names are unique and valid
1207
    utils.ValidateDeviceNames("NIC", self.op.nics)
1208

    
1209
    # check that disk's names are unique and valid
1210
    utils.ValidateDeviceNames("disk", self.op.disks)
1211

    
1212
    cluster = self.cfg.GetClusterInfo()
1213
    if not self.op.disk_template in cluster.enabled_disk_templates:
1214
      raise errors.OpPrereqError("Cannot create an instance with disk template"
1215
                                 " '%s', because it is not enabled in the"
1216
                                 " cluster. Enabled disk templates are: %s." %
1217
                                 (self.op.disk_template,
1218
                                  ",".join(cluster.enabled_disk_templates)))
1219

    
1220
    # check disks. parameter names and consistent adopt/no-adopt strategy
1221
    has_adopt = has_no_adopt = False
1222
    for disk in self.op.disks:
1223
      if self.op.disk_template != constants.DT_EXT:
1224
        utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
1225
      if constants.IDISK_ADOPT in disk:
1226
        has_adopt = True
1227
      else:
1228
        has_no_adopt = True
1229
    if has_adopt and has_no_adopt:
1230
      raise errors.OpPrereqError("Either all disks are adopted or none is",
1231
                                 errors.ECODE_INVAL)
1232
    if has_adopt:
1233
      if self.op.disk_template not in constants.DTS_MAY_ADOPT:
1234
        raise errors.OpPrereqError("Disk adoption is not supported for the"
1235
                                   " '%s' disk template" %
1236
                                   self.op.disk_template,
1237
                                   errors.ECODE_INVAL)
1238
      if self.op.iallocator is not None:
1239
        raise errors.OpPrereqError("Disk adoption not allowed with an"
1240
                                   " iallocator script", errors.ECODE_INVAL)
1241
      if self.op.mode == constants.INSTANCE_IMPORT:
1242
        raise errors.OpPrereqError("Disk adoption not allowed for"
1243
                                   " instance import", errors.ECODE_INVAL)
1244
    else:
1245
      if self.op.disk_template in constants.DTS_MUST_ADOPT:
1246
        raise errors.OpPrereqError("Disk template %s requires disk adoption,"
1247
                                   " but no 'adopt' parameter given" %
1248
                                   self.op.disk_template,
1249
                                   errors.ECODE_INVAL)
1250

    
1251
    self.adopt_disks = has_adopt
1252

    
1253
    # instance name verification
1254
    if self.op.name_check:
1255
      self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
1256
      self.op.instance_name = self.hostname1.name
1257
      # used in CheckPrereq for ip ping check
1258
      self.check_ip = self.hostname1.ip
1259
    else:
1260
      self.check_ip = None
1261

    
1262
    # file storage checks
1263
    if (self.op.file_driver and
1264
        not self.op.file_driver in constants.FILE_DRIVER):
1265
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
1266
                                 self.op.file_driver, errors.ECODE_INVAL)
1267

    
1268
    if self.op.disk_template == constants.DT_FILE:
1269
      opcodes.RequireFileStorage()
1270
    elif self.op.disk_template == constants.DT_SHARED_FILE:
1271
      opcodes.RequireSharedFileStorage()
1272

    
1273
    ### Node/iallocator related checks
1274
    _CheckIAllocatorOrNode(self, "iallocator", "pnode")
1275

    
1276
    if self.op.pnode is not None:
1277
      if self.op.disk_template in constants.DTS_INT_MIRROR:
1278
        if self.op.snode is None:
1279
          raise errors.OpPrereqError("The networked disk templates need"
1280
                                     " a mirror node", errors.ECODE_INVAL)
1281
      elif self.op.snode:
1282
        self.LogWarning("Secondary node will be ignored on non-mirrored disk"
1283
                        " template")
1284
        self.op.snode = None
1285

    
1286
    _CheckOpportunisticLocking(self.op)
1287

    
1288
    self._cds = _GetClusterDomainSecret()
1289

    
1290
    if self.op.mode == constants.INSTANCE_IMPORT:
1291
      # On import force_variant must be True, because if we forced it at
1292
      # initial install, our only chance when importing it back is that it
1293
      # works again!
1294
      self.op.force_variant = True
1295

    
1296
      if self.op.no_install:
1297
        self.LogInfo("No-installation mode has no effect during import")
1298

    
1299
    elif self.op.mode == constants.INSTANCE_CREATE:
1300
      if self.op.os_type is None:
1301
        raise errors.OpPrereqError("No guest OS specified",
1302
                                   errors.ECODE_INVAL)
1303
      if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
1304
        raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
1305
                                   " installation" % self.op.os_type,
1306
                                   errors.ECODE_STATE)
1307
      if self.op.disk_template is None:
1308
        raise errors.OpPrereqError("No disk template specified",
1309
                                   errors.ECODE_INVAL)
1310

    
1311
    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1312
      # Check handshake to ensure both clusters have the same domain secret
1313
      src_handshake = self.op.source_handshake
1314
      if not src_handshake:
1315
        raise errors.OpPrereqError("Missing source handshake",
1316
                                   errors.ECODE_INVAL)
1317

    
1318
      errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
1319
                                                           src_handshake)
1320
      if errmsg:
1321
        raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
1322
                                   errors.ECODE_INVAL)
1323

    
1324
      # Load and check source CA
1325
      self.source_x509_ca_pem = self.op.source_x509_ca
1326
      if not self.source_x509_ca_pem:
1327
        raise errors.OpPrereqError("Missing source X509 CA",
1328
                                   errors.ECODE_INVAL)
1329

    
1330
      try:
1331
        (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
1332
                                                    self._cds)
1333
      except OpenSSL.crypto.Error, err:
1334
        raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
1335
                                   (err, ), errors.ECODE_INVAL)
1336

    
1337
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
1338
      if errcode is not None:
1339
        raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
1340
                                   errors.ECODE_INVAL)
1341

    
1342
      self.source_x509_ca = cert
1343

    
1344
      src_instance_name = self.op.source_instance_name
1345
      if not src_instance_name:
1346
        raise errors.OpPrereqError("Missing source instance name",
1347
                                   errors.ECODE_INVAL)
1348

    
1349
      self.source_instance_name = \
1350
        netutils.GetHostname(name=src_instance_name).name
1351

    
1352
    else:
1353
      raise errors.OpPrereqError("Invalid instance creation mode %r" %
1354
                                 self.op.mode, errors.ECODE_INVAL)
1355

    
1356
  def ExpandNames(self):
1357
    """ExpandNames for CreateInstance.
1358

1359
    Figure out the right locks for instance creation.
1360

1361
    """
1362
    self.needed_locks = {}
1363

    
1364
    instance_name = self.op.instance_name
1365
    # this is just a preventive check, but someone might still add this
1366
    # instance in the meantime, and creation will fail at lock-add time
1367
    if instance_name in self.cfg.GetInstanceList():
1368
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1369
                                 instance_name, errors.ECODE_EXISTS)
1370

    
1371
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
1372

    
1373
    if self.op.iallocator:
1374
      # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
1375
      # specifying a group on instance creation and then selecting nodes from
1376
      # that group
1377
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1378
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1379

    
1380
      if self.op.opportunistic_locking:
1381
        self.opportunistic_locks[locking.LEVEL_NODE] = True
1382
        self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
1383
    else:
1384
      self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
1385
      nodelist = [self.op.pnode]
1386
      if self.op.snode is not None:
1387
        self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
1388
        nodelist.append(self.op.snode)
1389
      self.needed_locks[locking.LEVEL_NODE] = nodelist
1390

    
1391
    # in case of import lock the source node too
1392
    if self.op.mode == constants.INSTANCE_IMPORT:
1393
      src_node = self.op.src_node
1394
      src_path = self.op.src_path
1395

    
1396
      if src_path is None:
1397
        self.op.src_path = src_path = self.op.instance_name
1398

    
1399
      if src_node is None:
1400
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1401
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1402
        self.op.src_node = None
1403
        if os.path.isabs(src_path):
1404
          raise errors.OpPrereqError("Importing an instance from a path"
1405
                                     " requires a source node option",
1406
                                     errors.ECODE_INVAL)
1407
      else:
1408
        self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
1409
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
1410
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
1411
        if not os.path.isabs(src_path):
1412
          self.op.src_path = src_path = \
1413
            utils.PathJoin(pathutils.EXPORT_DIR, src_path)
1414

    
1415
    self.needed_locks[locking.LEVEL_NODE_RES] = \
1416
      _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1417

    
1418
  def _RunAllocator(self):
1419
    """Run the allocator based on input opcode.
1420

1421
    """
1422
    if self.op.opportunistic_locking:
1423
      # Only consider nodes for which a lock is held
1424
      node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
1425
    else:
1426
      node_whitelist = None
1427

    
1428
    #TODO Export network to iallocator so that it chooses a pnode
1429
    #     in a nodegroup that has the desired network connected to
1430
    req = _CreateInstanceAllocRequest(self.op, self.disks,
1431
                                      self.nics, self.be_full,
1432
                                      node_whitelist)
1433
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1434

    
1435
    ial.Run(self.op.iallocator)
1436

    
1437
    if not ial.success:
1438
      # When opportunistic locks are used only a temporary failure is generated
1439
      if self.op.opportunistic_locking:
1440
        ecode = errors.ECODE_TEMP_NORES
1441
      else:
1442
        ecode = errors.ECODE_NORES
1443

    
1444
      raise errors.OpPrereqError("Can't compute nodes using"
1445
                                 " iallocator '%s': %s" %
1446
                                 (self.op.iallocator, ial.info),
1447
                                 ecode)
1448

    
1449
    self.op.pnode = ial.result[0]
1450
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
1451
                 self.op.instance_name, self.op.iallocator,
1452
                 utils.CommaJoin(ial.result))
1453

    
1454
    assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
1455

    
1456
    if req.RequiredNodes() == 2:
1457
      self.op.snode = ial.result[1]
1458

    
1459
  def BuildHooksEnv(self):
1460
    """Build hooks env.
1461

1462
    This runs on master, primary and secondary nodes of the instance.
1463

1464
    """
1465
    env = {
1466
      "ADD_MODE": self.op.mode,
1467
      }
1468
    if self.op.mode == constants.INSTANCE_IMPORT:
1469
      env["SRC_NODE"] = self.op.src_node
1470
      env["SRC_PATH"] = self.op.src_path
1471
      env["SRC_IMAGES"] = self.src_images
1472

    
1473
    env.update(_BuildInstanceHookEnv(
1474
      name=self.op.instance_name,
1475
      primary_node=self.op.pnode,
1476
      secondary_nodes=self.secondaries,
1477
      status=self.op.start,
1478
      os_type=self.op.os_type,
1479
      minmem=self.be_full[constants.BE_MINMEM],
1480
      maxmem=self.be_full[constants.BE_MAXMEM],
1481
      vcpus=self.be_full[constants.BE_VCPUS],
1482
      nics=_NICListToTuple(self, self.nics),
1483
      disk_template=self.op.disk_template,
1484
      disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
1485
              d[constants.IDISK_MODE]) for d in self.disks],
1486
      bep=self.be_full,
1487
      hvp=self.hv_full,
1488
      hypervisor_name=self.op.hypervisor,
1489
      tags=self.op.tags,
1490
      ))
1491

    
1492
    return env
1493

    
1494
  def BuildHooksNodes(self):
1495
    """Build hooks nodes.
1496

1497
    """
1498
    nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
1499
    return nl, nl
1500

    
1501
  def _ReadExportInfo(self):
1502
    """Reads the export information from disk.
1503

1504
    It will override the opcode source node and path with the actual
1505
    information, if these two were not specified before.
1506

1507
    @return: the export information
1508

1509
    """
1510
    assert self.op.mode == constants.INSTANCE_IMPORT
1511

    
1512
    src_node = self.op.src_node
1513
    src_path = self.op.src_path
1514

    
1515
    if src_node is None:
1516
      locked_nodes = self.owned_locks(locking.LEVEL_NODE)
1517
      exp_list = self.rpc.call_export_list(locked_nodes)
1518
      found = False
1519
      for node in exp_list:
1520
        if exp_list[node].fail_msg:
1521
          continue
1522
        if src_path in exp_list[node].payload:
1523
          found = True
1524
          self.op.src_node = src_node = node
1525
          self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
1526
                                                       src_path)
1527
          break
1528
      if not found:
1529
        raise errors.OpPrereqError("No export found for relative path %s" %
1530
                                   src_path, errors.ECODE_INVAL)
1531

    
1532
    _CheckNodeOnline(self, src_node)
1533
    result = self.rpc.call_export_info(src_node, src_path)
1534
    result.Raise("No export or invalid export found in dir %s" % src_path)
1535

    
1536
    export_info = objects.SerializableConfigParser.Loads(str(result.payload))
1537
    if not export_info.has_section(constants.INISECT_EXP):
1538
      raise errors.ProgrammerError("Corrupted export config",
1539
                                   errors.ECODE_ENVIRON)
1540

    
1541
    ei_version = export_info.get(constants.INISECT_EXP, "version")
1542
    if (int(ei_version) != constants.EXPORT_VERSION):
1543
      raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
1544
                                 (ei_version, constants.EXPORT_VERSION),
1545
                                 errors.ECODE_ENVIRON)
1546
    return export_info
1547

    
1548
  def _ReadExportParams(self, einfo):
1549
    """Use export parameters as defaults.
1550

1551
    In case the opcode doesn't specify (as in override) some instance
1552
    parameters, then try to use them from the export information, if
1553
    that declares them.
1554

1555
    """
1556
    self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
1557

    
1558
    if self.op.disk_template is None:
1559
      if einfo.has_option(constants.INISECT_INS, "disk_template"):
1560
        self.op.disk_template = einfo.get(constants.INISECT_INS,
1561
                                          "disk_template")
1562
        if self.op.disk_template not in constants.DISK_TEMPLATES:
1563
          raise errors.OpPrereqError("Disk template specified in configuration"
1564
                                     " file is not one of the allowed values:"
1565
                                     " %s" %
1566
                                     " ".join(constants.DISK_TEMPLATES),
1567
                                     errors.ECODE_INVAL)
1568
      else:
1569
        raise errors.OpPrereqError("No disk template specified and the export"
1570
                                   " is missing the disk_template information",
1571
                                   errors.ECODE_INVAL)
1572

    
1573
    if not self.op.disks:
1574
      disks = []
1575
      # TODO: import the disk iv_name too
1576
      for idx in range(constants.MAX_DISKS):
1577
        if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
1578
          disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
1579
          disks.append({constants.IDISK_SIZE: disk_sz})
1580
      self.op.disks = disks
1581
      if not disks and self.op.disk_template != constants.DT_DISKLESS:
1582
        raise errors.OpPrereqError("No disk info specified and the export"
1583
                                   " is missing the disk information",
1584
                                   errors.ECODE_INVAL)
1585

    
1586
    if not self.op.nics:
1587
      nics = []
1588
      for idx in range(constants.MAX_NICS):
1589
        if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
1590
          ndict = {}
1591
          for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
1592
            v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
1593
            ndict[name] = v
1594
          nics.append(ndict)
1595
        else:
1596
          break
1597
      self.op.nics = nics
1598

    
1599
    if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
1600
      self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
1601

    
1602
    if (self.op.hypervisor is None and
1603
        einfo.has_option(constants.INISECT_INS, "hypervisor")):
1604
      self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
1605

    
1606
    if einfo.has_section(constants.INISECT_HYP):
1607
      # use the export parameters but do not override the ones
1608
      # specified by the user
1609
      for name, value in einfo.items(constants.INISECT_HYP):
1610
        if name not in self.op.hvparams:
1611
          self.op.hvparams[name] = value
1612

    
1613
    if einfo.has_section(constants.INISECT_BEP):
1614
      # use the parameters, without overriding
1615
      for name, value in einfo.items(constants.INISECT_BEP):
1616
        if name not in self.op.beparams:
1617
          self.op.beparams[name] = value
1618
        # Compatibility for the old "memory" be param
1619
        if name == constants.BE_MEMORY:
1620
          if constants.BE_MAXMEM not in self.op.beparams:
1621
            self.op.beparams[constants.BE_MAXMEM] = value
1622
          if constants.BE_MINMEM not in self.op.beparams:
1623
            self.op.beparams[constants.BE_MINMEM] = value
1624
    else:
1625
      # try to read the parameters old style, from the main section
1626
      for name in constants.BES_PARAMETERS:
1627
        if (name not in self.op.beparams and
1628
            einfo.has_option(constants.INISECT_INS, name)):
1629
          self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
1630

    
1631
    if einfo.has_section(constants.INISECT_OSP):
1632
      # use the parameters, without overriding
1633
      for name, value in einfo.items(constants.INISECT_OSP):
1634
        if name not in self.op.osparams:
1635
          self.op.osparams[name] = value
1636

    
1637
  def _RevertToDefaults(self, cluster):
1638
    """Revert the instance parameters to the default values.
1639

1640
    """
1641
    # hvparams
1642
    hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
1643
    for name in self.op.hvparams.keys():
1644
      if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
1645
        del self.op.hvparams[name]
1646
    # beparams
1647
    be_defs = cluster.SimpleFillBE({})
1648
    for name in self.op.beparams.keys():
1649
      if name in be_defs and be_defs[name] == self.op.beparams[name]:
1650
        del self.op.beparams[name]
1651
    # nic params
1652
    nic_defs = cluster.SimpleFillNIC({})
1653
    for nic in self.op.nics:
1654
      for name in constants.NICS_PARAMETERS:
1655
        if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
1656
          del nic[name]
1657
    # osparams
1658
    os_defs = cluster.SimpleFillOS(self.op.os_type, {})
1659
    for name in self.op.osparams.keys():
1660
      if name in os_defs and os_defs[name] == self.op.osparams[name]:
1661
        del self.op.osparams[name]
1662

    
1663
  def _CalculateFileStorageDir(self):
1664
    """Calculate final instance file storage dir.
1665

1666
    """
1667
    # file storage dir calculation/check
1668
    self.instance_file_storage_dir = None
1669
    if self.op.disk_template in constants.DTS_FILEBASED:
1670
      # build the full file storage dir path
1671
      joinargs = []
1672

    
1673
      if self.op.disk_template == constants.DT_SHARED_FILE:
1674
        get_fsd_fn = self.cfg.GetSharedFileStorageDir
1675
      else:
1676
        get_fsd_fn = self.cfg.GetFileStorageDir
1677

    
1678
      cfg_storagedir = get_fsd_fn()
1679
      if not cfg_storagedir:
1680
        raise errors.OpPrereqError("Cluster file storage dir not defined",
1681
                                   errors.ECODE_STATE)
1682
      joinargs.append(cfg_storagedir)
1683

    
1684
      if self.op.file_storage_dir is not None:
1685
        joinargs.append(self.op.file_storage_dir)
1686

    
1687
      joinargs.append(self.op.instance_name)
1688

    
1689
      # pylint: disable=W0142
1690
      self.instance_file_storage_dir = utils.PathJoin(*joinargs)
1691

    
1692
  def CheckPrereq(self): # pylint: disable=R0914
1693
    """Check prerequisites.
1694

1695
    """
1696
    self._CalculateFileStorageDir()
1697

    
1698
    if self.op.mode == constants.INSTANCE_IMPORT:
1699
      export_info = self._ReadExportInfo()
1700
      self._ReadExportParams(export_info)
1701
      self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
1702
    else:
1703
      self._old_instance_name = None
1704

    
1705
    if (not self.cfg.GetVGName() and
1706
        self.op.disk_template not in constants.DTS_NOT_LVM):
1707
      raise errors.OpPrereqError("Cluster does not support lvm-based"
1708
                                 " instances", errors.ECODE_STATE)
1709

    
1710
    if (self.op.hypervisor is None or
1711
        self.op.hypervisor == constants.VALUE_AUTO):
1712
      self.op.hypervisor = self.cfg.GetHypervisorType()
1713

    
1714
    cluster = self.cfg.GetClusterInfo()
1715
    enabled_hvs = cluster.enabled_hypervisors
1716
    if self.op.hypervisor not in enabled_hvs:
1717
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
1718
                                 " cluster (%s)" %
1719
                                 (self.op.hypervisor, ",".join(enabled_hvs)),
1720
                                 errors.ECODE_STATE)
1721

    
1722
    # Check tag validity
1723
    for tag in self.op.tags:
1724
      objects.TaggableObject.ValidateTag(tag)
1725

    
1726
    # check hypervisor parameter syntax (locally)
1727
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
1728
    filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
1729
                                      self.op.hvparams)
1730
    hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
1731
    hv_type.CheckParameterSyntax(filled_hvp)
1732
    self.hv_full = filled_hvp
1733
    # check that we don't specify global parameters on an instance
1734
    _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
1735
                          "instance", "cluster")
1736

    
1737
    # fill and remember the beparams dict
1738
    self.be_full = _ComputeFullBeParams(self.op, cluster)
1739

    
1740
    # build os parameters
1741
    self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
1742

    
1743
    # now that hvp/bep are in final format, let's reset to defaults,
1744
    # if told to do so
1745
    if self.op.identify_defaults:
1746
      self._RevertToDefaults(cluster)
1747

    
1748
    # NIC buildup
1749
    self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
1750
                             self.proc.GetECId())
1751

    
1752
    # disk checks/pre-build
1753
    default_vg = self.cfg.GetVGName()
1754
    self.disks = _ComputeDisks(self.op, default_vg)
1755

    
1756
    if self.op.mode == constants.INSTANCE_IMPORT:
1757
      disk_images = []
1758
      for idx in range(len(self.disks)):
1759
        option = "disk%d_dump" % idx
1760
        if export_info.has_option(constants.INISECT_INS, option):
1761
          # FIXME: are the old os-es, disk sizes, etc. useful?
1762
          export_name = export_info.get(constants.INISECT_INS, option)
1763
          image = utils.PathJoin(self.op.src_path, export_name)
1764
          disk_images.append(image)
1765
        else:
1766
          disk_images.append(False)
1767

    
1768
      self.src_images = disk_images
1769

    
1770
      if self.op.instance_name == self._old_instance_name:
1771
        for idx, nic in enumerate(self.nics):
1772
          if nic.mac == constants.VALUE_AUTO:
1773
            nic_mac_ini = "nic%d_mac" % idx
1774
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
1775

    
1776
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
1777

    
1778
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
1779
    if self.op.ip_check:
1780
      if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1781
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
1782
                                   (self.check_ip, self.op.instance_name),
1783
                                   errors.ECODE_NOTUNIQUE)
1784

    
1785
    #### mac address generation
1786
    # By generating here the mac address both the allocator and the hooks get
1787
    # the real final mac address rather than the 'auto' or 'generate' value.
1788
    # There is a race condition between the generation and the instance object
1789
    # creation, which means that we know the mac is valid now, but we're not
1790
    # sure it will be when we actually add the instance. If things go bad
1791
    # adding the instance will abort because of a duplicate mac, and the
1792
    # creation job will fail.
1793
    for nic in self.nics:
1794
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1795
        nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1796

    
1797
    #### allocator run
1798

    
1799
    if self.op.iallocator is not None:
1800
      self._RunAllocator()
1801

    
1802
    # Release all unneeded node locks
1803
    keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1804
    _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1805
    _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1806
    _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1807

    
1808
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1809
            self.owned_locks(locking.LEVEL_NODE_RES)), \
1810
      "Node locks differ from node resource locks"
1811

    
1812
    #### node related checks
1813

    
1814
    # check primary node
1815
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1816
    assert self.pnode is not None, \
1817
      "Cannot retrieve locked node %s" % self.op.pnode
1818
    if pnode.offline:
1819
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1820
                                 pnode.name, errors.ECODE_STATE)
1821
    if pnode.drained:
1822
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1823
                                 pnode.name, errors.ECODE_STATE)
1824
    if not pnode.vm_capable:
1825
      raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1826
                                 " '%s'" % pnode.name, errors.ECODE_STATE)
1827

    
1828
    self.secondaries = []
1829

    
1830
    # Fill in any IPs from IP pools. This must happen here, because we need to
1831
    # know the nic's primary node, as specified by the iallocator
1832
    for idx, nic in enumerate(self.nics):
1833
      net_uuid = nic.network
1834
      if net_uuid is not None:
1835
        nobj = self.cfg.GetNetwork(net_uuid)
1836
        netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1837
        if netparams is None:
1838
          raise errors.OpPrereqError("No netparams found for network"
1839
                                     " %s. Propably not connected to"
1840
                                     " node's %s nodegroup" %
1841
                                     (nobj.name, self.pnode.name),
1842
                                     errors.ECODE_INVAL)
1843
        self.LogInfo("NIC/%d inherits netparams %s" %
1844
                     (idx, netparams.values()))
1845
        nic.nicparams = dict(netparams)
1846
        if nic.ip is not None:
1847
          if nic.ip.lower() == constants.NIC_IP_POOL:
1848
            try:
1849
              nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1850
            except errors.ReservationError:
1851
              raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1852
                                         " from the address pool" % idx,
1853
                                         errors.ECODE_STATE)
1854
            self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1855
          else:
1856
            try:
1857
              self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1858
            except errors.ReservationError:
1859
              raise errors.OpPrereqError("IP address %s already in use"
1860
                                         " or does not belong to network %s" %
1861
                                         (nic.ip, nobj.name),
1862
                                         errors.ECODE_NOTUNIQUE)
1863

    
1864
      # net is None, ip None or given
1865
      elif self.op.conflicts_check:
1866
        _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1867

    
1868
    # mirror node verification
1869
    if self.op.disk_template in constants.DTS_INT_MIRROR:
1870
      if self.op.snode == pnode.name:
1871
        raise errors.OpPrereqError("The secondary node cannot be the"
1872
                                   " primary node", errors.ECODE_INVAL)
1873
      _CheckNodeOnline(self, self.op.snode)
1874
      _CheckNodeNotDrained(self, self.op.snode)
1875
      _CheckNodeVmCapable(self, self.op.snode)
1876
      self.secondaries.append(self.op.snode)
1877

    
1878
      snode = self.cfg.GetNodeInfo(self.op.snode)
1879
      if pnode.group != snode.group:
1880
        self.LogWarning("The primary and secondary nodes are in two"
1881
                        " different node groups; the disk parameters"
1882
                        " from the first disk's node group will be"
1883
                        " used")
1884

    
1885
    if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1886
      nodes = [pnode]
1887
      if self.op.disk_template in constants.DTS_INT_MIRROR:
1888
        nodes.append(snode)
1889
      has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1890
      if compat.any(map(has_es, nodes)):
1891
        raise errors.OpPrereqError("Disk template %s not supported with"
1892
                                   " exclusive storage" % self.op.disk_template,
1893
                                   errors.ECODE_STATE)
1894

    
1895
    nodenames = [pnode.name] + self.secondaries
1896

    
1897
    if not self.adopt_disks:
1898
      if self.op.disk_template == constants.DT_RBD:
1899
        # _CheckRADOSFreeSpace() is just a placeholder.
1900
        # Any function that checks prerequisites can be placed here.
1901
        # Check if there is enough space on the RADOS cluster.
1902
        _CheckRADOSFreeSpace()
1903
      elif self.op.disk_template == constants.DT_EXT:
1904
        # FIXME: Function that checks prereqs if needed
1905
        pass
1906
      else:
1907
        # Check lv size requirements, if not adopting
1908
        req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1909
        _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1910

    
1911
    elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1912
      all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1913
                                disk[constants.IDISK_ADOPT])
1914
                     for disk in self.disks])
1915
      if len(all_lvs) != len(self.disks):
1916
        raise errors.OpPrereqError("Duplicate volume names given for adoption",
1917
                                   errors.ECODE_INVAL)
1918
      for lv_name in all_lvs:
1919
        try:
1920
          # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1921
          # to ReserveLV uses the same syntax
1922
          self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1923
        except errors.ReservationError:
1924
          raise errors.OpPrereqError("LV named %s used by another instance" %
1925
                                     lv_name, errors.ECODE_NOTUNIQUE)
1926

    
1927
      vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1928
      vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1929

    
1930
      node_lvs = self.rpc.call_lv_list([pnode.name],
1931
                                       vg_names.payload.keys())[pnode.name]
1932
      node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1933
      node_lvs = node_lvs.payload
1934

    
1935
      delta = all_lvs.difference(node_lvs.keys())
1936
      if delta:
1937
        raise errors.OpPrereqError("Missing logical volume(s): %s" %
1938
                                   utils.CommaJoin(delta),
1939
                                   errors.ECODE_INVAL)
1940
      online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1941
      if online_lvs:
1942
        raise errors.OpPrereqError("Online logical volumes found, cannot"
1943
                                   " adopt: %s" % utils.CommaJoin(online_lvs),
1944
                                   errors.ECODE_STATE)
1945
      # update the size of disk based on what is found
1946
      for dsk in self.disks:
1947
        dsk[constants.IDISK_SIZE] = \
1948
          int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1949
                                        dsk[constants.IDISK_ADOPT])][0]))
1950

    
1951
    elif self.op.disk_template == constants.DT_BLOCK:
1952
      # Normalize and de-duplicate device paths
1953
      all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1954
                       for disk in self.disks])
1955
      if len(all_disks) != len(self.disks):
1956
        raise errors.OpPrereqError("Duplicate disk names given for adoption",
1957
                                   errors.ECODE_INVAL)
1958
      baddisks = [d for d in all_disks
1959
                  if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1960
      if baddisks:
1961
        raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1962
                                   " cannot be adopted" %
1963
                                   (utils.CommaJoin(baddisks),
1964
                                    constants.ADOPTABLE_BLOCKDEV_ROOT),
1965
                                   errors.ECODE_INVAL)
1966

    
1967
      node_disks = self.rpc.call_bdev_sizes([pnode.name],
1968
                                            list(all_disks))[pnode.name]
1969
      node_disks.Raise("Cannot get block device information from node %s" %
1970
                       pnode.name)
1971
      node_disks = node_disks.payload
1972
      delta = all_disks.difference(node_disks.keys())
1973
      if delta:
1974
        raise errors.OpPrereqError("Missing block device(s): %s" %
1975
                                   utils.CommaJoin(delta),
1976
                                   errors.ECODE_INVAL)
1977
      for dsk in self.disks:
1978
        dsk[constants.IDISK_SIZE] = \
1979
          int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1980

    
1981
    # Verify instance specs
1982
    spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1983
    ispec = {
1984
      constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1985
      constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1986
      constants.ISPEC_DISK_COUNT: len(self.disks),
1987
      constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1988
                                  for disk in self.disks],
1989
      constants.ISPEC_NIC_COUNT: len(self.nics),
1990
      constants.ISPEC_SPINDLE_USE: spindle_use,
1991
      }
1992

    
1993
    group_info = self.cfg.GetNodeGroup(pnode.group)
1994
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1995
    res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1996
                                               self.op.disk_template)
1997
    if not self.op.ignore_ipolicy and res:
1998
      msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1999
             (pnode.group, group_info.name, utils.CommaJoin(res)))
2000
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2001

    
2002
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
2003

    
2004
    _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
2005
    # check OS parameters (remotely)
2006
    _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
2007

    
2008
    _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
2009

    
2010
    #TODO: _CheckExtParams (remotely)
2011
    # Check parameters for extstorage
2012

    
2013
    # memory check on primary node
2014
    #TODO(dynmem): use MINMEM for checking
2015
    if self.op.start:
2016
      _CheckNodeFreeMemory(self, self.pnode.name,
2017
                           "creating instance %s" % self.op.instance_name,
2018
                           self.be_full[constants.BE_MAXMEM],
2019
                           self.op.hypervisor)
2020

    
2021
    self.dry_run_result = list(nodenames)
2022

    
2023
  def Exec(self, feedback_fn):
2024
    """Create and add the instance to the cluster.
2025

2026
    """
2027
    instance = self.op.instance_name
2028
    pnode_name = self.pnode.name
2029

    
2030
    assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
2031
                self.owned_locks(locking.LEVEL_NODE)), \
2032
      "Node locks differ from node resource locks"
2033
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2034

    
2035
    ht_kind = self.op.hypervisor
2036
    if ht_kind in constants.HTS_REQ_PORT:
2037
      network_port = self.cfg.AllocatePort()
2038
    else:
2039
      network_port = None
2040

    
2041
    # This is ugly but we got a chicken-egg problem here
2042
    # We can only take the group disk parameters, as the instance
2043
    # has no disks yet (we are generating them right here).
2044
    node = self.cfg.GetNodeInfo(pnode_name)
2045
    nodegroup = self.cfg.GetNodeGroup(node.group)
2046
    disks = _GenerateDiskTemplate(self,
2047
                                  self.op.disk_template,
2048
                                  instance, pnode_name,
2049
                                  self.secondaries,
2050
                                  self.disks,
2051
                                  self.instance_file_storage_dir,
2052
                                  self.op.file_driver,
2053
                                  0,
2054
                                  feedback_fn,
2055
                                  self.cfg.GetGroupDiskParams(nodegroup))
2056

    
2057
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2058
                            primary_node=pnode_name,
2059
                            nics=self.nics, disks=disks,
2060
                            disk_template=self.op.disk_template,
2061
                            admin_state=constants.ADMINST_DOWN,
2062
                            network_port=network_port,
2063
                            beparams=self.op.beparams,
2064
                            hvparams=self.op.hvparams,
2065
                            hypervisor=self.op.hypervisor,
2066
                            osparams=self.op.osparams,
2067
                            )
2068

    
2069
    if self.op.tags:
2070
      for tag in self.op.tags:
2071
        iobj.AddTag(tag)
2072

    
2073
    if self.adopt_disks:
2074
      if self.op.disk_template == constants.DT_PLAIN:
2075
        # rename LVs to the newly-generated names; we need to construct
2076
        # 'fake' LV disks with the old data, plus the new unique_id
2077
        tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
2078
        rename_to = []
2079
        for t_dsk, a_dsk in zip(tmp_disks, self.disks):
2080
          rename_to.append(t_dsk.logical_id)
2081
          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
2082
          self.cfg.SetDiskID(t_dsk, pnode_name)
2083
        result = self.rpc.call_blockdev_rename(pnode_name,
2084
                                               zip(tmp_disks, rename_to))
2085
        result.Raise("Failed to rename adoped LVs")
2086
    else:
2087
      feedback_fn("* creating instance disks...")
2088
      try:
2089
        _CreateDisks(self, iobj)
2090
      except errors.OpExecError:
2091
        self.LogWarning("Device creation failed")
2092
        self.cfg.ReleaseDRBDMinors(instance)
2093
        raise
2094

    
2095
    feedback_fn("adding instance %s to cluster config" % instance)
2096

    
2097
    self.cfg.AddInstance(iobj, self.proc.GetECId())
2098

    
2099
    # Declare that we don't want to remove the instance lock anymore, as we've
2100
    # added the instance to the config
2101
    del self.remove_locks[locking.LEVEL_INSTANCE]
2102

    
2103
    if self.op.mode == constants.INSTANCE_IMPORT:
2104
      # Release unused nodes
2105
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
2106
    else:
2107
      # Release all nodes
2108
      _ReleaseLocks(self, locking.LEVEL_NODE)
2109

    
2110
    disk_abort = False
2111
    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
2112
      feedback_fn("* wiping instance disks...")
2113
      try:
2114
        _WipeDisks(self, iobj)
2115
      except errors.OpExecError, err:
2116
        logging.exception("Wiping disks failed")
2117
        self.LogWarning("Wiping instance disks failed (%s)", err)
2118
        disk_abort = True
2119

    
2120
    if disk_abort:
2121
      # Something is already wrong with the disks, don't do anything else
2122
      pass
2123
    elif self.op.wait_for_sync:
2124
      disk_abort = not _WaitForSync(self, iobj)
2125
    elif iobj.disk_template in constants.DTS_INT_MIRROR:
2126
      # make sure the disks are not degraded (still sync-ing is ok)
2127
      feedback_fn("* checking mirrors status")
2128
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
2129
    else:
2130
      disk_abort = False
2131

    
2132
    if disk_abort:
2133
      _RemoveDisks(self, iobj)
2134
      self.cfg.RemoveInstance(iobj.name)
2135
      # Make sure the instance lock gets removed
2136
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
2137
      raise errors.OpExecError("There are some degraded disks for"
2138
                               " this instance")
2139

    
2140
    # Release all node resource locks
2141
    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
2142

    
2143
    if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
2144
      # we need to set the disks ID to the primary node, since the
2145
      # preceding code might or might have not done it, depending on
2146
      # disk template and other options
2147
      for disk in iobj.disks:
2148
        self.cfg.SetDiskID(disk, pnode_name)
2149
      if self.op.mode == constants.INSTANCE_CREATE:
2150
        if not self.op.no_install:
2151
          pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
2152
                        not self.op.wait_for_sync)
2153
          if pause_sync:
2154
            feedback_fn("* pausing disk sync to install instance OS")
2155
            result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
2156
                                                              (iobj.disks,
2157
                                                               iobj), True)
2158
            for idx, success in enumerate(result.payload):
2159
              if not success:
2160
                logging.warn("pause-sync of instance %s for disk %d failed",
2161
                             instance, idx)
2162

    
2163
          feedback_fn("* running the instance OS create scripts...")
2164
          # FIXME: pass debug option from opcode to backend
2165
          os_add_result = \
2166
            self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
2167
                                          self.op.debug_level)
2168
          if pause_sync:
2169
            feedback_fn("* resuming disk sync")
2170
            result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
2171
                                                              (iobj.disks,
2172
                                                               iobj), False)
2173
            for idx, success in enumerate(result.payload):
2174
              if not success:
2175
                logging.warn("resume-sync of instance %s for disk %d failed",
2176
                             instance, idx)
2177

    
2178
          os_add_result.Raise("Could not add os for instance %s"
2179
                              " on node %s" % (instance, pnode_name))
2180

    
2181
      else:
2182
        if self.op.mode == constants.INSTANCE_IMPORT:
2183
          feedback_fn("* running the instance OS import scripts...")
2184

    
2185
          transfers = []
2186

    
2187
          for idx, image in enumerate(self.src_images):
2188
            if not image:
2189
              continue
2190

    
2191
            # FIXME: pass debug option from opcode to backend
2192
            dt = masterd.instance.DiskTransfer("disk/%s" % idx,
2193
                                               constants.IEIO_FILE, (image, ),
2194
                                               constants.IEIO_SCRIPT,
2195
                                               (iobj.disks[idx], idx),
2196
                                               None)
2197
            transfers.append(dt)
2198

    
2199
          import_result = \
2200
            masterd.instance.TransferInstanceData(self, feedback_fn,
2201
                                                  self.op.src_node, pnode_name,
2202
                                                  self.pnode.secondary_ip,
2203
                                                  iobj, transfers)
2204
          if not compat.all(import_result):
2205
            self.LogWarning("Some disks for instance %s on node %s were not"
2206
                            " imported successfully" % (instance, pnode_name))
2207

    
2208
          rename_from = self._old_instance_name
2209

    
2210
        elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
2211
          feedback_fn("* preparing remote import...")
2212
          # The source cluster will stop the instance before attempting to make
2213
          # a connection. In some cases stopping an instance can take a long
2214
          # time, hence the shutdown timeout is added to the connection
2215
          # timeout.
2216
          connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
2217
                             self.op.source_shutdown_timeout)
2218
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
2219

    
2220
          assert iobj.primary_node == self.pnode.name
2221
          disk_results = \
2222
            masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
2223
                                          self.source_x509_ca,
2224
                                          self._cds, timeouts)
2225
          if not compat.all(disk_results):
2226
            # TODO: Should the instance still be started, even if some disks
2227
            # failed to import (valid for local imports, too)?
2228
            self.LogWarning("Some disks for instance %s on node %s were not"
2229
                            " imported successfully" % (instance, pnode_name))
2230

    
2231
          rename_from = self.source_instance_name
2232

    
2233
        else:
2234
          # also checked in the prereq part
2235
          raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2236
                                       % self.op.mode)
2237

    
2238
        # Run rename script on newly imported instance
2239
        assert iobj.name == instance
2240
        feedback_fn("Running rename script for %s" % instance)
2241
        result = self.rpc.call_instance_run_rename(pnode_name, iobj,
2242
                                                   rename_from,
2243
                                                   self.op.debug_level)
2244
        if result.fail_msg:
2245
          self.LogWarning("Failed to run rename script for %s on node"
2246
                          " %s: %s" % (instance, pnode_name, result.fail_msg))
2247

    
2248
    assert not self.owned_locks(locking.LEVEL_NODE_RES)
2249

    
2250
    if self.op.start:
2251
      iobj.admin_state = constants.ADMINST_UP
2252
      self.cfg.Update(iobj, feedback_fn)
2253
      logging.info("Starting instance %s on node %s", instance, pnode_name)
2254
      feedback_fn("* starting instance...")
2255
      result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
2256
                                            False, self.op.reason)
2257
      result.Raise("Could not start instance")
2258

    
2259
    return list(iobj.all_nodes)
2260

    
2261

    
2262
def _GetInstanceInfoText(instance):
2263
  """Compute that text that should be added to the disk's metadata.
2264

2265
  """
2266
  return "originstname+%s" % instance.name
2267

    
2268

    
2269
class LUInstanceRename(LogicalUnit):
2270
  """Rename an instance.
2271

2272
  """
2273
  HPATH = "instance-rename"
2274
  HTYPE = constants.HTYPE_INSTANCE
2275

    
2276
  def CheckArguments(self):
2277
    """Check arguments.
2278

2279
    """
2280
    if self.op.ip_check and not self.op.name_check:
2281
      # TODO: make the ip check more flexible and not depend on the name check
2282
      raise errors.OpPrereqError("IP address check requires a name check",
2283
                                 errors.ECODE_INVAL)
2284

    
2285
  def BuildHooksEnv(self):
2286
    """Build hooks env.
2287

2288
    This runs on master, primary and secondary nodes of the instance.
2289

2290
    """
2291
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2292
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2293
    return env
2294

    
2295
  def BuildHooksNodes(self):
2296
    """Build hooks nodes.
2297

2298
    """
2299
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2300
    return (nl, nl)
2301

    
2302
  def CheckPrereq(self):
2303
    """Check prerequisites.
2304

2305
    This checks that the instance is in the cluster and is not running.
2306

2307
    """
2308
    self.op.instance_name = _ExpandInstanceName(self.cfg,
2309
                                                self.op.instance_name)
2310
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311
    assert instance is not None
2312
    _CheckNodeOnline(self, instance.primary_node)
2313
    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2314
                        msg="cannot rename")
2315
    self.instance = instance
2316

    
2317
    new_name = self.op.new_name
2318
    if self.op.name_check:
2319
      hostname = _CheckHostnameSane(self, new_name)
2320
      new_name = self.op.new_name = hostname.name
2321
      if (self.op.ip_check and
2322
          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
2323
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2324
                                   (hostname.ip, new_name),
2325
                                   errors.ECODE_NOTUNIQUE)
2326

    
2327
    instance_list = self.cfg.GetInstanceList()
2328
    if new_name in instance_list and new_name != instance.name:
2329
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2330
                                 new_name, errors.ECODE_EXISTS)
2331

    
2332
  def Exec(self, feedback_fn):
2333
    """Rename the instance.
2334

2335
    """
2336
    inst = self.instance
2337
    old_name = inst.name
2338

    
2339
    rename_file_storage = False
2340
    if (inst.disk_template in constants.DTS_FILEBASED and
2341
        self.op.new_name != inst.name):
2342
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2343
      rename_file_storage = True
2344

    
2345
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2346
    # Change the instance lock. This is definitely safe while we hold the BGL.
2347
    # Otherwise the new lock would have to be added in acquired mode.
2348
    assert self.REQ_BGL
2349
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
2350
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
2351
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2352

    
2353
    # re-read the instance from the configuration after rename
2354
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2355

    
2356
    if rename_file_storage:
2357
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2358
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2359
                                                     old_file_storage_dir,
2360
                                                     new_file_storage_dir)
2361
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
2362
                   " (but the instance has been renamed in Ganeti)" %
2363
                   (inst.primary_node, old_file_storage_dir,
2364
                    new_file_storage_dir))
2365

    
2366
    _StartInstanceDisks(self, inst, None)
2367
    # update info on disks
2368
    info = _GetInstanceInfoText(inst)
2369
    for (idx, disk) in enumerate(inst.disks):
2370
      for node in inst.all_nodes:
2371
        self.cfg.SetDiskID(disk, node)
2372
        result = self.rpc.call_blockdev_setinfo(node, disk, info)
2373
        if result.fail_msg:
2374
          self.LogWarning("Error setting info on node %s for disk %s: %s",
2375
                          node, idx, result.fail_msg)
2376
    try:
2377
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2378
                                                 old_name, self.op.debug_level)
2379
      msg = result.fail_msg
2380
      if msg:
2381
        msg = ("Could not run OS rename script for instance %s on node %s"
2382
               " (but the instance has been renamed in Ganeti): %s" %
2383
               (inst.name, inst.primary_node, msg))
2384
        self.LogWarning(msg)
2385
    finally:
2386
      _ShutdownInstanceDisks(self, inst)
2387

    
2388
    return inst.name
2389

    
2390

    
2391
class LUInstanceRemove(LogicalUnit):
2392
  """Remove an instance.
2393

2394
  """
2395
  HPATH = "instance-remove"
2396
  HTYPE = constants.HTYPE_INSTANCE
2397
  REQ_BGL = False
2398

    
2399
  def ExpandNames(self):
2400
    self._ExpandAndLockInstance()
2401
    self.needed_locks[locking.LEVEL_NODE] = []
2402
    self.needed_locks[locking.LEVEL_NODE_RES] = []
2403
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2404

    
2405
  def DeclareLocks(self, level):
2406
    if level == locking.LEVEL_NODE:
2407
      self._LockInstancesNodes()
2408
    elif level == locking.LEVEL_NODE_RES:
2409
      # Copy node locks
2410
      self.needed_locks[locking.LEVEL_NODE_RES] = \
2411
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2412

    
2413
  def BuildHooksEnv(self):
2414
    """Build hooks env.
2415

2416
    This runs on master, primary and secondary nodes of the instance.
2417

2418
    """
2419
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2420
    env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
2421
    return env
2422

    
2423
  def BuildHooksNodes(self):
2424
    """Build hooks nodes.
2425

2426
    """
2427
    nl = [self.cfg.GetMasterNode()]
2428
    nl_post = list(self.instance.all_nodes) + nl
2429
    return (nl, nl_post)
2430

    
2431
  def CheckPrereq(self):
2432
    """Check prerequisites.
2433

2434
    This checks that the instance is in the cluster.
2435

2436
    """
2437
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2438
    assert self.instance is not None, \
2439
      "Cannot retrieve locked instance %s" % self.op.instance_name
2440

    
2441
  def Exec(self, feedback_fn):
2442
    """Remove the instance.
2443

2444
    """
2445
    instance = self.instance
2446
    logging.info("Shutting down instance %s on node %s",
2447
                 instance.name, instance.primary_node)
2448

    
2449
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
2450
                                             self.op.shutdown_timeout,
2451
                                             self.op.reason)
2452
    msg = result.fail_msg
2453
    if msg:
2454
      if self.op.ignore_failures:
2455
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
2456
      else:
2457
        raise errors.OpExecError("Could not shutdown instance %s on"
2458
                                 " node %s: %s" %
2459
                                 (instance.name, instance.primary_node, msg))
2460

    
2461
    assert (self.owned_locks(locking.LEVEL_NODE) ==
2462
            self.owned_locks(locking.LEVEL_NODE_RES))
2463
    assert not (set(instance.all_nodes) -
2464
                self.owned_locks(locking.LEVEL_NODE)), \
2465
      "Not owning correct locks"
2466

    
2467
    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
2468

    
2469

    
2470
def _CheckInstanceBridgesExist(lu, instance, node=None):
2471
  """Check that the brigdes needed by an instance exist.
2472

2473
  """
2474
  if node is None:
2475
    node = instance.primary_node
2476
  _CheckNicsBridgesExist(lu, instance.nics, node)
2477

    
2478

    
2479
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
2480
                                 target_group, cfg,
2481
                                 _compute_fn=_ComputeIPolicyInstanceViolation):
2482
  """Compute if instance meets the specs of the new target group.
2483

2484
  @param ipolicy: The ipolicy to verify
2485
  @param instance: The instance object to verify
2486
  @param current_group: The current group of the instance
2487
  @param target_group: The new group of the instance
2488
  @type cfg: L{config.ConfigWriter}
2489
  @param cfg: Cluster configuration
2490
  @param _compute_fn: The function to verify ipolicy (unittest only)
2491
  @see: L{_ComputeIPolicySpecViolation}
2492

2493
  """
2494
  if current_group == target_group:
2495
    return []
2496
  else:
2497
    return _compute_fn(ipolicy, instance, cfg)
2498

    
2499

    
2500
def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
2501
                            _compute_fn=_ComputeIPolicyNodeViolation):
2502
  """Checks that the target node is correct in terms of instance policy.
2503

2504
  @param ipolicy: The ipolicy to verify
2505
  @param instance: The instance object to verify
2506
  @param node: The new node to relocate
2507
  @type cfg: L{config.ConfigWriter}
2508
  @param cfg: Cluster configuration
2509
  @param ignore: Ignore violations of the ipolicy
2510
  @param _compute_fn: The function to verify ipolicy (unittest only)
2511
  @see: L{_ComputeIPolicySpecViolation}
2512

2513
  """
2514
  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
2515
  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
2516

    
2517
  if res:
2518
    msg = ("Instance does not meet target node group's (%s) instance"
2519
           " policy: %s") % (node.group, utils.CommaJoin(res))
2520
    if ignore:
2521
      lu.LogWarning(msg)
2522
    else:
2523
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2524

    
2525

    
2526
class LUInstanceMove(LogicalUnit):
2527
  """Move an instance by data-copying.
2528

2529
  """
2530
  HPATH = "instance-move"
2531
  HTYPE = constants.HTYPE_INSTANCE
2532
  REQ_BGL = False
2533

    
2534
  def ExpandNames(self):
2535
    self._ExpandAndLockInstance()
2536
    target_node = _ExpandNodeName(self.cfg, self.op.target_node)
2537
    self.op.target_node = target_node
2538
    self.needed_locks[locking.LEVEL_NODE] = [target_node]
2539
    self.needed_locks[locking.LEVEL_NODE_RES] = []
2540
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2541

    
2542
  def DeclareLocks(self, level):
2543
    if level == locking.LEVEL_NODE:
2544
      self._LockInstancesNodes(primary_only=True)
2545
    elif level == locking.LEVEL_NODE_RES:
2546
      # Copy node locks
2547
      self.needed_locks[locking.LEVEL_NODE_RES] = \
2548
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2549

    
2550
  def BuildHooksEnv(self):
2551
    """Build hooks env.
2552

2553
    This runs on master, primary and secondary nodes of the instance.
2554

2555
    """
2556
    env = {
2557
      "TARGET_NODE": self.op.target_node,
2558
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2559
      }
2560
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2561
    return env
2562

    
2563
  def BuildHooksNodes(self):
2564
    """Build hooks nodes.
2565

2566
    """
2567
    nl = [
2568
      self.cfg.GetMasterNode(),
2569
      self.instance.primary_node,
2570
      self.op.target_node,
2571
      ]
2572
    return (nl, nl)
2573

    
2574
  def CheckPrereq(self):
2575
    """Check prerequisites.
2576

2577
    This checks that the instance is in the cluster.
2578

2579
    """
2580
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2581
    assert self.instance is not None, \
2582
      "Cannot retrieve locked instance %s" % self.op.instance_name
2583

    
2584
    if instance.disk_template not in constants.DTS_COPYABLE:
2585
      raise errors.OpPrereqError("Disk template %s not suitable for copying" %
2586
                                 instance.disk_template, errors.ECODE_STATE)
2587

    
2588
    node = self.cfg.GetNodeInfo(self.op.target_node)
2589
    assert node is not None, \
2590
      "Cannot retrieve locked node %s" % self.op.target_node
2591

    
2592
    self.target_node = target_node = node.name
2593

    
2594
    if target_node == instance.primary_node:
2595
      raise errors.OpPrereqError("Instance %s is already on the node %s" %
2596
                                 (instance.name, target_node),
2597
                                 errors.ECODE_STATE)
2598

    
2599
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2600

    
2601
    for idx, dsk in enumerate(instance.disks):
2602
      if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
2603
        raise errors.OpPrereqError("Instance disk %d has a complex layout,"
2604
                                   " cannot copy" % idx, errors.ECODE_STATE)
2605

    
2606
    _CheckNodeOnline(self, target_node)
2607
    _CheckNodeNotDrained(self, target_node)
2608
    _CheckNodeVmCapable(self, target_node)
2609
    cluster = self.cfg.GetClusterInfo()
2610
    group_info = self.cfg.GetNodeGroup(node.group)
2611
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
2612
    _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
2613
                            ignore=self.op.ignore_ipolicy)
2614

    
2615
    if instance.admin_state == constants.ADMINST_UP:
2616
      # check memory requirements on the secondary node
2617
      _CheckNodeFreeMemory(self, target_node,
2618
                           "failing over instance %s" %
2619
                           instance.name, bep[constants.BE_MAXMEM],
2620
                           instance.hypervisor)
2621
    else:
2622
      self.LogInfo("Not checking memory on the secondary node as"
2623
                   " instance will not be started")
2624

    
2625
    # check bridge existance
2626
    _CheckInstanceBridgesExist(self, instance, node=target_node)
2627

    
2628
  def Exec(self, feedback_fn):
2629
    """Move an instance.
2630

2631
    The move is done by shutting it down on its present node, copying
2632
    the data over (slow) and starting it on the new node.
2633

2634
    """
2635
    instance = self.instance
2636

    
2637
    source_node = instance.primary_node
2638
    target_node = self.target_node
2639

    
2640
    self.LogInfo("Shutting down instance %s on source node %s",
2641
                 instance.name, source_node)
2642

    
2643
    assert (self.owned_locks(locking.LEVEL_NODE) ==
2644
            self.owned_locks(locking.LEVEL_NODE_RES))
2645

    
2646
    result = self.rpc.call_instance_shutdown(source_node, instance,
2647
                                             self.op.shutdown_timeout,
2648
                                             self.op.reason)
2649
    msg = result.fail_msg
2650
    if msg:
2651
      if self.op.ignore_consistency:
2652
        self.LogWarning("Could not shutdown instance %s on node %s."
2653
                        " Proceeding anyway. Please make sure node"
2654
                        " %s is down. Error details: %s",
2655
                        instance.name, source_node, source_node, msg)
2656
      else:
2657
        raise errors.OpExecError("Could not shutdown instance %s on"
2658
                                 " node %s: %s" %
2659
                                 (instance.name, source_node, msg))
2660

    
2661
    # create the target disks
2662
    try:
2663
      _CreateDisks(self, instance, target_node=target_node)
2664
    except errors.OpExecError:
2665
      self.LogWarning("Device creation failed")
2666
      self.cfg.ReleaseDRBDMinors(instance.name)
2667
      raise
2668

    
2669
    cluster_name = self.cfg.GetClusterInfo().cluster_name
2670

    
2671
    errs = []
2672
    # activate, get path, copy the data over
2673
    for idx, disk in enumerate(instance.disks):
2674
      self.LogInfo("Copying data for disk %d", idx)
2675
      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
2676
                                               instance.name, True, idx)
2677
      if result.fail_msg:
2678
        self.LogWarning("Can't assemble newly created disk %d: %s",
2679
                        idx, result.fail_msg)
2680
        errs.append(result.fail_msg)
2681
        break
2682
      dev_path = result.payload
2683
      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
2684
                                             target_node, dev_path,
2685
                                             cluster_name)
2686
      if result.fail_msg:
2687
        self.LogWarning("Can't copy data over for disk %d: %s",
2688
                        idx, result.fail_msg)
2689
        errs.append(result.fail_msg)
2690
        break
2691

    
2692
    if errs:
2693
      self.LogWarning("Some disks failed to copy, aborting")
2694
      try:
2695
        _RemoveDisks(self, instance, target_node=target_node)
2696
      finally:
2697
        self.cfg.ReleaseDRBDMinors(instance.name)
2698
        raise errors.OpExecError("Errors during disk copy: %s" %
2699
                                 (",".join(errs),))
2700

    
2701
    instance.primary_node = target_node
2702
    self.cfg.Update(instance, feedback_fn)
2703

    
2704
    self.LogInfo("Removing the disks on the original node")
2705
    _RemoveDisks(self, instance, target_node=source_node)
2706

    
2707
    # Only start the instance if it's marked as up
2708
    if instance.admin_state == constants.ADMINST_UP:
2709
      self.LogInfo("Starting instance %s on node %s",
2710
                   instance.name, target_node)
2711

    
2712
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
2713
                                           ignore_secondaries=True)
2714
      if not disks_ok:
2715
        _ShutdownInstanceDisks(self, instance)
2716
        raise errors.OpExecError("Can't activate the instance's disks")
2717

    
2718
      result = self.rpc.call_instance_start(target_node,
2719
                                            (instance, None, None), False,
2720
                                            self.op.reason)
2721
      msg = result.fail_msg
2722
      if msg:
2723
        _ShutdownInstanceDisks(self, instance)
2724
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
2725
                                 (instance.name, target_node, msg))
2726

    
2727

    
2728
def _GetInstanceConsole(cluster, instance):
2729
  """Returns console information for an instance.
2730

2731
  @type cluster: L{objects.Cluster}
2732
  @type instance: L{objects.Instance}
2733
  @rtype: dict
2734

2735
  """
2736
  hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
2737
  # beparams and hvparams are passed separately, to avoid editing the
2738
  # instance and then saving the defaults in the instance itself.
2739
  hvparams = cluster.FillHV(instance)
2740
  beparams = cluster.FillBE(instance)
2741
  console = hyper.GetInstanceConsole(instance, hvparams, beparams)
2742

    
2743
  assert console.instance == instance.name
2744
  assert console.Validate()
2745

    
2746
  return console.ToDict()
2747

    
2748

    
2749
class _InstanceQuery(_QueryBase):
2750
  FIELDS = query.INSTANCE_FIELDS
2751

    
2752
  def ExpandNames(self, lu):
2753
    lu.needed_locks = {}
2754
    lu.share_locks = _ShareAll()
2755

    
2756
    if self.names:
2757
      self.wanted = _GetWantedInstances(lu, self.names)
2758
    else:
2759
      self.wanted = locking.ALL_SET
2760

    
2761
    self.do_locking = (self.use_locking and
2762
                       query.IQ_LIVE in self.requested_data)
2763
    if self.do_locking:
2764
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2765
      lu.needed_locks[locking.LEVEL_NODEGROUP] = []
2766
      lu.needed_locks[locking.LEVEL_NODE] = []
2767
      lu.needed_locks[locking.LEVEL_NETWORK] = []
2768
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2769

    
2770
    self.do_grouplocks = (self.do_locking and
2771
                          query.IQ_NODES in self.requested_data)
2772

    
2773
  def DeclareLocks(self, lu, level):
2774
    if self.do_locking:
2775
      if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
2776
        assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
2777

    
2778
        # Lock all groups used by instances optimistically; this requires going
2779
        # via the node before it's locked, requiring verification later on
2780
        lu.needed_locks[locking.LEVEL_NODEGROUP] = \
2781
          set(group_uuid
2782
              for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2783
              for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
2784
      elif level == locking.LEVEL_NODE:
2785
        lu._LockInstancesNodes() # pylint: disable=W0212
2786

    
2787
      elif level == locking.LEVEL_NETWORK:
2788
        lu.needed_locks[locking.LEVEL_NETWORK] = \
2789
          frozenset(net_uuid
2790
                    for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2791
                    for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
2792

    
2793
  @staticmethod
2794
  def _CheckGroupLocks(lu):
2795
    owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
2796
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
2797

    
2798
    # Check if node groups for locked instances are still correct
2799
    for instance_name in owned_instances:
2800
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
2801

    
2802
  def _GetQueryData(self, lu):
2803
    """Computes the list of instances and their attributes.
2804

2805
    """
2806
    if self.do_grouplocks:
2807
      self._CheckGroupLocks(lu)
2808

    
2809
    cluster = lu.cfg.GetClusterInfo()
2810
    all_info = lu.cfg.GetAllInstancesInfo()
2811

    
2812
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
2813

    
2814
    instance_list = [all_info[name] for name in instance_names]
2815
    nodes = frozenset(itertools.chain(*(inst.all_nodes
2816
                                        for inst in instance_list)))
2817
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2818
    bad_nodes = []
2819
    offline_nodes = []
2820
    wrongnode_inst = set()
2821

    
2822
    # Gather data as requested
2823
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
2824
      live_data = {}
2825
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
2826
      for name in nodes:
2827
        result = node_data[name]
2828
        if result.offline:
2829
          # offline nodes will be in both lists
2830
          assert result.fail_msg
2831
          offline_nodes.append(name)
2832
        if result.fail_msg:
2833
          bad_nodes.append(name)
2834
        elif result.payload:
2835
          for inst in result.payload:
2836
            if inst in all_info:
2837
              if all_info[inst].primary_node == name:
2838
                live_data.update(result.payload)
2839
              else:
2840
                wrongnode_inst.add(inst)
2841
            else:
2842
              # orphan instance; we don't list it here as we don't
2843
              # handle this case yet in the output of instance listing
2844
              logging.warning("Orphan instance '%s' found on node %s",
2845
                              inst, name)
2846
              # else no instance is alive
2847
    else:
2848
      live_data = {}
2849

    
2850
    if query.IQ_DISKUSAGE in self.requested_data:
2851
      gmi = ganeti.masterd.instance
2852
      disk_usage = dict((inst.name,
2853
                         gmi.ComputeDiskSize(inst.disk_template,
2854
                                             [{constants.IDISK_SIZE: disk.size}
2855
                                              for disk in inst.disks]))
2856
                        for inst in instance_list)
2857
    else:
2858
      disk_usage = None
2859

    
2860
    if query.IQ_CONSOLE in self.requested_data:
2861
      consinfo = {}
2862
      for inst in instance_list:
2863
        if inst.name in live_data:
2864
          # Instance is running
2865
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2866
        else:
2867
          consinfo[inst.name] = None
2868
      assert set(consinfo.keys()) == set(instance_names)
2869
    else:
2870
      consinfo = None
2871

    
2872
    if query.IQ_NODES in self.requested_data:
2873
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2874
                                            instance_list)))
2875
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2876
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2877
                    for uuid in set(map(operator.attrgetter("group"),
2878
                                        nodes.values())))
2879
    else:
2880
      nodes = None
2881
      groups = None
2882

    
2883
    if query.IQ_NETWORKS in self.requested_data:
2884
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2885
                                    for i in instance_list))
2886
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2887
    else:
2888
      networks = None
2889

    
2890
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2891
                                   disk_usage, offline_nodes, bad_nodes,
2892
                                   live_data, wrongnode_inst, consinfo,
2893
                                   nodes, groups, networks)
2894

    
2895

    
2896
class LUInstanceQuery(NoHooksLU):
2897
  """Logical unit for querying instances.
2898

2899
  """
2900
  # pylint: disable=W0142
2901
  REQ_BGL = False
2902

    
2903
  def CheckArguments(self):
2904
    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2905
                             self.op.output_fields, self.op.use_locking)
2906

    
2907
  def ExpandNames(self):
2908
    self.iq.ExpandNames(self)
2909

    
2910
  def DeclareLocks(self, level):
2911
    self.iq.DeclareLocks(self, level)
2912

    
2913
  def Exec(self, feedback_fn):
2914
    return self.iq.OldStyleQuery(self)
2915

    
2916

    
2917
class LUInstanceQueryData(NoHooksLU):
2918
  """Query runtime instance data.
2919

2920
  """
2921
  REQ_BGL = False
2922

    
2923
  def ExpandNames(self):
2924
    self.needed_locks = {}
2925

    
2926
    # Use locking if requested or when non-static information is wanted
2927
    if not (self.op.static or self.op.use_locking):
2928
      self.LogWarning("Non-static data requested, locks need to be acquired")
2929
      self.op.use_locking = True
2930

    
2931
    if self.op.instances or not self.op.use_locking:
2932
      # Expand instance names right here
2933
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
2934
    else:
2935
      # Will use acquired locks
2936
      self.wanted_names = None
2937

    
2938
    if self.op.use_locking:
2939
      self.share_locks = _ShareAll()
2940

    
2941
      if self.wanted_names is None:
2942
        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2943
      else:
2944
        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2945

    
2946
      self.needed_locks[locking.LEVEL_NODEGROUP] = []
2947
      self.needed_locks[locking.LEVEL_NODE] = []
2948
      self.needed_locks[locking.LEVEL_NETWORK] = []
2949
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2950

    
2951
  def DeclareLocks(self, level):
2952
    if self.op.use_locking:
2953
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2954
      if level == locking.LEVEL_NODEGROUP:
2955

    
2956
        # Lock all groups used by instances optimistically; this requires going
2957
        # via the node before it's locked, requiring verification later on
2958
        self.needed_locks[locking.LEVEL_NODEGROUP] = \
2959
          frozenset(group_uuid
2960
                    for instance_name in owned_instances
2961
                    for group_uuid in
2962
                    self.cfg.GetInstanceNodeGroups(instance_name))
2963

    
2964
      elif level == locking.LEVEL_NODE:
2965
        self._LockInstancesNodes()
2966

    
2967
      elif level == locking.LEVEL_NETWORK:
2968
        self.needed_locks[locking.LEVEL_NETWORK] = \
2969
          frozenset(net_uuid
2970
                    for instance_name in owned_instances
2971
                    for net_uuid in
2972
                    self.cfg.GetInstanceNetworks(instance_name))
2973

    
2974
  def CheckPrereq(self):
2975
    """Check prerequisites.
2976

2977
    This only checks the optional instance list against the existing names.
2978

2979
    """
2980
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2981
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2982
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2983
    owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2984

    
2985
    if self.wanted_names is None:
2986
      assert self.op.use_locking, "Locking was not used"
2987
      self.wanted_names = owned_instances
2988

    
2989
    instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2990

    
2991
    if self.op.use_locking:
2992
      _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2993
                                None)
2994
    else:
2995
      assert not (owned_instances or owned_groups or
2996
                  owned_nodes or owned_networks)
2997

    
2998
    self.wanted_instances = instances.values()
2999

    
3000
  def _ComputeBlockdevStatus(self, node, instance, dev):
3001
    """Returns the status of a block device
3002

3003
    """
3004
    if self.op.static or not node:
3005
      return None
3006

    
3007
    self.cfg.SetDiskID(dev, node)
3008

    
3009
    result = self.rpc.call_blockdev_find(node, dev)
3010
    if result.offline:
3011
      return None
3012

    
3013
    result.Raise("Can't compute disk status for %s" % instance.name)
3014

    
3015
    status = result.payload
3016
    if status is None:
3017
      return None
3018

    
3019
    return (status.dev_path, status.major, status.minor,
3020
            status.sync_percent, status.estimated_time,
3021
            status.is_degraded, status.ldisk_status)
3022

    
3023
  def _ComputeDiskStatus(self, instance, snode, dev):
3024
    """Compute block device status.
3025

3026
    """
3027
    (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
3028

    
3029
    return self._ComputeDiskStatusInner(instance, snode, anno_dev)
3030

    
3031
  def _ComputeDiskStatusInner(self, instance, snode, dev):
3032
    """Compute block device status.
3033

3034
    @attention: The device has to be annotated already.
3035

3036
    """
3037
    if dev.dev_type in constants.LDS_DRBD:
3038
      # we change the snode then (otherwise we use the one passed in)
3039
      if dev.logical_id[0] == instance.primary_node:
3040
        snode = dev.logical_id[1]
3041
      else:
3042
        snode = dev.logical_id[0]
3043

    
3044
    dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
3045
                                              instance, dev)
3046
    dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
3047

    
3048
    if dev.children:
3049
      dev_children = map(compat.partial(self._ComputeDiskStatusInner,
3050
                                        instance, snode),
3051
                         dev.children)
3052
    else:
3053
      dev_children = []
3054

    
3055
    return {
3056
      "iv_name": dev.iv_name,
3057
      "dev_type": dev.dev_type,
3058
      "logical_id": dev.logical_id,
3059
      "physical_id": dev.physical_id,
3060
      "pstatus": dev_pstatus,
3061
      "sstatus": dev_sstatus,
3062
      "children": dev_children,
3063
      "mode": dev.mode,
3064
      "size": dev.size,
3065
      "name": dev.name,
3066
      "uuid": dev.uuid,
3067
      }
3068

    
3069
  def Exec(self, feedback_fn):
3070
    """Gather and return data"""
3071
    result = {}
3072

    
3073
    cluster = self.cfg.GetClusterInfo()
3074

    
3075
    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
3076
    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
3077

    
3078
    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
3079
                                                 for node in nodes.values()))
3080

    
3081
    group2name_fn = lambda uuid: groups[uuid].name
3082
    for instance in self.wanted_instances:
3083
      pnode = nodes[instance.primary_node]
3084

    
3085
      if self.op.static or pnode.offline:
3086
        remote_state = None
3087
        if pnode.offline:
3088
          self.LogWarning("Primary node %s is marked offline, returning static"
3089
                          " information only for instance %s" %
3090
                          (pnode.name, instance.name))
3091
      else:
3092
        remote_info = self.rpc.call_instance_info(instance.primary_node,
3093
                                                  instance.name,
3094
                                                  instance.hypervisor)
3095
        remote_info.Raise("Error checking node %s" % instance.primary_node)
3096
        remote_info = remote_info.payload
3097
        if remote_info and "state" in remote_info:
3098
          remote_state = "up"
3099
        else:
3100
          if instance.admin_state == constants.ADMINST_UP:
3101
            remote_state = "down"
3102
          else:
3103
            remote_state = instance.admin_state
3104

    
3105
      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
3106
                  instance.disks)
3107

    
3108
      snodes_group_uuids = [nodes[snode_name].group
3109
                            for snode_name in instance.secondary_nodes]
3110

    
3111
      result[instance.name] = {
3112
        "name": instance.name,
3113
        "config_state": instance.admin_state,
3114
        "run_state": remote_state,
3115
        "pnode": instance.primary_node,
3116
        "pnode_group_uuid": pnode.group,
3117
        "pnode_group_name": group2name_fn(pnode.group),
3118
        "snodes": instance.secondary_nodes,
3119
        "snodes_group_uuids": snodes_group_uuids,
3120
        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
3121
        "os": instance.os,
3122
        # this happens to be the same format used for hooks
3123
        "nics": _NICListToTuple(self, instance.nics),
3124
        "disk_template": instance.disk_template,
3125
        "disks": disks,
3126
        "hypervisor": instance.hypervisor,
3127
        "network_port": instance.network_port,
3128
        "hv_instance": instance.hvparams,
3129
        "hv_actual": cluster.FillHV(instance, skip_globals=True),
3130
        "be_instance": instance.beparams,
3131
        "be_actual": cluster.FillBE(instance),
3132
        "os_instance": instance.osparams,
3133
        "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
3134
        "serial_no": instance.serial_no,
3135
        "mtime": instance.mtime,
3136
        "ctime": instance.ctime,
3137
        "uuid": instance.uuid,
3138
        }
3139

    
3140
    return result
3141

    
3142

    
3143
class LUInstanceRecreateDisks(LogicalUnit):
3144
  """Recreate an instance's missing disks.
3145

3146
  """
3147
  HPATH = "instance-recreate-disks"
3148
  HTYPE = constants.HTYPE_INSTANCE
3149
  REQ_BGL = False
3150

    
3151
  _MODIFYABLE = compat.UniqueFrozenset([
3152
    constants.IDISK_SIZE,
3153
    constants.IDISK_MODE,
3154
    ])
3155

    
3156
  # New or changed disk parameters may have different semantics
3157
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
3158
    constants.IDISK_ADOPT,
3159

    
3160
    # TODO: Implement support changing VG while recreating
3161
    constants.IDISK_VG,
3162
    constants.IDISK_METAVG,
3163
    constants.IDISK_PROVIDER,
3164
    constants.IDISK_NAME,
3165
    ]))
3166

    
3167
  def _RunAllocator(self):
3168
    """Run the allocator based on input opcode.
3169

3170
    """
3171
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
3172

    
3173
    # FIXME
3174
    # The allocator should actually run in "relocate" mode, but current
3175
    # allocators don't support relocating all the nodes of an instance at
3176
    # the same time. As a workaround we use "allocate" mode, but this is
3177
    # suboptimal for two reasons:
3178
    # - The instance name passed to the allocator is present in the list of
3179
    #   existing instances, so there could be a conflict within the
3180
    #   internal structures of the allocator. This doesn't happen with the
3181
    #   current allocators, but it's a liability.
3182
    # - The allocator counts the resources used by the instance twice: once
3183
    #   because the instance exists already, and once because it tries to
3184
    #   allocate a new instance.
3185
    # The allocator could choose some of the nodes on which the instance is
3186
    # running, but that's not a problem. If the instance nodes are broken,
3187
    # they should be already be marked as drained or offline, and hence
3188
    # skipped by the allocator. If instance disks have been lost for other
3189
    # reasons, then recreating the disks on the same nodes should be fine.
3190
    disk_template = self.instance.disk_template
3191
    spindle_use = be_full[constants.BE_SPINDLE_USE]
3192
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
3193
                                        disk_template=disk_template,
3194
                                        tags=list(self.instance.GetTags()),
3195
                                        os=self.instance.os,
3196
                                        nics=[{}],
3197
                                        vcpus=be_full[constants.BE_VCPUS],
3198
                                        memory=be_full[constants.BE_MAXMEM],
3199
                                        spindle_use=spindle_use,
3200
                                        disks=[{constants.IDISK_SIZE: d.size,
3201
                                                constants.IDISK_MODE: d.mode}
3202
                                               for d in self.instance.disks],
3203
                                        hypervisor=self.instance.hypervisor,
3204
                                        node_whitelist=None)
3205
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3206

    
3207
    ial.Run(self.op.iallocator)
3208

    
3209
    assert req.RequiredNodes() == len(self.instance.all_nodes)
3210

    
3211
    if not ial.success:
3212
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
3213
                                 " %s" % (self.op.iallocator, ial.info),
3214
                                 errors.ECODE_NORES)
3215

    
3216
    self.op.nodes = ial.result
3217
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3218
                 self.op.instance_name, self.op.iallocator,
3219
                 utils.CommaJoin(ial.result))
3220

    
3221
  def CheckArguments(self):
3222
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
3223
      # Normalize and convert deprecated list of disk indices
3224
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
3225

    
3226
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
3227
    if duplicates:
3228
      raise errors.OpPrereqError("Some disks have been specified more than"
3229
                                 " once: %s" % utils.CommaJoin(duplicates),
3230
                                 errors.ECODE_INVAL)
3231

    
3232
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
3233
    # when neither iallocator nor nodes are specified
3234
    if self.op.iallocator or self.op.nodes:
3235
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
3236

    
3237
    for (idx, params) in self.op.disks:
3238
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
3239
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
3240
      if unsupported:
3241
        raise errors.OpPrereqError("Parameters for disk %s try to change"
3242
                                   " unmodifyable parameter(s): %s" %
3243
                                   (idx, utils.CommaJoin(unsupported)),
3244
                                   errors.ECODE_INVAL)
3245

    
3246
  def ExpandNames(self):
3247
    self._ExpandAndLockInstance()
3248
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3249

    
3250
    if self.op.nodes:
3251
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
3252
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
3253
    else:
3254
      self.needed_locks[locking.LEVEL_NODE] = []
3255
      if self.op.iallocator:
3256
        # iallocator will select a new node in the same group
3257
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
3258
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3259

    
3260
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3261

    
3262
  def DeclareLocks(self, level):
3263
    if level == locking.LEVEL_NODEGROUP:
3264
      assert self.op.iallocator is not None
3265
      assert not self.op.nodes
3266
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3267
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
3268
      # Lock the primary group used by the instance optimistically; this
3269
      # requires going via the node before it's locked, requiring
3270
      # verification later on
3271
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3272
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
3273

    
3274
    elif level == locking.LEVEL_NODE:
3275
      # If an allocator is used, then we lock all the nodes in the current
3276
      # instance group, as we don't know yet which ones will be selected;
3277
      # if we replace the nodes without using an allocator, locks are
3278
      # already declared in ExpandNames; otherwise, we need to lock all the
3279
      # instance nodes for disk re-creation
3280
      if self.op.iallocator:
3281
        assert not self.op.nodes
3282
        assert not self.needed_locks[locking.LEVEL_NODE]
3283
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
3284

    
3285
        # Lock member nodes of the group of the primary node
3286
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
3287
          self.needed_locks[locking.LEVEL_NODE].extend(
3288
            self.cfg.GetNodeGroup(group_uuid).members)
3289

    
3290
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3291
      elif not self.op.nodes:
3292
        self._LockInstancesNodes(primary_only=False)
3293
    elif level == locking.LEVEL_NODE_RES:
3294
      # Copy node locks
3295
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3296
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3297

    
3298
  def BuildHooksEnv(self):
3299
    """Build hooks env.
3300

3301
    This runs on master, primary and secondary nodes of the instance.
3302

3303
    """
3304
    return _BuildInstanceHookEnvByObject(self, self.instance)
3305

    
3306
  def BuildHooksNodes(self):
3307
    """Build hooks nodes.
3308

3309
    """
3310
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3311
    return (nl, nl)
3312

    
3313
  def CheckPrereq(self):
3314
    """Check prerequisites.
3315

3316
    This checks that the instance is in the cluster and is not running.
3317

3318
    """
3319
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3320
    assert instance is not None, \
3321
      "Cannot retrieve locked instance %s" % self.op.instance_name
3322
    if self.op.nodes:
3323
      if len(self.op.nodes) != len(instance.all_nodes):
3324
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
3325
                                   " %d replacement nodes were specified" %
3326
                                   (instance.name, len(instance.all_nodes),
3327
                                    len(self.op.nodes)),
3328
                                   errors.ECODE_INVAL)
3329
      assert instance.disk_template != constants.DT_DRBD8 or \
3330
             len(self.op.nodes) == 2
3331
      assert instance.disk_template != constants.DT_PLAIN or \
3332
             len(self.op.nodes) == 1
3333
      primary_node = self.op.nodes[0]
3334
    else:
3335
      primary_node = instance.primary_node
3336
    if not self.op.iallocator:
3337
      _CheckNodeOnline(self, primary_node)
3338

    
3339
    if instance.disk_template == constants.DT_DISKLESS:
3340
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3341
                                 self.op.instance_name, errors.ECODE_INVAL)
3342

    
3343
    # Verify if node group locks are still correct
3344
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3345
    if owned_groups:
3346
      # Node group locks are acquired only for the primary node (and only
3347
      # when the allocator is used)
3348
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
3349
                               primary_only=True)
3350

    
3351
    # if we replace nodes *and* the old primary is offline, we don't
3352
    # check the instance state
3353
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
3354
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
3355
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3356
                          msg="cannot recreate disks")
3357

    
3358
    if self.op.disks:
3359
      self.disks = dict(self.op.disks)
3360
    else:
3361
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
3362

    
3363
    maxidx = max(self.disks.keys())
3364
    if maxidx >= len(instance.disks):
3365
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
3366
                                 errors.ECODE_INVAL)
3367

    
3368
    if ((self.op.nodes or self.op.iallocator) and
3369
         sorted(self.disks.keys()) != range(len(instance.disks))):
3370
      raise errors.OpPrereqError("Can't recreate disks partially and"
3371
                                 " change the nodes at the same time",
3372
                                 errors.ECODE_INVAL)
3373

    
3374
    self.instance = instance
3375

    
3376
    if self.op.iallocator:
3377
      self._RunAllocator()
3378
      # Release unneeded node and node resource locks
3379
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
3380
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
3381
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
3382

    
3383
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3384

    
3385
  def Exec(self, feedback_fn):
3386
    """Recreate the disks.
3387

3388
    """
3389
    instance = self.instance
3390

    
3391
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3392
            self.owned_locks(locking.LEVEL_NODE_RES))
3393

    
3394
    to_skip = []
3395
    mods = [] # keeps track of needed changes
3396

    
3397
    for idx, disk in enumerate(instance.disks):
3398
      try:
3399
        changes = self.disks[idx]
3400
      except KeyError:
3401
        # Disk should not be recreated
3402
        to_skip.append(idx)
3403
        continue
3404

    
3405
      # update secondaries for disks, if needed
3406
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
3407
        # need to update the nodes and minors
3408
        assert len(self.op.nodes) == 2
3409
        assert len(disk.logical_id) == 6 # otherwise disk internals
3410
                                         # have changed
3411
        (_, _, old_port, _, _, old_secret) = disk.logical_id
3412
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
3413
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
3414
                  new_minors[0], new_minors[1], old_secret)
3415
        assert len(disk.logical_id) == len(new_id)
3416
      else:
3417
        new_id = None
3418

    
3419
      mods.append((idx, new_id, changes))
3420

    
3421
    # now that we have passed all asserts above, we can apply the mods
3422
    # in a single run (to avoid partial changes)
3423
    for idx, new_id, changes in mods:
3424
      disk = instance.disks[idx]
3425
      if new_id is not None:
3426
        assert disk.dev_type == constants.LD_DRBD8
3427
        disk.logical_id = new_id
3428
      if changes:
3429
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
3430
                    mode=changes.get(constants.IDISK_MODE, None))
3431

    
3432
    # change primary node, if needed
3433
    if self.op.nodes:
3434
      instance.primary_node = self.op.nodes[0]
3435
      self.LogWarning("Changing the instance's nodes, you will have to"
3436
                      " remove any disks left on the older nodes manually")
3437

    
3438
    if self.op.nodes:
3439
      self.cfg.Update(instance, feedback_fn)
3440

    
3441
    # All touched nodes must be locked
3442
    mylocks = self.owned_locks(locking.LEVEL_NODE)
3443
    assert mylocks.issuperset(frozenset(instance.all_nodes))
3444
    _CreateDisks(self, instance, to_skip=to_skip)
3445

    
3446

    
3447
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
3448
  """Shutdown block devices of an instance.
3449

3450
  This function checks if an instance is running, before calling
3451
  _ShutdownInstanceDisks.
3452

3453
  """
3454
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
3455
  _ShutdownInstanceDisks(lu, instance, disks=disks)
3456

    
3457

    
3458
def _DiskSizeInBytesToMebibytes(lu, size):
3459
  """Converts a disk size in bytes to mebibytes.
3460

3461
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
3462

3463
  """
3464
  (mib, remainder) = divmod(size, 1024 * 1024)
3465

    
3466
  if remainder != 0:
3467
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
3468
                  " to not overwrite existing data (%s bytes will not be"
3469
                  " wiped)", (1024 * 1024) - remainder)
3470
    mib += 1
3471

    
3472
  return mib
3473

    
3474

    
3475
class LUInstanceGrowDisk(LogicalUnit):
3476
  """Grow a disk of an instance.
3477

3478
  """
3479
  HPATH = "disk-grow"
3480
  HTYPE = constants.HTYPE_INSTANCE
3481
  REQ_BGL = False
3482

    
3483
  def ExpandNames(self):
3484
    self._ExpandAndLockInstance()
3485
    self.needed_locks[locking.LEVEL_NODE] = []
3486
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3487
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3488
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3489

    
3490
  def DeclareLocks(self, level):
3491
    if level == locking.LEVEL_NODE:
3492
      self._LockInstancesNodes()
3493
    elif level == locking.LEVEL_NODE_RES:
3494
      # Copy node locks
3495
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3496
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3497

    
3498
  def BuildHooksEnv(self):
3499
    """Build hooks env.
3500

3501
    This runs on the master, the primary and all the secondaries.
3502

3503
    """
3504
    env = {
3505
      "DISK": self.op.disk,
3506
      "AMOUNT": self.op.amount,
3507
      "ABSOLUTE": self.op.absolute,
3508
      }
3509
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3510
    return env
3511

    
3512
  def BuildHooksNodes(self):
3513
    """Build hooks nodes.
3514

3515
    """
3516
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3517
    return (nl, nl)
3518

    
3519
  def CheckPrereq(self):
3520
    """Check prerequisites.
3521

3522
    This checks that the instance is in the cluster.
3523

3524
    """
3525
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3526
    assert instance is not None, \
3527
      "Cannot retrieve locked instance %s" % self.op.instance_name
3528
    nodenames = list(instance.all_nodes)
3529
    for node in nodenames:
3530
      _CheckNodeOnline(self, node)
3531

    
3532
    self.instance = instance
3533

    
3534
    if instance.disk_template not in constants.DTS_GROWABLE:
3535
      raise errors.OpPrereqError("Instance's disk layout does not support"
3536
                                 " growing", errors.ECODE_INVAL)
3537

    
3538
    self.disk = instance.FindDisk(self.op.disk)
3539

    
3540
    if self.op.absolute:
3541
      self.target = self.op.amount
3542
      self.delta = self.target - self.disk.size
3543
      if self.delta < 0:
3544
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
3545
                                   "current disk size (%s)" %
3546
                                   (utils.FormatUnit(self.target, "h"),
3547
                                    utils.FormatUnit(self.disk.size, "h")),
3548
                                   errors.ECODE_STATE)
3549
    else:
3550
      self.delta = self.op.amount
3551
      self.target = self.disk.size + self.delta
3552
      if self.delta < 0:
3553
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
3554
                                   utils.FormatUnit(self.delta, "h"),
3555
                                   errors.ECODE_INVAL)
3556

    
3557
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
3558

    
3559
  def _CheckDiskSpace(self, nodenames, req_vgspace):
3560
    template = self.instance.disk_template
3561
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
3562
      # TODO: check the free disk space for file, when that feature will be
3563
      # supported
3564
      nodes = map(self.cfg.GetNodeInfo, nodenames)
3565
      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
3566
                        nodes)
3567
      if es_nodes:
3568
        # With exclusive storage we need to something smarter than just looking
3569
        # at free space; for now, let's simply abort the operation.
3570
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
3571
                                   " is enabled", errors.ECODE_STATE)
3572
      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
3573

    
3574
  def Exec(self, feedback_fn):
3575
    """Execute disk grow.
3576

3577
    """
3578
    instance = self.instance
3579
    disk = self.disk
3580

    
3581
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
3582
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3583
            self.owned_locks(locking.LEVEL_NODE_RES))
3584

    
3585
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
3586

    
3587
    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
3588
    if not disks_ok:
3589
      raise errors.OpExecError("Cannot activate block device to grow")
3590

    
3591
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
3592
                (self.op.disk, instance.name,
3593
                 utils.FormatUnit(self.delta, "h"),
3594
                 utils.FormatUnit(self.target, "h")))
3595

    
3596
    # First run all grow ops in dry-run mode
3597
    for node in instance.all_nodes:
3598
      self.cfg.SetDiskID(disk, node)
3599
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3600
                                           True, True)
3601
      result.Raise("Dry-run grow request failed to node %s" % node)
3602

    
3603
    if wipe_disks:
3604
      # Get disk size from primary node for wiping
3605
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
3606
      result.Raise("Failed to retrieve disk size from node '%s'" %
3607
                   instance.primary_node)
3608

    
3609
      (disk_size_in_bytes, ) = result.payload
3610

    
3611
      if disk_size_in_bytes is None:
3612
        raise errors.OpExecError("Failed to retrieve disk size from primary"
3613
                                 " node '%s'" % instance.primary_node)
3614

    
3615
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
3616

    
3617
      assert old_disk_size >= disk.size, \
3618
        ("Retrieved disk size too small (got %s, should be at least %s)" %
3619
         (old_disk_size, disk.size))
3620
    else:
3621
      old_disk_size = None
3622

    
3623
    # We know that (as far as we can test) operations across different
3624
    # nodes will succeed, time to run it for real on the backing storage
3625
    for node in instance.all_nodes:
3626
      self.cfg.SetDiskID(disk, node)
3627
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3628
                                           False, True)
3629
      result.Raise("Grow request failed to node %s" % node)
3630

    
3631
    # And now execute it for logical storage, on the primary node
3632
    node = instance.primary_node
3633
    self.cfg.SetDiskID(disk, node)
3634
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
3635
                                         False, False)
3636
    result.Raise("Grow request failed to node %s" % node)
3637

    
3638
    disk.RecordGrow(self.delta)
3639
    self.cfg.Update(instance, feedback_fn)
3640

    
3641
    # Changes have been recorded, release node lock
3642
    _ReleaseLocks(self, locking.LEVEL_NODE)
3643

    
3644
    # Downgrade lock while waiting for sync
3645
    self.glm.downgrade(locking.LEVEL_INSTANCE)
3646

    
3647
    assert wipe_disks ^ (old_disk_size is None)
3648

    
3649
    if wipe_disks:
3650
      assert instance.disks[self.op.disk] == disk
3651

    
3652
      # Wipe newly added disk space
3653
      _WipeDisks(self, instance,
3654
                 disks=[(self.op.disk, disk, old_disk_size)])
3655

    
3656
    if self.op.wait_for_sync:
3657
      disk_abort = not _WaitForSync(self, instance, disks=[disk])
3658
      if disk_abort:
3659
        self.LogWarning("Disk syncing has not returned a good status; check"
3660
                        " the instance")
3661
      if instance.admin_state != constants.ADMINST_UP:
3662
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
3663
    elif instance.admin_state != constants.ADMINST_UP:
3664
      self.LogWarning("Not shutting down the disk even if the instance is"
3665
                      " not supposed to be running because no wait for"
3666
                      " sync mode was requested")
3667

    
3668
    assert self.owned_locks(locking.LEVEL_NODE_RES)
3669
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
3670

    
3671

    
3672
class LUInstanceReplaceDisks(LogicalUnit):
3673
  """Replace the disks of an instance.
3674

3675
  """
3676
  HPATH = "mirrors-replace"
3677
  HTYPE = constants.HTYPE_INSTANCE
3678
  REQ_BGL = False
3679

    
3680
  def CheckArguments(self):
3681
    """Check arguments.
3682

3683
    """
3684
    remote_node = self.op.remote_node
3685
    ialloc = self.op.iallocator
3686
    if self.op.mode == constants.REPLACE_DISK_CHG:
3687
      if remote_node is None and ialloc is None:
3688
        raise errors.OpPrereqError("When changing the secondary either an"
3689
                                   " iallocator script must be used or the"
3690
                                   " new node given", errors.ECODE_INVAL)
3691
      else:
3692
        _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
3693

    
3694
    elif remote_node is not None or ialloc is not None:
3695
      # Not replacing the secondary
3696
      raise errors.OpPrereqError("The iallocator and new node options can"
3697
                                 " only be used when changing the"
3698
                                 " secondary node", errors.ECODE_INVAL)
3699

    
3700
  def ExpandNames(self):
3701
    self._ExpandAndLockInstance()
3702

    
3703
    assert locking.LEVEL_NODE not in self.needed_locks
3704
    assert locking.LEVEL_NODE_RES not in self.needed_locks
3705
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
3706

    
3707
    assert self.op.iallocator is None or self.op.remote_node is None, \
3708
      "Conflicting options"
3709

    
3710
    if self.op.remote_node is not None:
3711
      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
3712

    
3713
      # Warning: do not remove the locking of the new secondary here
3714
      # unless DRBD8.AddChildren is changed to work in parallel;
3715
      # currently it doesn't since parallel invocations of
3716
      # FindUnusedMinor will conflict
3717
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
3718
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3719
    else:
3720
      self.needed_locks[locking.LEVEL_NODE] = []
3721
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3722

    
3723
      if self.op.iallocator is not None:
3724
        # iallocator will select a new node in the same group
3725
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
3726
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3727

    
3728
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3729

    
3730
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
3731
                                   self.op.iallocator, self.op.remote_node,
3732
                                   self.op.disks, self.op.early_release,
3733
                                   self.op.ignore_ipolicy)
3734

    
3735
    self.tasklets = [self.replacer]
3736

    
3737
  def DeclareLocks(self, level):
3738
    if level == locking.LEVEL_NODEGROUP:
3739
      assert self.op.remote_node is None
3740
      assert self.op.iallocator is not None
3741
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3742

    
3743
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
3744
      # Lock all groups used by instance optimistically; this requires going
3745
      # via the node before it's locked, requiring verification later on
3746
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3747
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
3748

    
3749
    elif level == locking.LEVEL_NODE:
3750
      if self.op.iallocator is not None:
3751
        assert self.op.remote_node is None
3752
        assert not self.needed_locks[locking.LEVEL_NODE]
3753
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3754

    
3755
        # Lock member nodes of all locked groups
3756
        self.needed_locks[locking.LEVEL_NODE] = \
3757
          [node_name
3758
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3759
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
3760
      else:
3761
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3762

    
3763
        self._LockInstancesNodes()
3764

    
3765
    elif level == locking.LEVEL_NODE_RES:
3766
      # Reuse node locks
3767
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3768
        self.needed_locks[locking.LEVEL_NODE]
3769

    
3770
  def BuildHooksEnv(self):
3771
    """Build hooks env.
3772

3773
    This runs on the master, the primary and all the secondaries.
3774

3775
    """
3776
    instance = self.replacer.instance
3777
    env = {
3778
      "MODE": self.op.mode,
3779
      "NEW_SECONDARY": self.op.remote_node,
3780
      "OLD_SECONDARY": instance.secondary_nodes[0],
3781
      }
3782
    env.update(_BuildInstanceHookEnvByObject(self, instance))
3783
    return env
3784

    
3785
  def BuildHooksNodes(self):
3786
    """Build hooks nodes.
3787

3788
    """
3789
    instance = self.replacer.instance
3790
    nl = [
3791
      self.cfg.GetMasterNode(),
3792
      instance.primary_node,
3793
      ]
3794
    if self.op.remote_node is not None:
3795
      nl.append(self.op.remote_node)
3796
    return nl, nl
3797

    
3798
  def CheckPrereq(self):
3799
    """Check prerequisites.
3800

3801
    """
3802
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
3803
            self.op.iallocator is None)
3804

    
3805
    # Verify if node group locks are still correct
3806
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3807
    if owned_groups:
3808
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
3809

    
3810
    return LogicalUnit.CheckPrereq(self)
3811

    
3812

    
3813
class LUInstanceActivateDisks(NoHooksLU):
3814
  """Bring up an instance's disks.
3815

3816
  """
3817
  REQ_BGL = False
3818

    
3819
  def ExpandNames(self):
3820
    self._ExpandAndLockInstance()
3821
    self.needed_locks[locking.LEVEL_NODE] = []
3822
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3823

    
3824
  def DeclareLocks(self, level):
3825
    if level == locking.LEVEL_NODE:
3826
      self._LockInstancesNodes()
3827

    
3828
  def CheckPrereq(self):
3829
    """Check prerequisites.
3830

3831
    This checks that the instance is in the cluster.
3832

3833
    """
3834
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3835
    assert self.instance is not None, \
3836
      "Cannot retrieve locked instance %s" % self.op.instance_name
3837
    _CheckNodeOnline(self, self.instance.primary_node)
3838

    
3839
  def Exec(self, feedback_fn):
3840
    """Activate the disks.
3841

3842
    """
3843
    disks_ok, disks_info = \
3844
              _AssembleInstanceDisks(self, self.instance,
3845
                                     ignore_size=self.op.ignore_size)
3846
    if not disks_ok:
3847
      raise errors.OpExecError("Cannot activate block devices")
3848

    
3849
    if self.op.wait_for_sync:
3850
      if not _WaitForSync(self, self.instance):
3851
        raise errors.OpExecError("Some disks of the instance are degraded!")
3852

    
3853
    return disks_info
3854

    
3855

    
3856
class LUInstanceDeactivateDisks(NoHooksLU):
3857
  """Shutdown an instance's disks.
3858

3859
  """
3860
  REQ_BGL = False
3861

    
3862
  def ExpandNames(self):
3863
    self._ExpandAndLockInstance()
3864
    self.needed_locks[locking.LEVEL_NODE] = []
3865
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3866

    
3867
  def DeclareLocks(self, level):
3868
    if level == locking.LEVEL_NODE:
3869
      self._LockInstancesNodes()
3870

    
3871
  def CheckPrereq(self):
3872
    """Check prerequisites.
3873

3874
    This checks that the instance is in the cluster.
3875

3876
    """
3877
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3878
    assert self.instance is not None, \
3879
      "Cannot retrieve locked instance %s" % self.op.instance_name
3880

    
3881
  def Exec(self, feedback_fn):
3882
    """Deactivate the disks
3883

3884
    """
3885
    instance = self.instance
3886
    if self.op.force:
3887
      _ShutdownInstanceDisks(self, instance)
3888
    else:
3889
      _SafeShutdownInstanceDisks(self, instance)
3890

    
3891

    
3892
class LUInstanceStartup(LogicalUnit):
3893
  """Starts an instance.
3894

3895
  """
3896
  HPATH = "instance-start"
3897
  HTYPE = constants.HTYPE_INSTANCE
3898
  REQ_BGL = False
3899

    
3900
  def CheckArguments(self):
3901
    # extra beparams
3902
    if self.op.beparams:
3903
      # fill the beparams dict
3904
      objects.UpgradeBeParams(self.op.beparams)
3905
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3906

    
3907
  def ExpandNames(self):
3908
    self._ExpandAndLockInstance()
3909
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3910

    
3911
  def DeclareLocks(self, level):
3912
    if level == locking.LEVEL_NODE_RES:
3913
      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
3914

    
3915
  def BuildHooksEnv(self):
3916
    """Build hooks env.
3917

3918
    This runs on master, primary and secondary nodes of the instance.
3919

3920
    """
3921
    env = {
3922
      "FORCE": self.op.force,
3923
      }
3924

    
3925
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3926

    
3927
    return env
3928

    
3929
  def BuildHooksNodes(self):
3930
    """Build hooks nodes.
3931

3932
    """
3933
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3934
    return (nl, nl)
3935

    
3936
  def CheckPrereq(self):
3937
    """Check prerequisites.
3938

3939
    This checks that the instance is in the cluster.
3940

3941
    """
3942
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3943
    assert self.instance is not None, \
3944
      "Cannot retrieve locked instance %s" % self.op.instance_name
3945

    
3946
    # extra hvparams
3947
    if self.op.hvparams:
3948
      # check hypervisor parameter syntax (locally)
3949
      cluster = self.cfg.GetClusterInfo()
3950
      utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
3951
      filled_hvp = cluster.FillHV(instance)
3952
      filled_hvp.update(self.op.hvparams)
3953
      hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
3954
      hv_type.CheckParameterSyntax(filled_hvp)
3955
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3956

    
3957
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3958

    
3959
    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
3960

    
3961
    if self.primary_offline and self.op.ignore_offline_nodes:
3962
      self.LogWarning("Ignoring offline primary node")
3963

    
3964
      if self.op.hvparams or self.op.beparams:
3965
        self.LogWarning("Overridden parameters are ignored")
3966
    else:
3967
      _CheckNodeOnline(self, instance.primary_node)
3968

    
3969
      bep = self.cfg.GetClusterInfo().FillBE(instance)
3970
      bep.update(self.op.beparams)
3971

    
3972
      # check bridges existence
3973
      _CheckInstanceBridgesExist(self, instance)
3974

    
3975
      remote_info = self.rpc.call_instance_info(instance.primary_node,
3976
                                                instance.name,
3977
                                                instance.hypervisor)
3978
      remote_info.Raise("Error checking node %s" % instance.primary_node,
3979
                        prereq=True, ecode=errors.ECODE_ENVIRON)
3980
      if not remote_info.payload: # not running already
3981
        _CheckNodeFreeMemory(self, instance.primary_node,
3982
                             "starting instance %s" % instance.name,
3983
                             bep[constants.BE_MINMEM], instance.hypervisor)
3984

    
3985
  def Exec(self, feedback_fn):
3986
    """Start the instance.
3987

3988
    """
3989
    instance = self.instance
3990
    force = self.op.force
3991
    reason = self.op.reason
3992

    
3993
    if not self.op.no_remember:
3994
      self.cfg.MarkInstanceUp(instance.name)
3995

    
3996
    if self.primary_offline:
3997
      assert self.op.ignore_offline_nodes
3998
      self.LogInfo("Primary node offline, marked instance as started")
3999
    else:
4000
      node_current = instance.primary_node
4001

    
4002
      _StartInstanceDisks(self, instance, force)
4003

    
4004
      result = \
4005
        self.rpc.call_instance_start(node_current,
4006
                                     (instance, self.op.hvparams,
4007
                                      self.op.beparams),
4008
                                     self.op.startup_paused, reason)
4009
      msg = result.fail_msg
4010
      if msg:
4011
        _ShutdownInstanceDisks(self, instance)
4012
        raise errors.OpExecError("Could not start instance: %s" % msg)
4013

    
4014

    
4015
class LUInstanceShutdown(LogicalUnit):
4016
  """Shutdown an instance.
4017

4018
  """
4019
  HPATH = "instance-stop"
4020
  HTYPE = constants.HTYPE_INSTANCE
4021
  REQ_BGL = False
4022

    
4023
  def ExpandNames(self):
4024
    self._ExpandAndLockInstance()
4025

    
4026
  def BuildHooksEnv(self):
4027
    """Build hooks env.
4028

4029
    This runs on master, primary and secondary nodes of the instance.
4030

4031
    """
4032
    env = _BuildInstanceHookEnvByObject(self, self.instance)
4033
    env["TIMEOUT"] = self.op.timeout
4034
    return env
4035

    
4036
  def BuildHooksNodes(self):
4037
    """Build hooks nodes.
4038

4039
    """
4040
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4041
    return (nl, nl)
4042

    
4043
  def CheckPrereq(self):
4044
    """Check prerequisites.
4045

4046
    This checks that the instance is in the cluster.
4047

4048
    """
4049
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4050
    assert self.instance is not None, \
4051
      "Cannot retrieve locked instance %s" % self.op.instance_name
4052

    
4053
    if not self.op.force:
4054
      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
4055
    else:
4056
      self.LogWarning("Ignoring offline instance check")
4057

    
4058
    self.primary_offline = \
4059
      self.cfg.GetNodeInfo(self.instance.primary_node).offline
4060

    
4061
    if self.primary_offline and self.op.ignore_offline_nodes:
4062
      self.LogWarning("Ignoring offline primary node")
4063
    else:
4064
      _CheckNodeOnline(self, self.instance.primary_node)
4065

    
4066
  def Exec(self, feedback_fn):
4067
    """Shutdown the instance.
4068

4069
    """
4070
    instance = self.instance
4071
    node_current = instance.primary_node
4072
    timeout = self.op.timeout
4073
    reason = self.op.reason
4074

    
4075
    # If the instance is offline we shouldn't mark it as down, as that
4076
    # resets the offline flag.
4077
    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
4078
      self.cfg.MarkInstanceDown(instance.name)
4079

    
4080
    if self.primary_offline:
4081
      assert self.op.ignore_offline_nodes
4082
      self.LogInfo("Primary node offline, marked instance as stopped")
4083
    else:
4084
      result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
4085
                                               reason)
4086
      msg = result.fail_msg
4087
      if msg:
4088
        self.LogWarning("Could not shutdown instance: %s", msg)
4089

    
4090
      _ShutdownInstanceDisks(self, instance)
4091

    
4092

    
4093
class LUInstanceReinstall(LogicalUnit):
4094
  """Reinstall an instance.
4095

4096
  """
4097
  HPATH = "instance-reinstall"
4098
  HTYPE = constants.HTYPE_INSTANCE
4099
  REQ_BGL = False
4100

    
4101
  def ExpandNames(self):
4102
    self._ExpandAndLockInstance()
4103

    
4104
  def BuildHooksEnv(self):
4105
    """Build hooks env.
4106

4107
    This runs on master, primary and secondary nodes of the instance.
4108

4109
    """
4110
    return _BuildInstanceHookEnvByObject(self, self.instance)
4111

    
4112
  def BuildHooksNodes(self):
4113
    """Build hooks nodes.
4114

4115
    """
4116
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4117
    return (nl, nl)
4118

    
4119
  def CheckPrereq(self):
4120
    """Check prerequisites.
4121

4122
    This checks that the instance is in the cluster and is not running.
4123

4124
    """
4125
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4126
    assert instance is not None, \
4127
      "Cannot retrieve locked instance %s" % self.op.instance_name
4128
    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
4129
                     " offline, cannot reinstall")
4130

    
4131
    if instance.disk_template == constants.DT_DISKLESS:
4132
      raise