Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance.py @ 763ad5be

History | View | Annotate | Download (200.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with instances."""
23

    
24
import OpenSSL
25
import copy
26
import itertools
27
import logging
28
import operator
29
import os
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import ht
36
from ganeti import hypervisor
37
from ganeti import locking
38
from ganeti.masterd import iallocator
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import qlang
45
from ganeti import rpc
46
from ganeti import utils
47
from ganeti import query
48

    
49
from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50
  ResultWithJobs, Tasklet
51

    
52
from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54
  _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55
  _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56
  _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57
  _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58
  _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
59
  _CheckInstanceState, _ExpandNodeName
60
from ganeti.cmdlib.instance_storage import _CreateDisks, \
61
  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
62
  _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63
  _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64
  _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65
  _AssembleInstanceDisks, _ExpandCheckDisks
66
from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67
  _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68
  _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69
  _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70
  _GetInstanceInfoText, _RemoveDisks
71

    
72
import ganeti.masterd.instance
73

    
74

    
75
#: Type description for changes as returned by L{ApplyContainerMods}'s
76
#: callbacks
77
_TApplyContModsCbChanges = \
78
  ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
79
    ht.TNonEmptyString,
80
    ht.TAny,
81
    ])))
82

    
83

    
84
def _CheckHostnameSane(lu, name):
85
  """Ensures that a given hostname resolves to a 'sane' name.
86

87
  The given name is required to be a prefix of the resolved hostname,
88
  to prevent accidental mismatches.
89

90
  @param lu: the logical unit on behalf of which we're checking
91
  @param name: the name we should resolve and check
92
  @return: the resolved hostname object
93

94
  """
95
  hostname = netutils.GetHostname(name=name)
96
  if hostname.name != name:
97
    lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
98
  if not utils.MatchNameComponent(name, [hostname.name]):
99
    raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
100
                                " same as given hostname '%s'") %
101
                               (hostname.name, name), errors.ECODE_INVAL)
102
  return hostname
103

    
104

    
105
def _CheckOpportunisticLocking(op):
106
  """Generate error if opportunistic locking is not possible.
107

108
  """
109
  if op.opportunistic_locking and not op.iallocator:
110
    raise errors.OpPrereqError("Opportunistic locking is only available in"
111
                               " combination with an instance allocator",
112
                               errors.ECODE_INVAL)
113

    
114

    
115
def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
116
  """Wrapper around IAReqInstanceAlloc.
117

118
  @param op: The instance opcode
119
  @param disks: The computed disks
120
  @param nics: The computed nics
121
  @param beparams: The full filled beparams
122
  @param node_whitelist: List of nodes which should appear as online to the
123
    allocator (unless the node is already marked offline)
124

125
  @returns: A filled L{iallocator.IAReqInstanceAlloc}
126

127
  """
128
  spindle_use = beparams[constants.BE_SPINDLE_USE]
129
  return iallocator.IAReqInstanceAlloc(name=op.instance_name,
130
                                       disk_template=op.disk_template,
131
                                       tags=op.tags,
132
                                       os=op.os_type,
133
                                       vcpus=beparams[constants.BE_VCPUS],
134
                                       memory=beparams[constants.BE_MAXMEM],
135
                                       spindle_use=spindle_use,
136
                                       disks=disks,
137
                                       nics=[n.ToDict() for n in nics],
138
                                       hypervisor=op.hypervisor,
139
                                       node_whitelist=node_whitelist)
140

    
141

    
142
def _ComputeFullBeParams(op, cluster):
143
  """Computes the full beparams.
144

145
  @param op: The instance opcode
146
  @param cluster: The cluster config object
147

148
  @return: The fully filled beparams
149

150
  """
151
  default_beparams = cluster.beparams[constants.PP_DEFAULT]
152
  for param, value in op.beparams.iteritems():
153
    if value == constants.VALUE_AUTO:
154
      op.beparams[param] = default_beparams[param]
155
  objects.UpgradeBeParams(op.beparams)
156
  utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
157
  return cluster.SimpleFillBE(op.beparams)
158

    
159

    
160
def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
161
  """Computes the nics.
162

163
  @param op: The instance opcode
164
  @param cluster: Cluster configuration object
165
  @param default_ip: The default ip to assign
166
  @param cfg: An instance of the configuration object
167
  @param ec_id: Execution context ID
168

169
  @returns: The build up nics
170

171
  """
172
  nics = []
173
  for nic in op.nics:
174
    nic_mode_req = nic.get(constants.INIC_MODE, None)
175
    nic_mode = nic_mode_req
176
    if nic_mode is None or nic_mode == constants.VALUE_AUTO:
177
      nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
178

    
179
    net = nic.get(constants.INIC_NETWORK, None)
180
    link = nic.get(constants.NIC_LINK, None)
181
    ip = nic.get(constants.INIC_IP, None)
182

    
183
    if net is None or net.lower() == constants.VALUE_NONE:
184
      net = None
185
    else:
186
      if nic_mode_req is not None or link is not None:
187
        raise errors.OpPrereqError("If network is given, no mode or link"
188
                                   " is allowed to be passed",
189
                                   errors.ECODE_INVAL)
190

    
191
    # ip validity checks
192
    if ip is None or ip.lower() == constants.VALUE_NONE:
193
      nic_ip = None
194
    elif ip.lower() == constants.VALUE_AUTO:
195
      if not op.name_check:
196
        raise errors.OpPrereqError("IP address set to auto but name checks"
197
                                   " have been skipped",
198
                                   errors.ECODE_INVAL)
199
      nic_ip = default_ip
200
    else:
201
      # We defer pool operations until later, so that the iallocator has
202
      # filled in the instance's node(s) dimara
203
      if ip.lower() == constants.NIC_IP_POOL:
204
        if net is None:
205
          raise errors.OpPrereqError("if ip=pool, parameter network"
206
                                     " must be passed too",
207
                                     errors.ECODE_INVAL)
208

    
209
      elif not netutils.IPAddress.IsValid(ip):
210
        raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
211
                                   errors.ECODE_INVAL)
212

    
213
      nic_ip = ip
214

    
215
    # TODO: check the ip address for uniqueness
216
    if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
217
      raise errors.OpPrereqError("Routed nic mode requires an ip address",
218
                                 errors.ECODE_INVAL)
219

    
220
    # MAC address verification
221
    mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
222
    if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
223
      mac = utils.NormalizeAndValidateMac(mac)
224

    
225
      try:
226
        # TODO: We need to factor this out
227
        cfg.ReserveMAC(mac, ec_id)
228
      except errors.ReservationError:
229
        raise errors.OpPrereqError("MAC address %s already in use"
230
                                   " in cluster" % mac,
231
                                   errors.ECODE_NOTUNIQUE)
232

    
233
    #  Build nic parameters
234
    nicparams = {}
235
    if nic_mode_req:
236
      nicparams[constants.NIC_MODE] = nic_mode
237
    if link:
238
      nicparams[constants.NIC_LINK] = link
239

    
240
    check_params = cluster.SimpleFillNIC(nicparams)
241
    objects.NIC.CheckParameterSyntax(check_params)
242
    net_uuid = cfg.LookupNetwork(net)
243
    name = nic.get(constants.INIC_NAME, None)
244
    if name is not None and name.lower() == constants.VALUE_NONE:
245
      name = None
246
    nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
247
                          network=net_uuid, nicparams=nicparams)
248
    nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
249
    nics.append(nic_obj)
250

    
251
  return nics
252

    
253

    
254
def _CheckForConflictingIp(lu, ip, node):
255
  """In case of conflicting IP address raise error.
256

257
  @type ip: string
258
  @param ip: IP address
259
  @type node: string
260
  @param node: node name
261

262
  """
263
  (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
264
  if conf_net is not None:
265
    raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
266
                                " network %s, but the target NIC does not." %
267
                                (ip, conf_net)),
268
                               errors.ECODE_STATE)
269

    
270
  return (None, None)
271

    
272

    
273
def _ComputeIPolicyInstanceSpecViolation(
274
  ipolicy, instance_spec, disk_template,
275
  _compute_fn=_ComputeIPolicySpecViolation):
276
  """Compute if instance specs meets the specs of ipolicy.
277

278
  @type ipolicy: dict
279
  @param ipolicy: The ipolicy to verify against
280
  @param instance_spec: dict
281
  @param instance_spec: The instance spec to verify
282
  @type disk_template: string
283
  @param disk_template: the disk template of the instance
284
  @param _compute_fn: The function to verify ipolicy (unittest only)
285
  @see: L{_ComputeIPolicySpecViolation}
286

287
  """
288
  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
289
  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
290
  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
291
  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
292
  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
293
  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
294

    
295
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
296
                     disk_sizes, spindle_use, disk_template)
297

    
298

    
299
def _CheckOSVariant(os_obj, name):
300
  """Check whether an OS name conforms to the os variants specification.
301

302
  @type os_obj: L{objects.OS}
303
  @param os_obj: OS object to check
304
  @type name: string
305
  @param name: OS name passed by the user, to check for validity
306

307
  """
308
  variant = objects.OS.GetVariant(name)
309
  if not os_obj.supported_variants:
310
    if variant:
311
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
312
                                 " passed)" % (os_obj.name, variant),
313
                                 errors.ECODE_INVAL)
314
    return
315
  if not variant:
316
    raise errors.OpPrereqError("OS name must include a variant",
317
                               errors.ECODE_INVAL)
318

    
319
  if variant not in os_obj.supported_variants:
320
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
321

    
322

    
323
def _CheckNodeHasOS(lu, node, os_name, force_variant):
324
  """Ensure that a node supports a given OS.
325

326
  @param lu: the LU on behalf of which we make the check
327
  @param node: the node to check
328
  @param os_name: the OS to query about
329
  @param force_variant: whether to ignore variant errors
330
  @raise errors.OpPrereqError: if the node is not supporting the OS
331

332
  """
333
  result = lu.rpc.call_os_get(node, os_name)
334
  result.Raise("OS '%s' not in supported OS list for node %s" %
335
               (os_name, node),
336
               prereq=True, ecode=errors.ECODE_INVAL)
337
  if not force_variant:
338
    _CheckOSVariant(result.payload, os_name)
339

    
340

    
341
def _CheckNicsBridgesExist(lu, target_nics, target_node):
342
  """Check that the brigdes needed by a list of nics exist.
343

344
  """
345
  cluster = lu.cfg.GetClusterInfo()
346
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
347
  brlist = [params[constants.NIC_LINK] for params in paramslist
348
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
349
  if brlist:
350
    result = lu.rpc.call_bridges_exist(target_node, brlist)
351
    result.Raise("Error checking bridges on destination node '%s'" %
352
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
353

    
354

    
355
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
356
  """Checks if a node has enough free memory.
357

358
  This function checks if a given node has the needed amount of free
359
  memory. In case the node has less memory or we cannot get the
360
  information from the node, this function raises an OpPrereqError
361
  exception.
362

363
  @type lu: C{LogicalUnit}
364
  @param lu: a logical unit from which we get configuration data
365
  @type node: C{str}
366
  @param node: the node to check
367
  @type reason: C{str}
368
  @param reason: string to use in the error message
369
  @type requested: C{int}
370
  @param requested: the amount of memory in MiB to check for
371
  @type hypervisor_name: C{str}
372
  @param hypervisor_name: the hypervisor to ask for memory stats
373
  @rtype: integer
374
  @return: node current free memory
375
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
376
      we cannot check the node
377

378
  """
379
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
380
  nodeinfo[node].Raise("Can't get data from node %s" % node,
381
                       prereq=True, ecode=errors.ECODE_ENVIRON)
382
  (_, _, (hv_info, )) = nodeinfo[node].payload
383

    
384
  free_mem = hv_info.get("memory_free", None)
385
  if not isinstance(free_mem, int):
386
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
387
                               " was '%s'" % (node, free_mem),
388
                               errors.ECODE_ENVIRON)
389
  if requested > free_mem:
390
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
391
                               " needed %s MiB, available %s MiB" %
392
                               (node, reason, requested, free_mem),
393
                               errors.ECODE_NORES)
394
  return free_mem
395

    
396

    
397
class LUInstanceCreate(LogicalUnit):
398
  """Create an instance.
399

400
  """
401
  HPATH = "instance-add"
402
  HTYPE = constants.HTYPE_INSTANCE
403
  REQ_BGL = False
404

    
405
  def CheckArguments(self):
406
    """Check arguments.
407

408
    """
409
    # do not require name_check to ease forward/backward compatibility
410
    # for tools
411
    if self.op.no_install and self.op.start:
412
      self.LogInfo("No-installation mode selected, disabling startup")
413
      self.op.start = False
414
    # validate/normalize the instance name
415
    self.op.instance_name = \
416
      netutils.Hostname.GetNormalizedName(self.op.instance_name)
417

    
418
    if self.op.ip_check and not self.op.name_check:
419
      # TODO: make the ip check more flexible and not depend on the name check
420
      raise errors.OpPrereqError("Cannot do IP address check without a name"
421
                                 " check", errors.ECODE_INVAL)
422

    
423
    # check nics' parameter names
424
    for nic in self.op.nics:
425
      utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
426
    # check that NIC's parameters names are unique and valid
427
    utils.ValidateDeviceNames("NIC", self.op.nics)
428

    
429
    # check that disk's names are unique and valid
430
    utils.ValidateDeviceNames("disk", self.op.disks)
431

    
432
    cluster = self.cfg.GetClusterInfo()
433
    if not self.op.disk_template in cluster.enabled_disk_templates:
434
      raise errors.OpPrereqError("Cannot create an instance with disk template"
435
                                 " '%s', because it is not enabled in the"
436
                                 " cluster. Enabled disk templates are: %s." %
437
                                 (self.op.disk_template,
438
                                  ",".join(cluster.enabled_disk_templates)))
439

    
440
    # check disks. parameter names and consistent adopt/no-adopt strategy
441
    has_adopt = has_no_adopt = False
442
    for disk in self.op.disks:
443
      if self.op.disk_template != constants.DT_EXT:
444
        utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
445
      if constants.IDISK_ADOPT in disk:
446
        has_adopt = True
447
      else:
448
        has_no_adopt = True
449
    if has_adopt and has_no_adopt:
450
      raise errors.OpPrereqError("Either all disks are adopted or none is",
451
                                 errors.ECODE_INVAL)
452
    if has_adopt:
453
      if self.op.disk_template not in constants.DTS_MAY_ADOPT:
454
        raise errors.OpPrereqError("Disk adoption is not supported for the"
455
                                   " '%s' disk template" %
456
                                   self.op.disk_template,
457
                                   errors.ECODE_INVAL)
458
      if self.op.iallocator is not None:
459
        raise errors.OpPrereqError("Disk adoption not allowed with an"
460
                                   " iallocator script", errors.ECODE_INVAL)
461
      if self.op.mode == constants.INSTANCE_IMPORT:
462
        raise errors.OpPrereqError("Disk adoption not allowed for"
463
                                   " instance import", errors.ECODE_INVAL)
464
    else:
465
      if self.op.disk_template in constants.DTS_MUST_ADOPT:
466
        raise errors.OpPrereqError("Disk template %s requires disk adoption,"
467
                                   " but no 'adopt' parameter given" %
468
                                   self.op.disk_template,
469
                                   errors.ECODE_INVAL)
470

    
471
    self.adopt_disks = has_adopt
472

    
473
    # instance name verification
474
    if self.op.name_check:
475
      self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
476
      self.op.instance_name = self.hostname1.name
477
      # used in CheckPrereq for ip ping check
478
      self.check_ip = self.hostname1.ip
479
    else:
480
      self.check_ip = None
481

    
482
    # file storage checks
483
    if (self.op.file_driver and
484
        not self.op.file_driver in constants.FILE_DRIVER):
485
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
486
                                 self.op.file_driver, errors.ECODE_INVAL)
487

    
488
    if self.op.disk_template == constants.DT_FILE:
489
      opcodes.RequireFileStorage()
490
    elif self.op.disk_template == constants.DT_SHARED_FILE:
491
      opcodes.RequireSharedFileStorage()
492

    
493
    ### Node/iallocator related checks
494
    _CheckIAllocatorOrNode(self, "iallocator", "pnode")
495

    
496
    if self.op.pnode is not None:
497
      if self.op.disk_template in constants.DTS_INT_MIRROR:
498
        if self.op.snode is None:
499
          raise errors.OpPrereqError("The networked disk templates need"
500
                                     " a mirror node", errors.ECODE_INVAL)
501
      elif self.op.snode:
502
        self.LogWarning("Secondary node will be ignored on non-mirrored disk"
503
                        " template")
504
        self.op.snode = None
505

    
506
    _CheckOpportunisticLocking(self.op)
507

    
508
    self._cds = _GetClusterDomainSecret()
509

    
510
    if self.op.mode == constants.INSTANCE_IMPORT:
511
      # On import force_variant must be True, because if we forced it at
512
      # initial install, our only chance when importing it back is that it
513
      # works again!
514
      self.op.force_variant = True
515

    
516
      if self.op.no_install:
517
        self.LogInfo("No-installation mode has no effect during import")
518

    
519
    elif self.op.mode == constants.INSTANCE_CREATE:
520
      if self.op.os_type is None:
521
        raise errors.OpPrereqError("No guest OS specified",
522
                                   errors.ECODE_INVAL)
523
      if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
524
        raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
525
                                   " installation" % self.op.os_type,
526
                                   errors.ECODE_STATE)
527
      if self.op.disk_template is None:
528
        raise errors.OpPrereqError("No disk template specified",
529
                                   errors.ECODE_INVAL)
530

    
531
    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
532
      # Check handshake to ensure both clusters have the same domain secret
533
      src_handshake = self.op.source_handshake
534
      if not src_handshake:
535
        raise errors.OpPrereqError("Missing source handshake",
536
                                   errors.ECODE_INVAL)
537

    
538
      errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
539
                                                           src_handshake)
540
      if errmsg:
541
        raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
542
                                   errors.ECODE_INVAL)
543

    
544
      # Load and check source CA
545
      self.source_x509_ca_pem = self.op.source_x509_ca
546
      if not self.source_x509_ca_pem:
547
        raise errors.OpPrereqError("Missing source X509 CA",
548
                                   errors.ECODE_INVAL)
549

    
550
      try:
551
        (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
552
                                                    self._cds)
553
      except OpenSSL.crypto.Error, err:
554
        raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
555
                                   (err, ), errors.ECODE_INVAL)
556

    
557
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
558
      if errcode is not None:
559
        raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
560
                                   errors.ECODE_INVAL)
561

    
562
      self.source_x509_ca = cert
563

    
564
      src_instance_name = self.op.source_instance_name
565
      if not src_instance_name:
566
        raise errors.OpPrereqError("Missing source instance name",
567
                                   errors.ECODE_INVAL)
568

    
569
      self.source_instance_name = \
570
        netutils.GetHostname(name=src_instance_name).name
571

    
572
    else:
573
      raise errors.OpPrereqError("Invalid instance creation mode %r" %
574
                                 self.op.mode, errors.ECODE_INVAL)
575

    
576
  def ExpandNames(self):
577
    """ExpandNames for CreateInstance.
578

579
    Figure out the right locks for instance creation.
580

581
    """
582
    self.needed_locks = {}
583

    
584
    instance_name = self.op.instance_name
585
    # this is just a preventive check, but someone might still add this
586
    # instance in the meantime, and creation will fail at lock-add time
587
    if instance_name in self.cfg.GetInstanceList():
588
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
589
                                 instance_name, errors.ECODE_EXISTS)
590

    
591
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
592

    
593
    if self.op.iallocator:
594
      # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
595
      # specifying a group on instance creation and then selecting nodes from
596
      # that group
597
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
598
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
599

    
600
      if self.op.opportunistic_locking:
601
        self.opportunistic_locks[locking.LEVEL_NODE] = True
602
        self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
603
    else:
604
      self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
605
      nodelist = [self.op.pnode]
606
      if self.op.snode is not None:
607
        self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
608
        nodelist.append(self.op.snode)
609
      self.needed_locks[locking.LEVEL_NODE] = nodelist
610

    
611
    # in case of import lock the source node too
612
    if self.op.mode == constants.INSTANCE_IMPORT:
613
      src_node = self.op.src_node
614
      src_path = self.op.src_path
615

    
616
      if src_path is None:
617
        self.op.src_path = src_path = self.op.instance_name
618

    
619
      if src_node is None:
620
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
621
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
622
        self.op.src_node = None
623
        if os.path.isabs(src_path):
624
          raise errors.OpPrereqError("Importing an instance from a path"
625
                                     " requires a source node option",
626
                                     errors.ECODE_INVAL)
627
      else:
628
        self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
629
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
630
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
631
        if not os.path.isabs(src_path):
632
          self.op.src_path = src_path = \
633
            utils.PathJoin(pathutils.EXPORT_DIR, src_path)
634

    
635
    self.needed_locks[locking.LEVEL_NODE_RES] = \
636
      _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
637

    
638
  def _RunAllocator(self):
639
    """Run the allocator based on input opcode.
640

641
    """
642
    if self.op.opportunistic_locking:
643
      # Only consider nodes for which a lock is held
644
      node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
645
    else:
646
      node_whitelist = None
647

    
648
    #TODO Export network to iallocator so that it chooses a pnode
649
    #     in a nodegroup that has the desired network connected to
650
    req = _CreateInstanceAllocRequest(self.op, self.disks,
651
                                      self.nics, self.be_full,
652
                                      node_whitelist)
653
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
654

    
655
    ial.Run(self.op.iallocator)
656

    
657
    if not ial.success:
658
      # When opportunistic locks are used only a temporary failure is generated
659
      if self.op.opportunistic_locking:
660
        ecode = errors.ECODE_TEMP_NORES
661
      else:
662
        ecode = errors.ECODE_NORES
663

    
664
      raise errors.OpPrereqError("Can't compute nodes using"
665
                                 " iallocator '%s': %s" %
666
                                 (self.op.iallocator, ial.info),
667
                                 ecode)
668

    
669
    self.op.pnode = ial.result[0]
670
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
671
                 self.op.instance_name, self.op.iallocator,
672
                 utils.CommaJoin(ial.result))
673

    
674
    assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
675

    
676
    if req.RequiredNodes() == 2:
677
      self.op.snode = ial.result[1]
678

    
679
  def BuildHooksEnv(self):
680
    """Build hooks env.
681

682
    This runs on master, primary and secondary nodes of the instance.
683

684
    """
685
    env = {
686
      "ADD_MODE": self.op.mode,
687
      }
688
    if self.op.mode == constants.INSTANCE_IMPORT:
689
      env["SRC_NODE"] = self.op.src_node
690
      env["SRC_PATH"] = self.op.src_path
691
      env["SRC_IMAGES"] = self.src_images
692

    
693
    env.update(_BuildInstanceHookEnv(
694
      name=self.op.instance_name,
695
      primary_node=self.op.pnode,
696
      secondary_nodes=self.secondaries,
697
      status=self.op.start,
698
      os_type=self.op.os_type,
699
      minmem=self.be_full[constants.BE_MINMEM],
700
      maxmem=self.be_full[constants.BE_MAXMEM],
701
      vcpus=self.be_full[constants.BE_VCPUS],
702
      nics=_NICListToTuple(self, self.nics),
703
      disk_template=self.op.disk_template,
704
      disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
705
              d[constants.IDISK_MODE]) for d in self.disks],
706
      bep=self.be_full,
707
      hvp=self.hv_full,
708
      hypervisor_name=self.op.hypervisor,
709
      tags=self.op.tags,
710
      ))
711

    
712
    return env
713

    
714
  def BuildHooksNodes(self):
715
    """Build hooks nodes.
716

717
    """
718
    nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
719
    return nl, nl
720

    
721
  def _ReadExportInfo(self):
722
    """Reads the export information from disk.
723

724
    It will override the opcode source node and path with the actual
725
    information, if these two were not specified before.
726

727
    @return: the export information
728

729
    """
730
    assert self.op.mode == constants.INSTANCE_IMPORT
731

    
732
    src_node = self.op.src_node
733
    src_path = self.op.src_path
734

    
735
    if src_node is None:
736
      locked_nodes = self.owned_locks(locking.LEVEL_NODE)
737
      exp_list = self.rpc.call_export_list(locked_nodes)
738
      found = False
739
      for node in exp_list:
740
        if exp_list[node].fail_msg:
741
          continue
742
        if src_path in exp_list[node].payload:
743
          found = True
744
          self.op.src_node = src_node = node
745
          self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
746
                                                       src_path)
747
          break
748
      if not found:
749
        raise errors.OpPrereqError("No export found for relative path %s" %
750
                                   src_path, errors.ECODE_INVAL)
751

    
752
    _CheckNodeOnline(self, src_node)
753
    result = self.rpc.call_export_info(src_node, src_path)
754
    result.Raise("No export or invalid export found in dir %s" % src_path)
755

    
756
    export_info = objects.SerializableConfigParser.Loads(str(result.payload))
757
    if not export_info.has_section(constants.INISECT_EXP):
758
      raise errors.ProgrammerError("Corrupted export config",
759
                                   errors.ECODE_ENVIRON)
760

    
761
    ei_version = export_info.get(constants.INISECT_EXP, "version")
762
    if (int(ei_version) != constants.EXPORT_VERSION):
763
      raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
764
                                 (ei_version, constants.EXPORT_VERSION),
765
                                 errors.ECODE_ENVIRON)
766
    return export_info
767

    
768
  def _ReadExportParams(self, einfo):
769
    """Use export parameters as defaults.
770

771
    In case the opcode doesn't specify (as in override) some instance
772
    parameters, then try to use them from the export information, if
773
    that declares them.
774

775
    """
776
    self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
777

    
778
    if self.op.disk_template is None:
779
      if einfo.has_option(constants.INISECT_INS, "disk_template"):
780
        self.op.disk_template = einfo.get(constants.INISECT_INS,
781
                                          "disk_template")
782
        if self.op.disk_template not in constants.DISK_TEMPLATES:
783
          raise errors.OpPrereqError("Disk template specified in configuration"
784
                                     " file is not one of the allowed values:"
785
                                     " %s" %
786
                                     " ".join(constants.DISK_TEMPLATES),
787
                                     errors.ECODE_INVAL)
788
      else:
789
        raise errors.OpPrereqError("No disk template specified and the export"
790
                                   " is missing the disk_template information",
791
                                   errors.ECODE_INVAL)
792

    
793
    if not self.op.disks:
794
      disks = []
795
      # TODO: import the disk iv_name too
796
      for idx in range(constants.MAX_DISKS):
797
        if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
798
          disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
799
          disks.append({constants.IDISK_SIZE: disk_sz})
800
      self.op.disks = disks
801
      if not disks and self.op.disk_template != constants.DT_DISKLESS:
802
        raise errors.OpPrereqError("No disk info specified and the export"
803
                                   " is missing the disk information",
804
                                   errors.ECODE_INVAL)
805

    
806
    if not self.op.nics:
807
      nics = []
808
      for idx in range(constants.MAX_NICS):
809
        if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
810
          ndict = {}
811
          for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
812
            v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
813
            ndict[name] = v
814
          nics.append(ndict)
815
        else:
816
          break
817
      self.op.nics = nics
818

    
819
    if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
820
      self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
821

    
822
    if (self.op.hypervisor is None and
823
        einfo.has_option(constants.INISECT_INS, "hypervisor")):
824
      self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
825

    
826
    if einfo.has_section(constants.INISECT_HYP):
827
      # use the export parameters but do not override the ones
828
      # specified by the user
829
      for name, value in einfo.items(constants.INISECT_HYP):
830
        if name not in self.op.hvparams:
831
          self.op.hvparams[name] = value
832

    
833
    if einfo.has_section(constants.INISECT_BEP):
834
      # use the parameters, without overriding
835
      for name, value in einfo.items(constants.INISECT_BEP):
836
        if name not in self.op.beparams:
837
          self.op.beparams[name] = value
838
        # Compatibility for the old "memory" be param
839
        if name == constants.BE_MEMORY:
840
          if constants.BE_MAXMEM not in self.op.beparams:
841
            self.op.beparams[constants.BE_MAXMEM] = value
842
          if constants.BE_MINMEM not in self.op.beparams:
843
            self.op.beparams[constants.BE_MINMEM] = value
844
    else:
845
      # try to read the parameters old style, from the main section
846
      for name in constants.BES_PARAMETERS:
847
        if (name not in self.op.beparams and
848
            einfo.has_option(constants.INISECT_INS, name)):
849
          self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
850

    
851
    if einfo.has_section(constants.INISECT_OSP):
852
      # use the parameters, without overriding
853
      for name, value in einfo.items(constants.INISECT_OSP):
854
        if name not in self.op.osparams:
855
          self.op.osparams[name] = value
856

    
857
  def _RevertToDefaults(self, cluster):
858
    """Revert the instance parameters to the default values.
859

860
    """
861
    # hvparams
862
    hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
863
    for name in self.op.hvparams.keys():
864
      if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
865
        del self.op.hvparams[name]
866
    # beparams
867
    be_defs = cluster.SimpleFillBE({})
868
    for name in self.op.beparams.keys():
869
      if name in be_defs and be_defs[name] == self.op.beparams[name]:
870
        del self.op.beparams[name]
871
    # nic params
872
    nic_defs = cluster.SimpleFillNIC({})
873
    for nic in self.op.nics:
874
      for name in constants.NICS_PARAMETERS:
875
        if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
876
          del nic[name]
877
    # osparams
878
    os_defs = cluster.SimpleFillOS(self.op.os_type, {})
879
    for name in self.op.osparams.keys():
880
      if name in os_defs and os_defs[name] == self.op.osparams[name]:
881
        del self.op.osparams[name]
882

    
883
  def _CalculateFileStorageDir(self):
884
    """Calculate final instance file storage dir.
885

886
    """
887
    # file storage dir calculation/check
888
    self.instance_file_storage_dir = None
889
    if self.op.disk_template in constants.DTS_FILEBASED:
890
      # build the full file storage dir path
891
      joinargs = []
892

    
893
      if self.op.disk_template == constants.DT_SHARED_FILE:
894
        get_fsd_fn = self.cfg.GetSharedFileStorageDir
895
      else:
896
        get_fsd_fn = self.cfg.GetFileStorageDir
897

    
898
      cfg_storagedir = get_fsd_fn()
899
      if not cfg_storagedir:
900
        raise errors.OpPrereqError("Cluster file storage dir not defined",
901
                                   errors.ECODE_STATE)
902
      joinargs.append(cfg_storagedir)
903

    
904
      if self.op.file_storage_dir is not None:
905
        joinargs.append(self.op.file_storage_dir)
906

    
907
      joinargs.append(self.op.instance_name)
908

    
909
      # pylint: disable=W0142
910
      self.instance_file_storage_dir = utils.PathJoin(*joinargs)
911

    
912
  def CheckPrereq(self): # pylint: disable=R0914
913
    """Check prerequisites.
914

915
    """
916
    self._CalculateFileStorageDir()
917

    
918
    if self.op.mode == constants.INSTANCE_IMPORT:
919
      export_info = self._ReadExportInfo()
920
      self._ReadExportParams(export_info)
921
      self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
922
    else:
923
      self._old_instance_name = None
924

    
925
    if (not self.cfg.GetVGName() and
926
        self.op.disk_template not in constants.DTS_NOT_LVM):
927
      raise errors.OpPrereqError("Cluster does not support lvm-based"
928
                                 " instances", errors.ECODE_STATE)
929

    
930
    if (self.op.hypervisor is None or
931
        self.op.hypervisor == constants.VALUE_AUTO):
932
      self.op.hypervisor = self.cfg.GetHypervisorType()
933

    
934
    cluster = self.cfg.GetClusterInfo()
935
    enabled_hvs = cluster.enabled_hypervisors
936
    if self.op.hypervisor not in enabled_hvs:
937
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
938
                                 " cluster (%s)" %
939
                                 (self.op.hypervisor, ",".join(enabled_hvs)),
940
                                 errors.ECODE_STATE)
941

    
942
    # Check tag validity
943
    for tag in self.op.tags:
944
      objects.TaggableObject.ValidateTag(tag)
945

    
946
    # check hypervisor parameter syntax (locally)
947
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
948
    filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
949
                                      self.op.hvparams)
950
    hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
951
    hv_type.CheckParameterSyntax(filled_hvp)
952
    self.hv_full = filled_hvp
953
    # check that we don't specify global parameters on an instance
954
    _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
955
                          "instance", "cluster")
956

    
957
    # fill and remember the beparams dict
958
    self.be_full = _ComputeFullBeParams(self.op, cluster)
959

    
960
    # build os parameters
961
    self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
962

    
963
    # now that hvp/bep are in final format, let's reset to defaults,
964
    # if told to do so
965
    if self.op.identify_defaults:
966
      self._RevertToDefaults(cluster)
967

    
968
    # NIC buildup
969
    self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
970
                             self.proc.GetECId())
971

    
972
    # disk checks/pre-build
973
    default_vg = self.cfg.GetVGName()
974
    self.disks = _ComputeDisks(self.op, default_vg)
975

    
976
    if self.op.mode == constants.INSTANCE_IMPORT:
977
      disk_images = []
978
      for idx in range(len(self.disks)):
979
        option = "disk%d_dump" % idx
980
        if export_info.has_option(constants.INISECT_INS, option):
981
          # FIXME: are the old os-es, disk sizes, etc. useful?
982
          export_name = export_info.get(constants.INISECT_INS, option)
983
          image = utils.PathJoin(self.op.src_path, export_name)
984
          disk_images.append(image)
985
        else:
986
          disk_images.append(False)
987

    
988
      self.src_images = disk_images
989

    
990
      if self.op.instance_name == self._old_instance_name:
991
        for idx, nic in enumerate(self.nics):
992
          if nic.mac == constants.VALUE_AUTO:
993
            nic_mac_ini = "nic%d_mac" % idx
994
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
995

    
996
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
997

    
998
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
999
    if self.op.ip_check:
1000
      if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1001
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
1002
                                   (self.check_ip, self.op.instance_name),
1003
                                   errors.ECODE_NOTUNIQUE)
1004

    
1005
    #### mac address generation
1006
    # By generating here the mac address both the allocator and the hooks get
1007
    # the real final mac address rather than the 'auto' or 'generate' value.
1008
    # There is a race condition between the generation and the instance object
1009
    # creation, which means that we know the mac is valid now, but we're not
1010
    # sure it will be when we actually add the instance. If things go bad
1011
    # adding the instance will abort because of a duplicate mac, and the
1012
    # creation job will fail.
1013
    for nic in self.nics:
1014
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1015
        nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1016

    
1017
    #### allocator run
1018

    
1019
    if self.op.iallocator is not None:
1020
      self._RunAllocator()
1021

    
1022
    # Release all unneeded node locks
1023
    keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1024
    _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1025
    _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1026
    _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1027

    
1028
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1029
            self.owned_locks(locking.LEVEL_NODE_RES)), \
1030
      "Node locks differ from node resource locks"
1031

    
1032
    #### node related checks
1033

    
1034
    # check primary node
1035
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1036
    assert self.pnode is not None, \
1037
      "Cannot retrieve locked node %s" % self.op.pnode
1038
    if pnode.offline:
1039
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1040
                                 pnode.name, errors.ECODE_STATE)
1041
    if pnode.drained:
1042
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1043
                                 pnode.name, errors.ECODE_STATE)
1044
    if not pnode.vm_capable:
1045
      raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1046
                                 " '%s'" % pnode.name, errors.ECODE_STATE)
1047

    
1048
    self.secondaries = []
1049

    
1050
    # Fill in any IPs from IP pools. This must happen here, because we need to
1051
    # know the nic's primary node, as specified by the iallocator
1052
    for idx, nic in enumerate(self.nics):
1053
      net_uuid = nic.network
1054
      if net_uuid is not None:
1055
        nobj = self.cfg.GetNetwork(net_uuid)
1056
        netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1057
        if netparams is None:
1058
          raise errors.OpPrereqError("No netparams found for network"
1059
                                     " %s. Propably not connected to"
1060
                                     " node's %s nodegroup" %
1061
                                     (nobj.name, self.pnode.name),
1062
                                     errors.ECODE_INVAL)
1063
        self.LogInfo("NIC/%d inherits netparams %s" %
1064
                     (idx, netparams.values()))
1065
        nic.nicparams = dict(netparams)
1066
        if nic.ip is not None:
1067
          if nic.ip.lower() == constants.NIC_IP_POOL:
1068
            try:
1069
              nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1070
            except errors.ReservationError:
1071
              raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1072
                                         " from the address pool" % idx,
1073
                                         errors.ECODE_STATE)
1074
            self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1075
          else:
1076
            try:
1077
              self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1078
            except errors.ReservationError:
1079
              raise errors.OpPrereqError("IP address %s already in use"
1080
                                         " or does not belong to network %s" %
1081
                                         (nic.ip, nobj.name),
1082
                                         errors.ECODE_NOTUNIQUE)
1083

    
1084
      # net is None, ip None or given
1085
      elif self.op.conflicts_check:
1086
        _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1087

    
1088
    # mirror node verification
1089
    if self.op.disk_template in constants.DTS_INT_MIRROR:
1090
      if self.op.snode == pnode.name:
1091
        raise errors.OpPrereqError("The secondary node cannot be the"
1092
                                   " primary node", errors.ECODE_INVAL)
1093
      _CheckNodeOnline(self, self.op.snode)
1094
      _CheckNodeNotDrained(self, self.op.snode)
1095
      _CheckNodeVmCapable(self, self.op.snode)
1096
      self.secondaries.append(self.op.snode)
1097

    
1098
      snode = self.cfg.GetNodeInfo(self.op.snode)
1099
      if pnode.group != snode.group:
1100
        self.LogWarning("The primary and secondary nodes are in two"
1101
                        " different node groups; the disk parameters"
1102
                        " from the first disk's node group will be"
1103
                        " used")
1104

    
1105
    if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1106
      nodes = [pnode]
1107
      if self.op.disk_template in constants.DTS_INT_MIRROR:
1108
        nodes.append(snode)
1109
      has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1110
      if compat.any(map(has_es, nodes)):
1111
        raise errors.OpPrereqError("Disk template %s not supported with"
1112
                                   " exclusive storage" % self.op.disk_template,
1113
                                   errors.ECODE_STATE)
1114

    
1115
    nodenames = [pnode.name] + self.secondaries
1116

    
1117
    if not self.adopt_disks:
1118
      if self.op.disk_template == constants.DT_RBD:
1119
        # _CheckRADOSFreeSpace() is just a placeholder.
1120
        # Any function that checks prerequisites can be placed here.
1121
        # Check if there is enough space on the RADOS cluster.
1122
        _CheckRADOSFreeSpace()
1123
      elif self.op.disk_template == constants.DT_EXT:
1124
        # FIXME: Function that checks prereqs if needed
1125
        pass
1126
      else:
1127
        # Check lv size requirements, if not adopting
1128
        req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1129
        _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1130

    
1131
    elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1132
      all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1133
                                disk[constants.IDISK_ADOPT])
1134
                     for disk in self.disks])
1135
      if len(all_lvs) != len(self.disks):
1136
        raise errors.OpPrereqError("Duplicate volume names given for adoption",
1137
                                   errors.ECODE_INVAL)
1138
      for lv_name in all_lvs:
1139
        try:
1140
          # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1141
          # to ReserveLV uses the same syntax
1142
          self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1143
        except errors.ReservationError:
1144
          raise errors.OpPrereqError("LV named %s used by another instance" %
1145
                                     lv_name, errors.ECODE_NOTUNIQUE)
1146

    
1147
      vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1148
      vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1149

    
1150
      node_lvs = self.rpc.call_lv_list([pnode.name],
1151
                                       vg_names.payload.keys())[pnode.name]
1152
      node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1153
      node_lvs = node_lvs.payload
1154

    
1155
      delta = all_lvs.difference(node_lvs.keys())
1156
      if delta:
1157
        raise errors.OpPrereqError("Missing logical volume(s): %s" %
1158
                                   utils.CommaJoin(delta),
1159
                                   errors.ECODE_INVAL)
1160
      online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1161
      if online_lvs:
1162
        raise errors.OpPrereqError("Online logical volumes found, cannot"
1163
                                   " adopt: %s" % utils.CommaJoin(online_lvs),
1164
                                   errors.ECODE_STATE)
1165
      # update the size of disk based on what is found
1166
      for dsk in self.disks:
1167
        dsk[constants.IDISK_SIZE] = \
1168
          int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1169
                                        dsk[constants.IDISK_ADOPT])][0]))
1170

    
1171
    elif self.op.disk_template == constants.DT_BLOCK:
1172
      # Normalize and de-duplicate device paths
1173
      all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1174
                       for disk in self.disks])
1175
      if len(all_disks) != len(self.disks):
1176
        raise errors.OpPrereqError("Duplicate disk names given for adoption",
1177
                                   errors.ECODE_INVAL)
1178
      baddisks = [d for d in all_disks
1179
                  if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1180
      if baddisks:
1181
        raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1182
                                   " cannot be adopted" %
1183
                                   (utils.CommaJoin(baddisks),
1184
                                    constants.ADOPTABLE_BLOCKDEV_ROOT),
1185
                                   errors.ECODE_INVAL)
1186

    
1187
      node_disks = self.rpc.call_bdev_sizes([pnode.name],
1188
                                            list(all_disks))[pnode.name]
1189
      node_disks.Raise("Cannot get block device information from node %s" %
1190
                       pnode.name)
1191
      node_disks = node_disks.payload
1192
      delta = all_disks.difference(node_disks.keys())
1193
      if delta:
1194
        raise errors.OpPrereqError("Missing block device(s): %s" %
1195
                                   utils.CommaJoin(delta),
1196
                                   errors.ECODE_INVAL)
1197
      for dsk in self.disks:
1198
        dsk[constants.IDISK_SIZE] = \
1199
          int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1200

    
1201
    # Verify instance specs
1202
    spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1203
    ispec = {
1204
      constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1205
      constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1206
      constants.ISPEC_DISK_COUNT: len(self.disks),
1207
      constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1208
                                  for disk in self.disks],
1209
      constants.ISPEC_NIC_COUNT: len(self.nics),
1210
      constants.ISPEC_SPINDLE_USE: spindle_use,
1211
      }
1212

    
1213
    group_info = self.cfg.GetNodeGroup(pnode.group)
1214
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1215
    res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1216
                                               self.op.disk_template)
1217
    if not self.op.ignore_ipolicy and res:
1218
      msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1219
             (pnode.group, group_info.name, utils.CommaJoin(res)))
1220
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1221

    
1222
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
1223

    
1224
    _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
1225
    # check OS parameters (remotely)
1226
    _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
1227

    
1228
    _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
1229

    
1230
    #TODO: _CheckExtParams (remotely)
1231
    # Check parameters for extstorage
1232

    
1233
    # memory check on primary node
1234
    #TODO(dynmem): use MINMEM for checking
1235
    if self.op.start:
1236
      _CheckNodeFreeMemory(self, self.pnode.name,
1237
                           "creating instance %s" % self.op.instance_name,
1238
                           self.be_full[constants.BE_MAXMEM],
1239
                           self.op.hypervisor)
1240

    
1241
    self.dry_run_result = list(nodenames)
1242

    
1243
  def Exec(self, feedback_fn):
1244
    """Create and add the instance to the cluster.
1245

1246
    """
1247
    instance = self.op.instance_name
1248
    pnode_name = self.pnode.name
1249

    
1250
    assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
1251
                self.owned_locks(locking.LEVEL_NODE)), \
1252
      "Node locks differ from node resource locks"
1253
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1254

    
1255
    ht_kind = self.op.hypervisor
1256
    if ht_kind in constants.HTS_REQ_PORT:
1257
      network_port = self.cfg.AllocatePort()
1258
    else:
1259
      network_port = None
1260

    
1261
    # This is ugly but we got a chicken-egg problem here
1262
    # We can only take the group disk parameters, as the instance
1263
    # has no disks yet (we are generating them right here).
1264
    node = self.cfg.GetNodeInfo(pnode_name)
1265
    nodegroup = self.cfg.GetNodeGroup(node.group)
1266
    disks = _GenerateDiskTemplate(self,
1267
                                  self.op.disk_template,
1268
                                  instance, pnode_name,
1269
                                  self.secondaries,
1270
                                  self.disks,
1271
                                  self.instance_file_storage_dir,
1272
                                  self.op.file_driver,
1273
                                  0,
1274
                                  feedback_fn,
1275
                                  self.cfg.GetGroupDiskParams(nodegroup))
1276

    
1277
    iobj = objects.Instance(name=instance, os=self.op.os_type,
1278
                            primary_node=pnode_name,
1279
                            nics=self.nics, disks=disks,
1280
                            disk_template=self.op.disk_template,
1281
                            admin_state=constants.ADMINST_DOWN,
1282
                            network_port=network_port,
1283
                            beparams=self.op.beparams,
1284
                            hvparams=self.op.hvparams,
1285
                            hypervisor=self.op.hypervisor,
1286
                            osparams=self.op.osparams,
1287
                            )
1288

    
1289
    if self.op.tags:
1290
      for tag in self.op.tags:
1291
        iobj.AddTag(tag)
1292

    
1293
    if self.adopt_disks:
1294
      if self.op.disk_template == constants.DT_PLAIN:
1295
        # rename LVs to the newly-generated names; we need to construct
1296
        # 'fake' LV disks with the old data, plus the new unique_id
1297
        tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
1298
        rename_to = []
1299
        for t_dsk, a_dsk in zip(tmp_disks, self.disks):
1300
          rename_to.append(t_dsk.logical_id)
1301
          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
1302
          self.cfg.SetDiskID(t_dsk, pnode_name)
1303
        result = self.rpc.call_blockdev_rename(pnode_name,
1304
                                               zip(tmp_disks, rename_to))
1305
        result.Raise("Failed to rename adoped LVs")
1306
    else:
1307
      feedback_fn("* creating instance disks...")
1308
      try:
1309
        _CreateDisks(self, iobj)
1310
      except errors.OpExecError:
1311
        self.LogWarning("Device creation failed")
1312
        self.cfg.ReleaseDRBDMinors(instance)
1313
        raise
1314

    
1315
    feedback_fn("adding instance %s to cluster config" % instance)
1316

    
1317
    self.cfg.AddInstance(iobj, self.proc.GetECId())
1318

    
1319
    # Declare that we don't want to remove the instance lock anymore, as we've
1320
    # added the instance to the config
1321
    del self.remove_locks[locking.LEVEL_INSTANCE]
1322

    
1323
    if self.op.mode == constants.INSTANCE_IMPORT:
1324
      # Release unused nodes
1325
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
1326
    else:
1327
      # Release all nodes
1328
      _ReleaseLocks(self, locking.LEVEL_NODE)
1329

    
1330
    disk_abort = False
1331
    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
1332
      feedback_fn("* wiping instance disks...")
1333
      try:
1334
        _WipeDisks(self, iobj)
1335
      except errors.OpExecError, err:
1336
        logging.exception("Wiping disks failed")
1337
        self.LogWarning("Wiping instance disks failed (%s)", err)
1338
        disk_abort = True
1339

    
1340
    if disk_abort:
1341
      # Something is already wrong with the disks, don't do anything else
1342
      pass
1343
    elif self.op.wait_for_sync:
1344
      disk_abort = not _WaitForSync(self, iobj)
1345
    elif iobj.disk_template in constants.DTS_INT_MIRROR:
1346
      # make sure the disks are not degraded (still sync-ing is ok)
1347
      feedback_fn("* checking mirrors status")
1348
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
1349
    else:
1350
      disk_abort = False
1351

    
1352
    if disk_abort:
1353
      _RemoveDisks(self, iobj)
1354
      self.cfg.RemoveInstance(iobj.name)
1355
      # Make sure the instance lock gets removed
1356
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
1357
      raise errors.OpExecError("There are some degraded disks for"
1358
                               " this instance")
1359

    
1360
    # Release all node resource locks
1361
    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
1362

    
1363
    if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
1364
      # we need to set the disks ID to the primary node, since the
1365
      # preceding code might or might have not done it, depending on
1366
      # disk template and other options
1367
      for disk in iobj.disks:
1368
        self.cfg.SetDiskID(disk, pnode_name)
1369
      if self.op.mode == constants.INSTANCE_CREATE:
1370
        if not self.op.no_install:
1371
          pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
1372
                        not self.op.wait_for_sync)
1373
          if pause_sync:
1374
            feedback_fn("* pausing disk sync to install instance OS")
1375
            result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1376
                                                              (iobj.disks,
1377
                                                               iobj), True)
1378
            for idx, success in enumerate(result.payload):
1379
              if not success:
1380
                logging.warn("pause-sync of instance %s for disk %d failed",
1381
                             instance, idx)
1382

    
1383
          feedback_fn("* running the instance OS create scripts...")
1384
          # FIXME: pass debug option from opcode to backend
1385
          os_add_result = \
1386
            self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
1387
                                          self.op.debug_level)
1388
          if pause_sync:
1389
            feedback_fn("* resuming disk sync")
1390
            result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1391
                                                              (iobj.disks,
1392
                                                               iobj), False)
1393
            for idx, success in enumerate(result.payload):
1394
              if not success:
1395
                logging.warn("resume-sync of instance %s for disk %d failed",
1396
                             instance, idx)
1397

    
1398
          os_add_result.Raise("Could not add os for instance %s"
1399
                              " on node %s" % (instance, pnode_name))
1400

    
1401
      else:
1402
        if self.op.mode == constants.INSTANCE_IMPORT:
1403
          feedback_fn("* running the instance OS import scripts...")
1404

    
1405
          transfers = []
1406

    
1407
          for idx, image in enumerate(self.src_images):
1408
            if not image:
1409
              continue
1410

    
1411
            # FIXME: pass debug option from opcode to backend
1412
            dt = masterd.instance.DiskTransfer("disk/%s" % idx,
1413
                                               constants.IEIO_FILE, (image, ),
1414
                                               constants.IEIO_SCRIPT,
1415
                                               (iobj.disks[idx], idx),
1416
                                               None)
1417
            transfers.append(dt)
1418

    
1419
          import_result = \
1420
            masterd.instance.TransferInstanceData(self, feedback_fn,
1421
                                                  self.op.src_node, pnode_name,
1422
                                                  self.pnode.secondary_ip,
1423
                                                  iobj, transfers)
1424
          if not compat.all(import_result):
1425
            self.LogWarning("Some disks for instance %s on node %s were not"
1426
                            " imported successfully" % (instance, pnode_name))
1427

    
1428
          rename_from = self._old_instance_name
1429

    
1430
        elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1431
          feedback_fn("* preparing remote import...")
1432
          # The source cluster will stop the instance before attempting to make
1433
          # a connection. In some cases stopping an instance can take a long
1434
          # time, hence the shutdown timeout is added to the connection
1435
          # timeout.
1436
          connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
1437
                             self.op.source_shutdown_timeout)
1438
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1439

    
1440
          assert iobj.primary_node == self.pnode.name
1441
          disk_results = \
1442
            masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
1443
                                          self.source_x509_ca,
1444
                                          self._cds, timeouts)
1445
          if not compat.all(disk_results):
1446
            # TODO: Should the instance still be started, even if some disks
1447
            # failed to import (valid for local imports, too)?
1448
            self.LogWarning("Some disks for instance %s on node %s were not"
1449
                            " imported successfully" % (instance, pnode_name))
1450

    
1451
          rename_from = self.source_instance_name
1452

    
1453
        else:
1454
          # also checked in the prereq part
1455
          raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
1456
                                       % self.op.mode)
1457

    
1458
        # Run rename script on newly imported instance
1459
        assert iobj.name == instance
1460
        feedback_fn("Running rename script for %s" % instance)
1461
        result = self.rpc.call_instance_run_rename(pnode_name, iobj,
1462
                                                   rename_from,
1463
                                                   self.op.debug_level)
1464
        if result.fail_msg:
1465
          self.LogWarning("Failed to run rename script for %s on node"
1466
                          " %s: %s" % (instance, pnode_name, result.fail_msg))
1467

    
1468
    assert not self.owned_locks(locking.LEVEL_NODE_RES)
1469

    
1470
    if self.op.start:
1471
      iobj.admin_state = constants.ADMINST_UP
1472
      self.cfg.Update(iobj, feedback_fn)
1473
      logging.info("Starting instance %s on node %s", instance, pnode_name)
1474
      feedback_fn("* starting instance...")
1475
      result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
1476
                                            False, self.op.reason)
1477
      result.Raise("Could not start instance")
1478

    
1479
    return list(iobj.all_nodes)
1480

    
1481

    
1482
class LUInstanceRename(LogicalUnit):
1483
  """Rename an instance.
1484

1485
  """
1486
  HPATH = "instance-rename"
1487
  HTYPE = constants.HTYPE_INSTANCE
1488

    
1489
  def CheckArguments(self):
1490
    """Check arguments.
1491

1492
    """
1493
    if self.op.ip_check and not self.op.name_check:
1494
      # TODO: make the ip check more flexible and not depend on the name check
1495
      raise errors.OpPrereqError("IP address check requires a name check",
1496
                                 errors.ECODE_INVAL)
1497

    
1498
  def BuildHooksEnv(self):
1499
    """Build hooks env.
1500

1501
    This runs on master, primary and secondary nodes of the instance.
1502

1503
    """
1504
    env = _BuildInstanceHookEnvByObject(self, self.instance)
1505
    env["INSTANCE_NEW_NAME"] = self.op.new_name
1506
    return env
1507

    
1508
  def BuildHooksNodes(self):
1509
    """Build hooks nodes.
1510

1511
    """
1512
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1513
    return (nl, nl)
1514

    
1515
  def CheckPrereq(self):
1516
    """Check prerequisites.
1517

1518
    This checks that the instance is in the cluster and is not running.
1519

1520
    """
1521
    self.op.instance_name = _ExpandInstanceName(self.cfg,
1522
                                                self.op.instance_name)
1523
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1524
    assert instance is not None
1525
    _CheckNodeOnline(self, instance.primary_node)
1526
    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
1527
                        msg="cannot rename")
1528
    self.instance = instance
1529

    
1530
    new_name = self.op.new_name
1531
    if self.op.name_check:
1532
      hostname = _CheckHostnameSane(self, new_name)
1533
      new_name = self.op.new_name = hostname.name
1534
      if (self.op.ip_check and
1535
          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
1536
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
1537
                                   (hostname.ip, new_name),
1538
                                   errors.ECODE_NOTUNIQUE)
1539

    
1540
    instance_list = self.cfg.GetInstanceList()
1541
    if new_name in instance_list and new_name != instance.name:
1542
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1543
                                 new_name, errors.ECODE_EXISTS)
1544

    
1545
  def Exec(self, feedback_fn):
1546
    """Rename the instance.
1547

1548
    """
1549
    inst = self.instance
1550
    old_name = inst.name
1551

    
1552
    rename_file_storage = False
1553
    if (inst.disk_template in constants.DTS_FILEBASED and
1554
        self.op.new_name != inst.name):
1555
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1556
      rename_file_storage = True
1557

    
1558
    self.cfg.RenameInstance(inst.name, self.op.new_name)
1559
    # Change the instance lock. This is definitely safe while we hold the BGL.
1560
    # Otherwise the new lock would have to be added in acquired mode.
1561
    assert self.REQ_BGL
1562
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1563
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
1564
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
1565

    
1566
    # re-read the instance from the configuration after rename
1567
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
1568

    
1569
    if rename_file_storage:
1570
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1571
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
1572
                                                     old_file_storage_dir,
1573
                                                     new_file_storage_dir)
1574
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
1575
                   " (but the instance has been renamed in Ganeti)" %
1576
                   (inst.primary_node, old_file_storage_dir,
1577
                    new_file_storage_dir))
1578

    
1579
    _StartInstanceDisks(self, inst, None)
1580
    # update info on disks
1581
    info = _GetInstanceInfoText(inst)
1582
    for (idx, disk) in enumerate(inst.disks):
1583
      for node in inst.all_nodes:
1584
        self.cfg.SetDiskID(disk, node)
1585
        result = self.rpc.call_blockdev_setinfo(node, disk, info)
1586
        if result.fail_msg:
1587
          self.LogWarning("Error setting info on node %s for disk %s: %s",
1588
                          node, idx, result.fail_msg)
1589
    try:
1590
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
1591
                                                 old_name, self.op.debug_level)
1592
      msg = result.fail_msg
1593
      if msg:
1594
        msg = ("Could not run OS rename script for instance %s on node %s"
1595
               " (but the instance has been renamed in Ganeti): %s" %
1596
               (inst.name, inst.primary_node, msg))
1597
        self.LogWarning(msg)
1598
    finally:
1599
      _ShutdownInstanceDisks(self, inst)
1600

    
1601
    return inst.name
1602

    
1603

    
1604
class LUInstanceRemove(LogicalUnit):
1605
  """Remove an instance.
1606

1607
  """
1608
  HPATH = "instance-remove"
1609
  HTYPE = constants.HTYPE_INSTANCE
1610
  REQ_BGL = False
1611

    
1612
  def ExpandNames(self):
1613
    self._ExpandAndLockInstance()
1614
    self.needed_locks[locking.LEVEL_NODE] = []
1615
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1616
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1617

    
1618
  def DeclareLocks(self, level):
1619
    if level == locking.LEVEL_NODE:
1620
      self._LockInstancesNodes()
1621
    elif level == locking.LEVEL_NODE_RES:
1622
      # Copy node locks
1623
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1624
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1625

    
1626
  def BuildHooksEnv(self):
1627
    """Build hooks env.
1628

1629
    This runs on master, primary and secondary nodes of the instance.
1630

1631
    """
1632
    env = _BuildInstanceHookEnvByObject(self, self.instance)
1633
    env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
1634
    return env
1635

    
1636
  def BuildHooksNodes(self):
1637
    """Build hooks nodes.
1638

1639
    """
1640
    nl = [self.cfg.GetMasterNode()]
1641
    nl_post = list(self.instance.all_nodes) + nl
1642
    return (nl, nl_post)
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    This checks that the instance is in the cluster.
1648

1649
    """
1650
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651
    assert self.instance is not None, \
1652
      "Cannot retrieve locked instance %s" % self.op.instance_name
1653

    
1654
  def Exec(self, feedback_fn):
1655
    """Remove the instance.
1656

1657
    """
1658
    instance = self.instance
1659
    logging.info("Shutting down instance %s on node %s",
1660
                 instance.name, instance.primary_node)
1661

    
1662
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
1663
                                             self.op.shutdown_timeout,
1664
                                             self.op.reason)
1665
    msg = result.fail_msg
1666
    if msg:
1667
      if self.op.ignore_failures:
1668
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
1669
      else:
1670
        raise errors.OpExecError("Could not shutdown instance %s on"
1671
                                 " node %s: %s" %
1672
                                 (instance.name, instance.primary_node, msg))
1673

    
1674
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1675
            self.owned_locks(locking.LEVEL_NODE_RES))
1676
    assert not (set(instance.all_nodes) -
1677
                self.owned_locks(locking.LEVEL_NODE)), \
1678
      "Not owning correct locks"
1679

    
1680
    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
1681

    
1682

    
1683
def _CheckInstanceBridgesExist(lu, instance, node=None):
1684
  """Check that the brigdes needed by an instance exist.
1685

1686
  """
1687
  if node is None:
1688
    node = instance.primary_node
1689
  _CheckNicsBridgesExist(lu, instance.nics, node)
1690

    
1691

    
1692
class LUInstanceMove(LogicalUnit):
1693
  """Move an instance by data-copying.
1694

1695
  """
1696
  HPATH = "instance-move"
1697
  HTYPE = constants.HTYPE_INSTANCE
1698
  REQ_BGL = False
1699

    
1700
  def ExpandNames(self):
1701
    self._ExpandAndLockInstance()
1702
    target_node = _ExpandNodeName(self.cfg, self.op.target_node)
1703
    self.op.target_node = target_node
1704
    self.needed_locks[locking.LEVEL_NODE] = [target_node]
1705
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1706
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1707

    
1708
  def DeclareLocks(self, level):
1709
    if level == locking.LEVEL_NODE:
1710
      self._LockInstancesNodes(primary_only=True)
1711
    elif level == locking.LEVEL_NODE_RES:
1712
      # Copy node locks
1713
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1714
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1715

    
1716
  def BuildHooksEnv(self):
1717
    """Build hooks env.
1718

1719
    This runs on master, primary and secondary nodes of the instance.
1720

1721
    """
1722
    env = {
1723
      "TARGET_NODE": self.op.target_node,
1724
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1725
      }
1726
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1727
    return env
1728

    
1729
  def BuildHooksNodes(self):
1730
    """Build hooks nodes.
1731

1732
    """
1733
    nl = [
1734
      self.cfg.GetMasterNode(),
1735
      self.instance.primary_node,
1736
      self.op.target_node,
1737
      ]
1738
    return (nl, nl)
1739

    
1740
  def CheckPrereq(self):
1741
    """Check prerequisites.
1742

1743
    This checks that the instance is in the cluster.
1744

1745
    """
1746
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1747
    assert self.instance is not None, \
1748
      "Cannot retrieve locked instance %s" % self.op.instance_name
1749

    
1750
    if instance.disk_template not in constants.DTS_COPYABLE:
1751
      raise errors.OpPrereqError("Disk template %s not suitable for copying" %
1752
                                 instance.disk_template, errors.ECODE_STATE)
1753

    
1754
    node = self.cfg.GetNodeInfo(self.op.target_node)
1755
    assert node is not None, \
1756
      "Cannot retrieve locked node %s" % self.op.target_node
1757

    
1758
    self.target_node = target_node = node.name
1759

    
1760
    if target_node == instance.primary_node:
1761
      raise errors.OpPrereqError("Instance %s is already on the node %s" %
1762
                                 (instance.name, target_node),
1763
                                 errors.ECODE_STATE)
1764

    
1765
    bep = self.cfg.GetClusterInfo().FillBE(instance)
1766

    
1767
    for idx, dsk in enumerate(instance.disks):
1768
      if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
1769
        raise errors.OpPrereqError("Instance disk %d has a complex layout,"
1770
                                   " cannot copy" % idx, errors.ECODE_STATE)
1771

    
1772
    _CheckNodeOnline(self, target_node)
1773
    _CheckNodeNotDrained(self, target_node)
1774
    _CheckNodeVmCapable(self, target_node)
1775
    cluster = self.cfg.GetClusterInfo()
1776
    group_info = self.cfg.GetNodeGroup(node.group)
1777
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1778
    _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
1779
                            ignore=self.op.ignore_ipolicy)
1780

    
1781
    if instance.admin_state == constants.ADMINST_UP:
1782
      # check memory requirements on the secondary node
1783
      _CheckNodeFreeMemory(self, target_node,
1784
                           "failing over instance %s" %
1785
                           instance.name, bep[constants.BE_MAXMEM],
1786
                           instance.hypervisor)
1787
    else:
1788
      self.LogInfo("Not checking memory on the secondary node as"
1789
                   " instance will not be started")
1790

    
1791
    # check bridge existance
1792
    _CheckInstanceBridgesExist(self, instance, node=target_node)
1793

    
1794
  def Exec(self, feedback_fn):
1795
    """Move an instance.
1796

1797
    The move is done by shutting it down on its present node, copying
1798
    the data over (slow) and starting it on the new node.
1799

1800
    """
1801
    instance = self.instance
1802

    
1803
    source_node = instance.primary_node
1804
    target_node = self.target_node
1805

    
1806
    self.LogInfo("Shutting down instance %s on source node %s",
1807
                 instance.name, source_node)
1808

    
1809
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1810
            self.owned_locks(locking.LEVEL_NODE_RES))
1811

    
1812
    result = self.rpc.call_instance_shutdown(source_node, instance,
1813
                                             self.op.shutdown_timeout,
1814
                                             self.op.reason)
1815
    msg = result.fail_msg
1816
    if msg:
1817
      if self.op.ignore_consistency:
1818
        self.LogWarning("Could not shutdown instance %s on node %s."
1819
                        " Proceeding anyway. Please make sure node"
1820
                        " %s is down. Error details: %s",
1821
                        instance.name, source_node, source_node, msg)
1822
      else:
1823
        raise errors.OpExecError("Could not shutdown instance %s on"
1824
                                 " node %s: %s" %
1825
                                 (instance.name, source_node, msg))
1826

    
1827
    # create the target disks
1828
    try:
1829
      _CreateDisks(self, instance, target_node=target_node)
1830
    except errors.OpExecError:
1831
      self.LogWarning("Device creation failed")
1832
      self.cfg.ReleaseDRBDMinors(instance.name)
1833
      raise
1834

    
1835
    cluster_name = self.cfg.GetClusterInfo().cluster_name
1836

    
1837
    errs = []
1838
    # activate, get path, copy the data over
1839
    for idx, disk in enumerate(instance.disks):
1840
      self.LogInfo("Copying data for disk %d", idx)
1841
      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
1842
                                               instance.name, True, idx)
1843
      if result.fail_msg:
1844
        self.LogWarning("Can't assemble newly created disk %d: %s",
1845
                        idx, result.fail_msg)
1846
        errs.append(result.fail_msg)
1847
        break
1848
      dev_path = result.payload
1849
      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
1850
                                             target_node, dev_path,
1851
                                             cluster_name)
1852
      if result.fail_msg:
1853
        self.LogWarning("Can't copy data over for disk %d: %s",
1854
                        idx, result.fail_msg)
1855
        errs.append(result.fail_msg)
1856
        break
1857

    
1858
    if errs:
1859
      self.LogWarning("Some disks failed to copy, aborting")
1860
      try:
1861
        _RemoveDisks(self, instance, target_node=target_node)
1862
      finally:
1863
        self.cfg.ReleaseDRBDMinors(instance.name)
1864
        raise errors.OpExecError("Errors during disk copy: %s" %
1865
                                 (",".join(errs),))
1866

    
1867
    instance.primary_node = target_node
1868
    self.cfg.Update(instance, feedback_fn)
1869

    
1870
    self.LogInfo("Removing the disks on the original node")
1871
    _RemoveDisks(self, instance, target_node=source_node)
1872

    
1873
    # Only start the instance if it's marked as up
1874
    if instance.admin_state == constants.ADMINST_UP:
1875
      self.LogInfo("Starting instance %s on node %s",
1876
                   instance.name, target_node)
1877

    
1878
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
1879
                                           ignore_secondaries=True)
1880
      if not disks_ok:
1881
        _ShutdownInstanceDisks(self, instance)
1882
        raise errors.OpExecError("Can't activate the instance's disks")
1883

    
1884
      result = self.rpc.call_instance_start(target_node,
1885
                                            (instance, None, None), False,
1886
                                            self.op.reason)
1887
      msg = result.fail_msg
1888
      if msg:
1889
        _ShutdownInstanceDisks(self, instance)
1890
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
1891
                                 (instance.name, target_node, msg))
1892

    
1893

    
1894
def _GetInstanceConsole(cluster, instance):
1895
  """Returns console information for an instance.
1896

1897
  @type cluster: L{objects.Cluster}
1898
  @type instance: L{objects.Instance}
1899
  @rtype: dict
1900

1901
  """
1902
  hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
1903
  # beparams and hvparams are passed separately, to avoid editing the
1904
  # instance and then saving the defaults in the instance itself.
1905
  hvparams = cluster.FillHV(instance)
1906
  beparams = cluster.FillBE(instance)
1907
  console = hyper.GetInstanceConsole(instance, hvparams, beparams)
1908

    
1909
  assert console.instance == instance.name
1910
  assert console.Validate()
1911

    
1912
  return console.ToDict()
1913

    
1914

    
1915
class _InstanceQuery(_QueryBase):
1916
  FIELDS = query.INSTANCE_FIELDS
1917

    
1918
  def ExpandNames(self, lu):
1919
    lu.needed_locks = {}
1920
    lu.share_locks = _ShareAll()
1921

    
1922
    if self.names:
1923
      self.wanted = _GetWantedInstances(lu, self.names)
1924
    else:
1925
      self.wanted = locking.ALL_SET
1926

    
1927
    self.do_locking = (self.use_locking and
1928
                       query.IQ_LIVE in self.requested_data)
1929
    if self.do_locking:
1930
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1931
      lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1932
      lu.needed_locks[locking.LEVEL_NODE] = []
1933
      lu.needed_locks[locking.LEVEL_NETWORK] = []
1934
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1935

    
1936
    self.do_grouplocks = (self.do_locking and
1937
                          query.IQ_NODES in self.requested_data)
1938

    
1939
  def DeclareLocks(self, lu, level):
1940
    if self.do_locking:
1941
      if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1942
        assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1943

    
1944
        # Lock all groups used by instances optimistically; this requires going
1945
        # via the node before it's locked, requiring verification later on
1946
        lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1947
          set(group_uuid
1948
              for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1949
              for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1950
      elif level == locking.LEVEL_NODE:
1951
        lu._LockInstancesNodes() # pylint: disable=W0212
1952

    
1953
      elif level == locking.LEVEL_NETWORK:
1954
        lu.needed_locks[locking.LEVEL_NETWORK] = \
1955
          frozenset(net_uuid
1956
                    for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1957
                    for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1958

    
1959
  @staticmethod
1960
  def _CheckGroupLocks(lu):
1961
    owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1962
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1963

    
1964
    # Check if node groups for locked instances are still correct
1965
    for instance_name in owned_instances:
1966
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1967

    
1968
  def _GetQueryData(self, lu):
1969
    """Computes the list of instances and their attributes.
1970

1971
    """
1972
    if self.do_grouplocks:
1973
      self._CheckGroupLocks(lu)
1974

    
1975
    cluster = lu.cfg.GetClusterInfo()
1976
    all_info = lu.cfg.GetAllInstancesInfo()
1977

    
1978
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1979

    
1980
    instance_list = [all_info[name] for name in instance_names]
1981
    nodes = frozenset(itertools.chain(*(inst.all_nodes
1982
                                        for inst in instance_list)))
1983
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
1984
    bad_nodes = []
1985
    offline_nodes = []
1986
    wrongnode_inst = set()
1987

    
1988
    # Gather data as requested
1989
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1990
      live_data = {}
1991
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1992
      for name in nodes:
1993
        result = node_data[name]
1994
        if result.offline:
1995
          # offline nodes will be in both lists
1996
          assert result.fail_msg
1997
          offline_nodes.append(name)
1998
        if result.fail_msg:
1999
          bad_nodes.append(name)
2000
        elif result.payload:
2001
          for inst in result.payload:
2002
            if inst in all_info:
2003
              if all_info[inst].primary_node == name:
2004
                live_data.update(result.payload)
2005
              else:
2006
                wrongnode_inst.add(inst)
2007
            else:
2008
              # orphan instance; we don't list it here as we don't
2009
              # handle this case yet in the output of instance listing
2010
              logging.warning("Orphan instance '%s' found on node %s",
2011
                              inst, name)
2012
              # else no instance is alive
2013
    else:
2014
      live_data = {}
2015

    
2016
    if query.IQ_DISKUSAGE in self.requested_data:
2017
      gmi = ganeti.masterd.instance
2018
      disk_usage = dict((inst.name,
2019
                         gmi.ComputeDiskSize(inst.disk_template,
2020
                                             [{constants.IDISK_SIZE: disk.size}
2021
                                              for disk in inst.disks]))
2022
                        for inst in instance_list)
2023
    else:
2024
      disk_usage = None
2025

    
2026
    if query.IQ_CONSOLE in self.requested_data:
2027
      consinfo = {}
2028
      for inst in instance_list:
2029
        if inst.name in live_data:
2030
          # Instance is running
2031
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2032
        else:
2033
          consinfo[inst.name] = None
2034
      assert set(consinfo.keys()) == set(instance_names)
2035
    else:
2036
      consinfo = None
2037

    
2038
    if query.IQ_NODES in self.requested_data:
2039
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2040
                                            instance_list)))
2041
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2042
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2043
                    for uuid in set(map(operator.attrgetter("group"),
2044
                                        nodes.values())))
2045
    else:
2046
      nodes = None
2047
      groups = None
2048

    
2049
    if query.IQ_NETWORKS in self.requested_data:
2050
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2051
                                    for i in instance_list))
2052
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2053
    else:
2054
      networks = None
2055

    
2056
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2057
                                   disk_usage, offline_nodes, bad_nodes,
2058
                                   live_data, wrongnode_inst, consinfo,
2059
                                   nodes, groups, networks)
2060

    
2061

    
2062
class LUInstanceQuery(NoHooksLU):
2063
  """Logical unit for querying instances.
2064

2065
  """
2066
  # pylint: disable=W0142
2067
  REQ_BGL = False
2068

    
2069
  def CheckArguments(self):
2070
    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2071
                             self.op.output_fields, self.op.use_locking)
2072

    
2073
  def ExpandNames(self):
2074
    self.iq.ExpandNames(self)
2075

    
2076
  def DeclareLocks(self, level):
2077
    self.iq.DeclareLocks(self, level)
2078

    
2079
  def Exec(self, feedback_fn):
2080
    return self.iq.OldStyleQuery(self)
2081

    
2082

    
2083
class LUInstanceQueryData(NoHooksLU):
2084
  """Query runtime instance data.
2085

2086
  """
2087
  REQ_BGL = False
2088

    
2089
  def ExpandNames(self):
2090
    self.needed_locks = {}
2091

    
2092
    # Use locking if requested or when non-static information is wanted
2093
    if not (self.op.static or self.op.use_locking):
2094
      self.LogWarning("Non-static data requested, locks need to be acquired")
2095
      self.op.use_locking = True
2096

    
2097
    if self.op.instances or not self.op.use_locking:
2098
      # Expand instance names right here
2099
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
2100
    else:
2101
      # Will use acquired locks
2102
      self.wanted_names = None
2103

    
2104
    if self.op.use_locking:
2105
      self.share_locks = _ShareAll()
2106

    
2107
      if self.wanted_names is None:
2108
        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2109
      else:
2110
        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2111

    
2112
      self.needed_locks[locking.LEVEL_NODEGROUP] = []
2113
      self.needed_locks[locking.LEVEL_NODE] = []
2114
      self.needed_locks[locking.LEVEL_NETWORK] = []
2115
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116

    
2117
  def DeclareLocks(self, level):
2118
    if self.op.use_locking:
2119
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2120
      if level == locking.LEVEL_NODEGROUP:
2121

    
2122
        # Lock all groups used by instances optimistically; this requires going
2123
        # via the node before it's locked, requiring verification later on
2124
        self.needed_locks[locking.LEVEL_NODEGROUP] = \
2125
          frozenset(group_uuid
2126
                    for instance_name in owned_instances
2127
                    for group_uuid in
2128
                    self.cfg.GetInstanceNodeGroups(instance_name))
2129

    
2130
      elif level == locking.LEVEL_NODE:
2131
        self._LockInstancesNodes()
2132

    
2133
      elif level == locking.LEVEL_NETWORK:
2134
        self.needed_locks[locking.LEVEL_NETWORK] = \
2135
          frozenset(net_uuid
2136
                    for instance_name in owned_instances
2137
                    for net_uuid in
2138
                    self.cfg.GetInstanceNetworks(instance_name))
2139

    
2140
  def CheckPrereq(self):
2141
    """Check prerequisites.
2142

2143
    This only checks the optional instance list against the existing names.
2144

2145
    """
2146
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2147
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2148
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2149
    owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2150

    
2151
    if self.wanted_names is None:
2152
      assert self.op.use_locking, "Locking was not used"
2153
      self.wanted_names = owned_instances
2154

    
2155
    instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2156

    
2157
    if self.op.use_locking:
2158
      _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2159
                                None)
2160
    else:
2161
      assert not (owned_instances or owned_groups or
2162
                  owned_nodes or owned_networks)
2163

    
2164
    self.wanted_instances = instances.values()
2165

    
2166
  def _ComputeBlockdevStatus(self, node, instance, dev):
2167
    """Returns the status of a block device
2168

2169
    """
2170
    if self.op.static or not node:
2171
      return None
2172

    
2173
    self.cfg.SetDiskID(dev, node)
2174

    
2175
    result = self.rpc.call_blockdev_find(node, dev)
2176
    if result.offline:
2177
      return None
2178

    
2179
    result.Raise("Can't compute disk status for %s" % instance.name)
2180

    
2181
    status = result.payload
2182
    if status is None:
2183
      return None
2184

    
2185
    return (status.dev_path, status.major, status.minor,
2186
            status.sync_percent, status.estimated_time,
2187
            status.is_degraded, status.ldisk_status)
2188

    
2189
  def _ComputeDiskStatus(self, instance, snode, dev):
2190
    """Compute block device status.
2191

2192
    """
2193
    (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
2194

    
2195
    return self._ComputeDiskStatusInner(instance, snode, anno_dev)
2196

    
2197
  def _ComputeDiskStatusInner(self, instance, snode, dev):
2198
    """Compute block device status.
2199

2200
    @attention: The device has to be annotated already.
2201

2202
    """
2203
    if dev.dev_type in constants.LDS_DRBD:
2204
      # we change the snode then (otherwise we use the one passed in)
2205
      if dev.logical_id[0] == instance.primary_node:
2206
        snode = dev.logical_id[1]
2207
      else:
2208
        snode = dev.logical_id[0]
2209

    
2210
    dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
2211
                                              instance, dev)
2212
    dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
2213

    
2214
    if dev.children:
2215
      dev_children = map(compat.partial(self._ComputeDiskStatusInner,
2216
                                        instance, snode),
2217
                         dev.children)
2218
    else:
2219
      dev_children = []
2220

    
2221
    return {
2222
      "iv_name": dev.iv_name,
2223
      "dev_type": dev.dev_type,
2224
      "logical_id": dev.logical_id,
2225
      "physical_id": dev.physical_id,
2226
      "pstatus": dev_pstatus,
2227
      "sstatus": dev_sstatus,
2228
      "children": dev_children,
2229
      "mode": dev.mode,
2230
      "size": dev.size,
2231
      "name": dev.name,
2232
      "uuid": dev.uuid,
2233
      }
2234

    
2235
  def Exec(self, feedback_fn):
2236
    """Gather and return data"""
2237
    result = {}
2238

    
2239
    cluster = self.cfg.GetClusterInfo()
2240

    
2241
    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
2242
    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
2243

    
2244
    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
2245
                                                 for node in nodes.values()))
2246

    
2247
    group2name_fn = lambda uuid: groups[uuid].name
2248
    for instance in self.wanted_instances:
2249
      pnode = nodes[instance.primary_node]
2250

    
2251
      if self.op.static or pnode.offline:
2252
        remote_state = None
2253
        if pnode.offline:
2254
          self.LogWarning("Primary node %s is marked offline, returning static"
2255
                          " information only for instance %s" %
2256
                          (pnode.name, instance.name))
2257
      else:
2258
        remote_info = self.rpc.call_instance_info(instance.primary_node,
2259
                                                  instance.name,
2260
                                                  instance.hypervisor)
2261
        remote_info.Raise("Error checking node %s" % instance.primary_node)
2262
        remote_info = remote_info.payload
2263
        if remote_info and "state" in remote_info:
2264
          remote_state = "up"
2265
        else:
2266
          if instance.admin_state == constants.ADMINST_UP:
2267
            remote_state = "down"
2268
          else:
2269
            remote_state = instance.admin_state
2270

    
2271
      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
2272
                  instance.disks)
2273

    
2274
      snodes_group_uuids = [nodes[snode_name].group
2275
                            for snode_name in instance.secondary_nodes]
2276

    
2277
      result[instance.name] = {
2278
        "name": instance.name,
2279
        "config_state": instance.admin_state,
2280
        "run_state": remote_state,
2281
        "pnode": instance.primary_node,
2282
        "pnode_group_uuid": pnode.group,
2283
        "pnode_group_name": group2name_fn(pnode.group),
2284
        "snodes": instance.secondary_nodes,
2285
        "snodes_group_uuids": snodes_group_uuids,
2286
        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
2287
        "os": instance.os,
2288
        # this happens to be the same format used for hooks
2289
        "nics": _NICListToTuple(self, instance.nics),
2290
        "disk_template": instance.disk_template,
2291
        "disks": disks,
2292
        "hypervisor": instance.hypervisor,
2293
        "network_port": instance.network_port,
2294
        "hv_instance": instance.hvparams,
2295
        "hv_actual": cluster.FillHV(instance, skip_globals=True),
2296
        "be_instance": instance.beparams,
2297
        "be_actual": cluster.FillBE(instance),
2298
        "os_instance": instance.osparams,
2299
        "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
2300
        "serial_no": instance.serial_no,
2301
        "mtime": instance.mtime,
2302
        "ctime": instance.ctime,
2303
        "uuid": instance.uuid,
2304
        }
2305

    
2306
    return result
2307

    
2308

    
2309
class LUInstanceStartup(LogicalUnit):
2310
  """Starts an instance.
2311

2312
  """
2313
  HPATH = "instance-start"
2314
  HTYPE = constants.HTYPE_INSTANCE
2315
  REQ_BGL = False
2316

    
2317
  def CheckArguments(self):
2318
    # extra beparams
2319
    if self.op.beparams:
2320
      # fill the beparams dict
2321
      objects.UpgradeBeParams(self.op.beparams)
2322
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2323

    
2324
  def ExpandNames(self):
2325
    self._ExpandAndLockInstance()
2326
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2327

    
2328
  def DeclareLocks(self, level):
2329
    if level == locking.LEVEL_NODE_RES:
2330
      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
2331

    
2332
  def BuildHooksEnv(self):
2333
    """Build hooks env.
2334

2335
    This runs on master, primary and secondary nodes of the instance.
2336

2337
    """
2338
    env = {
2339
      "FORCE": self.op.force,
2340
      }
2341

    
2342
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2343

    
2344
    return env
2345

    
2346
  def BuildHooksNodes(self):
2347
    """Build hooks nodes.
2348

2349
    """
2350
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2351
    return (nl, nl)
2352

    
2353
  def CheckPrereq(self):
2354
    """Check prerequisites.
2355

2356
    This checks that the instance is in the cluster.
2357

2358
    """
2359
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2360
    assert self.instance is not None, \
2361
      "Cannot retrieve locked instance %s" % self.op.instance_name
2362

    
2363
    # extra hvparams
2364
    if self.op.hvparams:
2365
      # check hypervisor parameter syntax (locally)
2366
      cluster = self.cfg.GetClusterInfo()
2367
      utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
2368
      filled_hvp = cluster.FillHV(instance)
2369
      filled_hvp.update(self.op.hvparams)
2370
      hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
2371
      hv_type.CheckParameterSyntax(filled_hvp)
2372
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2373

    
2374
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2375

    
2376
    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
2377

    
2378
    if self.primary_offline and self.op.ignore_offline_nodes:
2379
      self.LogWarning("Ignoring offline primary node")
2380

    
2381
      if self.op.hvparams or self.op.beparams:
2382
        self.LogWarning("Overridden parameters are ignored")
2383
    else:
2384
      _CheckNodeOnline(self, instance.primary_node)
2385

    
2386
      bep = self.cfg.GetClusterInfo().FillBE(instance)
2387
      bep.update(self.op.beparams)
2388

    
2389
      # check bridges existence
2390
      _CheckInstanceBridgesExist(self, instance)
2391

    
2392
      remote_info = self.rpc.call_instance_info(instance.primary_node,
2393
                                                instance.name,
2394
                                                instance.hypervisor)
2395
      remote_info.Raise("Error checking node %s" % instance.primary_node,
2396
                        prereq=True, ecode=errors.ECODE_ENVIRON)
2397
      if not remote_info.payload: # not running already
2398
        _CheckNodeFreeMemory(self, instance.primary_node,
2399
                             "starting instance %s" % instance.name,
2400
                             bep[constants.BE_MINMEM], instance.hypervisor)
2401

    
2402
  def Exec(self, feedback_fn):
2403
    """Start the instance.
2404

2405
    """
2406
    instance = self.instance
2407
    force = self.op.force
2408
    reason = self.op.reason
2409

    
2410
    if not self.op.no_remember:
2411
      self.cfg.MarkInstanceUp(instance.name)
2412

    
2413
    if self.primary_offline:
2414
      assert self.op.ignore_offline_nodes
2415
      self.LogInfo("Primary node offline, marked instance as started")
2416
    else:
2417
      node_current = instance.primary_node
2418

    
2419
      _StartInstanceDisks(self, instance, force)
2420

    
2421
      result = \
2422
        self.rpc.call_instance_start(node_current,
2423
                                     (instance, self.op.hvparams,
2424
                                      self.op.beparams),
2425
                                     self.op.startup_paused, reason)
2426
      msg = result.fail_msg
2427
      if msg:
2428
        _ShutdownInstanceDisks(self, instance)
2429
        raise errors.OpExecError("Could not start instance: %s" % msg)
2430

    
2431

    
2432
class LUInstanceShutdown(LogicalUnit):
2433
  """Shutdown an instance.
2434

2435
  """
2436
  HPATH = "instance-stop"
2437
  HTYPE = constants.HTYPE_INSTANCE
2438
  REQ_BGL = False
2439

    
2440
  def ExpandNames(self):
2441
    self._ExpandAndLockInstance()
2442

    
2443
  def BuildHooksEnv(self):
2444
    """Build hooks env.
2445

2446
    This runs on master, primary and secondary nodes of the instance.
2447

2448
    """
2449
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2450
    env["TIMEOUT"] = self.op.timeout
2451
    return env
2452

    
2453
  def BuildHooksNodes(self):
2454
    """Build hooks nodes.
2455

2456
    """
2457
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2458
    return (nl, nl)
2459

    
2460
  def CheckPrereq(self):
2461
    """Check prerequisites.
2462

2463
    This checks that the instance is in the cluster.
2464

2465
    """
2466
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2467
    assert self.instance is not None, \
2468
      "Cannot retrieve locked instance %s" % self.op.instance_name
2469

    
2470
    if not self.op.force:
2471
      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2472
    else:
2473
      self.LogWarning("Ignoring offline instance check")
2474

    
2475
    self.primary_offline = \
2476
      self.cfg.GetNodeInfo(self.instance.primary_node).offline
2477

    
2478
    if self.primary_offline and self.op.ignore_offline_nodes:
2479
      self.LogWarning("Ignoring offline primary node")
2480
    else:
2481
      _CheckNodeOnline(self, self.instance.primary_node)
2482

    
2483
  def Exec(self, feedback_fn):
2484
    """Shutdown the instance.
2485

2486
    """
2487
    instance = self.instance
2488
    node_current = instance.primary_node
2489
    timeout = self.op.timeout
2490
    reason = self.op.reason
2491

    
2492
    # If the instance is offline we shouldn't mark it as down, as that
2493
    # resets the offline flag.
2494
    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2495
      self.cfg.MarkInstanceDown(instance.name)
2496

    
2497
    if self.primary_offline:
2498
      assert self.op.ignore_offline_nodes
2499
      self.LogInfo("Primary node offline, marked instance as stopped")
2500
    else:
2501
      result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2502
                                               reason)
2503
      msg = result.fail_msg
2504
      if msg:
2505
        self.LogWarning("Could not shutdown instance: %s", msg)
2506

    
2507
      _ShutdownInstanceDisks(self, instance)
2508

    
2509

    
2510
class LUInstanceReinstall(LogicalUnit):
2511
  """Reinstall an instance.
2512

2513
  """
2514
  HPATH = "instance-reinstall"
2515
  HTYPE = constants.HTYPE_INSTANCE
2516
  REQ_BGL = False
2517

    
2518
  def ExpandNames(self):
2519
    self._ExpandAndLockInstance()
2520

    
2521
  def BuildHooksEnv(self):
2522
    """Build hooks env.
2523

2524
    This runs on master, primary and secondary nodes of the instance.
2525

2526
    """
2527
    return _BuildInstanceHookEnvByObject(self, self.instance)
2528

    
2529
  def BuildHooksNodes(self):
2530
    """Build hooks nodes.
2531

2532
    """
2533
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2534
    return (nl, nl)
2535

    
2536
  def CheckPrereq(self):
2537
    """Check prerequisites.
2538

2539
    This checks that the instance is in the cluster and is not running.
2540

2541
    """
2542
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543
    assert instance is not None, \
2544
      "Cannot retrieve locked instance %s" % self.op.instance_name
2545
    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2546
                     " offline, cannot reinstall")
2547

    
2548
    if instance.disk_template == constants.DT_DISKLESS:
2549
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2550
                                 self.op.instance_name,
2551
                                 errors.ECODE_INVAL)
2552
    _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2553

    
2554
    if self.op.os_type is not None:
2555
      # OS verification
2556
      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2557
      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2558
      instance_os = self.op.os_type
2559
    else:
2560
      instance_os = instance.os
2561

    
2562
    nodelist = list(instance.all_nodes)
2563

    
2564
    if self.op.osparams:
2565
      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2566
      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2567
      self.os_inst = i_osdict # the new dict (without defaults)
2568
    else:
2569
      self.os_inst = None
2570

    
2571
    self.instance = instance
2572

    
2573
  def Exec(self, feedback_fn):
2574
    """Reinstall the instance.
2575

2576
    """
2577
    inst = self.instance
2578

    
2579
    if self.op.os_type is not None:
2580
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2581
      inst.os = self.op.os_type
2582
      # Write to configuration
2583
      self.cfg.Update(inst, feedback_fn)
2584

    
2585
    _StartInstanceDisks(self, inst, None)
2586
    try:
2587
      feedback_fn("Running the instance OS create scripts...")
2588
      # FIXME: pass debug option from opcode to backend
2589
      result = self.rpc.call_instance_os_add(inst.primary_node,
2590
                                             (inst, self.os_inst), True,
2591
                                             self.op.debug_level)
2592
      result.Raise("Could not install OS for instance %s on node %s" %
2593
                   (inst.name, inst.primary_node))
2594
    finally:
2595
      _ShutdownInstanceDisks(self, inst)
2596

    
2597

    
2598
class LUInstanceReboot(LogicalUnit):
2599
  """Reboot an instance.
2600

2601
  """
2602
  HPATH = "instance-reboot"
2603
  HTYPE = constants.HTYPE_INSTANCE
2604
  REQ_BGL = False
2605

    
2606
  def ExpandNames(self):
2607
    self._ExpandAndLockInstance()
2608

    
2609
  def BuildHooksEnv(self):
2610
    """Build hooks env.
2611

2612
    This runs on master, primary and secondary nodes of the instance.
2613

2614
    """
2615
    env = {
2616
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2617
      "REBOOT_TYPE": self.op.reboot_type,
2618
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2619
      }
2620

    
2621
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622

    
2623
    return env
2624

    
2625
  def BuildHooksNodes(self):
2626
    """Build hooks nodes.
2627

2628
    """
2629
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2630
    return (nl, nl)
2631

    
2632
  def CheckPrereq(self):
2633
    """Check prerequisites.
2634

2635
    This checks that the instance is in the cluster.
2636

2637
    """
2638
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639
    assert self.instance is not None, \
2640
      "Cannot retrieve locked instance %s" % self.op.instance_name
2641
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2642
    _CheckNodeOnline(self, instance.primary_node)
2643

    
2644
    # check bridges existence
2645
    _CheckInstanceBridgesExist(self, instance)
2646

    
2647
  def Exec(self, feedback_fn):
2648
    """Reboot the instance.
2649

2650
    """
2651
    instance = self.instance
2652
    ignore_secondaries = self.op.ignore_secondaries
2653
    reboot_type = self.op.reboot_type
2654
    reason = self.op.reason
2655

    
2656
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2657
                                              instance.name,
2658
                                              instance.hypervisor)
2659
    remote_info.Raise("Error checking node %s" % instance.primary_node)
2660
    instance_running = bool(remote_info.payload)
2661

    
2662
    node_current = instance.primary_node
2663

    
2664
    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2665
                                            constants.INSTANCE_REBOOT_HARD]:
2666
      for disk in instance.disks:
2667
        self.cfg.SetDiskID(disk, node_current)
2668
      result = self.rpc.call_instance_reboot(node_current, instance,
2669
                                             reboot_type,
2670
                                             self.op.shutdown_timeout, reason)
2671
      result.Raise("Could not reboot instance")
2672
    else:
2673
      if instance_running:
2674
        result = self.rpc.call_instance_shutdown(node_current, instance,
2675
                                                 self.op.shutdown_timeout,
2676
                                                 reason)
2677
        result.Raise("Could not shutdown instance for full reboot")
2678
        _ShutdownInstanceDisks(self, instance)
2679
      else:
2680
        self.LogInfo("Instance %s was already stopped, starting now",
2681
                     instance.name)
2682
      _StartInstanceDisks(self, instance, ignore_secondaries)
2683
      result = self.rpc.call_instance_start(node_current,
2684
                                            (instance, None, None), False,
2685
                                            reason)
2686
      msg = result.fail_msg
2687
      if msg:
2688
        _ShutdownInstanceDisks(self, instance)
2689
        raise errors.OpExecError("Could not start instance for"
2690
                                 " full reboot: %s" % msg)
2691

    
2692
    self.cfg.MarkInstanceUp(instance.name)
2693

    
2694

    
2695
class LUInstanceConsole(NoHooksLU):
2696
  """Connect to an instance's console.
2697

2698
  This is somewhat special in that it returns the command line that
2699
  you need to run on the master node in order to connect to the
2700
  console.
2701

2702
  """
2703
  REQ_BGL = False
2704

    
2705
  def ExpandNames(self):
2706
    self.share_locks = _ShareAll()
2707
    self._ExpandAndLockInstance()
2708

    
2709
  def CheckPrereq(self):
2710
    """Check prerequisites.
2711

2712
    This checks that the instance is in the cluster.
2713

2714
    """
2715
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716
    assert self.instance is not None, \
2717
      "Cannot retrieve locked instance %s" % self.op.instance_name
2718
    _CheckNodeOnline(self, self.instance.primary_node)
2719

    
2720
  def Exec(self, feedback_fn):
2721
    """Connect to the console of an instance
2722

2723
    """
2724
    instance = self.instance
2725
    node = instance.primary_node
2726

    
2727
    node_insts = self.rpc.call_instance_list([node],
2728
                                             [instance.hypervisor])[node]
2729
    node_insts.Raise("Can't get node information from %s" % node)
2730

    
2731
    if instance.name not in node_insts.payload:
2732
      if instance.admin_state == constants.ADMINST_UP:
2733
        state = constants.INSTST_ERRORDOWN
2734
      elif instance.admin_state == constants.ADMINST_DOWN:
2735
        state = constants.INSTST_ADMINDOWN
2736
      else:
2737
        state = constants.INSTST_ADMINOFFLINE
2738
      raise errors.OpExecError("Instance %s is not running (state %s)" %
2739
                               (instance.name, state))
2740

    
2741
    logging.debug("Connecting to console of %s on %s", instance.name, node)
2742

    
2743
    return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
2744

    
2745

    
2746
def _DeclareLocksForMigration(lu, level):
2747
  """Declares locks for L{TLMigrateInstance}.
2748

2749
  @type lu: L{LogicalUnit}
2750
  @param level: Lock level
2751

2752
  """
2753
  if level == locking.LEVEL_NODE_ALLOC:
2754
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2755

    
2756
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2757

    
2758
    # Node locks are already declared here rather than at LEVEL_NODE as we need
2759
    # the instance object anyway to declare the node allocation lock.
2760
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2761
      if lu.op.target_node is None:
2762
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2763
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2764
      else:
2765
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2766
                                               lu.op.target_node]
2767
      del lu.recalculate_locks[locking.LEVEL_NODE]
2768
    else:
2769
      lu._LockInstancesNodes() # pylint: disable=W0212
2770

    
2771
  elif level == locking.LEVEL_NODE:
2772
    # Node locks are declared together with the node allocation lock
2773
    assert (lu.needed_locks[locking.LEVEL_NODE] or
2774
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2775

    
2776
  elif level == locking.LEVEL_NODE_RES:
2777
    # Copy node locks
2778
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
2779
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2780

    
2781

    
2782
def _ExpandNamesForMigration(lu):
2783
  """Expands names for use with L{TLMigrateInstance}.
2784

2785
  @type lu: L{LogicalUnit}
2786

2787
  """
2788
  if lu.op.target_node is not None:
2789
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2790

    
2791
  lu.needed_locks[locking.LEVEL_NODE] = []
2792
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2793

    
2794
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
2795
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2796

    
2797
  # The node allocation lock is actually only needed for externally replicated
2798
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2799
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2800

    
2801

    
2802
class LUInstanceFailover(LogicalUnit):
2803
  """Failover an instance.
2804

2805
  """
2806
  HPATH = "instance-failover"
2807
  HTYPE = constants.HTYPE_INSTANCE
2808
  REQ_BGL = False
2809

    
2810
  def CheckArguments(self):
2811
    """Check the arguments.
2812

2813
    """
2814
    self.iallocator = getattr(self.op, "iallocator", None)
2815
    self.target_node = getattr(self.op, "target_node", None)
2816

    
2817
  def ExpandNames(self):
2818
    self._ExpandAndLockInstance()
2819
    _ExpandNamesForMigration(self)
2820

    
2821
    self._migrater = \
2822
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
2823
                        self.op.ignore_consistency, True,
2824
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
2825

    
2826
    self.tasklets = [self._migrater]
2827

    
2828
  def DeclareLocks(self, level):
2829
    _DeclareLocksForMigration(self, level)
2830

    
2831
  def BuildHooksEnv(self):
2832
    """Build hooks env.
2833

2834
    This runs on master, primary and secondary nodes of the instance.
2835

2836
    """
2837
    instance = self._migrater.instance
2838
    source_node = instance.primary_node
2839
    target_node = self.op.target_node
2840
    env = {
2841
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2842
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2843
      "OLD_PRIMARY": source_node,
2844
      "NEW_PRIMARY": target_node,
2845
      }
2846

    
2847
    if instance.disk_template in constants.DTS_INT_MIRROR:
2848
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2849
      env["NEW_SECONDARY"] = source_node
2850
    else:
2851
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2852

    
2853
    env.update(_BuildInstanceHookEnvByObject(self, instance))
2854

    
2855
    return env
2856

    
2857
  def BuildHooksNodes(self):
2858
    """Build hooks nodes.
2859

2860
    """
2861
    instance = self._migrater.instance
2862
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2863
    return (nl, nl + [instance.primary_node])
2864

    
2865

    
2866
class LUInstanceMigrate(LogicalUnit):
2867
  """Migrate an instance.
2868

2869
  This is migration without shutting down, compared to the failover,
2870
  which is done with shutdown.
2871

2872
  """
2873
  HPATH = "instance-migrate"
2874
  HTYPE = constants.HTYPE_INSTANCE
2875
  REQ_BGL = False
2876

    
2877
  def ExpandNames(self):
2878
    self._ExpandAndLockInstance()
2879
    _ExpandNamesForMigration(self)
2880

    
2881
    self._migrater = \
2882
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2883
                        False, self.op.allow_failover, False,
2884
                        self.op.allow_runtime_changes,
2885
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
2886
                        self.op.ignore_ipolicy)
2887

    
2888
    self.tasklets = [self._migrater]
2889

    
2890
  def DeclareLocks(self, level):
2891
    _DeclareLocksForMigration(self, level)
2892

    
2893
  def BuildHooksEnv(self):
2894
    """Build hooks env.
2895

2896
    This runs on master, primary and secondary nodes of the instance.
2897

2898
    """
2899
    instance = self._migrater.instance
2900
    source_node = instance.primary_node
2901
    target_node = self.op.target_node
2902
    env = _BuildInstanceHookEnvByObject(self, instance)
2903
    env.update({
2904
      "MIGRATE_LIVE": self._migrater.live,
2905
      "MIGRATE_CLEANUP": self.op.cleanup,
2906
      "OLD_PRIMARY": source_node,
2907
      "NEW_PRIMARY": target_node,
2908
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2909
      })
2910

    
2911
    if instance.disk_template in constants.DTS_INT_MIRROR:
2912
      env["OLD_SECONDARY"] = target_node
2913
      env["NEW_SECONDARY"] = source_node
2914
    else:
2915
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2916

    
2917
    return env
2918

    
2919
  def BuildHooksNodes(self):
2920
    """Build hooks nodes.
2921

2922
    """
2923
    instance = self._migrater.instance
2924
    snodes = list(instance.secondary_nodes)
2925
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2926
    return (nl, nl)
2927

    
2928

    
2929
class LUInstanceMultiAlloc(NoHooksLU):
2930
  """Allocates multiple instances at the same time.
2931

2932
  """
2933
  REQ_BGL = False
2934

    
2935
  def CheckArguments(self):
2936
    """Check arguments.
2937

2938
    """
2939
    nodes = []
2940
    for inst in self.op.instances:
2941
      if inst.iallocator is not None:
2942
        raise errors.OpPrereqError("iallocator are not allowed to be set on"
2943
                                   " instance objects", errors.ECODE_INVAL)
2944
      nodes.append(bool(inst.pnode))
2945
      if inst.disk_template in constants.DTS_INT_MIRROR:
2946
        nodes.append(bool(inst.snode))
2947

    
2948
    has_nodes = compat.any(nodes)
2949
    if compat.all(nodes) ^ has_nodes:
2950
      raise errors.OpPrereqError("There are instance objects providing"
2951
                                 " pnode/snode while others do not",
2952
                                 errors.ECODE_INVAL)
2953

    
2954
    if self.op.iallocator is None:
2955
      default_iallocator = self.cfg.GetDefaultIAllocator()
2956
      if default_iallocator and has_nodes:
2957
        self.op.iallocator = default_iallocator
2958
      else:
2959
        raise errors.OpPrereqError("No iallocator or nodes on the instances"
2960
                                   " given and no cluster-wide default"
2961
                                   " iallocator found; please specify either"
2962
                                   " an iallocator or nodes on the instances"
2963
                                   " or set a cluster-wide default iallocator",
2964
                                   errors.ECODE_INVAL)
2965

    
2966
    _CheckOpportunisticLocking(self.op)
2967

    
2968
    dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
2969
    if dups:
2970
      raise errors.OpPrereqError("There are duplicate instance names: %s" %
2971
                                 utils.CommaJoin(dups), errors.ECODE_INVAL)
2972

    
2973
  def ExpandNames(self):
2974
    """Calculate the locks.
2975

2976
    """
2977
    self.share_locks = _ShareAll()
2978
    self.needed_locks = {
2979
      # iallocator will select nodes and even if no iallocator is used,
2980
      # collisions with LUInstanceCreate should be avoided
2981
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2982
      }
2983

    
2984
    if self.op.iallocator:
2985
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2986
      self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
2987

    
2988
      if self.op.opportunistic_locking:
2989
        self.opportunistic_locks[locking.LEVEL_NODE] = True
2990
        self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
2991
    else:
2992
      nodeslist = []
2993
      for inst in self.op.instances:
2994
        inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
2995
        nodeslist.append(inst.pnode)
2996
        if inst.snode is not None:
2997
          inst.snode = _ExpandNodeName(self.cfg, inst.snode)
2998
          nodeslist.append(inst.snode)
2999

    
3000
      self.needed_locks[locking.LEVEL_NODE] = nodeslist
3001
      # Lock resources of instance's primary and secondary nodes (copy to
3002
      # prevent accidential modification)
3003
      self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
3004

    
3005
  def CheckPrereq(self):
3006
    """Check prerequisite.
3007

3008
    """
3009
    cluster = self.cfg.GetClusterInfo()
3010
    default_vg = self.cfg.GetVGName()
3011
    ec_id = self.proc.GetECId()
3012

    
3013
    if self.op.opportunistic_locking:
3014
      # Only consider nodes for which a lock is held
3015
      node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
3016
    else:
3017
      node_whitelist = None
3018

    
3019
    insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
3020
                                         _ComputeNics(op, cluster, None,
3021
                                                      self.cfg, ec_id),
3022
                                         _ComputeFullBeParams(op, cluster),
3023
                                         node_whitelist)
3024
             for op in self.op.instances]
3025

    
3026
    req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
3027
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3028

    
3029
    ial.Run(self.op.iallocator)
3030

    
3031
    if not ial.success:
3032
      raise errors.OpPrereqError("Can't compute nodes using"
3033
                                 " iallocator '%s': %s" %
3034
                                 (self.op.iallocator, ial.info),
3035
                                 errors.ECODE_NORES)
3036

    
3037
    self.ia_result = ial.result
3038

    
3039
    if self.op.dry_run:
3040
      self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
3041
        constants.JOB_IDS_KEY: [],
3042
        })
3043

    
3044
  def _ConstructPartialResult(self):
3045
    """Contructs the partial result.
3046

3047
    """
3048
    (allocatable, failed) = self.ia_result
3049
    return {
3050
      opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
3051
        map(compat.fst, allocatable),
3052
      opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
3053
      }
3054

    
3055
  def Exec(self, feedback_fn):
3056
    """Executes the opcode.
3057

3058
    """
3059
    op2inst = dict((op.instance_name, op) for op in self.op.instances)
3060
    (allocatable, failed) = self.ia_result
3061

    
3062
    jobs = []
3063
    for (name, nodes) in allocatable:
3064
      op = op2inst.pop(name)
3065

    
3066
      if len(nodes) > 1:
3067
        (op.pnode, op.snode) = nodes
3068
      else:
3069
        (op.pnode,) = nodes
3070

    
3071
      jobs.append([op])
3072

    
3073
    missing = set(op2inst.keys()) - set(failed)
3074
    assert not missing, \
3075
      "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
3076

    
3077
    return ResultWithJobs(jobs, **self._ConstructPartialResult())
3078

    
3079

    
3080
class _InstNicModPrivate:
3081
  """Data structure for network interface modifications.
3082

3083
  Used by L{LUInstanceSetParams}.
3084

3085
  """
3086
  def __init__(self):
3087
    self.params = None
3088
    self.filled = None
3089

    
3090

    
3091
def PrepareContainerMods(mods, private_fn):
3092
  """Prepares a list of container modifications by adding a private data field.
3093

3094
  @type mods: list of tuples; (operation, index, parameters)
3095
  @param mods: List of modifications
3096
  @type private_fn: callable or None
3097
  @param private_fn: Callable for constructing a private data field for a
3098
    modification
3099
  @rtype: list
3100

3101
  """
3102
  if private_fn is None:
3103
    fn = lambda: None
3104
  else:
3105
    fn = private_fn
3106

    
3107
  return [(op, idx, params, fn()) for (op, idx, params) in mods]
3108

    
3109

    
3110
def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
3111
  """Checks if nodes have enough physical CPUs
3112

3113
  This function checks if all given nodes have the needed number of
3114
  physical CPUs. In case any node has less CPUs or we cannot get the
3115
  information from the node, this function raises an OpPrereqError
3116
  exception.
3117

3118
  @type lu: C{LogicalUnit}
3119
  @param lu: a logical unit from which we get configuration data
3120
  @type nodenames: C{list}
3121
  @param nodenames: the list of node names to check
3122
  @type requested: C{int}
3123
  @param requested: the minimum acceptable number of physical CPUs
3124
  @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
3125
      or we cannot check the node
3126

3127
  """
3128
  nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
3129
  for node in nodenames:
3130
    info = nodeinfo[node]
3131
    info.Raise("Cannot get current information from node %s" % node,
3132
               prereq=True, ecode=errors.ECODE_ENVIRON)
3133
    (_, _, (hv_info, )) = info.payload
3134
    num_cpus = hv_info.get("cpu_total", None)
3135
    if not isinstance(num_cpus, int):
3136
      raise errors.OpPrereqError("Can't compute the number of physical CPUs"
3137
                                 " on node %s, result was '%s'" %
3138
                                 (node, num_cpus), errors.ECODE_ENVIRON)
3139
    if requested > num_cpus:
3140
      raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
3141
                                 "required" % (node, num_cpus, requested),
3142
                                 errors.ECODE_NORES)
3143

    
3144

    
3145
def GetItemFromContainer(identifier, kind, container):
3146
  """Return the item refered by the identifier.
3147

3148
  @type identifier: string
3149
  @param identifier: Item index or name or UUID
3150
  @type kind: string
3151
  @param kind: One-word item description
3152
  @type container: list
3153
  @param container: Container to get the item from
3154

3155
  """
3156
  # Index
3157
  try:
3158
    idx = int(identifier)
3159
    if idx == -1:
3160
      # Append
3161
      absidx = len(container) - 1
3162
    elif idx < 0:
3163
      raise IndexError("Not accepting negative indices other than -1")
3164
    elif idx > len(container):
3165
      raise IndexError("Got %s index %s, but there are only %s" %
3166
                       (kind, idx, len(container)))
3167
    else:
3168
      absidx = idx
3169
    return (absidx, container[idx])
3170
  except ValueError:
3171
    pass
3172

    
3173
  for idx, item in enumerate(container):
3174
    if item.uuid == identifier or item.name == identifier:
3175
      return (idx, item)
3176

    
3177
  raise errors.OpPrereqError("Cannot find %s with identifier %s" %
3178
                             (kind, identifier), errors.ECODE_NOENT)
3179

    
3180

    
3181
def ApplyContainerMods(kind, container, chgdesc, mods,
3182
                       create_fn, modify_fn, remove_fn):
3183
  """Applies descriptions in C{mods} to C{container}.
3184

3185
  @type kind: string
3186
  @param kind: One-word item description
3187
  @type container: list
3188
  @param container: Container to modify
3189
  @type chgdesc: None or list
3190
  @param chgdesc: List of applied changes
3191
  @type mods: list
3192
  @param mods: Modifications as returned by L{PrepareContainerMods}
3193
  @type create_fn: callable
3194
  @param create_fn: Callback for creating a new item (L{constants.DDM_ADD});
3195
    receives absolute item index, parameters and private data object as added
3196
    by L{PrepareContainerMods}, returns tuple containing new item and changes
3197
    as list
3198
  @type modify_fn: callable
3199
  @param modify_fn: Callback for modifying an existing item
3200
    (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters
3201
    and private data object as added by L{PrepareContainerMods}, returns
3202
    changes as list
3203
  @type remove_fn: callable
3204
  @param remove_fn: Callback on removing item; receives absolute item index,
3205
    item and private data object as added by L{PrepareContainerMods}
3206

3207
  """
3208
  for (op, identifier, params, private) in mods:
3209
    changes = None
3210

    
3211
    if op == constants.DDM_ADD:
3212
      # Calculate where item will be added
3213
      # When adding an item, identifier can only be an index
3214
      try:
3215
        idx = int(identifier)
3216
      except ValueError:
3217
        raise errors.OpPrereqError("Only possitive integer or -1 is accepted as"
3218
                                   " identifier for %s" % constants.DDM_ADD,
3219
                                   errors.ECODE_INVAL)
3220
      if idx == -1:
3221
        addidx = len(container)
3222
      else:
3223
        if idx < 0:
3224
          raise IndexError("Not accepting negative indices other than -1")
3225
        elif idx > len(container):
3226
          raise IndexError("Got %s index %s, but there are only %s" %
3227
                           (kind, idx, len(container)))
3228
        addidx = idx
3229

    
3230
      if create_fn is None:
3231
        item = params
3232
      else:
3233
        (item, changes) = create_fn(addidx, params, private)
3234

    
3235
      if idx == -1:
3236
        container.append(item)
3237
      else:
3238
        assert idx >= 0
3239
        assert idx <= len(container)
3240
        # list.insert does so before the specified index
3241
        container.insert(idx, item)
3242
    else:
3243
      # Retrieve existing item
3244
      (absidx, item) = GetItemFromContainer(identifier, kind, container)
3245

    
3246
      if op == constants.DDM_REMOVE:
3247
        assert not params
3248

    
3249
        if remove_fn is not None:
3250
          remove_fn(absidx, item, private)
3251

    
3252
        changes = [("%s/%s" % (kind, absidx), "remove")]
3253

    
3254
        assert container[absidx] == item
3255
        del container[absidx]
3256
      elif op == constants.DDM_MODIFY:
3257
        if modify_fn is not None:
3258
          changes = modify_fn(absidx, item, params, private)
3259
      else:
3260
        raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3261

    
3262
    assert _TApplyContModsCbChanges(changes)
3263

    
3264
    if not (chgdesc is None or changes is None):
3265
      chgdesc.extend(changes)
3266

    
3267

    
3268
def _UpdateIvNames(base_index, disks):
3269
  """Updates the C{iv_name} attribute of disks.
3270

3271
  @type disks: list of L{objects.Disk}
3272

3273
  """
3274
  for (idx, disk) in enumerate(disks):
3275
    disk.iv_name = "disk/%s" % (base_index + idx, )
3276

    
3277

    
3278
class LUInstanceSetParams(LogicalUnit):
3279
  """Modifies an instances's parameters.
3280

3281
  """
3282
  HPATH = "instance-modify"
3283
  HTYPE = constants.HTYPE_INSTANCE
3284
  REQ_BGL = False
3285

    
3286
  @staticmethod
3287
  def _UpgradeDiskNicMods(kind, mods, verify_fn):
3288
    assert ht.TList(mods)
3289
    assert not mods or len(mods[0]) in (2, 3)
3290

    
3291
    if mods and len(mods[0]) == 2:
3292
      result = []
3293

    
3294
      addremove = 0
3295
      for op, params in mods:
3296
        if op in (constants.DDM_ADD, constants.DDM_REMOVE):
3297
          result.append((op, -1, params))
3298
          addremove += 1
3299

    
3300
          if addremove > 1:
3301
            raise errors.OpPrereqError("Only one %s add or remove operation is"
3302
                                       " supported at a time" % kind,
3303
                                       errors.ECODE_INVAL)
3304
        else:
3305
          result.append((constants.DDM_MODIFY, op, params))
3306

    
3307
      assert verify_fn(result)
3308
    else:
3309
      result = mods
3310

    
3311
    return result
3312

    
3313
  @staticmethod
3314
  def _CheckMods(kind, mods, key_types, item_fn):
3315
    """Ensures requested disk/NIC modifications are valid.
3316

3317
    """
3318
    for (op, _, params) in mods:
3319
      assert ht.TDict(params)
3320

    
3321
      # If 'key_types' is an empty dict, we assume we have an
3322
      # 'ext' template and thus do not ForceDictType
3323
      if key_types:
3324
        utils.ForceDictType(params, key_types)
3325

    
3326
      if op == constants.DDM_REMOVE:
3327
        if params:
3328
          raise errors.OpPrereqError("No settings should be passed when"
3329
                                     " removing a %s" % kind,
3330
                                     errors.ECODE_INVAL)
3331
      elif op in (constants.DDM_ADD, constants.DDM_MODIFY):
3332
        item_fn(op, params)
3333
      else:
3334
        raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3335

    
3336
  @staticmethod
3337
  def _VerifyDiskModification(op, params):
3338
    """Verifies a disk modification.
3339

3340
    """
3341
    if op == constants.DDM_ADD:
3342
      mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
3343
      if mode not in constants.DISK_ACCESS_SET:
3344
        raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
3345
                                   errors.ECODE_INVAL)
3346

    
3347
      size = params.get(constants.IDISK_SIZE, None)
3348
      if size is None:
3349
        raise errors.OpPrereqError("Required disk parameter '%s' missing" %
3350
                                   constants.IDISK_SIZE, errors.ECODE_INVAL)
3351

    
3352
      try:
3353
        size = int(size)
3354
      except (TypeError, ValueError), err:
3355
        raise errors.OpPrereqError("Invalid disk size parameter: %s" % err,
3356
                                   errors.ECODE_INVAL)
3357

    
3358
      params[constants.IDISK_SIZE] = size
3359
      name = params.get(constants.IDISK_NAME, None)
3360
      if name is not None and name.lower() == constants.VALUE_NONE:
3361
        params[constants.IDISK_NAME] = None
3362

    
3363
    elif op == constants.DDM_MODIFY:
3364
      if constants.IDISK_SIZE in params:
3365
        raise errors.OpPrereqError("Disk size change not possible, use"
3366
                                   " grow-disk", errors.ECODE_INVAL)
3367
      if len(params) > 2:
3368
        raise errors.OpPrereqError("Disk modification doesn't support"
3369
                                   " additional arbitrary parameters",
3370
                                   errors.ECODE_INVAL)
3371
      name = params.get(constants.IDISK_NAME, None)
3372
      if name is not None and name.lower() == constants.VALUE_NONE:
3373
        params[constants.IDISK_NAME] = None
3374

    
3375
  @staticmethod
3376
  def _VerifyNicModification(op, params):
3377
    """Verifies a network interface modification.
3378

3379
    """
3380
    if op in (constants.DDM_ADD, constants.DDM_MODIFY):
3381
      ip = params.get(constants.INIC_IP, None)
3382
      name = params.get(constants.INIC_NAME, None)
3383
      req_net = params.get(constants.INIC_NETWORK, None)
3384
      link = params.get(constants.NIC_LINK, None)
3385
      mode = params.get(constants.NIC_MODE, None)
3386
      if name is not None and name.lower() == constants.VALUE_NONE:
3387
        params[constants.INIC_NAME] = None
3388
      if req_net is not None:
3389
        if req_net.lower() == constants.VALUE_NONE:
3390
          params[constants.INIC_NETWORK] = None
3391
          req_net = None
3392
        elif link is not None or mode is not None:
3393
          raise errors.OpPrereqError("If network is given"
3394
                                     " mode or link should not",
3395
                                     errors.ECODE_INVAL)
3396

    
3397
      if op == constants.DDM_ADD:
3398
        macaddr = params.get(constants.INIC_MAC, None)
3399
        if macaddr is None:
3400
          params[constants.INIC_MAC] = constants.VALUE_AUTO
3401

    
3402
      if ip is not None:
3403
        if ip.lower() == constants.VALUE_NONE:
3404
          params[constants.INIC_IP] = None
3405
        else:
3406
          if ip.lower() == constants.NIC_IP_POOL:
3407
            if op == constants.DDM_ADD and req_net is None:
3408
              raise errors.OpPrereqError("If ip=pool, parameter network"
3409
                                         " cannot be none",
3410
                                         errors.ECODE_INVAL)
3411
          else:
3412
            if not netutils.IPAddress.IsValid(ip):
3413
              raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
3414
                                         errors.ECODE_INVAL)
3415

    
3416
      if constants.INIC_MAC in params:
3417
        macaddr = params[constants.INIC_MAC]
3418
        if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3419
          macaddr = utils.NormalizeAndValidateMac(macaddr)
3420

    
3421
        if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO:
3422
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
3423
                                     " modifying an existing NIC",
3424
                                     errors.ECODE_INVAL)
3425

    
3426
  def CheckArguments(self):
3427
    if not (self.op.nics or self.op.disks or self.op.disk_template or
3428
            self.op.hvparams or self.op.beparams or self.op.os_name or
3429
            self.op.offline is not None or self.op.runtime_mem or
3430
            self.op.pnode):
3431
      raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
3432

    
3433
    if self.op.hvparams:
3434
      _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS,
3435
                            "hypervisor", "instance", "cluster")
3436

    
3437
    self.op.disks = self._UpgradeDiskNicMods(
3438
      "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
3439
    self.op.nics = self._UpgradeDiskNicMods(
3440
      "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
3441

    
3442
    if self.op.disks and self.op.disk_template is not None:
3443
      raise errors.OpPrereqError("Disk template conversion and other disk"
3444
                                 " changes not supported at the same time",
3445
                                 errors.ECODE_INVAL)
3446

    
3447
    if (self.op.disk_template and
3448
        self.op.disk_template in constants.DTS_INT_MIRROR and
3449
        self.op.remote_node is None):
3450
      raise errors.OpPrereqError("Changing the disk template to a mirrored"
3451
                                 " one requires specifying a secondary node",
3452
                                 errors.ECODE_INVAL)
3453

    
3454
    # Check NIC modifications
3455
    self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES,
3456
                    self._VerifyNicModification)
3457

    
3458
    if self.op.pnode:
3459
      self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
3460

    
3461
  def ExpandNames(self):
3462
    self._ExpandAndLockInstance()
3463
    self.needed_locks[locking.LEVEL_NODEGROUP] = []
3464
    # Can't even acquire node locks in shared mode as upcoming changes in
3465
    # Ganeti 2.6 will start to modify the node object on disk conversion
3466
    self.needed_locks[locking.LEVEL_NODE] = []
3467
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3468
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3469
    # Look node group to look up the ipolicy
3470
    self.share_locks[locking.LEVEL_NODEGROUP] = 1
3471

    
3472
  def DeclareLocks(self, level):
3473
    if level == locking.LEVEL_NODEGROUP:
3474
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3475
      # Acquire locks for the instance's nodegroups optimistically. Needs
3476
      # to be verified in CheckPrereq
3477
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3478
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
3479
    elif level == locking.LEVEL_NODE:
3480
      self._LockInstancesNodes()
3481
      if self.op.disk_template and self.op.remote_node:
3482
        self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
3483
        self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
3484
    elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
3485
      # Copy node locks
3486
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3487
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3488

    
3489
  def BuildHooksEnv(self):
3490
    """Build hooks env.
3491

3492
    This runs on the master, primary and secondaries.
3493

3494
    """
3495
    args = {}
3496
    if constants.BE_MINMEM in self.be_new:
3497
      args["minmem"] = self.be_new[constants.BE_MINMEM]
3498
    if constants.BE_MAXMEM in self.be_new:
3499
      args["maxmem"] = self.be_new[constants.BE_MAXMEM]
3500
    if constants.BE_VCPUS in self.be_new:
3501
      args["vcpus"] = self.be_new[constants.BE_VCPUS]
3502
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
3503
    # information at all.
3504

    
3505
    if self._new_nics is not None:
3506
      nics = []
3507

    
3508
      for nic in self._new_nics:
3509
        n = copy.deepcopy(nic)
3510
        nicparams = self.cluster.SimpleFillNIC(n.nicparams)
3511
        n.nicparams = nicparams
3512
        nics.append(_NICToTuple(self, n))
3513

    
3514
      args["nics"] = nics
3515

    
3516
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
3517
    if self.op.disk_template:
3518
      env["NEW_DISK_TEMPLATE"] = self.op.disk_template
3519
    if self.op.runtime_mem:
3520
      env["RUNTIME_MEMORY"] = self.op.runtime_mem
3521

    
3522
    return env
3523

    
3524
  def BuildHooksNodes(self):
3525
    """Build hooks nodes.
3526

3527
    """
3528
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3529
    return (nl, nl)
3530

    
3531
  def _PrepareNicModification(self, params, private, old_ip, old_net_uuid,
3532
                              old_params, cluster, pnode):
3533

    
3534
    update_params_dict = dict([(key, params[key])
3535
                               for key in constants.NICS_PARAMETERS
3536
                               if key in params])
3537

    
3538
    req_link = update_params_dict.get(constants.NIC_LINK, None)
3539
    req_mode = update_params_dict.get(constants.NIC_MODE, None)
3540

    
3541
    new_net_uuid = None
3542
    new_net_uuid_or_name = params.get(constants.INIC_NETWORK, old_net_uuid)
3543
    if new_net_uuid_or_name:
3544
      new_net_uuid = self.cfg.LookupNetwork(new_net_uuid_or_name)
3545
      new_net_obj = self.cfg.GetNetwork(new_net_uuid)
3546

    
3547
    if old_net_uuid:
3548
      old_net_obj = self.cfg.GetNetwork(old_net_uuid)
3549

    
3550
    if new_net_uuid:
3551
      netparams = self.cfg.GetGroupNetParams(new_net_uuid, pnode)
3552
      if not netparams:
3553
        raise errors.OpPrereqError("No netparams found for the network"
3554
                                   " %s, probably not connected" %
3555
                                   new_net_obj.name, errors.ECODE_INVAL)
3556
      new_params = dict(netparams)
3557
    else:
3558
      new_params = _GetUpdatedParams(old_params, update_params_dict)
3559

    
3560
    utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
3561

    
3562
    new_filled_params = cluster.SimpleFillNIC(new_params)
3563
    objects.NIC.CheckParameterSyntax(new_filled_params)
3564

    
3565
    new_mode = new_filled_params[constants.NIC_MODE]
3566
    if new_mode == constants.NIC_MODE_BRIDGED:
3567
      bridge = new_filled_params[constants.NIC_LINK]
3568
      msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg
3569
      if msg:
3570
        msg = "Error checking bridges on node '%s': %s" % (pnode, msg)
3571
        if self.op.force:
3572
          self.warn.append(msg)
3573
        else:
3574
          raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
3575

    
3576
    elif new_mode == constants.NIC_MODE_ROUTED:
3577
      ip = params.get(constants.INIC_IP, old_ip)
3578
      if ip is None:
3579
        raise errors.OpPrereqError("Cannot set the NIC IP address to None"
3580
                                   " on a routed NIC", errors.ECODE_INVAL)
3581

    
3582
    elif new_mode == constants.NIC_MODE_OVS:
3583
      # TODO: check OVS link
3584
      self.LogInfo("OVS links are currently not checked for correctness")
3585

    
3586
    if constants.INIC_MAC in params:
3587
      mac = params[constants.INIC_MAC]
3588
      if mac is None:
3589
        raise errors.OpPrereqError("Cannot unset the NIC MAC address",
3590
                                   errors.ECODE_INVAL)
3591
      elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3592
        # otherwise generate the MAC address
3593
        params[constants.INIC_MAC] = \
3594
          self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3595
      else:
3596
        # or validate/reserve the current one
3597
        try:
3598
          self.cfg.ReserveMAC(mac, self.proc.GetECId())
3599
        except errors.ReservationError:
3600
          raise errors.OpPrereqError("MAC address '%s' already in use"
3601
                                     " in cluster" % mac,
3602
                                     errors.ECODE_NOTUNIQUE)
3603
    elif new_net_uuid != old_net_uuid:
3604

    
3605
      def get_net_prefix(net_uuid):
3606
        mac_prefix = None
3607
        if net_uuid:
3608
          nobj = self.cfg.GetNetwork(net_uuid)
3609
          mac_prefix = nobj.mac_prefix
3610

    
3611
        return mac_prefix
3612

    
3613
      new_prefix = get_net_prefix(new_net_uuid)
3614
      old_prefix = get_net_prefix(old_net_uuid)
3615
      if old_prefix != new_prefix:
3616
        params[constants.INIC_MAC] = \
3617
          self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3618

    
3619
    # if there is a change in (ip, network) tuple
3620
    new_ip = params.get(constants.INIC_IP, old_ip)
3621
    if (new_ip, new_net_uuid) != (old_ip, old_net_uuid):
3622
      if new_ip:
3623
        # if IP is pool then require a network and generate one IP
3624
        if new_ip.lower() == constants.NIC_IP_POOL:
3625
          if new_net_uuid:
3626
            try:
3627
              new_ip = self.cfg.GenerateIp(new_net_uuid, self.proc.GetECId())
3628
            except errors.ReservationError:
3629
              raise errors.OpPrereqError("Unable to get a free IP"
3630
                                         " from the address pool",
3631
                                         errors.ECODE_STATE)
3632
            self.LogInfo("Chose IP %s from network %s",
3633
                         new_ip,
3634
                         new_net_obj.name)
3635
            params[constants.INIC_IP] = new_ip
3636
          else:
3637
            raise errors.OpPrereqError("ip=pool, but no network found",
3638
                                       errors.ECODE_INVAL)
3639
        # Reserve new IP if in the new network if any
3640
        elif new_net_uuid:
3641
          try:
3642
            self.cfg.ReserveIp(new_net_uuid, new_ip, self.proc.GetECId())
3643
            self.LogInfo("Reserving IP %s in network %s",
3644
                         new_ip, new_net_obj.name)
3645
          except errors.ReservationError:
3646
            raise errors.OpPrereqError("IP %s not available in network %s" %
3647
                                       (new_ip, new_net_obj.name),
3648
                                       errors.ECODE_NOTUNIQUE)
3649
        # new network is None so check if new IP is a conflicting IP
3650
        elif self.op.conflicts_check:
3651
          _CheckForConflictingIp(self, new_ip, pnode)
3652

    
3653
      # release old IP if old network is not None
3654
      if old_ip and old_net_uuid:
3655
        try:
3656
          self.cfg.ReleaseIp(old_net_uuid, old_ip, self.proc.GetECId())
3657
        except errors.AddressPoolError:
3658
          logging.warning("Release IP %s not contained in network %s",
3659
                          old_ip, old_net_obj.name)
3660

    
3661
    # there are no changes in (ip, network) tuple and old network is not None
3662
    elif (old_net_uuid is not None and
3663
          (req_link is not None or req_mode is not None)):
3664
      raise errors.OpPrereqError("Not allowed to change link or mode of"
3665
                                 " a NIC that is connected to a network",
3666
                                 errors.ECODE_INVAL)
3667

    
3668
    private.params = new_params
3669
    private.filled = new_filled_params
3670

    
3671
  def _PreCheckDiskTemplate(self, pnode_info):
3672
    """CheckPrereq checks related to a new disk template."""
3673
    # Arguments are passed to avoid configuration lookups
3674
    instance = self.instance
3675
    pnode = instance.primary_node
3676
    cluster = self.cluster
3677
    if instance.disk_template == self.op.disk_template:
3678
      raise errors.OpPrereqError("Instance already has disk template %s" %
3679
                                 instance.disk_template, errors.ECODE_INVAL)
3680

    
3681
    if (instance.disk_template,
3682
        self.op.disk_template) not in self._DISK_CONVERSIONS:
3683
      raise errors.OpPrereqError("Unsupported disk template conversion from"
3684
                                 " %s to %s" % (instance.disk_template,
3685
                                                self.op.disk_template),
3686
                                 errors.ECODE_INVAL)
3687
    _CheckInstanceState(self, instance, INSTANCE_DOWN,
3688
                        msg="cannot change disk template")
3689
    if self.op.disk_template in constants.DTS_INT_MIRROR:
3690
      if self.op.remote_node == pnode:
3691
        raise errors.OpPrereqError("Given new secondary node %s is the same"
3692
                                   " as the primary node of the instance" %
3693
                                   self.op.remote_node, errors.ECODE_STATE)
3694
      _CheckNodeOnline(self, self.op.remote_node)
3695
      _CheckNodeNotDrained(self, self.op.remote_node)
3696
      # FIXME: here we assume that the old instance type is DT_PLAIN
3697
      assert instance.disk_template == constants.DT_PLAIN
3698
      disks = [{constants.IDISK_SIZE: d.size,
3699
                constants.IDISK_VG: d.logical_id[0]}
3700
               for d in instance.disks]
3701
      required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
3702
      _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
3703

    
3704
      snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
3705
      snode_group = self.cfg.GetNodeGroup(snode_info.group)
3706
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3707
                                                              snode_group)
3708
      _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, self.cfg,
3709
                              ignore=self.op.ignore_ipolicy)
3710
      if pnode_info.group != snode_info.group:
3711
        self.LogWarning("The primary and secondary nodes are in two"
3712
                        " different node groups; the disk parameters"
3713
                        " from the first disk's node group will be"
3714
                        " used")
3715

    
3716
    if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
3717
      # Make sure none of the nodes require exclusive storage
3718
      nodes = [pnode_info]
3719
      if self.op.disk_template in constants.DTS_INT_MIRROR:
3720
        assert snode_info
3721
        nodes.append(snode_info)
3722
      has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
3723
      if compat.any(map(has_es, nodes)):
3724
        errmsg = ("Cannot convert disk template from %s to %s when exclusive"
3725
                  " storage is enabled" % (instance.disk_template,
3726
                                           self.op.disk_template))
3727
        raise errors.OpPrereqError(errmsg, errors.ECODE_STATE)
3728

    
3729
  def CheckPrereq(self):
3730
    """Check prerequisites.
3731

3732
    This only checks the instance list against the existing names.
3733

3734
    """
3735
    assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3736
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3737

    
3738
    cluster = self.cluster = self.cfg.GetClusterInfo()
3739
    assert self.instance is not None, \
3740
      "Cannot retrieve locked instance %s" % self.op.instance_name
3741

    
3742
    pnode = instance.primary_node
3743

    
3744
    self.warn = []
3745

    
3746
    if (self.op.pnode is not None and self.op.pnode != pnode and
3747
        not self.op.force):
3748
      # verify that the instance is not up
3749
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
3750
                                                  instance.hypervisor)
3751
      if instance_info.fail_msg:
3752
        self.warn.append("Can't get instance runtime information: %s" %
3753
                         instance_info.fail_msg)
3754
      elif instance_info.payload:
3755
        raise errors.OpPrereqError("Instance is still running on %s" % pnode,
3756
                                   errors.ECODE_STATE)
3757

    
3758
    assert pnode in self.owned_locks(locking.LEVEL_NODE)
3759
    nodelist = list(instance.all_nodes)
3760
    pnode_info = self.cfg.GetNodeInfo(pnode)
3761
    self.diskparams = self.cfg.GetInstanceDiskParams(instance)
3762

    
3763
    #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
3764
    assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP)
3765
    group_info = self.cfg.GetNodeGroup(pnode_info.group)
3766

    
3767
    # dictionary with instance information after the modification
3768
    ispec = {}
3769

    
3770
    # Check disk modifications. This is done here and not in CheckArguments
3771
    # (as with NICs), because we need to know the instance's disk template
3772
    if instance.disk_template == constants.DT_EXT:
3773
      self._CheckMods("disk", self.op.disks, {},
3774
                      self._VerifyDiskModification)
3775
    else:
3776
      self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
3777
                      self._VerifyDiskModification)
3778

    
3779
    # Prepare disk/NIC modifications
3780
    self.diskmod = PrepareContainerMods(self.op.disks, None)
3781
    self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
3782

    
3783
    # Check the validity of the `provider' parameter
3784
    if instance.disk_template in constants.DT_EXT:
3785
      for mod in self.diskmod:
3786
        ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3787
        if mod[0] == constants.DDM_ADD:
3788
          if ext_provider is None:
3789
            raise errors.OpPrereqError("Instance template is '%s' and parameter"
3790
                                       " '%s' missing, during disk add" %
3791
                                       (constants.DT_EXT,
3792
                                        constants.IDISK_PROVIDER),
3793
                                       errors.ECODE_NOENT)
3794
        elif mod[0] == constants.DDM_MODIFY:
3795
          if ext_provider:
3796
            raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
3797
                                       " modification" %
3798
                                       constants.IDISK_PROVIDER,
3799
                                       errors.ECODE_INVAL)
3800
    else:
3801
      for mod in self.diskmod:
3802
        ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3803
        if ext_provider is not None:
3804
          raise errors.OpPrereqError("Parameter '%s' is only valid for"
3805
                                     " instances of type '%s'" %
3806
                                     (constants.IDISK_PROVIDER,
3807
                                      constants.DT_EXT),
3808
                                     errors.ECODE_INVAL)
3809

    
3810
    # OS change
3811
    if self.op.os_name and not self.op.force:
3812
      _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
3813
                      self.op.force_variant)
3814
      instance_os = self.op.os_name
3815
    else:
3816
      instance_os = instance.os
3817

    
3818
    assert not (self.op.disk_template and self.op.disks), \
3819
      "Can't modify disk template and apply disk changes at the same time"
3820

    
3821
    if self.op.disk_template:
3822
      self._PreCheckDiskTemplate(pnode_info)
3823

    
3824
    # hvparams processing
3825
    if self.op.hvparams:
3826
      hv_type = instance.hypervisor
3827
      i_hvdict = _GetUpdatedParams(instance.hvparams, self.op.hvparams)
3828
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
3829
      hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict)
3830

    
3831
      # local check
3832
      hypervisor.GetHypervisorClass(hv_type).CheckParameterSyntax(hv_new)
3833
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
3834
      self.hv_proposed = self.hv_new = hv_new # the new actual values
3835
      self.hv_inst = i_hvdict # the new dict (without defaults)
3836
    else:
3837
      self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
3838
                                              instance.hvparams)
3839
      self.hv_new = self.hv_inst = {}
3840

    
3841
    # beparams processing
3842
    if self.op.beparams:
3843
      i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
3844
                                   use_none=True)
3845
      objects.UpgradeBeParams(i_bedict)
3846
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
3847
      be_new = cluster.SimpleFillBE(i_bedict)
3848
      self.be_proposed = self.be_new = be_new # the new actual values
3849
      self.be_inst = i_bedict # the new dict (without defaults)
3850
    else:
3851
      self.be_new = self.be_inst = {}
3852
      self.be_proposed = cluster.SimpleFillBE(instance.beparams)
3853
    be_old = cluster.FillBE(instance)
3854

    
3855
    # CPU param validation -- checking every time a parameter is
3856
    # changed to cover all cases where either CPU mask or vcpus have
3857
    # changed
3858
    if (constants.BE_VCPUS in self.be_proposed and
3859
        constants.HV_CPU_MASK in self.hv_proposed):
3860
      cpu_list = \
3861
        utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
3862
      # Verify mask is consistent with number of vCPUs. Can skip this
3863
      # test if only 1 entry in the CPU mask, which means same mask
3864
      # is applied to all vCPUs.
3865
      if (len(cpu_list) > 1 and
3866
          len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
3867
        raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
3868
                                   " CPU mask [%s]" %
3869
                                   (self.be_proposed[constants.BE_VCPUS],
3870
                                    self.hv_proposed[constants.HV_CPU_MASK]),
3871
                                   errors.ECODE_INVAL)
3872

    
3873
      # Only perform this test if a new CPU mask is given
3874
      if constants.HV_CPU_MASK in self.hv_new:
3875
        # Calculate the largest CPU number requested
3876
        max_requested_cpu = max(map(max, cpu_list))
3877
        # Check that all of the instance's nodes have enough physical CPUs to
3878
        # satisfy the requested CPU mask
3879
        _CheckNodesPhysicalCPUs(self, instance.all_nodes,
3880
                                max_requested_cpu + 1, instance.hypervisor)
3881

    
3882
    # osparams processing
3883
    if self.op.osparams:
3884
      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
3885
      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
3886
      self.os_inst = i_osdict # the new dict (without defaults)
3887
    else:
3888
      self.os_inst = {}
3889

    
3890
    #TODO(dynmem): do the appropriate check involving MINMEM
3891
    if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
3892
        be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
3893
      mem_check_list = [pnode]
3894
      if be_new[constants.BE_AUTO_BALANCE]:
3895
        # either we changed auto_balance to yes or it was from before
3896
        mem_check_list.extend(instance.secondary_nodes)
3897
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
3898
                                                  instance.hypervisor)
3899
      nodeinfo = self.rpc.call_node_info(mem_check_list, None,
3900
                                         [instance.hypervisor], False)
3901
      pninfo = nodeinfo[pnode]
3902
      msg = pninfo.fail_msg
3903
      if msg:
3904
        # Assume the primary node is unreachable and go ahead
3905
        self.warn.append("Can't get info from primary node %s: %s" %
3906
                         (pnode, msg))
3907
      else:
3908
        (_, _, (pnhvinfo, )) = pninfo.payload
3909
        if not isinstance(pnhvinfo.get("memory_free", None), int):
3910
          self.warn.append("Node data from primary node %s doesn't contain"
3911
                           " free memory information" % pnode)
3912
        elif instance_info.fail_msg:
3913
          self.warn.append("Can't get instance runtime information: %s" %
3914
                           instance_info.fail_msg)
3915
        else:
3916
          if instance_info.payload:
3917
            current_mem = int(instance_info.payload["memory"])
3918
          else:
3919
            # Assume instance not running
3920
            # (there is a slight race condition here, but it's not very
3921
            # probable, and we have no other way to check)
3922
            # TODO: Describe race condition
3923
            current_mem = 0
3924
          #TODO(dynmem): do the appropriate check involving MINMEM
3925
          miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
3926
                      pnhvinfo["memory_free"])
3927
          if miss_mem > 0:
3928
            raise errors.OpPrereqError("This change will prevent the instance"
3929
                                       " from starting, due to %d MB of memory"
3930
                                       " missing on its primary node" %
3931
                                       miss_mem, errors.ECODE_NORES)
3932

    
3933
      if be_new[constants.BE_AUTO_BALANCE]:
3934
        for node, nres in nodeinfo.items():
3935
          if node not in instance.secondary_nodes:
3936
            continue
3937
          nres.Raise("Can't get info from secondary node %s" % node,
3938
                     prereq=True, ecode=errors.ECODE_STATE)
3939
          (_, _, (nhvinfo, )) = nres.payload
3940
          if not isinstance(nhvinfo.get("memory_free", None), int):
3941
            raise errors.OpPrereqError("Secondary node %s didn't return free"
3942
                                       " memory information" % node,
3943
                                       errors.ECODE_STATE)
3944
          #TODO(dynmem): do the appropriate check involving MINMEM
3945
          elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
3946
            raise errors.OpPrereqError("This change will prevent the instance"
3947
                                       " from failover to its secondary node"
3948
                                       " %s, due to not enough memory" % node,
3949
                                       errors.ECODE_STATE)
3950

    
3951
    if self.op.runtime_mem:
3952
      remote_info = self.rpc.call_instance_info(instance.primary_node,
3953
                                                instance.name,
3954
                                                instance.hypervisor)
3955
      remote_info.Raise("Error checking node %s" % instance.primary_node)
3956
      if not remote_info.payload: # not running already
3957
        raise errors.OpPrereqError("Instance %s is not running" %
3958
                                   instance.name, errors.ECODE_STATE)
3959

    
3960
      current_memory = remote_info.payload["memory"]
3961
      if (not self.op.force and
3962
           (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
3963
            self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
3964
        raise errors.OpPrereqError("Instance %s must have memory between %d"
3965
                                   " and %d MB of memory unless --force is"
3966
                                   " given" %
3967
                                   (instance.name,
3968
                                    self.be_proposed[constants.BE_MINMEM],
3969
                                    self.be_proposed[constants.BE_MAXMEM]),
3970
                                   errors.ECODE_INVAL)
3971

    
3972
      delta = self.op.runtime_mem - current_memory
3973
      if delta > 0:
3974
        _CheckNodeFreeMemory(self, instance.primary_node,
3975
                             "ballooning memory for instance %s" %
3976
                             instance.name, delta, instance.hypervisor)
3977

    
3978
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
3979
      raise errors.OpPrereqError("Disk operations not supported for"
3980
                                 " diskless instances", errors.ECODE_INVAL)
3981

    
3982
    def _PrepareNicCreate(_, params, private):
3983
      self._PrepareNicModification(params, private, None, None,
3984
                                   {}, cluster, pnode)
3985
      return (None, None)
3986

    
3987
    def _PrepareNicMod(_, nic, params, private):
3988
      self._PrepareNicModification(params, private, nic.ip, nic.network,
3989
                                   nic.nicparams, cluster, pnode)
3990
      return None
3991

    
3992
    def _PrepareNicRemove(_, params, __):
3993
      ip = params.ip
3994
      net = params.network
3995
      if net is not None and ip is not None:
3996
        self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
3997

    
3998
    # Verify NIC changes (operating on copy)
3999
    nics = instance.nics[:]
4000
    ApplyContainerMods("NIC", nics, None, self.nicmod,
4001
                       _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
4002
    if len(nics) > constants.MAX_NICS:
4003
      raise errors.OpPrereqError("Instance has too many network interfaces"
4004
                                 " (%d), cannot add more" % constants.MAX_NICS,
4005
                                 errors.ECODE_STATE)
4006

    
4007
    def _PrepareDiskMod(_, disk, params, __):
4008
      disk.name = params.get(constants.IDISK_NAME, None)
4009

    
4010
    # Verify disk changes (operating on a copy)
4011
    disks = copy.deepcopy(instance.disks)
4012
    ApplyContainerMods("disk", disks, None, self.diskmod, None, _PrepareDiskMod,
4013
                       None)
4014
    utils.ValidateDeviceNames("disk", disks)
4015
    if len(disks) > constants.MAX_DISKS:
4016
      raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
4017
                                 " more" % constants.MAX_DISKS,
4018
                                 errors.ECODE_STATE)
4019
    disk_sizes = [disk.size for disk in instance.disks]
4020
    disk_sizes.extend(params["size"] for (op, idx, params, private) in
4021
                      self.diskmod if op == constants.DDM_ADD)
4022
    ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
4023
    ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
4024

    
4025
    if self.op.offline is not None and self.op.offline:
4026
      _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE,
4027
                          msg="can't change to offline")
4028

    
4029
    # Pre-compute NIC changes (necessary to use result in hooks)
4030
    self._nic_chgdesc = []
4031
    if self.nicmod:
4032
      # Operate on copies as this is still in prereq
4033
      nics = [nic.Copy() for nic in instance.nics]
4034
      ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
4035
                         self._CreateNewNic, self._ApplyNicMods, None)
4036
      # Verify that NIC names are unique and valid
4037
      utils.ValidateDeviceNames("NIC", nics)
4038
      self._new_nics = nics
4039
      ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics)
4040
    else:
4041
      self._new_nics = None
4042
      ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics)
4043

    
4044
    if not self.op.ignore_ipolicy:
4045
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4046
                                                              group_info)
4047

    
4048
      # Fill ispec with backend parameters
4049
      ispec[constants.ISPEC_SPINDLE_USE] = \
4050
        self.be_new.get(constants.BE_SPINDLE_USE, None)
4051
      ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS,
4052
                                                         None)
4053

    
4054
      # Copy ispec to verify parameters with min/max values separately
4055
      if self.op.disk_template:
4056
        new_disk_template = self.op.disk_template
4057
      else:
4058
        new_disk_template = instance.disk_template
4059
      ispec_max = ispec.copy()
4060
      ispec_max[constants.ISPEC_MEM_SIZE] = \
4061
        self.be_new.get(constants.BE_MAXMEM, None)
4062
      res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max,
4063
                                                     new_disk_template)
4064
      ispec_min = ispec.copy()
4065
      ispec_min[constants.ISPEC_MEM_SIZE] = \
4066
        self.be_new.get(constants.BE_MINMEM, None)
4067
      res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min,
4068
                                                     new_disk_template)
4069

    
4070
      if (res_max or res_min):
4071
        # FIXME: Improve error message by including information about whether
4072
        # the upper or lower limit of the parameter fails the ipolicy.
4073
        msg = ("Instance allocation to group %s (%s) violates policy: %s" %
4074
               (group_info, group_info.name,
4075
                utils.CommaJoin(set(res_max + res_min))))
4076
        raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
4077

    
4078
  def _ConvertPlainToDrbd(self, feedback_fn):
4079
    """Converts an instance from plain to drbd.
4080

4081
    """
4082
    feedback_fn("Converting template to drbd")
4083
    instance = self.instance
4084
    pnode = instance.primary_node
4085
    snode = self.op.remote_node
4086

    
4087
    assert instance.disk_template == constants.DT_PLAIN
4088

    
4089
    # create a fake disk info for _GenerateDiskTemplate
4090
    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
4091
                  constants.IDISK_VG: d.logical_id[0],
4092
                  constants.IDISK_NAME: d.name}
4093
                 for d in instance.disks]
4094
    new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
4095
                                      instance.name, pnode, [snode],
4096
                                      disk_info, None, None, 0, feedback_fn,
4097
                                      self.diskparams)
4098
    anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
4099
                                        self.diskparams)
4100
    p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode)
4101
    s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode)
4102
    info = _GetInstanceInfoText(instance)
4103
    feedback_fn("Creating additional volumes...")
4104
    # first, create the missing data and meta devices
4105
    for disk in anno_disks:
4106
      # unfortunately this is... not too nice
4107
      _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
4108
                            info, True, p_excl_stor)
4109
      for child in disk.children:
4110
        _CreateSingleBlockDev(self, snode, instance, child, info, True,
4111
                              s_excl_stor)
4112
    # at this stage, all new LVs have been created, we can rename the
4113
    # old ones
4114
    feedback_fn("Renaming original volumes...")
4115
    rename_list = [(o, n.children[0].logical_id)
4116
                   for (o, n) in zip(instance.disks, new_disks)]
4117
    result = self.rpc.call_blockdev_rename(pnode, rename_list)
4118
    result.Raise("Failed to rename original LVs")
4119

    
4120
    feedback_fn("Initializing DRBD devices...")
4121
    # all child devices are in place, we can now create the DRBD devices
4122
    try:
4123
      for disk in anno_disks:
4124
        for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]:
4125
          f_create = node == pnode
4126
          _CreateSingleBlockDev(self, node, instance, disk, info, f_create,
4127
                                excl_stor)
4128
    except errors.GenericError, e:
4129
      feedback_fn("Initializing of DRBD devices failed;"
4130
                  " renaming back original volumes...")
4131
      for disk in new_disks:
4132
        self.cfg.SetDiskID(disk, pnode)
4133
      rename_back_list = [(n.children[0], o.logical_id)
4134
                          for (n, o) in zip(new_disks, instance.disks)]
4135
      result = self.rpc.call_blockdev_rename(pnode, rename_back_list)
4136
      result.Raise("Failed to rename LVs back after error %s" % str(e))
4137
      raise
4138

    
4139
    # at this point, the instance has been modified
4140
    instance.disk_template = constants.DT_DRBD8
4141
    instance.disks = new_disks
4142
    self.cfg.Update(instance, feedback_fn)
4143

    
4144
    # Release node locks while waiting for sync
4145
    _ReleaseLocks(self, locking.LEVEL_NODE)
4146

    
4147
    # disks are created, waiting for sync
4148
    disk_abort = not _WaitForSync(self, instance,
4149
                                  oneshot=not self.op.wait_for_sync)
4150
    if disk_abort:
4151
      raise errors.OpExecError("There are some degraded disks for"
4152
                               " this instance, please cleanup manually")
4153

    
4154
    # Node resource locks will be released by caller
4155

    
4156
  def _ConvertDrbdToPlain(self, feedback_fn):
4157
    """Converts an instance from drbd to plain.
4158

4159
    """
4160
    instance = self.instance
4161

    
4162
    assert len(instance.secondary_nodes) == 1
4163
    assert instance.disk_template == constants.DT_DRBD8
4164

    
4165
    pnode = instance.primary_node
4166
    snode = instance.secondary_nodes[0]
4167
    feedback_fn("Converting template to plain")
4168

    
4169
    old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
4170
    new_disks = [d.children[0] for d in instance.disks]
4171

    
4172
    # copy over size, mode and name
4173
    for parent, child in zip(old_disks, new_disks):
4174
      child.size = parent.size
4175
      child.mode = parent.mode
4176
      child.name = parent.name
4177

    
4178
    # this is a DRBD disk, return its port to the pool
4179
    # NOTE: this must be done right before the call to cfg.Update!
4180
    for disk in old_disks:
4181
      tcp_port = disk.logical_id[2]
4182
      self.cfg.AddTcpUdpPort(tcp_port)
4183

    
4184
    # update instance structure
4185
    instance.disks = new_disks
4186
    instance.disk_template = constants.DT_PLAIN
4187
    _UpdateIvNames(0, instance.disks)
4188
    self.cfg.Update(instance, feedback_fn)
4189

    
4190
    # Release locks in case removing disks takes a while
4191
    _ReleaseLocks(self, locking.LEVEL_NODE)
4192

    
4193
    feedback_fn("Removing volumes on the secondary node...")
4194
    for disk in old_disks:
4195
      self.cfg.SetDiskID(disk, snode)
4196
      msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
4197
      if msg:
4198
        self.LogWarning("Could not remove block device %s on node %s,"
4199
                        " continuing anyway: %s", disk.iv_name, snode, msg)
4200

    
4201
    feedback_fn("Removing unneeded volumes on the primary node...")
4202
    for idx, disk in enumerate(old_disks):
4203
      meta = disk.children[1]
4204
      self.cfg.SetDiskID(meta, pnode)
4205
      msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
4206
      if msg:
4207
        self.LogWarning("Could not remove metadata for disk %d on node %s,"
4208
                        " continuing anyway: %s", idx, pnode, msg)
4209

    
4210
  def _CreateNewDisk(self, idx, params, _):
4211
    """Creates a new disk.
4212

4213
    """
4214
    instance = self.instance
4215

    
4216
    # add a new disk
4217
    if instance.disk_template in constants.DTS_FILEBASED:
4218
      (file_driver, file_path) = instance.disks[0].logical_id
4219
      file_path = os.path.dirname(file_path)
4220
    else:
4221
      file_driver = file_path = None
4222

    
4223
    disk = \
4224
      _GenerateDiskTemplate(self, instance.disk_template, instance.name,
4225
                            instance.primary_node, instance.secondary_nodes,
4226
                            [params], file_path, file_driver, idx,
4227
                            self.Log, self.diskparams)[0]
4228

    
4229
    info = _GetInstanceInfoText(instance)
4230

    
4231
    logging.info("Creating volume %s for instance %s",
4232
                 disk.iv_name, instance.name)
4233
    # Note: this needs to be kept in sync with _CreateDisks
4234
    #HARDCODE
4235
    for node in instance.all_nodes:
4236
      f_create = (node == instance.primary_node)
4237
      try:
4238
        _CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
4239
      except errors.OpExecError, err:
4240
        self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
4241
                        disk.iv_name, disk, node, err)
4242

    
4243
    if self.cluster.prealloc_wipe_disks:
4244
      # Wipe new disk
4245
      _WipeDisks(self, instance,
4246
                 disks=[(idx, disk, 0)])
4247

    
4248
    return (disk, [
4249
      ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
4250
      ])
4251

    
4252
  @staticmethod
4253
  def _ModifyDisk(idx, disk, params, _):
4254
    """Modifies a disk.
4255

4256
    """
4257
    changes = []
4258
    mode = params.get(constants.IDISK_MODE, None)
4259
    if mode:
4260
      disk.mode = mode
4261
      changes.append(("disk.mode/%d" % idx, disk.mode))
4262

    
4263
    name = params.get(constants.IDISK_NAME, None)
4264
    disk.name = name
4265
    changes.append(("disk.name/%d" % idx, disk.name))
4266

    
4267
    return changes
4268

    
4269
  def _RemoveDisk(self, idx, root, _):
4270
    """Removes a disk.
4271

4272
    """
4273
    (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
4274
    for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
4275
      self.cfg.SetDiskID(disk, node)
4276
      msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
4277
      if msg:
4278
        self.LogWarning("Could not remove disk/%d on node '%s': %s,"
4279
                        " continuing anyway", idx, node, msg)
4280

    
4281
    # if this is a DRBD disk, return its port to the pool
4282
    if root.dev_type in constants.LDS_DRBD:
4283
      self.cfg.AddTcpUdpPort(root.logical_id[2])
4284

    
4285
  def _CreateNewNic(self, idx, params, private):
4286
    """Creates data structure for a new network interface.
4287

4288
    """
4289
    mac = params[constants.INIC_MAC]
4290
    ip = params.get(constants.INIC_IP, None)
4291
    net = params.get(constants.INIC_NETWORK, None)
4292
    name = params.get(constants.INIC_NAME, None)
4293
    net_uuid = self.cfg.LookupNetwork(net)
4294
    #TODO: not private.filled?? can a nic have no nicparams??
4295
    nicparams = private.filled
4296
    nobj = objects.NIC(mac=mac, ip=ip, network=net_uuid, name=name,
4297
                       nicparams=nicparams)
4298
    nobj.uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
4299

    
4300
    return (nobj, [
4301
      ("nic.%d" % idx,
4302
       "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
4303
       (mac, ip, private.filled[constants.NIC_MODE],
4304
       private.filled[constants.NIC_LINK],
4305
       net)),
4306
      ])
4307

    
4308
  def _ApplyNicMods(self, idx, nic, params, private):
4309
    """Modifies a network interface.
4310

4311
    """
4312
    changes = []
4313

    
4314
    for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NAME]:
4315
      if key in params:
4316
        changes.append(("nic.%s/%d" % (key, idx), params[key]))
4317
        setattr(nic, key, params[key])
4318

    
4319
    new_net = params.get(constants.INIC_NETWORK, nic.network)
4320
    new_net_uuid = self.cfg.LookupNetwork(new_net)
4321
    if new_net_uuid != nic.network:
4322
      changes.append(("nic.network/%d" % idx, new_net))
4323
      nic.network = new_net_uuid
4324

    
4325
    if private.filled:
4326
      nic.nicparams = private.filled
4327

    
4328
      for (key, val) in nic.nicparams.items():
4329
        changes.append(("nic.%s/%d" % (key, idx), val))
4330

    
4331
    return changes
4332

    
4333
  def Exec(self, feedback_fn):
4334
    """Modifies an instance.
4335

4336
    All parameters take effect only at the next restart of the instance.
4337

4338
    """
4339
    # Process here the warnings from CheckPrereq, as we don't have a
4340
    # feedback_fn there.
4341
    # TODO: Replace with self.LogWarning
4342
    for warn in self.warn:
4343
      feedback_fn("WARNING: %s" % warn)
4344

    
4345
    assert ((self.op.disk_template is None) ^
4346
            bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
4347
      "Not owning any node resource locks"
4348

    
4349
    result = []
4350
    instance = self.instance
4351

    
4352
    # New primary node
4353
    if self.op.pnode:
4354
      instance.primary_node = self.op.pnode
4355

    
4356
    # runtime memory
4357
    if self.op.runtime_mem:
4358
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
4359
                                                     instance,
4360
                                                     self.op.runtime_mem)
4361
      rpcres.Raise("Cannot modify instance runtime memory")
4362
      result.append(("runtime_memory", self.op.runtime_mem))
4363

    
4364
    # Apply disk changes
4365
    ApplyContainerMods("disk", instance.disks, result, self.diskmod,
4366
                       self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
4367
    _UpdateIvNames(0, instance.disks)
4368

    
4369
    if self.op.disk_template:
4370
      if __debug__:
4371
        check_nodes = set(instance.all_nodes)
4372
        if self.op.remote_node:
4373
          check_nodes.add(self.op.remote_node)
4374
        for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
4375
          owned = self.owned_locks(level)
4376
          assert not (check_nodes - owned), \
4377
            ("Not owning the correct locks, owning %r, expected at least %r" %
4378
             (owned, check_nodes))
4379

    
4380
      r_shut = _ShutdownInstanceDisks(self, instance)
4381
      if not r_shut:
4382
        raise errors.OpExecError("Cannot shutdown instance disks, unable to"
4383
                                 " proceed with disk template conversion")
4384
      mode = (instance.disk_template, self.op.disk_template)
4385
      try:
4386
        self._DISK_CONVERSIONS[mode](self, feedback_fn)
4387
      except:
4388
        self.cfg.ReleaseDRBDMinors(instance.name)
4389
        raise
4390
      result.append(("disk_template", self.op.disk_template))
4391

    
4392
      assert instance.disk_template == self.op.disk_template, \
4393
        ("Expected disk template '%s', found '%s'" %
4394
         (self.op.disk_template, instance.disk_template))
4395

    
4396
    # Release node and resource locks if there are any (they might already have
4397
    # been released during disk conversion)
4398
    _ReleaseLocks(self, locking.LEVEL_NODE)
4399
    _ReleaseLocks(self, locking.LEVEL_NODE_RES)
4400

    
4401
    # Apply NIC changes
4402
    if self._new_nics is not None:
4403
      instance.nics = self._new_nics
4404
      result.extend(self._nic_chgdesc)
4405

    
4406
    # hvparams changes
4407
    if self.op.hvparams:
4408
      instance.hvparams = self.hv_inst
4409
      for key, val in self.op.hvparams.iteritems():
4410
        result.append(("hv/%s" % key, val))
4411

    
4412
    # beparams changes
4413
    if self.op.beparams:
4414
      instance.beparams = self.be_inst
4415
      for key, val in self.op.beparams.iteritems():
4416
        result.append(("be/%s" % key, val))
4417

    
4418
    # OS change
4419
    if self.op.os_name:
4420
      instance.os = self.op.os_name
4421

    
4422
    # osparams changes
4423
    if self.op.osparams:
4424
      instance.osparams = self.os_inst
4425
      for key, val in self.op.osparams.iteritems():
4426
        result.append(("os/%s" % key, val))
4427

    
4428
    if self.op.offline is None:
4429
      # Ignore
4430
      pass
4431
    elif self.op.offline:
4432
      # Mark instance as offline
4433
      self.cfg.MarkInstanceOffline(instance.name)
4434
      result.append(("admin_state", constants.ADMINST_OFFLINE))
4435
    else:
4436
      # Mark instance as online, but stopped
4437
      self.cfg.MarkInstanceDown(instance.name)
4438
      result.append(("admin_state", constants.ADMINST_DOWN))
4439

    
4440
    self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
4441

    
4442
    assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
4443
                self.owned_locks(locking.LEVEL_NODE)), \
4444
      "All node locks should have been released by now"
4445

    
4446
    return result
4447

    
4448
  _DISK_CONVERSIONS = {
4449
    (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
4450
    (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
4451
    }
4452

    
4453

    
4454
class LUInstanceChangeGroup(LogicalUnit):
4455
  HPATH = "instance-change-group"
4456
  HTYPE = constants.HTYPE_INSTANCE
4457
  REQ_BGL = False
4458

    
4459
  def ExpandNames(self):
4460
    self.share_locks = _ShareAll()
4461

    
4462
    self.needed_locks = {
4463
      locking.LEVEL_NODEGROUP: [],
4464
      locking.LEVEL_NODE: [],
4465
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
4466
      }
4467

    
4468
    self._ExpandAndLockInstance()
4469

    
4470
    if self.op.target_groups:
4471
      self.req_target_uuids = map(self.cfg.LookupNodeGroup,
4472
                                  self.op.target_groups)
4473
    else:
4474
      self.req_target_uuids = None
4475

    
4476
    self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
4477

    
4478
  def DeclareLocks(self, level):
4479
    if level == locking.LEVEL_NODEGROUP:
4480
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
4481

    
4482
      if self.req_target_uuids:
4483
        lock_groups = set(self.req_target_uuids)
4484

    
4485
        # Lock all groups used by instance optimistically; this requires going
4486
        # via the node before it's locked, requiring verification later on
4487
        instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
4488
        lock_groups.update(instance_groups)
4489
      else:
4490
        # No target groups, need to lock all of them
4491
        lock_groups = locking.ALL_SET
4492

    
4493
      self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
4494

    
4495
    elif level == locking.LEVEL_NODE:
4496
      if self.req_target_uuids:
4497
        # Lock all nodes used by instances
4498
        self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4499
        self._LockInstancesNodes()
4500

    
4501
        # Lock all nodes in all potential target groups
4502
        lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) -
4503
                       self.cfg.GetInstanceNodeGroups(self.op.instance_name))
4504
        member_nodes = [node_name
4505
                        for group in lock_groups
4506
                        for node_name in self.cfg.GetNodeGroup(group).members]
4507
        self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
4508
      else:
4509
        # Lock all nodes as all groups are potential targets
4510
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4511

    
4512
  def CheckPrereq(self):
4513
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
4514
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
4515
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
4516

    
4517
    assert (self.req_target_uuids is None or
4518
            owned_groups.issuperset(self.req_target_uuids))
4519
    assert owned_instances == set([self.op.instance_name])
4520

    
4521
    # Get instance information
4522
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4523

    
4524
    # Check if node groups for locked instance are still correct
4525
    assert owned_nodes.issuperset(self.instance.all_nodes), \
4526
      ("Instance %s's nodes changed while we kept the lock" %
4527
       self.op.instance_name)
4528

    
4529
    inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name,
4530
                                           owned_groups)
4531

    
4532
    if self.req_target_uuids:
4533
      # User requested specific target groups
4534
      self.target_uuids = frozenset(self.req_target_uuids)
4535
    else:
4536
      # All groups except those used by the instance are potential targets
4537
      self.target_uuids = owned_groups - inst_groups
4538

    
4539
    conflicting_groups = self.target_uuids & inst_groups
4540
    if conflicting_groups:
4541
      raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are"
4542
                                 " used by the instance '%s'" %
4543
                                 (utils.CommaJoin(conflicting_groups),
4544
                                  self.op.instance_name),
4545
                                 errors.ECODE_INVAL)
4546

    
4547
    if not self.target_uuids:
4548
      raise errors.OpPrereqError("There are no possible target groups",
4549
                                 errors.ECODE_INVAL)
4550

    
4551
  def BuildHooksEnv(self):
4552
    """Build hooks env.
4553

4554
    """
4555
    assert self.target_uuids
4556

    
4557
    env = {
4558
      "TARGET_GROUPS": " ".join(self.target_uuids),
4559
      }
4560

    
4561
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4562

    
4563
    return env
4564

    
4565
  def BuildHooksNodes(self):
4566
    """Build hooks nodes.
4567

4568
    """
4569
    mn = self.cfg.GetMasterNode()
4570
    return ([mn], [mn])
4571

    
4572
  def Exec(self, feedback_fn):
4573
    instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
4574

    
4575
    assert instances == [self.op.instance_name], "Instance not locked"
4576

    
4577
    req = iallocator.IAReqGroupChange(instances=instances,
4578
                                      target_groups=list(self.target_uuids))
4579
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4580

    
4581
    ial.Run(self.op.iallocator)
4582

    
4583
    if not ial.success:
4584
      raise errors.OpPrereqError("Can't compute solution for changing group of"
4585
                                 " instance '%s' using iallocator '%s': %s" %
4586
                                 (self.op.instance_name, self.op.iallocator,
4587
                                  ial.info), errors.ECODE_NORES)
4588

    
4589
    jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
4590

    
4591
    self.LogInfo("Iallocator returned %s job(s) for changing group of"
4592
                 " instance '%s'", len(jobs), self.op.instance_name)
4593

    
4594
    return ResultWithJobs(jobs)
4595

    
4596

    
4597
class TLMigrateInstance(Tasklet):
4598
  """Tasklet class for instance migration.
4599

4600
  @type live: boolean
4601
  @ivar live: whether the migration will be done live or non-live;
4602
      this variable is initalized only after CheckPrereq has run
4603
  @type cleanup: boolean
4604
  @ivar cleanup: Wheater we cleanup from a failed migration
4605
  @type iallocator: string
4606
  @ivar iallocator: The iallocator used to determine target_node
4607
  @type target_node: string
4608
  @ivar target_node: If given, the target_node to reallocate the instance to
4609
  @type failover: boolean
4610
  @ivar failover: Whether operation results in failover or migration
4611
  @type fallback: boolean
4612
  @ivar fallback: Whether fallback to failover is allowed if migration not
4613
                  possible
4614
  @type ignore_consistency: boolean
4615
  @ivar ignore_consistency: Wheter we should ignore consistency between source
4616
                            and target node
4617
  @type shutdown_timeout: int
4618
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
4619
  @type ignore_ipolicy: bool
4620
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
4621

4622
  """
4623

    
4624
  # Constants
4625
  _MIGRATION_POLL_INTERVAL = 1      # seconds
4626
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
4627

    
4628
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
4629
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
4630
               ignore_ipolicy):
4631
    """Initializes this class.
4632

4633
    """
4634
    Tasklet.__init__(self, lu)
4635

    
4636
    # Parameters
4637
    self.instance_name = instance_name
4638
    self.cleanup = cleanup
4639
    self.live = False # will be overridden later
4640
    self.failover = failover
4641
    self.fallback = fallback
4642
    self.ignore_consistency = ignore_consistency
4643
    self.shutdown_timeout = shutdown_timeout
4644
    self.ignore_ipolicy = ignore_ipolicy
4645
    self.allow_runtime_changes = allow_runtime_changes
4646

    
4647
  def CheckPrereq(self):
4648
    """Check prerequisites.
4649

4650
    This checks that the instance is in the cluster.
4651

4652
    """
4653
    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
4654
    instance = self.cfg.GetInstanceInfo(instance_name)
4655
    assert instance is not None
4656
    self.instance = instance
4657
    cluster = self.cfg.GetClusterInfo()
4658

    
4659
    if (not self.cleanup and
4660
        not instance.admin_state == constants.ADMINST_UP and
4661
        not self.failover and self.fallback):
4662
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
4663
                      " switching to failover")
4664
      self.failover = True
4665

    
4666
    if instance.disk_template not in constants.DTS_MIRRORED:
4667
      if self.failover:
4668
        text = "failovers"
4669
      else:
4670
        text = "migrations"
4671
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
4672
                                 " %s" % (instance.disk_template, text),
4673
                                 errors.ECODE_STATE)
4674

    
4675
    if instance.disk_template in constants.DTS_EXT_MIRROR:
4676
      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
4677

    
4678
      if self.lu.op.iallocator:
4679
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4680
        self._RunAllocator()
4681
      else:
4682
        # We set set self.target_node as it is required by
4683
        # BuildHooksEnv
4684
        self.target_node = self.lu.op.target_node
4685

    
4686
      # Check that the target node is correct in terms of instance policy
4687
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
4688
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4689
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4690
                                                              group_info)
4691
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4692
                              ignore=self.ignore_ipolicy)
4693

    
4694
      # self.target_node is already populated, either directly or by the
4695
      # iallocator run
4696
      target_node = self.target_node
4697
      if self.target_node == instance.primary_node:
4698
        raise errors.OpPrereqError("Cannot migrate instance %s"
4699
                                   " to its primary (%s)" %
4700
                                   (instance.name, instance.primary_node),
4701
                                   errors.ECODE_STATE)
4702

    
4703
      if len(self.lu.tasklets) == 1:
4704
        # It is safe to release locks only when we're the only tasklet
4705
        # in the LU
4706
        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
4707
                      keep=[instance.primary_node, self.target_node])
4708
        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
4709

    
4710
    else:
4711
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
4712

    
4713
      secondary_nodes = instance.secondary_nodes
4714
      if not secondary_nodes:
4715
        raise errors.ConfigurationError("No secondary node but using"
4716
                                        " %s disk template" %
4717
                                        instance.disk_template)
4718
      target_node = secondary_nodes[0]
4719
      if self.lu.op.iallocator or (self.lu.op.target_node and
4720
                                   self.lu.op.target_node != target_node):
4721
        if self.failover:
4722
          text = "failed over"
4723
        else:
4724
          text = "migrated"
4725
        raise errors.OpPrereqError("Instances with disk template %s cannot"
4726
                                   " be %s to arbitrary nodes"
4727
                                   " (neither an iallocator nor a target"
4728
                                   " node can be passed)" %
4729
                                   (instance.disk_template, text),
4730
                                   errors.ECODE_INVAL)
4731
      nodeinfo = self.cfg.GetNodeInfo(target_node)
4732
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4733
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4734
                                                              group_info)
4735
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4736
                              ignore=self.ignore_ipolicy)
4737

    
4738
    i_be = cluster.FillBE(instance)
4739

    
4740
    # check memory requirements on the secondary node
4741
    if (not self.cleanup and
4742
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
4743
      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
4744
                                               "migrating instance %s" %
4745
                                               instance.name,
4746
                                               i_be[constants.BE_MINMEM],
4747
                                               instance.hypervisor)
4748
    else:
4749
      self.lu.LogInfo("Not checking memory on the secondary node as"
4750
                      " instance will not be started")
4751

    
4752
    # check if failover must be forced instead of migration
4753
    if (not self.cleanup and not self.failover and
4754
        i_be[constants.BE_ALWAYS_FAILOVER]):
4755
      self.lu.LogInfo("Instance configured to always failover; fallback"
4756
                      " to failover")
4757
      self.failover = True
4758

    
4759
    # check bridge existance
4760
    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
4761

    
4762
    if not self.cleanup:
4763
      _CheckNodeNotDrained(self.lu, target_node)
4764
      if not self.failover:
4765
        result = self.rpc.call_instance_migratable(instance.primary_node,
4766
                                                   instance)
4767
        if result.fail_msg and self.fallback:
4768
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
4769
                          " failover")
4770
          self.failover = True
4771
        else:
4772
          result.Raise("Can't migrate, please use failover",
4773
                       prereq=True, ecode=errors.ECODE_STATE)
4774

    
4775
    assert not (self.failover and self.cleanup)
4776

    
4777
    if not self.failover:
4778
      if self.lu.op.live is not None and self.lu.op.mode is not None:
4779
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
4780
                                   " parameters are accepted",
4781
                                   errors.ECODE_INVAL)
4782
      if self.lu.op.live is not None:
4783
        if self.lu.op.live:
4784
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
4785
        else:
4786
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
4787
        # reset the 'live' parameter to None so that repeated
4788
        # invocations of CheckPrereq do not raise an exception
4789
        self.lu.op.live = None
4790
      elif self.lu.op.mode is None:
4791
        # read the default value from the hypervisor
4792
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
4793
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
4794

    
4795
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
4796
    else:
4797
      # Failover is never live
4798
      self.live = False
4799

    
4800
    if not (self.failover or self.cleanup):
4801
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4802
                                                instance.name,
4803
                                                instance.hypervisor)
4804
      remote_info.Raise("Error checking instance on node %s" %
4805
                        instance.primary_node)
4806
      instance_running = bool(remote_info.payload)
4807
      if instance_running:
4808
        self.current_mem = int(remote_info.payload["memory"])
4809

    
4810
  def _RunAllocator(self):
4811
    """Run the allocator based on input opcode.
4812

4813
    """
4814
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4815

    
4816
    # FIXME: add a self.ignore_ipolicy option
4817
    req = iallocator.IAReqRelocate(name=self.instance_name,
4818
                                   relocate_from=[self.instance.primary_node])
4819
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4820

    
4821
    ial.Run(self.lu.op.iallocator)
4822

    
4823
    if not ial.success:
4824
      raise errors.OpPrereqError("Can't compute nodes using"
4825
                                 " iallocator '%s': %s" %
4826
                                 (self.lu.op.iallocator, ial.info),
4827
                                 errors.ECODE_NORES)
4828
    self.target_node = ial.result[0]
4829
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4830
                    self.instance_name, self.lu.op.iallocator,
4831
                    utils.CommaJoin(ial.result))
4832

    
4833
  def _WaitUntilSync(self):
4834
    """Poll with custom rpc for disk sync.
4835

4836
    This uses our own step-based rpc call.
4837

4838
    """
4839
    self.feedback_fn("* wait until resync is done")
4840
    all_done = False
4841
    while not all_done:
4842
      all_done = True
4843
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4844
                                            self.nodes_ip,
4845
                                            (self.instance.disks,
4846
                                             self.instance))
4847
      min_percent = 100
4848
      for node, nres in result.items():
4849
        nres.Raise("Cannot resync disks on node %s" % node)
4850
        node_done, node_percent = nres.payload
4851
        all_done = all_done and node_done
4852
        if node_percent is not None:
4853
          min_percent = min(min_percent, node_percent)
4854
      if not all_done:
4855
        if min_percent < 100:
4856
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
4857
        time.sleep(2)
4858

    
4859
  def _EnsureSecondary(self, node):
4860
    """Demote a node to secondary.
4861

4862
    """
4863
    self.feedback_fn("* switching node %s to secondary mode" % node)
4864

    
4865
    for dev in self.instance.disks:
4866
      self.cfg.SetDiskID(dev, node)
4867

    
4868
    result = self.rpc.call_blockdev_close(node, self.instance.name,
4869
                                          self.instance.disks)
4870
    result.Raise("Cannot change disk to secondary on node %s" % node)
4871

    
4872
  def _GoStandalone(self):
4873
    """Disconnect from the network.
4874

4875
    """
4876
    self.feedback_fn("* changing into standalone mode")
4877
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4878
                                               self.instance.disks)
4879
    for node, nres in result.items():
4880
      nres.Raise("Cannot disconnect disks node %s" % node)
4881

    
4882
  def _GoReconnect(self, multimaster):
4883
    """Reconnect to the network.
4884

4885
    """
4886
    if multimaster:
4887
      msg = "dual-master"
4888
    else:
4889
      msg = "single-master"
4890
    self.feedback_fn("* changing disks into %s mode" % msg)
4891
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4892
                                           (self.instance.disks, self.instance),
4893
                                           self.instance.name, multimaster)
4894
    for node, nres in result.items():
4895
      nres.Raise("Cannot change disks config on node %s" % node)
4896

    
4897
  def _ExecCleanup(self):
4898
    """Try to cleanup after a failed migration.
4899

4900
    The cleanup is done by:
4901
      - check that the instance is running only on one node
4902
        (and update the config if needed)
4903
      - change disks on its secondary node to secondary
4904
      - wait until disks are fully synchronized
4905
      - disconnect from the network
4906
      - change disks into single-master mode
4907
      - wait again until disks are fully synchronized
4908

4909
    """
4910
    instance = self.instance
4911
    target_node = self.target_node
4912
    source_node = self.source_node
4913

    
4914
    # check running on only one node
4915
    self.feedback_fn("* checking where the instance actually runs"
4916
                     " (if this hangs, the hypervisor might be in"
4917
                     " a bad state)")
4918
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4919
    for node, result in ins_l.items():
4920
      result.Raise("Can't contact node %s" % node)
4921

    
4922
    runningon_source = instance.name in ins_l[source_node].payload
4923
    runningon_target = instance.name in ins_l[target_node].payload
4924

    
4925
    if runningon_source and runningon_target:
4926
      raise errors.OpExecError("Instance seems to be running on two nodes,"
4927
                               " or the hypervisor is confused; you will have"
4928
                               " to ensure manually that it runs only on one"
4929
                               " and restart this operation")
4930

    
4931
    if not (runningon_source or runningon_target):
4932
      raise errors.OpExecError("Instance does not seem to be running at all;"
4933
                               " in this case it's safer to repair by"
4934
                               " running 'gnt-instance stop' to ensure disk"
4935
                               " shutdown, and then restarting it")
4936

    
4937
    if runningon_target:
4938
      # the migration has actually succeeded, we need to update the config
4939
      self.feedback_fn("* instance running on secondary node (%s),"
4940
                       " updating config" % target_node)
4941
      instance.primary_node = target_node
4942
      self.cfg.Update(instance, self.feedback_fn)
4943
      demoted_node = source_node
4944
    else:
4945
      self.feedback_fn("* instance confirmed to be running on its"
4946
                       " primary node (%s)" % source_node)
4947
      demoted_node = target_node
4948

    
4949
    if instance.disk_template in constants.DTS_INT_MIRROR:
4950
      self._EnsureSecondary(demoted_node)
4951
      try:
4952
        self._WaitUntilSync()
4953
      except errors.OpExecError:
4954
        # we ignore here errors, since if the device is standalone, it
4955
        # won't be able to sync
4956
        pass
4957
      self._GoStandalone()
4958
      self._GoReconnect(False)
4959
      self._WaitUntilSync()
4960

    
4961
    self.feedback_fn("* done")
4962

    
4963
  def _RevertDiskStatus(self):
4964
    """Try to revert the disk status after a failed migration.
4965

4966
    """
4967
    target_node = self.target_node
4968
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
4969
      return
4970

    
4971
    try:
4972
      self._EnsureSecondary(target_node)
4973
      self._GoStandalone()
4974
      self._GoReconnect(False)
4975
      self._WaitUntilSync()
4976
    except errors.OpExecError, err:
4977
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
4978
                         " please try to recover the instance manually;"
4979
                         " error '%s'" % str(err))
4980

    
4981
  def _AbortMigration(self):
4982
    """Call the hypervisor code to abort a started migration.
4983

4984
    """
4985
    instance = self.instance
4986
    target_node = self.target_node
4987
    source_node = self.source_node
4988
    migration_info = self.migration_info
4989

    
4990
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
4991
                                                                 instance,
4992
                                                                 migration_info,
4993
                                                                 False)
4994
    abort_msg = abort_result.fail_msg
4995
    if abort_msg:
4996
      logging.error("Aborting migration failed on target node %s: %s",
4997
                    target_node, abort_msg)
4998
      # Don't raise an exception here, as we stil have to try to revert the
4999
      # disk status, even if this step failed.
5000

    
5001
    abort_result = self.rpc.call_instance_finalize_migration_src(
5002
      source_node, instance, False, self.live)
5003
    abort_msg = abort_result.fail_msg
5004
    if abort_msg:
5005
      logging.error("Aborting migration failed on source node %s: %s",
5006
                    source_node, abort_msg)
5007

    
5008
  def _ExecMigration(self):
5009
    """Migrate an instance.
5010

5011
    The migrate is done by:
5012
      - change the disks into dual-master mode
5013
      - wait until disks are fully synchronized again
5014
      - migrate the instance
5015
      - change disks on the new secondary node (the old primary) to secondary
5016
      - wait until disks are fully synchronized
5017
      - change disks into single-master mode
5018

5019
    """
5020
    instance = self.instance
5021
    target_node = self.target_node
5022
    source_node = self.source_node
5023

    
5024
    # Check for hypervisor version mismatch and warn the user.
5025
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
5026
                                       None, [self.instance.hypervisor], False)
5027
    for ninfo in nodeinfo.values():
5028
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
5029
                  ninfo.node)
5030
    (_, _, (src_info, )) = nodeinfo[source_node].payload
5031
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
5032

    
5033
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
5034
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
5035
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
5036
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
5037
      if src_version != dst_version:
5038
        self.feedback_fn("* warning: hypervisor version mismatch between"
5039
                         " source (%s) and target (%s) node" %
5040
                         (src_version, dst_version))
5041

    
5042
    self.feedback_fn("* checking disk consistency between source and target")
5043
    for (idx, dev) in enumerate(instance.disks):
5044
      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
5045
        raise errors.OpExecError("Disk %s is degraded or not fully"
5046
                                 " synchronized on target node,"
5047
                                 " aborting migration" % idx)
5048

    
5049
    if self.current_mem > self.tgt_free_mem:
5050
      if not self.allow_runtime_changes:
5051
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
5052
                                 " free memory to fit instance %s on target"
5053
                                 " node %s (have %dMB, need %dMB)" %
5054
                                 (instance.name, target_node,
5055
                                  self.tgt_free_mem, self.current_mem))
5056
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
5057
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
5058
                                                     instance,
5059
                                                     self.tgt_free_mem)
5060
      rpcres.Raise("Cannot modify instance runtime memory")
5061

    
5062
    # First get the migration information from the remote node
5063
    result = self.rpc.call_migration_info(source_node, instance)
5064
    msg = result.fail_msg
5065
    if msg:
5066
      log_err = ("Failed fetching source migration information from %s: %s" %
5067
                 (source_node, msg))
5068
      logging.error(log_err)
5069
      raise errors.OpExecError(log_err)
5070

    
5071
    self.migration_info = migration_info = result.payload
5072

    
5073
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5074
      # Then switch the disks to master/master mode
5075
      self._EnsureSecondary(target_node)
5076
      self._GoStandalone()
5077
      self._GoReconnect(True)
5078
      self._WaitUntilSync()
5079

    
5080
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
5081
    result = self.rpc.call_accept_instance(target_node,
5082
                                           instance,
5083
                                           migration_info,
5084
                                           self.nodes_ip[target_node])
5085

    
5086
    msg = result.fail_msg
5087
    if msg:
5088
      logging.error("Instance pre-migration failed, trying to revert"
5089
                    " disk status: %s", msg)
5090
      self.feedback_fn("Pre-migration failed, aborting")
5091
      self._AbortMigration()
5092
      self._RevertDiskStatus()
5093
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5094
                               (instance.name, msg))
5095

    
5096
    self.feedback_fn("* migrating instance to %s" % target_node)
5097
    result = self.rpc.call_instance_migrate(source_node, instance,
5098
                                            self.nodes_ip[target_node],
5099
                                            self.live)
5100
    msg = result.fail_msg
5101
    if msg:
5102
      logging.error("Instance migration failed, trying to revert"
5103
                    " disk status: %s", msg)
5104
      self.feedback_fn("Migration failed, aborting")
5105
      self._AbortMigration()
5106
      self._RevertDiskStatus()
5107
      raise errors.OpExecError("Could not migrate instance %s: %s" %
5108
                               (instance.name, msg))
5109

    
5110
    self.feedback_fn("* starting memory transfer")
5111
    last_feedback = time.time()
5112
    while True:
5113
      result = self.rpc.call_instance_get_migration_status(source_node,
5114
                                                           instance)
5115
      msg = result.fail_msg
5116
      ms = result.payload   # MigrationStatus instance
5117
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
5118
        logging.error("Instance migration failed, trying to revert"
5119
                      " disk status: %s", msg)
5120
        self.feedback_fn("Migration failed, aborting")
5121
        self._AbortMigration()
5122
        self._RevertDiskStatus()
5123
        if not msg:
5124
          msg = "hypervisor returned failure"
5125
        raise errors.OpExecError("Could not migrate instance %s: %s" %
5126
                                 (instance.name, msg))
5127

    
5128
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
5129
        self.feedback_fn("* memory transfer complete")
5130
        break
5131

    
5132
      if (utils.TimeoutExpired(last_feedback,
5133
                               self._MIGRATION_FEEDBACK_INTERVAL) and
5134
          ms.transferred_ram is not None):
5135
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
5136
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
5137
        last_feedback = time.time()
5138

    
5139
      time.sleep(self._MIGRATION_POLL_INTERVAL)
5140

    
5141
    result = self.rpc.call_instance_finalize_migration_src(source_node,
5142
                                                           instance,
5143
                                                           True,
5144
                                                           self.live)
5145
    msg = result.fail_msg
5146
    if msg:
5147
      logging.error("Instance migration succeeded, but finalization failed"
5148
                    " on the source node: %s", msg)
5149
      raise errors.OpExecError("Could not finalize instance migration: %s" %
5150
                               msg)
5151

    
5152
    instance.primary_node = target_node
5153

    
5154
    # distribute new instance config to the other nodes
5155
    self.cfg.Update(instance, self.feedback_fn)
5156

    
5157
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
5158
                                                           instance,
5159
                                                           migration_info,
5160
                                                           True)
5161
    msg = result.fail_msg
5162
    if msg:
5163
      logging.error("Instance migration succeeded, but finalization failed"
5164
                    " on the target node: %s", msg)
5165
      raise errors.OpExecError("Could not finalize instance migration: %s" %
5166
                               msg)
5167

    
5168
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5169
      self._EnsureSecondary(source_node)
5170
      self._WaitUntilSync()
5171
      self._GoStandalone()
5172
      self._GoReconnect(False)
5173
      self._WaitUntilSync()
5174

    
5175
    # If the instance's disk template is `rbd' or `ext' and there was a
5176
    # successful migration, unmap the device from the source node.
5177
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
5178
      disks = _ExpandCheckDisks(instance, instance.disks)
5179
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
5180
      for disk in disks:
5181
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
5182
        msg = result.fail_msg
5183
        if msg:
5184
          logging.error("Migration was successful, but couldn't unmap the"
5185
                        " block device %s on source node %s: %s",
5186
                        disk.iv_name, source_node, msg)
5187
          logging.error("You need to unmap the device %s manually on %s",
5188
                        disk.iv_name, source_node)
5189

    
5190
    self.feedback_fn("* done")
5191

    
5192
  def _ExecFailover(self):
5193
    """Failover an instance.
5194

5195
    The failover is done by shutting it down on its present node and
5196
    starting it on the secondary.
5197

5198
    """
5199
    instance = self.instance
5200
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
5201

    
5202
    source_node = instance.primary_node
5203
    target_node = self.target_node
5204

    
5205
    if instance.admin_state == constants.ADMINST_UP:
5206
      self.feedback_fn("* checking disk consistency between source and target")
5207
      for (idx, dev) in enumerate(instance.disks):
5208
        # for drbd, these are drbd over lvm
5209
        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
5210
                                     False):
5211
          if primary_node.offline:
5212
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
5213
                             " target node %s" %
5214
                             (primary_node.name, idx, target_node))
5215
          elif not self.ignore_consistency:
5216
            raise errors.OpExecError("Disk %s is degraded on target node,"
5217
                                     " aborting failover" % idx)
5218
    else:
5219
      self.feedback_fn("* not checking disk consistency as instance is not"
5220
                       " running")
5221

    
5222
    self.feedback_fn("* shutting down instance on source node")
5223
    logging.info("Shutting down instance %s on node %s",
5224
                 instance.name, source_node)
5225

    
5226
    result = self.rpc.call_instance_shutdown(source_node, instance,
5227
                                             self.shutdown_timeout,
5228
                                             self.lu.op.reason)
5229
    msg = result.fail_msg
5230
    if msg:
5231
      if self.ignore_consistency or primary_node.offline:
5232
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
5233
                           " proceeding anyway; please make sure node"
5234
                           " %s is down; error details: %s",
5235
                           instance.name, source_node, source_node, msg)
5236
      else:
5237
        raise errors.OpExecError("Could not shutdown instance %s on"
5238
                                 " node %s: %s" %
5239
                                 (instance.name, source_node, msg))
5240

    
5241
    self.feedback_fn("* deactivating the instance's disks on source node")
5242
    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
5243
      raise errors.OpExecError("Can't shut down the instance's disks")
5244

    
5245
    instance.primary_node = target_node
5246
    # distribute new instance config to the other nodes
5247
    self.cfg.Update(instance, self.feedback_fn)
5248

    
5249
    # Only start the instance if it's marked as up
5250
    if instance.admin_state == constants.ADMINST_UP:
5251
      self.feedback_fn("* activating the instance's disks on target node %s" %
5252
                       target_node)
5253
      logging.info("Starting instance %s on node %s",
5254
                   instance.name, target_node)
5255

    
5256
      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
5257
                                           ignore_secondaries=True)
5258
      if not disks_ok:
5259
        _ShutdownInstanceDisks(self.lu, instance)
5260
        raise errors.OpExecError("Can't activate the instance's disks")
5261

    
5262
      self.feedback_fn("* starting the instance on the target node %s" %
5263
                       target_node)
5264
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
5265
                                            False, self.lu.op.reason)
5266
      msg = result.fail_msg
5267
      if msg:
5268
        _ShutdownInstanceDisks(self.lu, instance)
5269
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
5270
                                 (instance.name, target_node, msg))
5271

    
5272
  def Exec(self, feedback_fn):
5273
    """Perform the migration.
5274

5275
    """
5276
    self.feedback_fn = feedback_fn
5277
    self.source_node = self.instance.primary_node
5278

    
5279
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
5280
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
5281
      self.target_node = self.instance.secondary_nodes[0]
5282
      # Otherwise self.target_node has been populated either
5283
      # directly, or through an iallocator.
5284

    
5285
    self.all_nodes = [self.source_node, self.target_node]
5286
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
5287
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
5288

    
5289
    if self.failover:
5290
      feedback_fn("Failover instance %s" % self.instance.name)
5291
      self._ExecFailover()
5292
    else:
5293
      feedback_fn("Migrating instance %s" % self.instance.name)
5294

    
5295
      if self.cleanup:
5296
        return self._ExecCleanup()
5297
      else:
5298
        return self._ExecMigration()