cmdlib: Extract storage related functionality
[ganeti-local] / lib / cmdlib / instance.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instances."""
23
24 import OpenSSL
25 import copy
26 import itertools
27 import logging
28 import operator
29 import os
30 import time
31
32 from ganeti import compat
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import ht
36 from ganeti import hypervisor
37 from ganeti import locking
38 from ganeti.masterd import iallocator
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import qlang
45 from ganeti import rpc
46 from ganeti import utils
47 from ganeti import query
48
49 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50   ResultWithJobs, Tasklet
51
52 from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53   INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54   _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55   _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56   _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57   _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58   _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
59   _CheckInstanceState, _ExpandNodeName
60 from ganeti.cmdlib.instance_storage import _CreateDisks, \
61   _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
62   _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63   _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64   _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65   _AssembleInstanceDisks, _ExpandCheckDisks
66 from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67   _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68   _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69   _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70   _GetInstanceInfoText, _RemoveDisks
71
72 import ganeti.masterd.instance
73
74
75 #: Type description for changes as returned by L{ApplyContainerMods}'s
76 #: callbacks
77 _TApplyContModsCbChanges = \
78   ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
79     ht.TNonEmptyString,
80     ht.TAny,
81     ])))
82
83
84 def _CheckHostnameSane(lu, name):
85   """Ensures that a given hostname resolves to a 'sane' name.
86
87   The given name is required to be a prefix of the resolved hostname,
88   to prevent accidental mismatches.
89
90   @param lu: the logical unit on behalf of which we're checking
91   @param name: the name we should resolve and check
92   @return: the resolved hostname object
93
94   """
95   hostname = netutils.GetHostname(name=name)
96   if hostname.name != name:
97     lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
98   if not utils.MatchNameComponent(name, [hostname.name]):
99     raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
100                                 " same as given hostname '%s'") %
101                                (hostname.name, name), errors.ECODE_INVAL)
102   return hostname
103
104
105 def _CheckOpportunisticLocking(op):
106   """Generate error if opportunistic locking is not possible.
107
108   """
109   if op.opportunistic_locking and not op.iallocator:
110     raise errors.OpPrereqError("Opportunistic locking is only available in"
111                                " combination with an instance allocator",
112                                errors.ECODE_INVAL)
113
114
115 def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
116   """Wrapper around IAReqInstanceAlloc.
117
118   @param op: The instance opcode
119   @param disks: The computed disks
120   @param nics: The computed nics
121   @param beparams: The full filled beparams
122   @param node_whitelist: List of nodes which should appear as online to the
123     allocator (unless the node is already marked offline)
124
125   @returns: A filled L{iallocator.IAReqInstanceAlloc}
126
127   """
128   spindle_use = beparams[constants.BE_SPINDLE_USE]
129   return iallocator.IAReqInstanceAlloc(name=op.instance_name,
130                                        disk_template=op.disk_template,
131                                        tags=op.tags,
132                                        os=op.os_type,
133                                        vcpus=beparams[constants.BE_VCPUS],
134                                        memory=beparams[constants.BE_MAXMEM],
135                                        spindle_use=spindle_use,
136                                        disks=disks,
137                                        nics=[n.ToDict() for n in nics],
138                                        hypervisor=op.hypervisor,
139                                        node_whitelist=node_whitelist)
140
141
142 def _ComputeFullBeParams(op, cluster):
143   """Computes the full beparams.
144
145   @param op: The instance opcode
146   @param cluster: The cluster config object
147
148   @return: The fully filled beparams
149
150   """
151   default_beparams = cluster.beparams[constants.PP_DEFAULT]
152   for param, value in op.beparams.iteritems():
153     if value == constants.VALUE_AUTO:
154       op.beparams[param] = default_beparams[param]
155   objects.UpgradeBeParams(op.beparams)
156   utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
157   return cluster.SimpleFillBE(op.beparams)
158
159
160 def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
161   """Computes the nics.
162
163   @param op: The instance opcode
164   @param cluster: Cluster configuration object
165   @param default_ip: The default ip to assign
166   @param cfg: An instance of the configuration object
167   @param ec_id: Execution context ID
168
169   @returns: The build up nics
170
171   """
172   nics = []
173   for nic in op.nics:
174     nic_mode_req = nic.get(constants.INIC_MODE, None)
175     nic_mode = nic_mode_req
176     if nic_mode is None or nic_mode == constants.VALUE_AUTO:
177       nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
178
179     net = nic.get(constants.INIC_NETWORK, None)
180     link = nic.get(constants.NIC_LINK, None)
181     ip = nic.get(constants.INIC_IP, None)
182
183     if net is None or net.lower() == constants.VALUE_NONE:
184       net = None
185     else:
186       if nic_mode_req is not None or link is not None:
187         raise errors.OpPrereqError("If network is given, no mode or link"
188                                    " is allowed to be passed",
189                                    errors.ECODE_INVAL)
190
191     # ip validity checks
192     if ip is None or ip.lower() == constants.VALUE_NONE:
193       nic_ip = None
194     elif ip.lower() == constants.VALUE_AUTO:
195       if not op.name_check:
196         raise errors.OpPrereqError("IP address set to auto but name checks"
197                                    " have been skipped",
198                                    errors.ECODE_INVAL)
199       nic_ip = default_ip
200     else:
201       # We defer pool operations until later, so that the iallocator has
202       # filled in the instance's node(s) dimara
203       if ip.lower() == constants.NIC_IP_POOL:
204         if net is None:
205           raise errors.OpPrereqError("if ip=pool, parameter network"
206                                      " must be passed too",
207                                      errors.ECODE_INVAL)
208
209       elif not netutils.IPAddress.IsValid(ip):
210         raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
211                                    errors.ECODE_INVAL)
212
213       nic_ip = ip
214
215     # TODO: check the ip address for uniqueness
216     if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
217       raise errors.OpPrereqError("Routed nic mode requires an ip address",
218                                  errors.ECODE_INVAL)
219
220     # MAC address verification
221     mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
222     if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
223       mac = utils.NormalizeAndValidateMac(mac)
224
225       try:
226         # TODO: We need to factor this out
227         cfg.ReserveMAC(mac, ec_id)
228       except errors.ReservationError:
229         raise errors.OpPrereqError("MAC address %s already in use"
230                                    " in cluster" % mac,
231                                    errors.ECODE_NOTUNIQUE)
232
233     #  Build nic parameters
234     nicparams = {}
235     if nic_mode_req:
236       nicparams[constants.NIC_MODE] = nic_mode
237     if link:
238       nicparams[constants.NIC_LINK] = link
239
240     check_params = cluster.SimpleFillNIC(nicparams)
241     objects.NIC.CheckParameterSyntax(check_params)
242     net_uuid = cfg.LookupNetwork(net)
243     name = nic.get(constants.INIC_NAME, None)
244     if name is not None and name.lower() == constants.VALUE_NONE:
245       name = None
246     nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
247                           network=net_uuid, nicparams=nicparams)
248     nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
249     nics.append(nic_obj)
250
251   return nics
252
253
254 def _CheckForConflictingIp(lu, ip, node):
255   """In case of conflicting IP address raise error.
256
257   @type ip: string
258   @param ip: IP address
259   @type node: string
260   @param node: node name
261
262   """
263   (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
264   if conf_net is not None:
265     raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
266                                 " network %s, but the target NIC does not." %
267                                 (ip, conf_net)),
268                                errors.ECODE_STATE)
269
270   return (None, None)
271
272
273 def _ComputeIPolicyInstanceSpecViolation(
274   ipolicy, instance_spec, disk_template,
275   _compute_fn=_ComputeIPolicySpecViolation):
276   """Compute if instance specs meets the specs of ipolicy.
277
278   @type ipolicy: dict
279   @param ipolicy: The ipolicy to verify against
280   @param instance_spec: dict
281   @param instance_spec: The instance spec to verify
282   @type disk_template: string
283   @param disk_template: the disk template of the instance
284   @param _compute_fn: The function to verify ipolicy (unittest only)
285   @see: L{_ComputeIPolicySpecViolation}
286
287   """
288   mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
289   cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
290   disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
291   disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
292   nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
293   spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
294
295   return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
296                      disk_sizes, spindle_use, disk_template)
297
298
299 def _CheckOSVariant(os_obj, name):
300   """Check whether an OS name conforms to the os variants specification.
301
302   @type os_obj: L{objects.OS}
303   @param os_obj: OS object to check
304   @type name: string
305   @param name: OS name passed by the user, to check for validity
306
307   """
308   variant = objects.OS.GetVariant(name)
309   if not os_obj.supported_variants:
310     if variant:
311       raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
312                                  " passed)" % (os_obj.name, variant),
313                                  errors.ECODE_INVAL)
314     return
315   if not variant:
316     raise errors.OpPrereqError("OS name must include a variant",
317                                errors.ECODE_INVAL)
318
319   if variant not in os_obj.supported_variants:
320     raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
321
322
323 def _CheckNodeHasOS(lu, node, os_name, force_variant):
324   """Ensure that a node supports a given OS.
325
326   @param lu: the LU on behalf of which we make the check
327   @param node: the node to check
328   @param os_name: the OS to query about
329   @param force_variant: whether to ignore variant errors
330   @raise errors.OpPrereqError: if the node is not supporting the OS
331
332   """
333   result = lu.rpc.call_os_get(node, os_name)
334   result.Raise("OS '%s' not in supported OS list for node %s" %
335                (os_name, node),
336                prereq=True, ecode=errors.ECODE_INVAL)
337   if not force_variant:
338     _CheckOSVariant(result.payload, os_name)
339
340
341 def _CheckNicsBridgesExist(lu, target_nics, target_node):
342   """Check that the brigdes needed by a list of nics exist.
343
344   """
345   cluster = lu.cfg.GetClusterInfo()
346   paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
347   brlist = [params[constants.NIC_LINK] for params in paramslist
348             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
349   if brlist:
350     result = lu.rpc.call_bridges_exist(target_node, brlist)
351     result.Raise("Error checking bridges on destination node '%s'" %
352                  target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
353
354
355 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
356   """Checks if a node has enough free memory.
357
358   This function checks if a given node has the needed amount of free
359   memory. In case the node has less memory or we cannot get the
360   information from the node, this function raises an OpPrereqError
361   exception.
362
363   @type lu: C{LogicalUnit}
364   @param lu: a logical unit from which we get configuration data
365   @type node: C{str}
366   @param node: the node to check
367   @type reason: C{str}
368   @param reason: string to use in the error message
369   @type requested: C{int}
370   @param requested: the amount of memory in MiB to check for
371   @type hypervisor_name: C{str}
372   @param hypervisor_name: the hypervisor to ask for memory stats
373   @rtype: integer
374   @return: node current free memory
375   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
376       we cannot check the node
377
378   """
379   nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
380   nodeinfo[node].Raise("Can't get data from node %s" % node,
381                        prereq=True, ecode=errors.ECODE_ENVIRON)
382   (_, _, (hv_info, )) = nodeinfo[node].payload
383
384   free_mem = hv_info.get("memory_free", None)
385   if not isinstance(free_mem, int):
386     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
387                                " was '%s'" % (node, free_mem),
388                                errors.ECODE_ENVIRON)
389   if requested > free_mem:
390     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
391                                " needed %s MiB, available %s MiB" %
392                                (node, reason, requested, free_mem),
393                                errors.ECODE_NORES)
394   return free_mem
395
396
397 class LUInstanceCreate(LogicalUnit):
398   """Create an instance.
399
400   """
401   HPATH = "instance-add"
402   HTYPE = constants.HTYPE_INSTANCE
403   REQ_BGL = False
404
405   def CheckArguments(self):
406     """Check arguments.
407
408     """
409     # do not require name_check to ease forward/backward compatibility
410     # for tools
411     if self.op.no_install and self.op.start:
412       self.LogInfo("No-installation mode selected, disabling startup")
413       self.op.start = False
414     # validate/normalize the instance name
415     self.op.instance_name = \
416       netutils.Hostname.GetNormalizedName(self.op.instance_name)
417
418     if self.op.ip_check and not self.op.name_check:
419       # TODO: make the ip check more flexible and not depend on the name check
420       raise errors.OpPrereqError("Cannot do IP address check without a name"
421                                  " check", errors.ECODE_INVAL)
422
423     # check nics' parameter names
424     for nic in self.op.nics:
425       utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
426     # check that NIC's parameters names are unique and valid
427     utils.ValidateDeviceNames("NIC", self.op.nics)
428
429     # check that disk's names are unique and valid
430     utils.ValidateDeviceNames("disk", self.op.disks)
431
432     cluster = self.cfg.GetClusterInfo()
433     if not self.op.disk_template in cluster.enabled_disk_templates:
434       raise errors.OpPrereqError("Cannot create an instance with disk template"
435                                  " '%s', because it is not enabled in the"
436                                  " cluster. Enabled disk templates are: %s." %
437                                  (self.op.disk_template,
438                                   ",".join(cluster.enabled_disk_templates)))
439
440     # check disks. parameter names and consistent adopt/no-adopt strategy
441     has_adopt = has_no_adopt = False
442     for disk in self.op.disks:
443       if self.op.disk_template != constants.DT_EXT:
444         utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
445       if constants.IDISK_ADOPT in disk:
446         has_adopt = True
447       else:
448         has_no_adopt = True
449     if has_adopt and has_no_adopt:
450       raise errors.OpPrereqError("Either all disks are adopted or none is",
451                                  errors.ECODE_INVAL)
452     if has_adopt:
453       if self.op.disk_template not in constants.DTS_MAY_ADOPT:
454         raise errors.OpPrereqError("Disk adoption is not supported for the"
455                                    " '%s' disk template" %
456                                    self.op.disk_template,
457                                    errors.ECODE_INVAL)
458       if self.op.iallocator is not None:
459         raise errors.OpPrereqError("Disk adoption not allowed with an"
460                                    " iallocator script", errors.ECODE_INVAL)
461       if self.op.mode == constants.INSTANCE_IMPORT:
462         raise errors.OpPrereqError("Disk adoption not allowed for"
463                                    " instance import", errors.ECODE_INVAL)
464     else:
465       if self.op.disk_template in constants.DTS_MUST_ADOPT:
466         raise errors.OpPrereqError("Disk template %s requires disk adoption,"
467                                    " but no 'adopt' parameter given" %
468                                    self.op.disk_template,
469                                    errors.ECODE_INVAL)
470
471     self.adopt_disks = has_adopt
472
473     # instance name verification
474     if self.op.name_check:
475       self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
476       self.op.instance_name = self.hostname1.name
477       # used in CheckPrereq for ip ping check
478       self.check_ip = self.hostname1.ip
479     else:
480       self.check_ip = None
481
482     # file storage checks
483     if (self.op.file_driver and
484         not self.op.file_driver in constants.FILE_DRIVER):
485       raise errors.OpPrereqError("Invalid file driver name '%s'" %
486                                  self.op.file_driver, errors.ECODE_INVAL)
487
488     if self.op.disk_template == constants.DT_FILE:
489       opcodes.RequireFileStorage()
490     elif self.op.disk_template == constants.DT_SHARED_FILE:
491       opcodes.RequireSharedFileStorage()
492
493     ### Node/iallocator related checks
494     _CheckIAllocatorOrNode(self, "iallocator", "pnode")
495
496     if self.op.pnode is not None:
497       if self.op.disk_template in constants.DTS_INT_MIRROR:
498         if self.op.snode is None:
499           raise errors.OpPrereqError("The networked disk templates need"
500                                      " a mirror node", errors.ECODE_INVAL)
501       elif self.op.snode:
502         self.LogWarning("Secondary node will be ignored on non-mirrored disk"
503                         " template")
504         self.op.snode = None
505
506     _CheckOpportunisticLocking(self.op)
507
508     self._cds = _GetClusterDomainSecret()
509
510     if self.op.mode == constants.INSTANCE_IMPORT:
511       # On import force_variant must be True, because if we forced it at
512       # initial install, our only chance when importing it back is that it
513       # works again!
514       self.op.force_variant = True
515
516       if self.op.no_install:
517         self.LogInfo("No-installation mode has no effect during import")
518
519     elif self.op.mode == constants.INSTANCE_CREATE:
520       if self.op.os_type is None:
521         raise errors.OpPrereqError("No guest OS specified",
522                                    errors.ECODE_INVAL)
523       if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
524         raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
525                                    " installation" % self.op.os_type,
526                                    errors.ECODE_STATE)
527       if self.op.disk_template is None:
528         raise errors.OpPrereqError("No disk template specified",
529                                    errors.ECODE_INVAL)
530
531     elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
532       # Check handshake to ensure both clusters have the same domain secret
533       src_handshake = self.op.source_handshake
534       if not src_handshake:
535         raise errors.OpPrereqError("Missing source handshake",
536                                    errors.ECODE_INVAL)
537
538       errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
539                                                            src_handshake)
540       if errmsg:
541         raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
542                                    errors.ECODE_INVAL)
543
544       # Load and check source CA
545       self.source_x509_ca_pem = self.op.source_x509_ca
546       if not self.source_x509_ca_pem:
547         raise errors.OpPrereqError("Missing source X509 CA",
548                                    errors.ECODE_INVAL)
549
550       try:
551         (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
552                                                     self._cds)
553       except OpenSSL.crypto.Error, err:
554         raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
555                                    (err, ), errors.ECODE_INVAL)
556
557       (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
558       if errcode is not None:
559         raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
560                                    errors.ECODE_INVAL)
561
562       self.source_x509_ca = cert
563
564       src_instance_name = self.op.source_instance_name
565       if not src_instance_name:
566         raise errors.OpPrereqError("Missing source instance name",
567                                    errors.ECODE_INVAL)
568
569       self.source_instance_name = \
570         netutils.GetHostname(name=src_instance_name).name
571
572     else:
573       raise errors.OpPrereqError("Invalid instance creation mode %r" %
574                                  self.op.mode, errors.ECODE_INVAL)
575
576   def ExpandNames(self):
577     """ExpandNames for CreateInstance.
578
579     Figure out the right locks for instance creation.
580
581     """
582     self.needed_locks = {}
583
584     instance_name = self.op.instance_name
585     # this is just a preventive check, but someone might still add this
586     # instance in the meantime, and creation will fail at lock-add time
587     if instance_name in self.cfg.GetInstanceList():
588       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
589                                  instance_name, errors.ECODE_EXISTS)
590
591     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
592
593     if self.op.iallocator:
594       # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
595       # specifying a group on instance creation and then selecting nodes from
596       # that group
597       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
598       self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
599
600       if self.op.opportunistic_locking:
601         self.opportunistic_locks[locking.LEVEL_NODE] = True
602         self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
603     else:
604       self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
605       nodelist = [self.op.pnode]
606       if self.op.snode is not None:
607         self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
608         nodelist.append(self.op.snode)
609       self.needed_locks[locking.LEVEL_NODE] = nodelist
610
611     # in case of import lock the source node too
612     if self.op.mode == constants.INSTANCE_IMPORT:
613       src_node = self.op.src_node
614       src_path = self.op.src_path
615
616       if src_path is None:
617         self.op.src_path = src_path = self.op.instance_name
618
619       if src_node is None:
620         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
621         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
622         self.op.src_node = None
623         if os.path.isabs(src_path):
624           raise errors.OpPrereqError("Importing an instance from a path"
625                                      " requires a source node option",
626                                      errors.ECODE_INVAL)
627       else:
628         self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
629         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
630           self.needed_locks[locking.LEVEL_NODE].append(src_node)
631         if not os.path.isabs(src_path):
632           self.op.src_path = src_path = \
633             utils.PathJoin(pathutils.EXPORT_DIR, src_path)
634
635     self.needed_locks[locking.LEVEL_NODE_RES] = \
636       _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
637
638   def _RunAllocator(self):
639     """Run the allocator based on input opcode.
640
641     """
642     if self.op.opportunistic_locking:
643       # Only consider nodes for which a lock is held
644       node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
645     else:
646       node_whitelist = None
647
648     #TODO Export network to iallocator so that it chooses a pnode
649     #     in a nodegroup that has the desired network connected to
650     req = _CreateInstanceAllocRequest(self.op, self.disks,
651                                       self.nics, self.be_full,
652                                       node_whitelist)
653     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
654
655     ial.Run(self.op.iallocator)
656
657     if not ial.success:
658       # When opportunistic locks are used only a temporary failure is generated
659       if self.op.opportunistic_locking:
660         ecode = errors.ECODE_TEMP_NORES
661       else:
662         ecode = errors.ECODE_NORES
663
664       raise errors.OpPrereqError("Can't compute nodes using"
665                                  " iallocator '%s': %s" %
666                                  (self.op.iallocator, ial.info),
667                                  ecode)
668
669     self.op.pnode = ial.result[0]
670     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
671                  self.op.instance_name, self.op.iallocator,
672                  utils.CommaJoin(ial.result))
673
674     assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
675
676     if req.RequiredNodes() == 2:
677       self.op.snode = ial.result[1]
678
679   def BuildHooksEnv(self):
680     """Build hooks env.
681
682     This runs on master, primary and secondary nodes of the instance.
683
684     """
685     env = {
686       "ADD_MODE": self.op.mode,
687       }
688     if self.op.mode == constants.INSTANCE_IMPORT:
689       env["SRC_NODE"] = self.op.src_node
690       env["SRC_PATH"] = self.op.src_path
691       env["SRC_IMAGES"] = self.src_images
692
693     env.update(_BuildInstanceHookEnv(
694       name=self.op.instance_name,
695       primary_node=self.op.pnode,
696       secondary_nodes=self.secondaries,
697       status=self.op.start,
698       os_type=self.op.os_type,
699       minmem=self.be_full[constants.BE_MINMEM],
700       maxmem=self.be_full[constants.BE_MAXMEM],
701       vcpus=self.be_full[constants.BE_VCPUS],
702       nics=_NICListToTuple(self, self.nics),
703       disk_template=self.op.disk_template,
704       disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
705               d[constants.IDISK_MODE]) for d in self.disks],
706       bep=self.be_full,
707       hvp=self.hv_full,
708       hypervisor_name=self.op.hypervisor,
709       tags=self.op.tags,
710       ))
711
712     return env
713
714   def BuildHooksNodes(self):
715     """Build hooks nodes.
716
717     """
718     nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
719     return nl, nl
720
721   def _ReadExportInfo(self):
722     """Reads the export information from disk.
723
724     It will override the opcode source node and path with the actual
725     information, if these two were not specified before.
726
727     @return: the export information
728
729     """
730     assert self.op.mode == constants.INSTANCE_IMPORT
731
732     src_node = self.op.src_node
733     src_path = self.op.src_path
734
735     if src_node is None:
736       locked_nodes = self.owned_locks(locking.LEVEL_NODE)
737       exp_list = self.rpc.call_export_list(locked_nodes)
738       found = False
739       for node in exp_list:
740         if exp_list[node].fail_msg:
741           continue
742         if src_path in exp_list[node].payload:
743           found = True
744           self.op.src_node = src_node = node
745           self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
746                                                        src_path)
747           break
748       if not found:
749         raise errors.OpPrereqError("No export found for relative path %s" %
750                                    src_path, errors.ECODE_INVAL)
751
752     _CheckNodeOnline(self, src_node)
753     result = self.rpc.call_export_info(src_node, src_path)
754     result.Raise("No export or invalid export found in dir %s" % src_path)
755
756     export_info = objects.SerializableConfigParser.Loads(str(result.payload))
757     if not export_info.has_section(constants.INISECT_EXP):
758       raise errors.ProgrammerError("Corrupted export config",
759                                    errors.ECODE_ENVIRON)
760
761     ei_version = export_info.get(constants.INISECT_EXP, "version")
762     if (int(ei_version) != constants.EXPORT_VERSION):
763       raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
764                                  (ei_version, constants.EXPORT_VERSION),
765                                  errors.ECODE_ENVIRON)
766     return export_info
767
768   def _ReadExportParams(self, einfo):
769     """Use export parameters as defaults.
770
771     In case the opcode doesn't specify (as in override) some instance
772     parameters, then try to use them from the export information, if
773     that declares them.
774
775     """
776     self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
777
778     if self.op.disk_template is None:
779       if einfo.has_option(constants.INISECT_INS, "disk_template"):
780         self.op.disk_template = einfo.get(constants.INISECT_INS,
781                                           "disk_template")
782         if self.op.disk_template not in constants.DISK_TEMPLATES:
783           raise errors.OpPrereqError("Disk template specified in configuration"
784                                      " file is not one of the allowed values:"
785                                      " %s" %
786                                      " ".join(constants.DISK_TEMPLATES),
787                                      errors.ECODE_INVAL)
788       else:
789         raise errors.OpPrereqError("No disk template specified and the export"
790                                    " is missing the disk_template information",
791                                    errors.ECODE_INVAL)
792
793     if not self.op.disks:
794       disks = []
795       # TODO: import the disk iv_name too
796       for idx in range(constants.MAX_DISKS):
797         if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
798           disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
799           disks.append({constants.IDISK_SIZE: disk_sz})
800       self.op.disks = disks
801       if not disks and self.op.disk_template != constants.DT_DISKLESS:
802         raise errors.OpPrereqError("No disk info specified and the export"
803                                    " is missing the disk information",
804                                    errors.ECODE_INVAL)
805
806     if not self.op.nics:
807       nics = []
808       for idx in range(constants.MAX_NICS):
809         if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
810           ndict = {}
811           for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
812             v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
813             ndict[name] = v
814           nics.append(ndict)
815         else:
816           break
817       self.op.nics = nics
818
819     if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
820       self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
821
822     if (self.op.hypervisor is None and
823         einfo.has_option(constants.INISECT_INS, "hypervisor")):
824       self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
825
826     if einfo.has_section(constants.INISECT_HYP):
827       # use the export parameters but do not override the ones
828       # specified by the user
829       for name, value in einfo.items(constants.INISECT_HYP):
830         if name not in self.op.hvparams:
831           self.op.hvparams[name] = value
832
833     if einfo.has_section(constants.INISECT_BEP):
834       # use the parameters, without overriding
835       for name, value in einfo.items(constants.INISECT_BEP):
836         if name not in self.op.beparams:
837           self.op.beparams[name] = value
838         # Compatibility for the old "memory" be param
839         if name == constants.BE_MEMORY:
840           if constants.BE_MAXMEM not in self.op.beparams:
841             self.op.beparams[constants.BE_MAXMEM] = value
842           if constants.BE_MINMEM not in self.op.beparams:
843             self.op.beparams[constants.BE_MINMEM] = value
844     else:
845       # try to read the parameters old style, from the main section
846       for name in constants.BES_PARAMETERS:
847         if (name not in self.op.beparams and
848             einfo.has_option(constants.INISECT_INS, name)):
849           self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
850
851     if einfo.has_section(constants.INISECT_OSP):
852       # use the parameters, without overriding
853       for name, value in einfo.items(constants.INISECT_OSP):
854         if name not in self.op.osparams:
855           self.op.osparams[name] = value
856
857   def _RevertToDefaults(self, cluster):
858     """Revert the instance parameters to the default values.
859
860     """
861     # hvparams
862     hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
863     for name in self.op.hvparams.keys():
864       if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
865         del self.op.hvparams[name]
866     # beparams
867     be_defs = cluster.SimpleFillBE({})
868     for name in self.op.beparams.keys():
869       if name in be_defs and be_defs[name] == self.op.beparams[name]:
870         del self.op.beparams[name]
871     # nic params
872     nic_defs = cluster.SimpleFillNIC({})
873     for nic in self.op.nics:
874       for name in constants.NICS_PARAMETERS:
875         if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
876           del nic[name]
877     # osparams
878     os_defs = cluster.SimpleFillOS(self.op.os_type, {})
879     for name in self.op.osparams.keys():
880       if name in os_defs and os_defs[name] == self.op.osparams[name]:
881         del self.op.osparams[name]
882
883   def _CalculateFileStorageDir(self):
884     """Calculate final instance file storage dir.
885
886     """
887     # file storage dir calculation/check
888     self.instance_file_storage_dir = None
889     if self.op.disk_template in constants.DTS_FILEBASED:
890       # build the full file storage dir path
891       joinargs = []
892
893       if self.op.disk_template == constants.DT_SHARED_FILE:
894         get_fsd_fn = self.cfg.GetSharedFileStorageDir
895       else:
896         get_fsd_fn = self.cfg.GetFileStorageDir
897
898       cfg_storagedir = get_fsd_fn()
899       if not cfg_storagedir:
900         raise errors.OpPrereqError("Cluster file storage dir not defined",
901                                    errors.ECODE_STATE)
902       joinargs.append(cfg_storagedir)
903
904       if self.op.file_storage_dir is not None:
905         joinargs.append(self.op.file_storage_dir)
906
907       joinargs.append(self.op.instance_name)
908
909       # pylint: disable=W0142
910       self.instance_file_storage_dir = utils.PathJoin(*joinargs)
911
912   def CheckPrereq(self): # pylint: disable=R0914
913     """Check prerequisites.
914
915     """
916     self._CalculateFileStorageDir()
917
918     if self.op.mode == constants.INSTANCE_IMPORT:
919       export_info = self._ReadExportInfo()
920       self._ReadExportParams(export_info)
921       self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
922     else:
923       self._old_instance_name = None
924
925     if (not self.cfg.GetVGName() and
926         self.op.disk_template not in constants.DTS_NOT_LVM):
927       raise errors.OpPrereqError("Cluster does not support lvm-based"
928                                  " instances", errors.ECODE_STATE)
929
930     if (self.op.hypervisor is None or
931         self.op.hypervisor == constants.VALUE_AUTO):
932       self.op.hypervisor = self.cfg.GetHypervisorType()
933
934     cluster = self.cfg.GetClusterInfo()
935     enabled_hvs = cluster.enabled_hypervisors
936     if self.op.hypervisor not in enabled_hvs:
937       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
938                                  " cluster (%s)" %
939                                  (self.op.hypervisor, ",".join(enabled_hvs)),
940                                  errors.ECODE_STATE)
941
942     # Check tag validity
943     for tag in self.op.tags:
944       objects.TaggableObject.ValidateTag(tag)
945
946     # check hypervisor parameter syntax (locally)
947     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
948     filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
949                                       self.op.hvparams)
950     hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
951     hv_type.CheckParameterSyntax(filled_hvp)
952     self.hv_full = filled_hvp
953     # check that we don't specify global parameters on an instance
954     _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
955                           "instance", "cluster")
956
957     # fill and remember the beparams dict
958     self.be_full = _ComputeFullBeParams(self.op, cluster)
959
960     # build os parameters
961     self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
962
963     # now that hvp/bep are in final format, let's reset to defaults,
964     # if told to do so
965     if self.op.identify_defaults:
966       self._RevertToDefaults(cluster)
967
968     # NIC buildup
969     self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
970                              self.proc.GetECId())
971
972     # disk checks/pre-build
973     default_vg = self.cfg.GetVGName()
974     self.disks = _ComputeDisks(self.op, default_vg)
975
976     if self.op.mode == constants.INSTANCE_IMPORT:
977       disk_images = []
978       for idx in range(len(self.disks)):
979         option = "disk%d_dump" % idx
980         if export_info.has_option(constants.INISECT_INS, option):
981           # FIXME: are the old os-es, disk sizes, etc. useful?
982           export_name = export_info.get(constants.INISECT_INS, option)
983           image = utils.PathJoin(self.op.src_path, export_name)
984           disk_images.append(image)
985         else:
986           disk_images.append(False)
987
988       self.src_images = disk_images
989
990       if self.op.instance_name == self._old_instance_name:
991         for idx, nic in enumerate(self.nics):
992           if nic.mac == constants.VALUE_AUTO:
993             nic_mac_ini = "nic%d_mac" % idx
994             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
995
996     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
997
998     # ip ping checks (we use the same ip that was resolved in ExpandNames)
999     if self.op.ip_check:
1000       if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1001         raise errors.OpPrereqError("IP %s of instance %s already in use" %
1002                                    (self.check_ip, self.op.instance_name),
1003                                    errors.ECODE_NOTUNIQUE)
1004
1005     #### mac address generation
1006     # By generating here the mac address both the allocator and the hooks get
1007     # the real final mac address rather than the 'auto' or 'generate' value.
1008     # There is a race condition between the generation and the instance object
1009     # creation, which means that we know the mac is valid now, but we're not
1010     # sure it will be when we actually add the instance. If things go bad
1011     # adding the instance will abort because of a duplicate mac, and the
1012     # creation job will fail.
1013     for nic in self.nics:
1014       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1015         nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1016
1017     #### allocator run
1018
1019     if self.op.iallocator is not None:
1020       self._RunAllocator()
1021
1022     # Release all unneeded node locks
1023     keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1024     _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1025     _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1026     _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1027
1028     assert (self.owned_locks(locking.LEVEL_NODE) ==
1029             self.owned_locks(locking.LEVEL_NODE_RES)), \
1030       "Node locks differ from node resource locks"
1031
1032     #### node related checks
1033
1034     # check primary node
1035     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1036     assert self.pnode is not None, \
1037       "Cannot retrieve locked node %s" % self.op.pnode
1038     if pnode.offline:
1039       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1040                                  pnode.name, errors.ECODE_STATE)
1041     if pnode.drained:
1042       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1043                                  pnode.name, errors.ECODE_STATE)
1044     if not pnode.vm_capable:
1045       raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1046                                  " '%s'" % pnode.name, errors.ECODE_STATE)
1047
1048     self.secondaries = []
1049
1050     # Fill in any IPs from IP pools. This must happen here, because we need to
1051     # know the nic's primary node, as specified by the iallocator
1052     for idx, nic in enumerate(self.nics):
1053       net_uuid = nic.network
1054       if net_uuid is not None:
1055         nobj = self.cfg.GetNetwork(net_uuid)
1056         netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1057         if netparams is None:
1058           raise errors.OpPrereqError("No netparams found for network"
1059                                      " %s. Propably not connected to"
1060                                      " node's %s nodegroup" %
1061                                      (nobj.name, self.pnode.name),
1062                                      errors.ECODE_INVAL)
1063         self.LogInfo("NIC/%d inherits netparams %s" %
1064                      (idx, netparams.values()))
1065         nic.nicparams = dict(netparams)
1066         if nic.ip is not None:
1067           if nic.ip.lower() == constants.NIC_IP_POOL:
1068             try:
1069               nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1070             except errors.ReservationError:
1071               raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1072                                          " from the address pool" % idx,
1073                                          errors.ECODE_STATE)
1074             self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1075           else:
1076             try:
1077               self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1078             except errors.ReservationError:
1079               raise errors.OpPrereqError("IP address %s already in use"
1080                                          " or does not belong to network %s" %
1081                                          (nic.ip, nobj.name),
1082                                          errors.ECODE_NOTUNIQUE)
1083
1084       # net is None, ip None or given
1085       elif self.op.conflicts_check:
1086         _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1087
1088     # mirror node verification
1089     if self.op.disk_template in constants.DTS_INT_MIRROR:
1090       if self.op.snode == pnode.name:
1091         raise errors.OpPrereqError("The secondary node cannot be the"
1092                                    " primary node", errors.ECODE_INVAL)
1093       _CheckNodeOnline(self, self.op.snode)
1094       _CheckNodeNotDrained(self, self.op.snode)
1095       _CheckNodeVmCapable(self, self.op.snode)
1096       self.secondaries.append(self.op.snode)
1097
1098       snode = self.cfg.GetNodeInfo(self.op.snode)
1099       if pnode.group != snode.group:
1100         self.LogWarning("The primary and secondary nodes are in two"
1101                         " different node groups; the disk parameters"
1102                         " from the first disk's node group will be"
1103                         " used")
1104
1105     if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1106       nodes = [pnode]
1107       if self.op.disk_template in constants.DTS_INT_MIRROR:
1108         nodes.append(snode)
1109       has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1110       if compat.any(map(has_es, nodes)):
1111         raise errors.OpPrereqError("Disk template %s not supported with"
1112                                    " exclusive storage" % self.op.disk_template,
1113                                    errors.ECODE_STATE)
1114
1115     nodenames = [pnode.name] + self.secondaries
1116
1117     if not self.adopt_disks:
1118       if self.op.disk_template == constants.DT_RBD:
1119         # _CheckRADOSFreeSpace() is just a placeholder.
1120         # Any function that checks prerequisites can be placed here.
1121         # Check if there is enough space on the RADOS cluster.
1122         _CheckRADOSFreeSpace()
1123       elif self.op.disk_template == constants.DT_EXT:
1124         # FIXME: Function that checks prereqs if needed
1125         pass
1126       else:
1127         # Check lv size requirements, if not adopting
1128         req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1129         _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1130
1131     elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1132       all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1133                                 disk[constants.IDISK_ADOPT])
1134                      for disk in self.disks])
1135       if len(all_lvs) != len(self.disks):
1136         raise errors.OpPrereqError("Duplicate volume names given for adoption",
1137                                    errors.ECODE_INVAL)
1138       for lv_name in all_lvs:
1139         try:
1140           # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1141           # to ReserveLV uses the same syntax
1142           self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1143         except errors.ReservationError:
1144           raise errors.OpPrereqError("LV named %s used by another instance" %
1145                                      lv_name, errors.ECODE_NOTUNIQUE)
1146
1147       vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1148       vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1149
1150       node_lvs = self.rpc.call_lv_list([pnode.name],
1151                                        vg_names.payload.keys())[pnode.name]
1152       node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1153       node_lvs = node_lvs.payload
1154
1155       delta = all_lvs.difference(node_lvs.keys())
1156       if delta:
1157         raise errors.OpPrereqError("Missing logical volume(s): %s" %
1158                                    utils.CommaJoin(delta),
1159                                    errors.ECODE_INVAL)
1160       online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1161       if online_lvs:
1162         raise errors.OpPrereqError("Online logical volumes found, cannot"
1163                                    " adopt: %s" % utils.CommaJoin(online_lvs),
1164                                    errors.ECODE_STATE)
1165       # update the size of disk based on what is found
1166       for dsk in self.disks:
1167         dsk[constants.IDISK_SIZE] = \
1168           int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1169                                         dsk[constants.IDISK_ADOPT])][0]))
1170
1171     elif self.op.disk_template == constants.DT_BLOCK:
1172       # Normalize and de-duplicate device paths
1173       all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1174                        for disk in self.disks])
1175       if len(all_disks) != len(self.disks):
1176         raise errors.OpPrereqError("Duplicate disk names given for adoption",
1177                                    errors.ECODE_INVAL)
1178       baddisks = [d for d in all_disks
1179                   if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1180       if baddisks:
1181         raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1182                                    " cannot be adopted" %
1183                                    (utils.CommaJoin(baddisks),
1184                                     constants.ADOPTABLE_BLOCKDEV_ROOT),
1185                                    errors.ECODE_INVAL)
1186
1187       node_disks = self.rpc.call_bdev_sizes([pnode.name],
1188                                             list(all_disks))[pnode.name]
1189       node_disks.Raise("Cannot get block device information from node %s" %
1190                        pnode.name)
1191       node_disks = node_disks.payload
1192       delta = all_disks.difference(node_disks.keys())
1193       if delta:
1194         raise errors.OpPrereqError("Missing block device(s): %s" %
1195                                    utils.CommaJoin(delta),
1196                                    errors.ECODE_INVAL)
1197       for dsk in self.disks:
1198         dsk[constants.IDISK_SIZE] = \
1199           int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1200
1201     # Verify instance specs
1202     spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1203     ispec = {
1204       constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1205       constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1206       constants.ISPEC_DISK_COUNT: len(self.disks),
1207       constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1208                                   for disk in self.disks],
1209       constants.ISPEC_NIC_COUNT: len(self.nics),
1210       constants.ISPEC_SPINDLE_USE: spindle_use,
1211       }
1212
1213     group_info = self.cfg.GetNodeGroup(pnode.group)
1214     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1215     res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1216                                                self.op.disk_template)
1217     if not self.op.ignore_ipolicy and res:
1218       msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1219              (pnode.group, group_info.name, utils.CommaJoin(res)))
1220       raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1221
1222     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
1223
1224     _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
1225     # check OS parameters (remotely)
1226     _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
1227
1228     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
1229
1230     #TODO: _CheckExtParams (remotely)
1231     # Check parameters for extstorage
1232
1233     # memory check on primary node
1234     #TODO(dynmem): use MINMEM for checking
1235     if self.op.start:
1236       _CheckNodeFreeMemory(self, self.pnode.name,
1237                            "creating instance %s" % self.op.instance_name,
1238                            self.be_full[constants.BE_MAXMEM],
1239                            self.op.hypervisor)
1240
1241     self.dry_run_result = list(nodenames)
1242
1243   def Exec(self, feedback_fn):
1244     """Create and add the instance to the cluster.
1245
1246     """
1247     instance = self.op.instance_name
1248     pnode_name = self.pnode.name
1249
1250     assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
1251                 self.owned_locks(locking.LEVEL_NODE)), \
1252       "Node locks differ from node resource locks"
1253     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1254
1255     ht_kind = self.op.hypervisor
1256     if ht_kind in constants.HTS_REQ_PORT:
1257       network_port = self.cfg.AllocatePort()
1258     else:
1259       network_port = None
1260
1261     # This is ugly but we got a chicken-egg problem here
1262     # We can only take the group disk parameters, as the instance
1263     # has no disks yet (we are generating them right here).
1264     node = self.cfg.GetNodeInfo(pnode_name)
1265     nodegroup = self.cfg.GetNodeGroup(node.group)
1266     disks = _GenerateDiskTemplate(self,
1267                                   self.op.disk_template,
1268                                   instance, pnode_name,
1269                                   self.secondaries,
1270                                   self.disks,
1271                                   self.instance_file_storage_dir,
1272                                   self.op.file_driver,
1273                                   0,
1274                                   feedback_fn,
1275                                   self.cfg.GetGroupDiskParams(nodegroup))
1276
1277     iobj = objects.Instance(name=instance, os=self.op.os_type,
1278                             primary_node=pnode_name,
1279                             nics=self.nics, disks=disks,
1280                             disk_template=self.op.disk_template,
1281                             admin_state=constants.ADMINST_DOWN,
1282                             network_port=network_port,
1283                             beparams=self.op.beparams,
1284                             hvparams=self.op.hvparams,
1285                             hypervisor=self.op.hypervisor,
1286                             osparams=self.op.osparams,
1287                             )
1288
1289     if self.op.tags:
1290       for tag in self.op.tags:
1291         iobj.AddTag(tag)
1292
1293     if self.adopt_disks:
1294       if self.op.disk_template == constants.DT_PLAIN:
1295         # rename LVs to the newly-generated names; we need to construct
1296         # 'fake' LV disks with the old data, plus the new unique_id
1297         tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
1298         rename_to = []
1299         for t_dsk, a_dsk in zip(tmp_disks, self.disks):
1300           rename_to.append(t_dsk.logical_id)
1301           t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
1302           self.cfg.SetDiskID(t_dsk, pnode_name)
1303         result = self.rpc.call_blockdev_rename(pnode_name,
1304                                                zip(tmp_disks, rename_to))
1305         result.Raise("Failed to rename adoped LVs")
1306     else:
1307       feedback_fn("* creating instance disks...")
1308       try:
1309         _CreateDisks(self, iobj)
1310       except errors.OpExecError:
1311         self.LogWarning("Device creation failed")
1312         self.cfg.ReleaseDRBDMinors(instance)
1313         raise
1314
1315     feedback_fn("adding instance %s to cluster config" % instance)
1316
1317     self.cfg.AddInstance(iobj, self.proc.GetECId())
1318
1319     # Declare that we don't want to remove the instance lock anymore, as we've
1320     # added the instance to the config
1321     del self.remove_locks[locking.LEVEL_INSTANCE]
1322
1323     if self.op.mode == constants.INSTANCE_IMPORT:
1324       # Release unused nodes
1325       _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
1326     else:
1327       # Release all nodes
1328       _ReleaseLocks(self, locking.LEVEL_NODE)
1329
1330     disk_abort = False
1331     if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
1332       feedback_fn("* wiping instance disks...")
1333       try:
1334         _WipeDisks(self, iobj)
1335       except errors.OpExecError, err:
1336         logging.exception("Wiping disks failed")
1337         self.LogWarning("Wiping instance disks failed (%s)", err)
1338         disk_abort = True
1339
1340     if disk_abort:
1341       # Something is already wrong with the disks, don't do anything else
1342       pass
1343     elif self.op.wait_for_sync:
1344       disk_abort = not _WaitForSync(self, iobj)
1345     elif iobj.disk_template in constants.DTS_INT_MIRROR:
1346       # make sure the disks are not degraded (still sync-ing is ok)
1347       feedback_fn("* checking mirrors status")
1348       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
1349     else:
1350       disk_abort = False
1351
1352     if disk_abort:
1353       _RemoveDisks(self, iobj)
1354       self.cfg.RemoveInstance(iobj.name)
1355       # Make sure the instance lock gets removed
1356       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
1357       raise errors.OpExecError("There are some degraded disks for"
1358                                " this instance")
1359
1360     # Release all node resource locks
1361     _ReleaseLocks(self, locking.LEVEL_NODE_RES)
1362
1363     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
1364       # we need to set the disks ID to the primary node, since the
1365       # preceding code might or might have not done it, depending on
1366       # disk template and other options
1367       for disk in iobj.disks:
1368         self.cfg.SetDiskID(disk, pnode_name)
1369       if self.op.mode == constants.INSTANCE_CREATE:
1370         if not self.op.no_install:
1371           pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
1372                         not self.op.wait_for_sync)
1373           if pause_sync:
1374             feedback_fn("* pausing disk sync to install instance OS")
1375             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1376                                                               (iobj.disks,
1377                                                                iobj), True)
1378             for idx, success in enumerate(result.payload):
1379               if not success:
1380                 logging.warn("pause-sync of instance %s for disk %d failed",
1381                              instance, idx)
1382
1383           feedback_fn("* running the instance OS create scripts...")
1384           # FIXME: pass debug option from opcode to backend
1385           os_add_result = \
1386             self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
1387                                           self.op.debug_level)
1388           if pause_sync:
1389             feedback_fn("* resuming disk sync")
1390             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1391                                                               (iobj.disks,
1392                                                                iobj), False)
1393             for idx, success in enumerate(result.payload):
1394               if not success:
1395                 logging.warn("resume-sync of instance %s for disk %d failed",
1396                              instance, idx)
1397
1398           os_add_result.Raise("Could not add os for instance %s"
1399                               " on node %s" % (instance, pnode_name))
1400
1401       else:
1402         if self.op.mode == constants.INSTANCE_IMPORT:
1403           feedback_fn("* running the instance OS import scripts...")
1404
1405           transfers = []
1406
1407           for idx, image in enumerate(self.src_images):
1408             if not image:
1409               continue
1410
1411             # FIXME: pass debug option from opcode to backend
1412             dt = masterd.instance.DiskTransfer("disk/%s" % idx,
1413                                                constants.IEIO_FILE, (image, ),
1414                                                constants.IEIO_SCRIPT,
1415                                                (iobj.disks[idx], idx),
1416                                                None)
1417             transfers.append(dt)
1418
1419           import_result = \
1420             masterd.instance.TransferInstanceData(self, feedback_fn,
1421                                                   self.op.src_node, pnode_name,
1422                                                   self.pnode.secondary_ip,
1423                                                   iobj, transfers)
1424           if not compat.all(import_result):
1425             self.LogWarning("Some disks for instance %s on node %s were not"
1426                             " imported successfully" % (instance, pnode_name))
1427
1428           rename_from = self._old_instance_name
1429
1430         elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1431           feedback_fn("* preparing remote import...")
1432           # The source cluster will stop the instance before attempting to make
1433           # a connection. In some cases stopping an instance can take a long
1434           # time, hence the shutdown timeout is added to the connection
1435           # timeout.
1436           connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
1437                              self.op.source_shutdown_timeout)
1438           timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1439
1440           assert iobj.primary_node == self.pnode.name
1441           disk_results = \
1442             masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
1443                                           self.source_x509_ca,
1444                                           self._cds, timeouts)
1445           if not compat.all(disk_results):
1446             # TODO: Should the instance still be started, even if some disks
1447             # failed to import (valid for local imports, too)?
1448             self.LogWarning("Some disks for instance %s on node %s were not"
1449                             " imported successfully" % (instance, pnode_name))
1450
1451           rename_from = self.source_instance_name
1452
1453         else:
1454           # also checked in the prereq part
1455           raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
1456                                        % self.op.mode)
1457
1458         # Run rename script on newly imported instance
1459         assert iobj.name == instance
1460         feedback_fn("Running rename script for %s" % instance)
1461         result = self.rpc.call_instance_run_rename(pnode_name, iobj,
1462                                                    rename_from,
1463                                                    self.op.debug_level)
1464         if result.fail_msg:
1465           self.LogWarning("Failed to run rename script for %s on node"
1466                           " %s: %s" % (instance, pnode_name, result.fail_msg))
1467
1468     assert not self.owned_locks(locking.LEVEL_NODE_RES)
1469
1470     if self.op.start:
1471       iobj.admin_state = constants.ADMINST_UP
1472       self.cfg.Update(iobj, feedback_fn)
1473       logging.info("Starting instance %s on node %s", instance, pnode_name)
1474       feedback_fn("* starting instance...")
1475       result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
1476                                             False, self.op.reason)
1477       result.Raise("Could not start instance")
1478
1479     return list(iobj.all_nodes)
1480
1481
1482 class LUInstanceRename(LogicalUnit):
1483   """Rename an instance.
1484
1485   """
1486   HPATH = "instance-rename"
1487   HTYPE = constants.HTYPE_INSTANCE
1488
1489   def CheckArguments(self):
1490     """Check arguments.
1491
1492     """
1493     if self.op.ip_check and not self.op.name_check:
1494       # TODO: make the ip check more flexible and not depend on the name check
1495       raise errors.OpPrereqError("IP address check requires a name check",
1496                                  errors.ECODE_INVAL)
1497
1498   def BuildHooksEnv(self):
1499     """Build hooks env.
1500
1501     This runs on master, primary and secondary nodes of the instance.
1502
1503     """
1504     env = _BuildInstanceHookEnvByObject(self, self.instance)
1505     env["INSTANCE_NEW_NAME"] = self.op.new_name
1506     return env
1507
1508   def BuildHooksNodes(self):
1509     """Build hooks nodes.
1510
1511     """
1512     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1513     return (nl, nl)
1514
1515   def CheckPrereq(self):
1516     """Check prerequisites.
1517
1518     This checks that the instance is in the cluster and is not running.
1519
1520     """
1521     self.op.instance_name = _ExpandInstanceName(self.cfg,
1522                                                 self.op.instance_name)
1523     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1524     assert instance is not None
1525     _CheckNodeOnline(self, instance.primary_node)
1526     _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
1527                         msg="cannot rename")
1528     self.instance = instance
1529
1530     new_name = self.op.new_name
1531     if self.op.name_check:
1532       hostname = _CheckHostnameSane(self, new_name)
1533       new_name = self.op.new_name = hostname.name
1534       if (self.op.ip_check and
1535           netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
1536         raise errors.OpPrereqError("IP %s of instance %s already in use" %
1537                                    (hostname.ip, new_name),
1538                                    errors.ECODE_NOTUNIQUE)
1539
1540     instance_list = self.cfg.GetInstanceList()
1541     if new_name in instance_list and new_name != instance.name:
1542       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1543                                  new_name, errors.ECODE_EXISTS)
1544
1545   def Exec(self, feedback_fn):
1546     """Rename the instance.
1547
1548     """
1549     inst = self.instance
1550     old_name = inst.name
1551
1552     rename_file_storage = False
1553     if (inst.disk_template in constants.DTS_FILEBASED and
1554         self.op.new_name != inst.name):
1555       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1556       rename_file_storage = True
1557
1558     self.cfg.RenameInstance(inst.name, self.op.new_name)
1559     # Change the instance lock. This is definitely safe while we hold the BGL.
1560     # Otherwise the new lock would have to be added in acquired mode.
1561     assert self.REQ_BGL
1562     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1563     self.glm.remove(locking.LEVEL_INSTANCE, old_name)
1564     self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
1565
1566     # re-read the instance from the configuration after rename
1567     inst = self.cfg.GetInstanceInfo(self.op.new_name)
1568
1569     if rename_file_storage:
1570       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1571       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
1572                                                      old_file_storage_dir,
1573                                                      new_file_storage_dir)
1574       result.Raise("Could not rename on node %s directory '%s' to '%s'"
1575                    " (but the instance has been renamed in Ganeti)" %
1576                    (inst.primary_node, old_file_storage_dir,
1577                     new_file_storage_dir))
1578
1579     _StartInstanceDisks(self, inst, None)
1580     # update info on disks
1581     info = _GetInstanceInfoText(inst)
1582     for (idx, disk) in enumerate(inst.disks):
1583       for node in inst.all_nodes:
1584         self.cfg.SetDiskID(disk, node)
1585         result = self.rpc.call_blockdev_setinfo(node, disk, info)
1586         if result.fail_msg:
1587           self.LogWarning("Error setting info on node %s for disk %s: %s",
1588                           node, idx, result.fail_msg)
1589     try:
1590       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
1591                                                  old_name, self.op.debug_level)
1592       msg = result.fail_msg
1593       if msg:
1594         msg = ("Could not run OS rename script for instance %s on node %s"
1595                " (but the instance has been renamed in Ganeti): %s" %
1596                (inst.name, inst.primary_node, msg))
1597         self.LogWarning(msg)
1598     finally:
1599       _ShutdownInstanceDisks(self, inst)
1600
1601     return inst.name
1602
1603
1604 class LUInstanceRemove(LogicalUnit):
1605   """Remove an instance.
1606
1607   """
1608   HPATH = "instance-remove"
1609   HTYPE = constants.HTYPE_INSTANCE
1610   REQ_BGL = False
1611
1612   def ExpandNames(self):
1613     self._ExpandAndLockInstance()
1614     self.needed_locks[locking.LEVEL_NODE] = []
1615     self.needed_locks[locking.LEVEL_NODE_RES] = []
1616     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1617
1618   def DeclareLocks(self, level):
1619     if level == locking.LEVEL_NODE:
1620       self._LockInstancesNodes()
1621     elif level == locking.LEVEL_NODE_RES:
1622       # Copy node locks
1623       self.needed_locks[locking.LEVEL_NODE_RES] = \
1624         _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1625
1626   def BuildHooksEnv(self):
1627     """Build hooks env.
1628
1629     This runs on master, primary and secondary nodes of the instance.
1630
1631     """
1632     env = _BuildInstanceHookEnvByObject(self, self.instance)
1633     env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
1634     return env
1635
1636   def BuildHooksNodes(self):
1637     """Build hooks nodes.
1638
1639     """
1640     nl = [self.cfg.GetMasterNode()]
1641     nl_post = list(self.instance.all_nodes) + nl
1642     return (nl, nl_post)
1643
1644   def CheckPrereq(self):
1645     """Check prerequisites.
1646
1647     This checks that the instance is in the cluster.
1648
1649     """
1650     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651     assert self.instance is not None, \
1652       "Cannot retrieve locked instance %s" % self.op.instance_name
1653
1654   def Exec(self, feedback_fn):
1655     """Remove the instance.
1656
1657     """
1658     instance = self.instance
1659     logging.info("Shutting down instance %s on node %s",
1660                  instance.name, instance.primary_node)
1661
1662     result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
1663                                              self.op.shutdown_timeout,
1664                                              self.op.reason)
1665     msg = result.fail_msg
1666     if msg:
1667       if self.op.ignore_failures:
1668         feedback_fn("Warning: can't shutdown instance: %s" % msg)
1669       else:
1670         raise errors.OpExecError("Could not shutdown instance %s on"
1671                                  " node %s: %s" %
1672                                  (instance.name, instance.primary_node, msg))
1673
1674     assert (self.owned_locks(locking.LEVEL_NODE) ==
1675             self.owned_locks(locking.LEVEL_NODE_RES))
1676     assert not (set(instance.all_nodes) -
1677                 self.owned_locks(locking.LEVEL_NODE)), \
1678       "Not owning correct locks"
1679
1680     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
1681
1682
1683 def _CheckInstanceBridgesExist(lu, instance, node=None):
1684   """Check that the brigdes needed by an instance exist.
1685
1686   """
1687   if node is None:
1688     node = instance.primary_node
1689   _CheckNicsBridgesExist(lu, instance.nics, node)
1690
1691
1692 class LUInstanceMove(LogicalUnit):
1693   """Move an instance by data-copying.
1694
1695   """
1696   HPATH = "instance-move"
1697   HTYPE = constants.HTYPE_INSTANCE
1698   REQ_BGL = False
1699
1700   def ExpandNames(self):
1701     self._ExpandAndLockInstance()
1702     target_node = _ExpandNodeName(self.cfg, self.op.target_node)
1703     self.op.target_node = target_node
1704     self.needed_locks[locking.LEVEL_NODE] = [target_node]
1705     self.needed_locks[locking.LEVEL_NODE_RES] = []
1706     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1707
1708   def DeclareLocks(self, level):
1709     if level == locking.LEVEL_NODE:
1710       self._LockInstancesNodes(primary_only=True)
1711     elif level == locking.LEVEL_NODE_RES:
1712       # Copy node locks
1713       self.needed_locks[locking.LEVEL_NODE_RES] = \
1714         _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1715
1716   def BuildHooksEnv(self):
1717     """Build hooks env.
1718
1719     This runs on master, primary and secondary nodes of the instance.
1720
1721     """
1722     env = {
1723       "TARGET_NODE": self.op.target_node,
1724       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1725       }
1726     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1727     return env
1728
1729   def BuildHooksNodes(self):
1730     """Build hooks nodes.
1731
1732     """
1733     nl = [
1734       self.cfg.GetMasterNode(),
1735       self.instance.primary_node,
1736       self.op.target_node,
1737       ]
1738     return (nl, nl)
1739
1740   def CheckPrereq(self):
1741     """Check prerequisites.
1742
1743     This checks that the instance is in the cluster.
1744
1745     """
1746     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1747     assert self.instance is not None, \
1748       "Cannot retrieve locked instance %s" % self.op.instance_name
1749
1750     if instance.disk_template not in constants.DTS_COPYABLE:
1751       raise errors.OpPrereqError("Disk template %s not suitable for copying" %
1752                                  instance.disk_template, errors.ECODE_STATE)
1753
1754     node = self.cfg.GetNodeInfo(self.op.target_node)
1755     assert node is not None, \
1756       "Cannot retrieve locked node %s" % self.op.target_node
1757
1758     self.target_node = target_node = node.name
1759
1760     if target_node == instance.primary_node:
1761       raise errors.OpPrereqError("Instance %s is already on the node %s" %
1762                                  (instance.name, target_node),
1763                                  errors.ECODE_STATE)
1764
1765     bep = self.cfg.GetClusterInfo().FillBE(instance)
1766
1767     for idx, dsk in enumerate(instance.disks):
1768       if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
1769         raise errors.OpPrereqError("Instance disk %d has a complex layout,"
1770                                    " cannot copy" % idx, errors.ECODE_STATE)
1771
1772     _CheckNodeOnline(self, target_node)
1773     _CheckNodeNotDrained(self, target_node)
1774     _CheckNodeVmCapable(self, target_node)
1775     cluster = self.cfg.GetClusterInfo()
1776     group_info = self.cfg.GetNodeGroup(node.group)
1777     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1778     _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
1779                             ignore=self.op.ignore_ipolicy)
1780
1781     if instance.admin_state == constants.ADMINST_UP:
1782       # check memory requirements on the secondary node
1783       _CheckNodeFreeMemory(self, target_node,
1784                            "failing over instance %s" %
1785                            instance.name, bep[constants.BE_MAXMEM],
1786                            instance.hypervisor)
1787     else:
1788       self.LogInfo("Not checking memory on the secondary node as"
1789                    " instance will not be started")
1790
1791     # check bridge existance
1792     _CheckInstanceBridgesExist(self, instance, node=target_node)
1793
1794   def Exec(self, feedback_fn):
1795     """Move an instance.
1796
1797     The move is done by shutting it down on its present node, copying
1798     the data over (slow) and starting it on the new node.
1799
1800     """
1801     instance = self.instance
1802
1803     source_node = instance.primary_node
1804     target_node = self.target_node
1805
1806     self.LogInfo("Shutting down instance %s on source node %s",
1807                  instance.name, source_node)
1808
1809     assert (self.owned_locks(locking.LEVEL_NODE) ==
1810             self.owned_locks(locking.LEVEL_NODE_RES))
1811
1812     result = self.rpc.call_instance_shutdown(source_node, instance,
1813                                              self.op.shutdown_timeout,
1814                                              self.op.reason)
1815     msg = result.fail_msg
1816     if msg:
1817       if self.op.ignore_consistency:
1818         self.LogWarning("Could not shutdown instance %s on node %s."
1819                         " Proceeding anyway. Please make sure node"
1820                         " %s is down. Error details: %s",
1821                         instance.name, source_node, source_node, msg)
1822       else:
1823         raise errors.OpExecError("Could not shutdown instance %s on"
1824                                  " node %s: %s" %
1825                                  (instance.name, source_node, msg))
1826
1827     # create the target disks
1828     try:
1829       _CreateDisks(self, instance, target_node=target_node)
1830     except errors.OpExecError:
1831       self.LogWarning("Device creation failed")
1832       self.cfg.ReleaseDRBDMinors(instance.name)
1833       raise
1834
1835     cluster_name = self.cfg.GetClusterInfo().cluster_name
1836
1837     errs = []
1838     # activate, get path, copy the data over
1839     for idx, disk in enumerate(instance.disks):
1840       self.LogInfo("Copying data for disk %d", idx)
1841       result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
1842                                                instance.name, True, idx)
1843       if result.fail_msg:
1844         self.LogWarning("Can't assemble newly created disk %d: %s",
1845                         idx, result.fail_msg)
1846         errs.append(result.fail_msg)
1847         break
1848       dev_path = result.payload
1849       result = self.rpc.call_blockdev_export(source_node, (disk, instance),
1850                                              target_node, dev_path,
1851                                              cluster_name)
1852       if result.fail_msg:
1853         self.LogWarning("Can't copy data over for disk %d: %s",
1854                         idx, result.fail_msg)
1855         errs.append(result.fail_msg)
1856         break
1857
1858     if errs:
1859       self.LogWarning("Some disks failed to copy, aborting")
1860       try:
1861         _RemoveDisks(self, instance, target_node=target_node)
1862       finally:
1863         self.cfg.ReleaseDRBDMinors(instance.name)
1864         raise errors.OpExecError("Errors during disk copy: %s" %
1865                                  (",".join(errs),))
1866
1867     instance.primary_node = target_node
1868     self.cfg.Update(instance, feedback_fn)
1869
1870     self.LogInfo("Removing the disks on the original node")
1871     _RemoveDisks(self, instance, target_node=source_node)
1872
1873     # Only start the instance if it's marked as up
1874     if instance.admin_state == constants.ADMINST_UP:
1875       self.LogInfo("Starting instance %s on node %s",
1876                    instance.name, target_node)
1877
1878       disks_ok, _ = _AssembleInstanceDisks(self, instance,
1879                                            ignore_secondaries=True)
1880       if not disks_ok:
1881         _ShutdownInstanceDisks(self, instance)
1882         raise errors.OpExecError("Can't activate the instance's disks")
1883
1884       result = self.rpc.call_instance_start(target_node,
1885                                             (instance, None, None), False,
1886                                             self.op.reason)
1887       msg = result.fail_msg
1888       if msg:
1889         _ShutdownInstanceDisks(self, instance)
1890         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
1891                                  (instance.name, target_node, msg))
1892
1893
1894 def _GetInstanceConsole(cluster, instance):
1895   """Returns console information for an instance.
1896
1897   @type cluster: L{objects.Cluster}
1898   @type instance: L{objects.Instance}
1899   @rtype: dict
1900
1901   """
1902   hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
1903   # beparams and hvparams are passed separately, to avoid editing the
1904   # instance and then saving the defaults in the instance itself.
1905   hvparams = cluster.FillHV(instance)
1906   beparams = cluster.FillBE(instance)
1907   console = hyper.GetInstanceConsole(instance, hvparams, beparams)
1908
1909   assert console.instance == instance.name
1910   assert console.Validate()
1911
1912   return console.ToDict()
1913
1914
1915 class _InstanceQuery(_QueryBase):
1916   FIELDS = query.INSTANCE_FIELDS
1917
1918   def ExpandNames(self, lu):
1919     lu.needed_locks = {}
1920     lu.share_locks = _ShareAll()
1921
1922     if self.names:
1923       self.wanted = _GetWantedInstances(lu, self.names)
1924     else:
1925       self.wanted = locking.ALL_SET
1926
1927     self.do_locking = (self.use_locking and
1928                        query.IQ_LIVE in self.requested_data)
1929     if self.do_locking:
1930       lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1931       lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1932       lu.needed_locks[locking.LEVEL_NODE] = []
1933       lu.needed_locks[locking.LEVEL_NETWORK] = []
1934       lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1935
1936     self.do_grouplocks = (self.do_locking and
1937                           query.IQ_NODES in self.requested_data)
1938
1939   def DeclareLocks(self, lu, level):
1940     if self.do_locking:
1941       if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1942         assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1943
1944         # Lock all groups used by instances optimistically; this requires going
1945         # via the node before it's locked, requiring verification later on
1946         lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1947           set(group_uuid
1948               for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1949               for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1950       elif level == locking.LEVEL_NODE:
1951         lu._LockInstancesNodes() # pylint: disable=W0212
1952
1953       elif level == locking.LEVEL_NETWORK:
1954         lu.needed_locks[locking.LEVEL_NETWORK] = \
1955           frozenset(net_uuid
1956                     for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1957                     for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1958
1959   @staticmethod
1960   def _CheckGroupLocks(lu):
1961     owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1962     owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1963
1964     # Check if node groups for locked instances are still correct
1965     for instance_name in owned_instances:
1966       _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1967
1968   def _GetQueryData(self, lu):
1969     """Computes the list of instances and their attributes.
1970
1971     """
1972     if self.do_grouplocks:
1973       self._CheckGroupLocks(lu)
1974
1975     cluster = lu.cfg.GetClusterInfo()
1976     all_info = lu.cfg.GetAllInstancesInfo()
1977
1978     instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1979
1980     instance_list = [all_info[name] for name in instance_names]
1981     nodes = frozenset(itertools.chain(*(inst.all_nodes
1982                                         for inst in instance_list)))
1983     hv_list = list(set([inst.hypervisor for inst in instance_list]))
1984     bad_nodes = []
1985     offline_nodes = []
1986     wrongnode_inst = set()
1987
1988     # Gather data as requested
1989     if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1990       live_data = {}
1991       node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1992       for name in nodes:
1993         result = node_data[name]
1994         if result.offline:
1995           # offline nodes will be in both lists
1996           assert result.fail_msg
1997           offline_nodes.append(name)
1998         if result.fail_msg:
1999           bad_nodes.append(name)
2000         elif result.payload:
2001           for inst in result.payload:
2002             if inst in all_info:
2003               if all_info[inst].primary_node == name:
2004                 live_data.update(result.payload)
2005               else:
2006                 wrongnode_inst.add(inst)
2007             else:
2008               # orphan instance; we don't list it here as we don't
2009               # handle this case yet in the output of instance listing
2010               logging.warning("Orphan instance '%s' found on node %s",
2011                               inst, name)
2012               # else no instance is alive
2013     else:
2014       live_data = {}
2015
2016     if query.IQ_DISKUSAGE in self.requested_data:
2017       gmi = ganeti.masterd.instance
2018       disk_usage = dict((inst.name,
2019                          gmi.ComputeDiskSize(inst.disk_template,
2020                                              [{constants.IDISK_SIZE: disk.size}
2021                                               for disk in inst.disks]))
2022                         for inst in instance_list)
2023     else:
2024       disk_usage = None
2025
2026     if query.IQ_CONSOLE in self.requested_data:
2027       consinfo = {}
2028       for inst in instance_list:
2029         if inst.name in live_data:
2030           # Instance is running
2031           consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2032         else:
2033           consinfo[inst.name] = None
2034       assert set(consinfo.keys()) == set(instance_names)
2035     else:
2036       consinfo = None
2037
2038     if query.IQ_NODES in self.requested_data:
2039       node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2040                                             instance_list)))
2041       nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2042       groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2043                     for uuid in set(map(operator.attrgetter("group"),
2044                                         nodes.values())))
2045     else:
2046       nodes = None
2047       groups = None
2048
2049     if query.IQ_NETWORKS in self.requested_data:
2050       net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2051                                     for i in instance_list))
2052       networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2053     else:
2054       networks = None
2055
2056     return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2057                                    disk_usage, offline_nodes, bad_nodes,
2058                                    live_data, wrongnode_inst, consinfo,
2059                                    nodes, groups, networks)
2060
2061
2062 class LUInstanceQuery(NoHooksLU):
2063   """Logical unit for querying instances.
2064
2065   """
2066   # pylint: disable=W0142
2067   REQ_BGL = False
2068
2069   def CheckArguments(self):
2070     self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2071                              self.op.output_fields, self.op.use_locking)
2072
2073   def ExpandNames(self):
2074     self.iq.ExpandNames(self)
2075
2076   def DeclareLocks(self, level):
2077     self.iq.DeclareLocks(self, level)
2078
2079   def Exec(self, feedback_fn):
2080     return self.iq.OldStyleQuery(self)
2081
2082
2083 class LUInstanceQueryData(NoHooksLU):
2084   """Query runtime instance data.
2085
2086   """
2087   REQ_BGL = False
2088
2089   def ExpandNames(self):
2090     self.needed_locks = {}
2091
2092     # Use locking if requested or when non-static information is wanted
2093     if not (self.op.static or self.op.use_locking):
2094       self.LogWarning("Non-static data requested, locks need to be acquired")
2095       self.op.use_locking = True
2096
2097     if self.op.instances or not self.op.use_locking:
2098       # Expand instance names right here
2099       self.wanted_names = _GetWantedInstances(self, self.op.instances)
2100     else:
2101       # Will use acquired locks
2102       self.wanted_names = None
2103
2104     if self.op.use_locking:
2105       self.share_locks = _ShareAll()
2106
2107       if self.wanted_names is None:
2108         self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2109       else:
2110         self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2111
2112       self.needed_locks[locking.LEVEL_NODEGROUP] = []
2113       self.needed_locks[locking.LEVEL_NODE] = []
2114       self.needed_locks[locking.LEVEL_NETWORK] = []
2115       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116
2117   def DeclareLocks(self, level):
2118     if self.op.use_locking:
2119       owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2120       if level == locking.LEVEL_NODEGROUP:
2121
2122         # Lock all groups used by instances optimistically; this requires going
2123         # via the node before it's locked, requiring verification later on
2124         self.needed_locks[locking.LEVEL_NODEGROUP] = \
2125           frozenset(group_uuid
2126                     for instance_name in owned_instances
2127                     for group_uuid in
2128                     self.cfg.GetInstanceNodeGroups(instance_name))
2129
2130       elif level == locking.LEVEL_NODE:
2131         self._LockInstancesNodes()
2132
2133       elif level == locking.LEVEL_NETWORK:
2134         self.needed_locks[locking.LEVEL_NETWORK] = \
2135           frozenset(net_uuid
2136                     for instance_name in owned_instances
2137                     for net_uuid in
2138                     self.cfg.GetInstanceNetworks(instance_name))
2139
2140   def CheckPrereq(self):
2141     """Check prerequisites.
2142
2143     This only checks the optional instance list against the existing names.
2144
2145     """
2146     owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2147     owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2148     owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2149     owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2150
2151     if self.wanted_names is None:
2152       assert self.op.use_locking, "Locking was not used"
2153       self.wanted_names = owned_instances
2154
2155     instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2156
2157     if self.op.use_locking:
2158       _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2159                                 None)
2160     else:
2161       assert not (owned_instances or owned_groups or
2162                   owned_nodes or owned_networks)
2163
2164     self.wanted_instances = instances.values()
2165
2166   def _ComputeBlockdevStatus(self, node, instance, dev):
2167     """Returns the status of a block device
2168
2169     """
2170     if self.op.static or not node:
2171       return None
2172
2173     self.cfg.SetDiskID(dev, node)
2174
2175     result = self.rpc.call_blockdev_find(node, dev)
2176     if result.offline:
2177       return None
2178
2179     result.Raise("Can't compute disk status for %s" % instance.name)
2180
2181     status = result.payload
2182     if status is None:
2183       return None
2184
2185     return (status.dev_path, status.major, status.minor,
2186             status.sync_percent, status.estimated_time,
2187             status.is_degraded, status.ldisk_status)
2188
2189   def _ComputeDiskStatus(self, instance, snode, dev):
2190     """Compute block device status.
2191
2192     """
2193     (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
2194
2195     return self._ComputeDiskStatusInner(instance, snode, anno_dev)
2196
2197   def _ComputeDiskStatusInner(self, instance, snode, dev):
2198     """Compute block device status.
2199
2200     @attention: The device has to be annotated already.
2201
2202     """
2203     if dev.dev_type in constants.LDS_DRBD:
2204       # we change the snode then (otherwise we use the one passed in)
2205       if dev.logical_id[0] == instance.primary_node:
2206         snode = dev.logical_id[1]
2207       else:
2208         snode = dev.logical_id[0]
2209
2210     dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
2211                                               instance, dev)
2212     dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
2213
2214     if dev.children:
2215       dev_children = map(compat.partial(self._ComputeDiskStatusInner,
2216                                         instance, snode),
2217                          dev.children)
2218     else:
2219       dev_children = []
2220
2221     return {
2222       "iv_name": dev.iv_name,
2223       "dev_type": dev.dev_type,
2224       "logical_id": dev.logical_id,
2225       "physical_id": dev.physical_id,
2226       "pstatus": dev_pstatus,
2227       "sstatus": dev_sstatus,
2228       "children": dev_children,
2229       "mode": dev.mode,
2230       "size": dev.size,
2231       "name": dev.name,
2232       "uuid": dev.uuid,
2233       }
2234
2235   def Exec(self, feedback_fn):
2236     """Gather and return data"""
2237     result = {}
2238
2239     cluster = self.cfg.GetClusterInfo()
2240
2241     node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
2242     nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
2243
2244     groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
2245                                                  for node in nodes.values()))
2246
2247     group2name_fn = lambda uuid: groups[uuid].name
2248     for instance in self.wanted_instances:
2249       pnode = nodes[instance.primary_node]
2250
2251       if self.op.static or pnode.offline:
2252         remote_state = None
2253         if pnode.offline:
2254           self.LogWarning("Primary node %s is marked offline, returning static"
2255                           " information only for instance %s" %
2256                           (pnode.name, instance.name))
2257       else:
2258         remote_info = self.rpc.call_instance_info(instance.primary_node,
2259                                                   instance.name,
2260                                                   instance.hypervisor)
2261         remote_info.Raise("Error checking node %s" % instance.primary_node)
2262         remote_info = remote_info.payload
2263         if remote_info and "state" in remote_info:
2264           remote_state = "up"
2265         else:
2266           if instance.admin_state == constants.ADMINST_UP:
2267             remote_state = "down"
2268           else:
2269             remote_state = instance.admin_state
2270
2271       disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
2272                   instance.disks)
2273
2274       snodes_group_uuids = [nodes[snode_name].group
2275                             for snode_name in instance.secondary_nodes]
2276
2277       result[instance.name] = {
2278         "name": instance.name,
2279         "config_state": instance.admin_state,
2280         "run_state": remote_state,
2281         "pnode": instance.primary_node,
2282         "pnode_group_uuid": pnode.group,
2283         "pnode_group_name": group2name_fn(pnode.group),
2284         "snodes": instance.secondary_nodes,
2285         "snodes_group_uuids": snodes_group_uuids,
2286         "snodes_group_names": map(group2name_fn, snodes_group_uuids),
2287         "os": instance.os,
2288         # this happens to be the same format used for hooks
2289         "nics": _NICListToTuple(self, instance.nics),
2290         "disk_template": instance.disk_template,
2291         "disks": disks,
2292         "hypervisor": instance.hypervisor,
2293         "network_port": instance.network_port,
2294         "hv_instance": instance.hvparams,
2295         "hv_actual": cluster.FillHV(instance, skip_globals=True),
2296         "be_instance": instance.beparams,
2297         "be_actual": cluster.FillBE(instance),
2298         "os_instance": instance.osparams,
2299         "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
2300         "serial_no": instance.serial_no,
2301         "mtime": instance.mtime,
2302         "ctime": instance.ctime,
2303         "uuid": instance.uuid,
2304         }
2305
2306     return result
2307
2308
2309 class LUInstanceStartup(LogicalUnit):
2310   """Starts an instance.
2311
2312   """
2313   HPATH = "instance-start"
2314   HTYPE = constants.HTYPE_INSTANCE
2315   REQ_BGL = False
2316
2317   def CheckArguments(self):
2318     # extra beparams
2319     if self.op.beparams:
2320       # fill the beparams dict
2321       objects.UpgradeBeParams(self.op.beparams)
2322       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2323
2324   def ExpandNames(self):
2325     self._ExpandAndLockInstance()
2326     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2327
2328   def DeclareLocks(self, level):
2329     if level == locking.LEVEL_NODE_RES:
2330       self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
2331
2332   def BuildHooksEnv(self):
2333     """Build hooks env.
2334
2335     This runs on master, primary and secondary nodes of the instance.
2336
2337     """
2338     env = {
2339       "FORCE": self.op.force,
2340       }
2341
2342     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2343
2344     return env
2345
2346   def BuildHooksNodes(self):
2347     """Build hooks nodes.
2348
2349     """
2350     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2351     return (nl, nl)
2352
2353   def CheckPrereq(self):
2354     """Check prerequisites.
2355
2356     This checks that the instance is in the cluster.
2357
2358     """
2359     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2360     assert self.instance is not None, \
2361       "Cannot retrieve locked instance %s" % self.op.instance_name
2362
2363     # extra hvparams
2364     if self.op.hvparams:
2365       # check hypervisor parameter syntax (locally)
2366       cluster = self.cfg.GetClusterInfo()
2367       utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
2368       filled_hvp = cluster.FillHV(instance)
2369       filled_hvp.update(self.op.hvparams)
2370       hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
2371       hv_type.CheckParameterSyntax(filled_hvp)
2372       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2373
2374     _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2375
2376     self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
2377
2378     if self.primary_offline and self.op.ignore_offline_nodes:
2379       self.LogWarning("Ignoring offline primary node")
2380
2381       if self.op.hvparams or self.op.beparams:
2382         self.LogWarning("Overridden parameters are ignored")
2383     else:
2384       _CheckNodeOnline(self, instance.primary_node)
2385
2386       bep = self.cfg.GetClusterInfo().FillBE(instance)
2387       bep.update(self.op.beparams)
2388
2389       # check bridges existence
2390       _CheckInstanceBridgesExist(self, instance)
2391
2392       remote_info = self.rpc.call_instance_info(instance.primary_node,
2393                                                 instance.name,
2394                                                 instance.hypervisor)
2395       remote_info.Raise("Error checking node %s" % instance.primary_node,
2396                         prereq=True, ecode=errors.ECODE_ENVIRON)
2397       if not remote_info.payload: # not running already
2398         _CheckNodeFreeMemory(self, instance.primary_node,
2399                              "starting instance %s" % instance.name,
2400                              bep[constants.BE_MINMEM], instance.hypervisor)
2401
2402   def Exec(self, feedback_fn):
2403     """Start the instance.
2404
2405     """
2406     instance = self.instance
2407     force = self.op.force
2408     reason = self.op.reason
2409
2410     if not self.op.no_remember:
2411       self.cfg.MarkInstanceUp(instance.name)
2412
2413     if self.primary_offline:
2414       assert self.op.ignore_offline_nodes
2415       self.LogInfo("Primary node offline, marked instance as started")
2416     else:
2417       node_current = instance.primary_node
2418
2419       _StartInstanceDisks(self, instance, force)
2420
2421       result = \
2422         self.rpc.call_instance_start(node_current,
2423                                      (instance, self.op.hvparams,
2424                                       self.op.beparams),
2425                                      self.op.startup_paused, reason)
2426       msg = result.fail_msg
2427       if msg:
2428         _ShutdownInstanceDisks(self, instance)
2429         raise errors.OpExecError("Could not start instance: %s" % msg)
2430
2431
2432 class LUInstanceShutdown(LogicalUnit):
2433   """Shutdown an instance.
2434
2435   """
2436   HPATH = "instance-stop"
2437   HTYPE = constants.HTYPE_INSTANCE
2438   REQ_BGL = False
2439
2440   def ExpandNames(self):
2441     self._ExpandAndLockInstance()
2442
2443   def BuildHooksEnv(self):
2444     """Build hooks env.
2445
2446     This runs on master, primary and secondary nodes of the instance.
2447
2448     """
2449     env = _BuildInstanceHookEnvByObject(self, self.instance)
2450     env["TIMEOUT"] = self.op.timeout
2451     return env
2452
2453   def BuildHooksNodes(self):
2454     """Build hooks nodes.
2455
2456     """
2457     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2458     return (nl, nl)
2459
2460   def CheckPrereq(self):
2461     """Check prerequisites.
2462
2463     This checks that the instance is in the cluster.
2464
2465     """
2466     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2467     assert self.instance is not None, \
2468       "Cannot retrieve locked instance %s" % self.op.instance_name
2469
2470     if not self.op.force:
2471       _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2472     else:
2473       self.LogWarning("Ignoring offline instance check")
2474
2475     self.primary_offline = \
2476       self.cfg.GetNodeInfo(self.instance.primary_node).offline
2477
2478     if self.primary_offline and self.op.ignore_offline_nodes:
2479       self.LogWarning("Ignoring offline primary node")
2480     else:
2481       _CheckNodeOnline(self, self.instance.primary_node)
2482
2483   def Exec(self, feedback_fn):
2484     """Shutdown the instance.
2485
2486     """
2487     instance = self.instance
2488     node_current = instance.primary_node
2489     timeout = self.op.timeout
2490     reason = self.op.reason
2491
2492     # If the instance is offline we shouldn't mark it as down, as that
2493     # resets the offline flag.
2494     if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2495       self.cfg.MarkInstanceDown(instance.name)
2496
2497     if self.primary_offline:
2498       assert self.op.ignore_offline_nodes
2499       self.LogInfo("Primary node offline, marked instance as stopped")
2500     else:
2501       result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2502                                                reason)
2503       msg = result.fail_msg
2504       if msg:
2505         self.LogWarning("Could not shutdown instance: %s", msg)
2506
2507       _ShutdownInstanceDisks(self, instance)
2508
2509
2510 class LUInstanceReinstall(LogicalUnit):
2511   """Reinstall an instance.
2512
2513   """
2514   HPATH = "instance-reinstall"
2515   HTYPE = constants.HTYPE_INSTANCE
2516   REQ_BGL = False
2517
2518   def ExpandNames(self):
2519     self._ExpandAndLockInstance()
2520
2521   def BuildHooksEnv(self):
2522     """Build hooks env.
2523
2524     This runs on master, primary and secondary nodes of the instance.
2525
2526     """
2527     return _BuildInstanceHookEnvByObject(self, self.instance)
2528
2529   def BuildHooksNodes(self):
2530     """Build hooks nodes.
2531
2532     """
2533     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2534     return (nl, nl)
2535
2536   def CheckPrereq(self):
2537     """Check prerequisites.
2538
2539     This checks that the instance is in the cluster and is not running.
2540
2541     """
2542     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543     assert instance is not None, \
2544       "Cannot retrieve locked instance %s" % self.op.instance_name
2545     _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2546                      " offline, cannot reinstall")
2547
2548     if instance.disk_template == constants.DT_DISKLESS:
2549       raise errors.OpPrereqError("Instance '%s' has no disks" %
2550                                  self.op.instance_name,
2551                                  errors.ECODE_INVAL)
2552     _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2553
2554     if self.op.os_type is not None:
2555       # OS verification
2556       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2557       _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2558       instance_os = self.op.os_type
2559     else:
2560       instance_os = instance.os
2561
2562     nodelist = list(instance.all_nodes)
2563
2564     if self.op.osparams:
2565       i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2566       _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2567       self.os_inst = i_osdict # the new dict (without defaults)
2568     else:
2569       self.os_inst = None
2570
2571     self.instance = instance
2572
2573   def Exec(self, feedback_fn):
2574     """Reinstall the instance.
2575
2576     """
2577     inst = self.instance
2578
2579     if self.op.os_type is not None:
2580       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2581       inst.os = self.op.os_type
2582       # Write to configuration
2583       self.cfg.Update(inst, feedback_fn)
2584
2585     _StartInstanceDisks(self, inst, None)
2586     try:
2587       feedback_fn("Running the instance OS create scripts...")
2588       # FIXME: pass debug option from opcode to backend
2589       result = self.rpc.call_instance_os_add(inst.primary_node,
2590                                              (inst, self.os_inst), True,
2591                                              self.op.debug_level)
2592       result.Raise("Could not install OS for instance %s on node %s" %
2593                    (inst.name, inst.primary_node))
2594     finally:
2595       _ShutdownInstanceDisks(self, inst)
2596
2597
2598 class LUInstanceReboot(LogicalUnit):
2599   """Reboot an instance.
2600
2601   """
2602   HPATH = "instance-reboot"
2603   HTYPE = constants.HTYPE_INSTANCE
2604   REQ_BGL = False
2605
2606   def ExpandNames(self):
2607     self._ExpandAndLockInstance()
2608
2609   def BuildHooksEnv(self):
2610     """Build hooks env.
2611
2612     This runs on master, primary and secondary nodes of the instance.
2613
2614     """
2615     env = {
2616       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2617       "REBOOT_TYPE": self.op.reboot_type,
2618       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2619       }
2620
2621     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622
2623     return env
2624
2625   def BuildHooksNodes(self):
2626     """Build hooks nodes.
2627
2628     """
2629     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2630     return (nl, nl)
2631
2632   def CheckPrereq(self):
2633     """Check prerequisites.
2634
2635     This checks that the instance is in the cluster.
2636
2637     """
2638     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639     assert self.instance is not None, \
2640       "Cannot retrieve locked instance %s" % self.op.instance_name
2641     _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2642     _CheckNodeOnline(self, instance.primary_node)
2643
2644     # check bridges existence
2645     _CheckInstanceBridgesExist(self, instance)
2646
2647   def Exec(self, feedback_fn):
2648     """Reboot the instance.
2649
2650     """
2651     instance = self.instance
2652     ignore_secondaries = self.op.ignore_secondaries
2653     reboot_type = self.op.reboot_type
2654     reason = self.op.reason
2655
2656     remote_info = self.rpc.call_instance_info(instance.primary_node,
2657                                               instance.name,
2658                                               instance.hypervisor)
2659     remote_info.Raise("Error checking node %s" % instance.primary_node)
2660     instance_running = bool(remote_info.payload)
2661
2662     node_current = instance.primary_node
2663
2664     if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2665                                             constants.INSTANCE_REBOOT_HARD]:
2666       for disk in instance.disks:
2667         self.cfg.SetDiskID(disk, node_current)
2668       result = self.rpc.call_instance_reboot(node_current, instance,
2669                                              reboot_type,
2670                                              self.op.shutdown_timeout, reason)
2671       result.Raise("Could not reboot instance")
2672     else:
2673       if instance_running:
2674         result = self.rpc.call_instance_shutdown(node_current, instance,
2675                                                  self.op.shutdown_timeout,
2676                                                  reason)
2677         result.Raise("Could not shutdown instance for full reboot")
2678         _ShutdownInstanceDisks(self, instance)
2679       else:
2680         self.LogInfo("Instance %s was already stopped, starting now",
2681                      instance.name)
2682       _StartInstanceDisks(self, instance, ignore_secondaries)
2683       result = self.rpc.call_instance_start(node_current,
2684                                             (instance, None, None), False,
2685                                             reason)
2686       msg = result.fail_msg
2687       if msg:
2688         _ShutdownInstanceDisks(self, instance)
2689         raise errors.OpExecError("Could not start instance for"
2690                                  " full reboot: %s" % msg)
2691
2692     self.cfg.MarkInstanceUp(instance.name)
2693
2694
2695 class LUInstanceConsole(NoHooksLU):
2696   """Connect to an instance's console.
2697
2698   This is somewhat special in that it returns the command line that
2699   you need to run on the master node in order to connect to the
2700   console.
2701
2702   """
2703   REQ_BGL = False
2704
2705   def ExpandNames(self):
2706     self.share_locks = _ShareAll()
2707     self._ExpandAndLockInstance()
2708
2709   def CheckPrereq(self):
2710     """Check prerequisites.
2711
2712     This checks that the instance is in the cluster.
2713
2714     """
2715     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716     assert self.instance is not None, \
2717       "Cannot retrieve locked instance %s" % self.op.instance_name
2718     _CheckNodeOnline(self, self.instance.primary_node)
2719
2720   def Exec(self, feedback_fn):
2721     """Connect to the console of an instance
2722
2723     """
2724     instance = self.instance
2725     node = instance.primary_node
2726
2727     node_insts = self.rpc.call_instance_list([node],
2728                                              [instance.hypervisor])[node]
2729     node_insts.Raise("Can't get node information from %s" % node)
2730
2731     if instance.name not in node_insts.payload:
2732       if instance.admin_state == constants.ADMINST_UP:
2733         state = constants.INSTST_ERRORDOWN
2734       elif instance.admin_state == constants.ADMINST_DOWN:
2735         state = constants.INSTST_ADMINDOWN
2736       else:
2737         state = constants.INSTST_ADMINOFFLINE
2738       raise errors.OpExecError("Instance %s is not running (state %s)" %
2739                                (instance.name, state))
2740
2741     logging.debug("Connecting to console of %s on %s", instance.name, node)
2742
2743     return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
2744
2745
2746 def _DeclareLocksForMigration(lu, level):
2747   """Declares locks for L{TLMigrateInstance}.
2748
2749   @type lu: L{LogicalUnit}
2750   @param level: Lock level
2751
2752   """
2753   if level == locking.LEVEL_NODE_ALLOC:
2754     assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2755
2756     instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2757
2758     # Node locks are already declared here rather than at LEVEL_NODE as we need
2759     # the instance object anyway to declare the node allocation lock.
2760     if instance.disk_template in constants.DTS_EXT_MIRROR:
2761       if lu.op.target_node is None:
2762         lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2763         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2764       else:
2765         lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2766                                                lu.op.target_node]
2767       del lu.recalculate_locks[locking.LEVEL_NODE]
2768     else:
2769       lu._LockInstancesNodes() # pylint: disable=W0212
2770
2771   elif level == locking.LEVEL_NODE:
2772     # Node locks are declared together with the node allocation lock
2773     assert (lu.needed_locks[locking.LEVEL_NODE] or
2774             lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2775
2776   elif level == locking.LEVEL_NODE_RES:
2777     # Copy node locks
2778     lu.needed_locks[locking.LEVEL_NODE_RES] = \
2779       _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2780
2781
2782 def _ExpandNamesForMigration(lu):
2783   """Expands names for use with L{TLMigrateInstance}.
2784
2785   @type lu: L{LogicalUnit}
2786
2787   """
2788   if lu.op.target_node is not None:
2789     lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2790
2791   lu.needed_locks[locking.LEVEL_NODE] = []
2792   lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2793
2794   lu.needed_locks[locking.LEVEL_NODE_RES] = []
2795   lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2796
2797   # The node allocation lock is actually only needed for externally replicated
2798   # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2799   lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2800
2801
2802 class LUInstanceFailover(LogicalUnit):
2803   """Failover an instance.
2804
2805   """
2806   HPATH = "instance-failover"
2807   HTYPE = constants.HTYPE_INSTANCE
2808   REQ_BGL = False
2809
2810   def CheckArguments(self):
2811     """Check the arguments.
2812
2813     """
2814     self.iallocator = getattr(self.op, "iallocator", None)
2815     self.target_node = getattr(self.op, "target_node", None)
2816
2817   def ExpandNames(self):
2818     self._ExpandAndLockInstance()
2819     _ExpandNamesForMigration(self)
2820
2821     self._migrater = \
2822       TLMigrateInstance(self, self.op.instance_name, False, True, False,
2823                         self.op.ignore_consistency, True,
2824                         self.op.shutdown_timeout, self.op.ignore_ipolicy)
2825
2826     self.tasklets = [self._migrater]
2827
2828   def DeclareLocks(self, level):
2829     _DeclareLocksForMigration(self, level)
2830
2831   def BuildHooksEnv(self):
2832     """Build hooks env.
2833
2834     This runs on master, primary and secondary nodes of the instance.
2835
2836     """
2837     instance = self._migrater.instance
2838     source_node = instance.primary_node
2839     target_node = self.op.target_node
2840     env = {
2841       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2842       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2843       "OLD_PRIMARY": source_node,
2844       "NEW_PRIMARY": target_node,
2845       }
2846
2847     if instance.disk_template in constants.DTS_INT_MIRROR:
2848       env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2849       env["NEW_SECONDARY"] = source_node
2850     else:
2851       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2852
2853     env.update(_BuildInstanceHookEnvByObject(self, instance))
2854
2855     return env
2856
2857   def BuildHooksNodes(self):
2858     """Build hooks nodes.
2859
2860     """
2861     instance = self._migrater.instance
2862     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2863     return (nl, nl + [instance.primary_node])
2864
2865
2866 class LUInstanceMigrate(LogicalUnit):
2867   """Migrate an instance.
2868
2869   This is migration without shutting down, compared to the failover,
2870   which is done with shutdown.
2871
2872   """
2873   HPATH = "instance-migrate"
2874   HTYPE = constants.HTYPE_INSTANCE
2875   REQ_BGL = False
2876
2877   def ExpandNames(self):
2878     self._ExpandAndLockInstance()
2879     _ExpandNamesForMigration(self)
2880
2881     self._migrater = \
2882       TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2883                         False, self.op.allow_failover, False,
2884                         self.op.allow_runtime_changes,
2885                         constants.DEFAULT_SHUTDOWN_TIMEOUT,
2886                         self.op.ignore_ipolicy)
2887
2888     self.tasklets = [self._migrater]
2889
2890   def DeclareLocks(self, level):
2891     _DeclareLocksForMigration(self, level)
2892
2893   def BuildHooksEnv(self):
2894     """Build hooks env.
2895
2896     This runs on master, primary and secondary nodes of the instance.
2897
2898     """
2899     instance = self._migrater.instance
2900     source_node = instance.primary_node
2901     target_node = self.op.target_node
2902     env = _BuildInstanceHookEnvByObject(self, instance)
2903     env.update({
2904       "MIGRATE_LIVE": self._migrater.live,
2905       "MIGRATE_CLEANUP": self.op.cleanup,
2906       "OLD_PRIMARY": source_node,
2907       "NEW_PRIMARY": target_node,
2908       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2909       })
2910
2911     if instance.disk_template in constants.DTS_INT_MIRROR:
2912       env["OLD_SECONDARY"] = target_node
2913       env["NEW_SECONDARY"] = source_node
2914     else:
2915       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2916
2917     return env
2918
2919   def BuildHooksNodes(self):
2920     """Build hooks nodes.
2921
2922     """
2923     instance = self._migrater.instance
2924     snodes = list(instance.secondary_nodes)
2925     nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2926     return (nl, nl)
2927
2928
2929 class LUInstanceMultiAlloc(NoHooksLU):
2930   """Allocates multiple instances at the same time.
2931
2932   """
2933   REQ_BGL = False
2934
2935   def CheckArguments(self):
2936     """Check arguments.
2937
2938     """
2939     nodes = []
2940     for inst in self.op.instances:
2941       if inst.iallocator is not None:
2942         raise errors.OpPrereqError("iallocator are not allowed to be set on"
2943                                    " instance objects", errors.ECODE_INVAL)
2944       nodes.append(bool(inst.pnode))
2945       if inst.disk_template in constants.DTS_INT_MIRROR:
2946         nodes.append(bool(inst.snode))
2947
2948     has_nodes = compat.any(nodes)
2949     if compat.all(nodes) ^ has_nodes:
2950       raise errors.OpPrereqError("There are instance objects providing"
2951                                  " pnode/snode while others do not",
2952                                  errors.ECODE_INVAL)
2953
2954     if self.op.iallocator is None:
2955       default_iallocator = self.cfg.GetDefaultIAllocator()
2956       if default_iallocator and has_nodes:
2957         self.op.iallocator = default_iallocator
2958       else:
2959         raise errors.OpPrereqError("No iallocator or nodes on the instances"
2960                                    " given and no cluster-wide default"
2961                                    " iallocator found; please specify either"
2962                                    " an iallocator or nodes on the instances"
2963                                    " or set a cluster-wide default iallocator",
2964                                    errors.ECODE_INVAL)
2965
2966     _CheckOpportunisticLocking(self.op)
2967
2968     dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
2969     if dups:
2970       raise errors.OpPrereqError("There are duplicate instance names: %s" %
2971                                  utils.CommaJoin(dups), errors.ECODE_INVAL)
2972
2973   def ExpandNames(self):
2974     """Calculate the locks.
2975
2976     """
2977     self.share_locks = _ShareAll()
2978     self.needed_locks = {
2979       # iallocator will select nodes and even if no iallocator is used,
2980       # collisions with LUInstanceCreate should be avoided
2981       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2982       }
2983
2984     if self.op.iallocator:
2985       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2986       self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
2987
2988       if self.op.opportunistic_locking:
2989         self.opportunistic_locks[locking.LEVEL_NODE] = True
2990         self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
2991     else:
2992       nodeslist = []
2993       for inst in self.op.instances:
2994         inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
2995         nodeslist.append(inst.pnode)
2996         if inst.snode is not None:
2997           inst.snode = _ExpandNodeName(self.cfg, inst.snode)
2998           nodeslist.append(inst.snode)
2999
3000       self.needed_locks[locking.LEVEL_NODE] = nodeslist
3001       # Lock resources of instance's primary and secondary nodes (copy to
3002       # prevent accidential modification)
3003       self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
3004
3005   def CheckPrereq(self):
3006     """Check prerequisite.
3007
3008     """
3009     cluster = self.cfg.GetClusterInfo()
3010     default_vg = self.cfg.GetVGName()
3011     ec_id = self.proc.GetECId()
3012
3013     if self.op.opportunistic_locking:
3014       # Only consider nodes for which a lock is held
3015       node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
3016     else:
3017       node_whitelist = None
3018
3019     insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
3020                                          _ComputeNics(op, cluster, None,
3021                                                       self.cfg, ec_id),
3022                                          _ComputeFullBeParams(op, cluster),
3023                                          node_whitelist)
3024              for op in self.op.instances]
3025
3026     req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
3027     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3028
3029     ial.Run(self.op.iallocator)
3030
3031     if not ial.success:
3032       raise errors.OpPrereqError("Can't compute nodes using"
3033                                  " iallocator '%s': %s" %
3034                                  (self.op.iallocator, ial.info),
3035                                  errors.ECODE_NORES)
3036
3037     self.ia_result = ial.result
3038
3039     if self.op.dry_run:
3040       self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
3041         constants.JOB_IDS_KEY: [],
3042         })
3043
3044   def _ConstructPartialResult(self):
3045     """Contructs the partial result.
3046
3047     """
3048     (allocatable, failed) = self.ia_result
3049     return {
3050       opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
3051         map(compat.fst, allocatable),
3052       opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
3053       }
3054
3055   def Exec(self, feedback_fn):
3056     """Executes the opcode.
3057
3058     """
3059     op2inst = dict((op.instance_name, op) for op in self.op.instances)
3060     (allocatable, failed) = self.ia_result
3061
3062     jobs = []
3063     for (name, nodes) in allocatable:
3064       op = op2inst.pop(name)
3065
3066       if len(nodes) > 1:
3067         (op.pnode, op.snode) = nodes
3068       else:
3069         (op.pnode,) = nodes
3070
3071       jobs.append([op])
3072
3073     missing = set(op2inst.keys()) - set(failed)
3074     assert not missing, \
3075       "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing)
3076
3077     return ResultWithJobs(jobs, **self._ConstructPartialResult())
3078
3079
3080 class _InstNicModPrivate:
3081   """Data structure for network interface modifications.
3082
3083   Used by L{LUInstanceSetParams}.
3084
3085   """
3086   def __init__(self):
3087     self.params = None
3088     self.filled = None
3089
3090
3091 def PrepareContainerMods(mods, private_fn):
3092   """Prepares a list of container modifications by adding a private data field.
3093
3094   @type mods: list of tuples; (operation, index, parameters)
3095   @param mods: List of modifications
3096   @type private_fn: callable or None
3097   @param private_fn: Callable for constructing a private data field for a
3098     modification
3099   @rtype: list
3100
3101   """
3102   if private_fn is None:
3103     fn = lambda: None
3104   else:
3105     fn = private_fn
3106
3107   return [(op, idx, params, fn()) for (op, idx, params) in mods]
3108
3109
3110 def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
3111   """Checks if nodes have enough physical CPUs
3112
3113   This function checks if all given nodes have the needed number of
3114   physical CPUs. In case any node has less CPUs or we cannot get the
3115   information from the node, this function raises an OpPrereqError
3116   exception.
3117
3118   @type lu: C{LogicalUnit}
3119   @param lu: a logical unit from which we get configuration data
3120   @type nodenames: C{list}
3121   @param nodenames: the list of node names to check
3122   @type requested: C{int}
3123   @param requested: the minimum acceptable number of physical CPUs
3124   @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
3125       or we cannot check the node
3126
3127   """
3128   nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
3129   for node in nodenames:
3130     info = nodeinfo[node]
3131     info.Raise("Cannot get current information from node %s" % node,
3132                prereq=True, ecode=errors.ECODE_ENVIRON)
3133     (_, _, (hv_info, )) = info.payload
3134     num_cpus = hv_info.get("cpu_total", None)
3135     if not isinstance(num_cpus, int):
3136       raise errors.OpPrereqError("Can't compute the number of physical CPUs"
3137                                  " on node %s, result was '%s'" %
3138                                  (node, num_cpus), errors.ECODE_ENVIRON)
3139     if requested > num_cpus:
3140       raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
3141                                  "required" % (node, num_cpus, requested),
3142                                  errors.ECODE_NORES)
3143
3144
3145 def GetItemFromContainer(identifier, kind, container):
3146   """Return the item refered by the identifier.
3147
3148   @type identifier: string
3149   @param identifier: Item index or name or UUID
3150   @type kind: string
3151   @param kind: One-word item description
3152   @type container: list
3153   @param container: Container to get the item from
3154
3155   """
3156   # Index
3157   try:
3158     idx = int(identifier)
3159     if idx == -1:
3160       # Append
3161       absidx = len(container) - 1
3162     elif idx < 0:
3163       raise IndexError("Not accepting negative indices other than -1")
3164     elif idx > len(container):
3165       raise IndexError("Got %s index %s, but there are only %s" %
3166                        (kind, idx, len(container)))
3167     else:
3168       absidx = idx
3169     return (absidx, container[idx])
3170   except ValueError:
3171     pass
3172
3173   for idx, item in enumerate(container):
3174     if item.uuid == identifier or item.name == identifier:
3175       return (idx, item)
3176
3177   raise errors.OpPrereqError("Cannot find %s with identifier %s" %
3178                              (kind, identifier), errors.ECODE_NOENT)
3179
3180
3181 def ApplyContainerMods(kind, container, chgdesc, mods,
3182                        create_fn, modify_fn, remove_fn):
3183   """Applies descriptions in C{mods} to C{container}.
3184
3185   @type kind: string
3186   @param kind: One-word item description
3187   @type container: list
3188   @param container: Container to modify
3189   @type chgdesc: None or list
3190   @param chgdesc: List of applied changes
3191   @type mods: list
3192   @param mods: Modifications as returned by L{PrepareContainerMods}
3193   @type create_fn: callable
3194   @param create_fn: Callback for creating a new item (L{constants.DDM_ADD});
3195     receives absolute item index, parameters and private data object as added
3196     by L{PrepareContainerMods}, returns tuple containing new item and changes
3197     as list
3198   @type modify_fn: callable
3199   @param modify_fn: Callback for modifying an existing item
3200     (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters
3201     and private data object as added by L{PrepareContainerMods}, returns
3202     changes as list
3203   @type remove_fn: callable
3204   @param remove_fn: Callback on removing item; receives absolute item index,
3205     item and private data object as added by L{PrepareContainerMods}
3206
3207   """
3208   for (op, identifier, params, private) in mods:
3209     changes = None
3210
3211     if op == constants.DDM_ADD:
3212       # Calculate where item will be added
3213       # When adding an item, identifier can only be an index
3214       try:
3215         idx = int(identifier)
3216       except ValueError:
3217         raise errors.OpPrereqError("Only possitive integer or -1 is accepted as"
3218                                    " identifier for %s" % constants.DDM_ADD,
3219                                    errors.ECODE_INVAL)
3220       if idx == -1:
3221         addidx = len(container)
3222       else:
3223         if idx < 0:
3224           raise IndexError("Not accepting negative indices other than -1")
3225         elif idx > len(container):
3226           raise IndexError("Got %s index %s, but there are only %s" %
3227                            (kind, idx, len(container)))
3228         addidx = idx
3229
3230       if create_fn is None:
3231         item = params
3232       else:
3233         (item, changes) = create_fn(addidx, params, private)
3234
3235       if idx == -1:
3236         container.append(item)
3237       else:
3238         assert idx >= 0
3239         assert idx <= len(container)
3240         # list.insert does so before the specified index
3241         container.insert(idx, item)
3242     else:
3243       # Retrieve existing item
3244       (absidx, item) = GetItemFromContainer(identifier, kind, container)
3245
3246       if op == constants.DDM_REMOVE:
3247         assert not params
3248
3249         if remove_fn is not None:
3250           remove_fn(absidx, item, private)
3251
3252         changes = [("%s/%s" % (kind, absidx), "remove")]
3253
3254         assert container[absidx] == item
3255         del container[absidx]
3256       elif op == constants.DDM_MODIFY:
3257         if modify_fn is not None:
3258           changes = modify_fn(absidx, item, params, private)
3259       else:
3260         raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3261
3262     assert _TApplyContModsCbChanges(changes)
3263
3264     if not (chgdesc is None or changes is None):
3265       chgdesc.extend(changes)
3266
3267
3268 def _UpdateIvNames(base_index, disks):
3269   """Updates the C{iv_name} attribute of disks.
3270
3271   @type disks: list of L{objects.Disk}
3272
3273   """
3274   for (idx, disk) in enumerate(disks):
3275     disk.iv_name = "disk/%s" % (base_index + idx, )
3276
3277
3278 class LUInstanceSetParams(LogicalUnit):
3279   """Modifies an instances's parameters.
3280
3281   """
3282   HPATH = "instance-modify"
3283   HTYPE = constants.HTYPE_INSTANCE
3284   REQ_BGL = False
3285
3286   @staticmethod
3287   def _UpgradeDiskNicMods(kind, mods, verify_fn):
3288     assert ht.TList(mods)
3289     assert not mods or len(mods[0]) in (2, 3)
3290
3291     if mods and len(mods[0]) == 2:
3292       result = []
3293
3294       addremove = 0
3295       for op, params in mods:
3296         if op in (constants.DDM_ADD, constants.DDM_REMOVE):
3297           result.append((op, -1, params))
3298           addremove += 1
3299
3300           if addremove > 1:
3301             raise errors.OpPrereqError("Only one %s add or remove operation is"
3302                                        " supported at a time" % kind,
3303                                        errors.ECODE_INVAL)
3304         else:
3305           result.append((constants.DDM_MODIFY, op, params))
3306
3307       assert verify_fn(result)
3308     else:
3309       result = mods
3310
3311     return result
3312
3313   @staticmethod
3314   def _CheckMods(kind, mods, key_types, item_fn):
3315     """Ensures requested disk/NIC modifications are valid.
3316
3317     """
3318     for (op, _, params) in mods:
3319       assert ht.TDict(params)
3320
3321       # If 'key_types' is an empty dict, we assume we have an
3322       # 'ext' template and thus do not ForceDictType
3323       if key_types:
3324         utils.ForceDictType(params, key_types)
3325
3326       if op == constants.DDM_REMOVE:
3327         if params:
3328           raise errors.OpPrereqError("No settings should be passed when"
3329                                      " removing a %s" % kind,
3330                                      errors.ECODE_INVAL)
3331       elif op in (constants.DDM_ADD, constants.DDM_MODIFY):
3332         item_fn(op, params)
3333       else:
3334         raise errors.ProgrammerError("Unhandled operation '%s'" % op)
3335
3336   @staticmethod
3337   def _VerifyDiskModification(op, params):
3338     """Verifies a disk modification.
3339
3340     """
3341     if op == constants.DDM_ADD:
3342       mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
3343       if mode not in constants.DISK_ACCESS_SET:
3344         raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
3345                                    errors.ECODE_INVAL)
3346
3347       size = params.get(constants.IDISK_SIZE, None)
3348       if size is None:
3349         raise errors.OpPrereqError("Required disk parameter '%s' missing" %
3350                                    constants.IDISK_SIZE, errors.ECODE_INVAL)
3351
3352       try:
3353         size = int(size)
3354       except (TypeError, ValueError), err:
3355         raise errors.OpPrereqError("Invalid disk size parameter: %s" % err,
3356                                    errors.ECODE_INVAL)
3357
3358       params[constants.IDISK_SIZE] = size
3359       name = params.get(constants.IDISK_NAME, None)
3360       if name is not None and name.lower() == constants.VALUE_NONE:
3361         params[constants.IDISK_NAME] = None
3362
3363     elif op == constants.DDM_MODIFY:
3364       if constants.IDISK_SIZE in params:
3365         raise errors.OpPrereqError("Disk size change not possible, use"
3366                                    " grow-disk", errors.ECODE_INVAL)
3367       if len(params) > 2:
3368         raise errors.OpPrereqError("Disk modification doesn't support"
3369                                    " additional arbitrary parameters",
3370                                    errors.ECODE_INVAL)
3371       name = params.get(constants.IDISK_NAME, None)
3372       if name is not None and name.lower() == constants.VALUE_NONE:
3373         params[constants.IDISK_NAME] = None
3374
3375   @staticmethod
3376   def _VerifyNicModification(op, params):
3377     """Verifies a network interface modification.
3378
3379     """
3380     if op in (constants.DDM_ADD, constants.DDM_MODIFY):
3381       ip = params.get(constants.INIC_IP, None)
3382       name = params.get(constants.INIC_NAME, None)
3383       req_net = params.get(constants.INIC_NETWORK, None)
3384       link = params.get(constants.NIC_LINK, None)
3385       mode = params.get(constants.NIC_MODE, None)
3386       if name is not None and name.lower() == constants.VALUE_NONE:
3387         params[constants.INIC_NAME] = None
3388       if req_net is not None:
3389         if req_net.lower() == constants.VALUE_NONE:
3390           params[constants.INIC_NETWORK] = None
3391           req_net = None
3392         elif link is not None or mode is not None:
3393           raise errors.OpPrereqError("If network is given"
3394                                      " mode or link should not",
3395                                      errors.ECODE_INVAL)
3396
3397       if op == constants.DDM_ADD:
3398         macaddr = params.get(constants.INIC_MAC, None)
3399         if macaddr is None:
3400           params[constants.INIC_MAC] = constants.VALUE_AUTO
3401
3402       if ip is not None:
3403         if ip.lower() == constants.VALUE_NONE:
3404           params[constants.INIC_IP] = None
3405         else:
3406           if ip.lower() == constants.NIC_IP_POOL:
3407             if op == constants.DDM_ADD and req_net is None:
3408               raise errors.OpPrereqError("If ip=pool, parameter network"
3409                                          " cannot be none",
3410                                          errors.ECODE_INVAL)
3411           else:
3412             if not netutils.IPAddress.IsValid(ip):
3413               raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
3414                                          errors.ECODE_INVAL)
3415
3416       if constants.INIC_MAC in params:
3417         macaddr = params[constants.INIC_MAC]
3418         if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3419           macaddr = utils.NormalizeAndValidateMac(macaddr)
3420
3421         if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO:
3422           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
3423                                      " modifying an existing NIC",
3424                                      errors.ECODE_INVAL)
3425
3426   def CheckArguments(self):
3427     if not (self.op.nics or self.op.disks or self.op.disk_template or
3428             self.op.hvparams or self.op.beparams or self.op.os_name or
3429             self.op.offline is not None or self.op.runtime_mem or
3430             self.op.pnode):
3431       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
3432
3433     if self.op.hvparams:
3434       _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS,
3435                             "hypervisor", "instance", "cluster")
3436
3437     self.op.disks = self._UpgradeDiskNicMods(
3438       "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
3439     self.op.nics = self._UpgradeDiskNicMods(
3440       "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
3441
3442     if self.op.disks and self.op.disk_template is not None:
3443       raise errors.OpPrereqError("Disk template conversion and other disk"
3444                                  " changes not supported at the same time",
3445                                  errors.ECODE_INVAL)
3446
3447     if (self.op.disk_template and
3448         self.op.disk_template in constants.DTS_INT_MIRROR and
3449         self.op.remote_node is None):
3450       raise errors.OpPrereqError("Changing the disk template to a mirrored"
3451                                  " one requires specifying a secondary node",
3452                                  errors.ECODE_INVAL)
3453
3454     # Check NIC modifications
3455     self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES,
3456                     self._VerifyNicModification)
3457
3458     if self.op.pnode:
3459       self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
3460
3461   def ExpandNames(self):
3462     self._ExpandAndLockInstance()
3463     self.needed_locks[locking.LEVEL_NODEGROUP] = []
3464     # Can't even acquire node locks in shared mode as upcoming changes in
3465     # Ganeti 2.6 will start to modify the node object on disk conversion
3466     self.needed_locks[locking.LEVEL_NODE] = []
3467     self.needed_locks[locking.LEVEL_NODE_RES] = []
3468     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3469     # Look node group to look up the ipolicy
3470     self.share_locks[locking.LEVEL_NODEGROUP] = 1
3471
3472   def DeclareLocks(self, level):
3473     if level == locking.LEVEL_NODEGROUP:
3474       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3475       # Acquire locks for the instance's nodegroups optimistically. Needs
3476       # to be verified in CheckPrereq
3477       self.needed_locks[locking.LEVEL_NODEGROUP] = \
3478         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
3479     elif level == locking.LEVEL_NODE:
3480       self._LockInstancesNodes()
3481       if self.op.disk_template and self.op.remote_node:
3482         self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
3483         self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
3484     elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
3485       # Copy node locks
3486       self.needed_locks[locking.LEVEL_NODE_RES] = \
3487         _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3488
3489   def BuildHooksEnv(self):
3490     """Build hooks env.
3491
3492     This runs on the master, primary and secondaries.
3493
3494     """
3495     args = {}
3496     if constants.BE_MINMEM in self.be_new:
3497       args["minmem"] = self.be_new[constants.BE_MINMEM]
3498     if constants.BE_MAXMEM in self.be_new:
3499       args["maxmem"] = self.be_new[constants.BE_MAXMEM]
3500     if constants.BE_VCPUS in self.be_new:
3501       args["vcpus"] = self.be_new[constants.BE_VCPUS]
3502     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
3503     # information at all.
3504
3505     if self._new_nics is not None:
3506       nics = []
3507
3508       for nic in self._new_nics:
3509         n = copy.deepcopy(nic)
3510         nicparams = self.cluster.SimpleFillNIC(n.nicparams)
3511         n.nicparams = nicparams
3512         nics.append(_NICToTuple(self, n))
3513
3514       args["nics"] = nics
3515
3516     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
3517     if self.op.disk_template:
3518       env["NEW_DISK_TEMPLATE"] = self.op.disk_template
3519     if self.op.runtime_mem:
3520       env["RUNTIME_MEMORY"] = self.op.runtime_mem
3521
3522     return env
3523
3524   def BuildHooksNodes(self):
3525     """Build hooks nodes.
3526
3527     """
3528     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3529     return (nl, nl)
3530
3531   def _PrepareNicModification(self, params, private, old_ip, old_net_uuid,
3532                               old_params, cluster, pnode):
3533
3534     update_params_dict = dict([(key, params[key])
3535                                for key in constants.NICS_PARAMETERS
3536                                if key in params])
3537
3538     req_link = update_params_dict.get(constants.NIC_LINK, None)
3539     req_mode = update_params_dict.get(constants.NIC_MODE, None)
3540
3541     new_net_uuid = None
3542     new_net_uuid_or_name = params.get(constants.INIC_NETWORK, old_net_uuid)
3543     if new_net_uuid_or_name:
3544       new_net_uuid = self.cfg.LookupNetwork(new_net_uuid_or_name)
3545       new_net_obj = self.cfg.GetNetwork(new_net_uuid)
3546
3547     if old_net_uuid:
3548       old_net_obj = self.cfg.GetNetwork(old_net_uuid)
3549
3550     if new_net_uuid:
3551       netparams = self.cfg.GetGroupNetParams(new_net_uuid, pnode)
3552       if not netparams:
3553         raise errors.OpPrereqError("No netparams found for the network"
3554                                    " %s, probably not connected" %
3555                                    new_net_obj.name, errors.ECODE_INVAL)
3556       new_params = dict(netparams)
3557     else:
3558       new_params = _GetUpdatedParams(old_params, update_params_dict)
3559
3560     utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
3561
3562     new_filled_params = cluster.SimpleFillNIC(new_params)
3563     objects.NIC.CheckParameterSyntax(new_filled_params)
3564
3565     new_mode = new_filled_params[constants.NIC_MODE]
3566     if new_mode == constants.NIC_MODE_BRIDGED:
3567       bridge = new_filled_params[constants.NIC_LINK]
3568       msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg
3569       if msg:
3570         msg = "Error checking bridges on node '%s': %s" % (pnode, msg)
3571         if self.op.force:
3572           self.warn.append(msg)
3573         else:
3574           raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
3575
3576     elif new_mode == constants.NIC_MODE_ROUTED:
3577       ip = params.get(constants.INIC_IP, old_ip)
3578       if ip is None:
3579         raise errors.OpPrereqError("Cannot set the NIC IP address to None"
3580                                    " on a routed NIC", errors.ECODE_INVAL)
3581
3582     elif new_mode == constants.NIC_MODE_OVS:
3583       # TODO: check OVS link
3584       self.LogInfo("OVS links are currently not checked for correctness")
3585
3586     if constants.INIC_MAC in params:
3587       mac = params[constants.INIC_MAC]
3588       if mac is None:
3589         raise errors.OpPrereqError("Cannot unset the NIC MAC address",
3590                                    errors.ECODE_INVAL)
3591       elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3592         # otherwise generate the MAC address
3593         params[constants.INIC_MAC] = \
3594           self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3595       else:
3596         # or validate/reserve the current one
3597         try:
3598           self.cfg.ReserveMAC(mac, self.proc.GetECId())
3599         except errors.ReservationError:
3600           raise errors.OpPrereqError("MAC address '%s' already in use"
3601                                      " in cluster" % mac,
3602                                      errors.ECODE_NOTUNIQUE)
3603     elif new_net_uuid != old_net_uuid:
3604
3605       def get_net_prefix(net_uuid):
3606         mac_prefix = None
3607         if net_uuid:
3608           nobj = self.cfg.GetNetwork(net_uuid)
3609           mac_prefix = nobj.mac_prefix
3610
3611         return mac_prefix
3612
3613       new_prefix = get_net_prefix(new_net_uuid)
3614       old_prefix = get_net_prefix(old_net_uuid)
3615       if old_prefix != new_prefix:
3616         params[constants.INIC_MAC] = \
3617           self.cfg.GenerateMAC(new_net_uuid, self.proc.GetECId())
3618
3619     # if there is a change in (ip, network) tuple
3620     new_ip = params.get(constants.INIC_IP, old_ip)
3621     if (new_ip, new_net_uuid) != (old_ip, old_net_uuid):
3622       if new_ip:
3623         # if IP is pool then require a network and generate one IP
3624         if new_ip.lower() == constants.NIC_IP_POOL:
3625           if new_net_uuid:
3626             try:
3627               new_ip = self.cfg.GenerateIp(new_net_uuid, self.proc.GetECId())
3628             except errors.ReservationError:
3629               raise errors.OpPrereqError("Unable to get a free IP"
3630                                          " from the address pool",
3631                                          errors.ECODE_STATE)
3632             self.LogInfo("Chose IP %s from network %s",
3633                          new_ip,
3634                          new_net_obj.name)
3635             params[constants.INIC_IP] = new_ip
3636           else:
3637             raise errors.OpPrereqError("ip=pool, but no network found",
3638                                        errors.ECODE_INVAL)
3639         # Reserve new IP if in the new network if any
3640         elif new_net_uuid:
3641           try:
3642             self.cfg.ReserveIp(new_net_uuid, new_ip, self.proc.GetECId())
3643             self.LogInfo("Reserving IP %s in network %s",
3644                          new_ip, new_net_obj.name)
3645           except errors.ReservationError:
3646             raise errors.OpPrereqError("IP %s not available in network %s" %
3647                                        (new_ip, new_net_obj.name),
3648                                        errors.ECODE_NOTUNIQUE)
3649         # new network is None so check if new IP is a conflicting IP
3650         elif self.op.conflicts_check:
3651           _CheckForConflictingIp(self, new_ip, pnode)
3652
3653       # release old IP if old network is not None
3654       if old_ip and old_net_uuid:
3655         try:
3656           self.cfg.ReleaseIp(old_net_uuid, old_ip, self.proc.GetECId())
3657         except errors.AddressPoolError:
3658           logging.warning("Release IP %s not contained in network %s",
3659                           old_ip, old_net_obj.name)
3660
3661     # there are no changes in (ip, network) tuple and old network is not None
3662     elif (old_net_uuid is not None and
3663           (req_link is not None or req_mode is not None)):
3664       raise errors.OpPrereqError("Not allowed to change link or mode of"
3665                                  " a NIC that is connected to a network",
3666                                  errors.ECODE_INVAL)
3667
3668     private.params = new_params
3669     private.filled = new_filled_params
3670
3671   def _PreCheckDiskTemplate(self, pnode_info):
3672     """CheckPrereq checks related to a new disk template."""
3673     # Arguments are passed to avoid configuration lookups
3674     instance = self.instance
3675     pnode = instance.primary_node
3676     cluster = self.cluster
3677     if instance.disk_template == self.op.disk_template:
3678       raise errors.OpPrereqError("Instance already has disk template %s" %
3679                                  instance.disk_template, errors.ECODE_INVAL)
3680
3681     if (instance.disk_template,
3682         self.op.disk_template) not in self._DISK_CONVERSIONS:
3683       raise errors.OpPrereqError("Unsupported disk template conversion from"
3684                                  " %s to %s" % (instance.disk_template,
3685                                                 self.op.disk_template),
3686                                  errors.ECODE_INVAL)
3687     _CheckInstanceState(self, instance, INSTANCE_DOWN,
3688                         msg="cannot change disk template")
3689     if self.op.disk_template in constants.DTS_INT_MIRROR:
3690       if self.op.remote_node == pnode:
3691         raise errors.OpPrereqError("Given new secondary node %s is the same"
3692                                    " as the primary node of the instance" %
3693                                    self.op.remote_node, errors.ECODE_STATE)
3694       _CheckNodeOnline(self, self.op.remote_node)
3695       _CheckNodeNotDrained(self, self.op.remote_node)
3696       # FIXME: here we assume that the old instance type is DT_PLAIN
3697       assert instance.disk_template == constants.DT_PLAIN
3698       disks = [{constants.IDISK_SIZE: d.size,
3699                 constants.IDISK_VG: d.logical_id[0]}
3700                for d in instance.disks]
3701       required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
3702       _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
3703
3704       snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
3705       snode_group = self.cfg.GetNodeGroup(snode_info.group)
3706       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3707                                                               snode_group)
3708       _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, self.cfg,
3709                               ignore=self.op.ignore_ipolicy)
3710       if pnode_info.group != snode_info.group:
3711         self.LogWarning("The primary and secondary nodes are in two"
3712                         " different node groups; the disk parameters"
3713                         " from the first disk's node group will be"
3714                         " used")
3715
3716     if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
3717       # Make sure none of the nodes require exclusive storage
3718       nodes = [pnode_info]
3719       if self.op.disk_template in constants.DTS_INT_MIRROR:
3720         assert snode_info
3721         nodes.append(snode_info)
3722       has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
3723       if compat.any(map(has_es, nodes)):
3724         errmsg = ("Cannot convert disk template from %s to %s when exclusive"
3725                   " storage is enabled" % (instance.disk_template,
3726                                            self.op.disk_template))
3727         raise errors.OpPrereqError(errmsg, errors.ECODE_STATE)
3728
3729   def CheckPrereq(self):
3730     """Check prerequisites.
3731
3732     This only checks the instance list against the existing names.
3733
3734     """
3735     assert self.op.instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3736     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3737
3738     cluster = self.cluster = self.cfg.GetClusterInfo()
3739     assert self.instance is not None, \
3740       "Cannot retrieve locked instance %s" % self.op.instance_name
3741
3742     pnode = instance.primary_node
3743
3744     self.warn = []
3745
3746     if (self.op.pnode is not None and self.op.pnode != pnode and
3747         not self.op.force):
3748       # verify that the instance is not up
3749       instance_info = self.rpc.call_instance_info(pnode, instance.name,
3750                                                   instance.hypervisor)
3751       if instance_info.fail_msg:
3752         self.warn.append("Can't get instance runtime information: %s" %
3753                          instance_info.fail_msg)
3754       elif instance_info.payload:
3755         raise errors.OpPrereqError("Instance is still running on %s" % pnode,
3756                                    errors.ECODE_STATE)
3757
3758     assert pnode in self.owned_locks(locking.LEVEL_NODE)
3759     nodelist = list(instance.all_nodes)
3760     pnode_info = self.cfg.GetNodeInfo(pnode)
3761     self.diskparams = self.cfg.GetInstanceDiskParams(instance)
3762
3763     #_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
3764     assert pnode_info.group in self.owned_locks(locking.LEVEL_NODEGROUP)
3765     group_info = self.cfg.GetNodeGroup(pnode_info.group)
3766
3767     # dictionary with instance information after the modification
3768     ispec = {}
3769
3770     # Check disk modifications. This is done here and not in CheckArguments
3771     # (as with NICs), because we need to know the instance's disk template
3772     if instance.disk_template == constants.DT_EXT:
3773       self._CheckMods("disk", self.op.disks, {},
3774                       self._VerifyDiskModification)
3775     else:
3776       self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
3777                       self._VerifyDiskModification)
3778
3779     # Prepare disk/NIC modifications
3780     self.diskmod = PrepareContainerMods(self.op.disks, None)
3781     self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
3782
3783     # Check the validity of the `provider' parameter
3784     if instance.disk_template in constants.DT_EXT:
3785       for mod in self.diskmod:
3786         ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3787         if mod[0] == constants.DDM_ADD:
3788           if ext_provider is None:
3789             raise errors.OpPrereqError("Instance template is '%s' and parameter"
3790                                        " '%s' missing, during disk add" %
3791                                        (constants.DT_EXT,
3792                                         constants.IDISK_PROVIDER),
3793                                        errors.ECODE_NOENT)
3794         elif mod[0] == constants.DDM_MODIFY:
3795           if ext_provider:
3796             raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
3797                                        " modification" %
3798                                        constants.IDISK_PROVIDER,
3799                                        errors.ECODE_INVAL)
3800     else:
3801       for mod in self.diskmod:
3802         ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
3803         if ext_provider is not None:
3804           raise errors.OpPrereqError("Parameter '%s' is only valid for"
3805                                      " instances of type '%s'" %
3806                                      (constants.IDISK_PROVIDER,
3807                                       constants.DT_EXT),
3808                                      errors.ECODE_INVAL)
3809
3810     # OS change
3811     if self.op.os_name and not self.op.force:
3812       _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
3813                       self.op.force_variant)
3814       instance_os = self.op.os_name
3815     else:
3816       instance_os = instance.os
3817
3818     assert not (self.op.disk_template and self.op.disks), \
3819       "Can't modify disk template and apply disk changes at the same time"
3820
3821     if self.op.disk_template:
3822       self._PreCheckDiskTemplate(pnode_info)
3823
3824     # hvparams processing
3825     if self.op.hvparams:
3826       hv_type = instance.hypervisor
3827       i_hvdict = _GetUpdatedParams(instance.hvparams, self.op.hvparams)
3828       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
3829       hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict)
3830
3831       # local check
3832       hypervisor.GetHypervisorClass(hv_type).CheckParameterSyntax(hv_new)
3833       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
3834       self.hv_proposed = self.hv_new = hv_new # the new actual values
3835       self.hv_inst = i_hvdict # the new dict (without defaults)
3836     else:
3837       self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
3838                                               instance.hvparams)
3839       self.hv_new = self.hv_inst = {}
3840
3841     # beparams processing
3842     if self.op.beparams:
3843       i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
3844                                    use_none=True)
3845       objects.UpgradeBeParams(i_bedict)
3846       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
3847       be_new = cluster.SimpleFillBE(i_bedict)
3848       self.be_proposed = self.be_new = be_new # the new actual values
3849       self.be_inst = i_bedict # the new dict (without defaults)
3850     else:
3851       self.be_new = self.be_inst = {}
3852       self.be_proposed = cluster.SimpleFillBE(instance.beparams)
3853     be_old = cluster.FillBE(instance)
3854
3855     # CPU param validation -- checking every time a parameter is
3856     # changed to cover all cases where either CPU mask or vcpus have
3857     # changed
3858     if (constants.BE_VCPUS in self.be_proposed and
3859         constants.HV_CPU_MASK in self.hv_proposed):
3860       cpu_list = \
3861         utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
3862       # Verify mask is consistent with number of vCPUs. Can skip this
3863       # test if only 1 entry in the CPU mask, which means same mask
3864       # is applied to all vCPUs.
3865       if (len(cpu_list) > 1 and
3866           len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
3867         raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
3868                                    " CPU mask [%s]" %
3869                                    (self.be_proposed[constants.BE_VCPUS],
3870                                     self.hv_proposed[constants.HV_CPU_MASK]),
3871                                    errors.ECODE_INVAL)
3872
3873       # Only perform this test if a new CPU mask is given
3874       if constants.HV_CPU_MASK in self.hv_new:
3875         # Calculate the largest CPU number requested
3876         max_requested_cpu = max(map(max, cpu_list))
3877         # Check that all of the instance's nodes have enough physical CPUs to
3878         # satisfy the requested CPU mask
3879         _CheckNodesPhysicalCPUs(self, instance.all_nodes,
3880                                 max_requested_cpu + 1, instance.hypervisor)
3881
3882     # osparams processing
3883     if self.op.osparams:
3884       i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
3885       _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
3886       self.os_inst = i_osdict # the new dict (without defaults)
3887     else:
3888       self.os_inst = {}
3889
3890     #TODO(dynmem): do the appropriate check involving MINMEM
3891     if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
3892         be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
3893       mem_check_list = [pnode]
3894       if be_new[constants.BE_AUTO_BALANCE]:
3895         # either we changed auto_balance to yes or it was from before
3896         mem_check_list.extend(instance.secondary_nodes)
3897       instance_info = self.rpc.call_instance_info(pnode, instance.name,
3898                                                   instance.hypervisor)
3899       nodeinfo = self.rpc.call_node_info(mem_check_list, None,
3900                                          [instance.hypervisor], False)
3901       pninfo = nodeinfo[pnode]
3902       msg = pninfo.fail_msg
3903       if msg:
3904         # Assume the primary node is unreachable and go ahead
3905         self.warn.append("Can't get info from primary node %s: %s" %
3906                          (pnode, msg))
3907       else:
3908         (_, _, (pnhvinfo, )) = pninfo.payload
3909         if not isinstance(pnhvinfo.get("memory_free", None), int):
3910           self.warn.append("Node data from primary node %s doesn't contain"
3911                            " free memory information" % pnode)
3912         elif instance_info.fail_msg:
3913           self.warn.append("Can't get instance runtime information: %s" %
3914                            instance_info.fail_msg)
3915         else:
3916           if instance_info.payload:
3917             current_mem = int(instance_info.payload["memory"])
3918           else:
3919             # Assume instance not running
3920             # (there is a slight race condition here, but it's not very
3921             # probable, and we have no other way to check)
3922             # TODO: Describe race condition
3923             current_mem = 0
3924           #TODO(dynmem): do the appropriate check involving MINMEM
3925           miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
3926                       pnhvinfo["memory_free"])
3927           if miss_mem > 0:
3928             raise errors.OpPrereqError("This change will prevent the instance"
3929                                        " from starting, due to %d MB of memory"
3930                                        " missing on its primary node" %
3931                                        miss_mem, errors.ECODE_NORES)
3932
3933       if be_new[constants.BE_AUTO_BALANCE]:
3934         for node, nres in nodeinfo.items():
3935           if node not in instance.secondary_nodes:
3936             continue
3937           nres.Raise("Can't get info from secondary node %s" % node,
3938                      prereq=True, ecode=errors.ECODE_STATE)
3939           (_, _, (nhvinfo, )) = nres.payload
3940           if not isinstance(nhvinfo.get("memory_free", None), int):
3941             raise errors.OpPrereqError("Secondary node %s didn't return free"
3942                                        " memory information" % node,
3943                                        errors.ECODE_STATE)
3944           #TODO(dynmem): do the appropriate check involving MINMEM
3945           elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
3946             raise errors.OpPrereqError("This change will prevent the instance"
3947                                        " from failover to its secondary node"
3948                                        " %s, due to not enough memory" % node,
3949                                        errors.ECODE_STATE)
3950
3951     if self.op.runtime_mem:
3952       remote_info = self.rpc.call_instance_info(instance.primary_node,
3953                                                 instance.name,
3954                                                 instance.hypervisor)
3955       remote_info.Raise("Error checking node %s" % instance.primary_node)
3956       if not remote_info.payload: # not running already
3957         raise errors.OpPrereqError("Instance %s is not running" %
3958                                    instance.name, errors.ECODE_STATE)
3959
3960       current_memory = remote_info.payload["memory"]
3961       if (not self.op.force and
3962            (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
3963             self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
3964         raise errors.OpPrereqError("Instance %s must have memory between %d"
3965                                    " and %d MB of memory unless --force is"
3966                                    " given" %
3967                                    (instance.name,
3968                                     self.be_proposed[constants.BE_MINMEM],
3969                                     self.be_proposed[constants.BE_MAXMEM]),
3970                                    errors.ECODE_INVAL)
3971
3972       delta = self.op.runtime_mem - current_memory
3973       if delta > 0:
3974         _CheckNodeFreeMemory(self, instance.primary_node,
3975                              "ballooning memory for instance %s" %
3976                              instance.name, delta, instance.hypervisor)
3977
3978     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
3979       raise errors.OpPrereqError("Disk operations not supported for"
3980                                  " diskless instances", errors.ECODE_INVAL)
3981
3982     def _PrepareNicCreate(_, params, private):
3983       self._PrepareNicModification(params, private, None, None,
3984                                    {}, cluster, pnode)
3985       return (None, None)
3986
3987     def _PrepareNicMod(_, nic, params, private):
3988       self._PrepareNicModification(params, private, nic.ip, nic.network,
3989                                    nic.nicparams, cluster, pnode)
3990       return None
3991
3992     def _PrepareNicRemove(_, params, __):
3993       ip = params.ip
3994       net = params.network
3995       if net is not None and ip is not None:
3996         self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
3997
3998     # Verify NIC changes (operating on copy)
3999     nics = instance.nics[:]
4000     ApplyContainerMods("NIC", nics, None, self.nicmod,
4001                        _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
4002     if len(nics) > constants.MAX_NICS:
4003       raise errors.OpPrereqError("Instance has too many network interfaces"
4004                                  " (%d), cannot add more" % constants.MAX_NICS,
4005                                  errors.ECODE_STATE)
4006
4007     def _PrepareDiskMod(_, disk, params, __):
4008       disk.name = params.get(constants.IDISK_NAME, None)
4009
4010     # Verify disk changes (operating on a copy)
4011     disks = copy.deepcopy(instance.disks)
4012     ApplyContainerMods("disk", disks, None, self.diskmod, None, _PrepareDiskMod,
4013                        None)
4014     utils.ValidateDeviceNames("disk", disks)
4015     if len(disks) > constants.MAX_DISKS:
4016       raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
4017                                  " more" % constants.MAX_DISKS,
4018                                  errors.ECODE_STATE)
4019     disk_sizes = [disk.size for disk in instance.disks]
4020     disk_sizes.extend(params["size"] for (op, idx, params, private) in
4021                       self.diskmod if op == constants.DDM_ADD)
4022     ispec[constants.ISPEC_DISK_COUNT] = len(disk_sizes)
4023     ispec[constants.ISPEC_DISK_SIZE] = disk_sizes
4024
4025     if self.op.offline is not None and self.op.offline:
4026       _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE,
4027                           msg="can't change to offline")
4028
4029     # Pre-compute NIC changes (necessary to use result in hooks)
4030     self._nic_chgdesc = []
4031     if self.nicmod:
4032       # Operate on copies as this is still in prereq
4033       nics = [nic.Copy() for nic in instance.nics]
4034       ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
4035                          self._CreateNewNic, self._ApplyNicMods, None)
4036       # Verify that NIC names are unique and valid
4037       utils.ValidateDeviceNames("NIC", nics)
4038       self._new_nics = nics
4039       ispec[constants.ISPEC_NIC_COUNT] = len(self._new_nics)
4040     else:
4041       self._new_nics = None
4042       ispec[constants.ISPEC_NIC_COUNT] = len(instance.nics)
4043
4044     if not self.op.ignore_ipolicy:
4045       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4046                                                               group_info)
4047
4048       # Fill ispec with backend parameters
4049       ispec[constants.ISPEC_SPINDLE_USE] = \
4050         self.be_new.get(constants.BE_SPINDLE_USE, None)
4051       ispec[constants.ISPEC_CPU_COUNT] = self.be_new.get(constants.BE_VCPUS,
4052                                                          None)
4053
4054       # Copy ispec to verify parameters with min/max values separately
4055       if self.op.disk_template:
4056         new_disk_template = self.op.disk_template
4057       else:
4058         new_disk_template = instance.disk_template
4059       ispec_max = ispec.copy()
4060       ispec_max[constants.ISPEC_MEM_SIZE] = \
4061         self.be_new.get(constants.BE_MAXMEM, None)
4062       res_max = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_max,
4063                                                      new_disk_template)
4064       ispec_min = ispec.copy()
4065       ispec_min[constants.ISPEC_MEM_SIZE] = \
4066         self.be_new.get(constants.BE_MINMEM, None)
4067       res_min = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec_min,
4068                                                      new_disk_template)
4069
4070       if (res_max or res_min):
4071         # FIXME: Improve error message by including information about whether
4072         # the upper or lower limit of the parameter fails the ipolicy.
4073         msg = ("Instance allocation to group %s (%s) violates policy: %s" %
4074                (group_info, group_info.name,
4075                 utils.CommaJoin(set(res_max + res_min))))
4076         raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
4077
4078   def _ConvertPlainToDrbd(self, feedback_fn):
4079     """Converts an instance from plain to drbd.
4080
4081     """
4082     feedback_fn("Converting template to drbd")
4083     instance = self.instance
4084     pnode = instance.primary_node
4085     snode = self.op.remote_node
4086
4087     assert instance.disk_template == constants.DT_PLAIN
4088
4089     # create a fake disk info for _GenerateDiskTemplate
4090     disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
4091                   constants.IDISK_VG: d.logical_id[0],
4092                   constants.IDISK_NAME: d.name}
4093                  for d in instance.disks]
4094     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
4095                                       instance.name, pnode, [snode],
4096                                       disk_info, None, None, 0, feedback_fn,
4097                                       self.diskparams)
4098     anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
4099                                         self.diskparams)
4100     p_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, pnode)
4101     s_excl_stor = _IsExclusiveStorageEnabledNodeName(self.cfg, snode)
4102     info = _GetInstanceInfoText(instance)
4103     feedback_fn("Creating additional volumes...")
4104     # first, create the missing data and meta devices
4105     for disk in anno_disks:
4106       # unfortunately this is... not too nice
4107       _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
4108                             info, True, p_excl_stor)
4109       for child in disk.children:
4110         _CreateSingleBlockDev(self, snode, instance, child, info, True,
4111                               s_excl_stor)
4112     # at this stage, all new LVs have been created, we can rename the
4113     # old ones
4114     feedback_fn("Renaming original volumes...")
4115     rename_list = [(o, n.children[0].logical_id)
4116                    for (o, n) in zip(instance.disks, new_disks)]
4117     result = self.rpc.call_blockdev_rename(pnode, rename_list)
4118     result.Raise("Failed to rename original LVs")
4119
4120     feedback_fn("Initializing DRBD devices...")
4121     # all child devices are in place, we can now create the DRBD devices
4122     try:
4123       for disk in anno_disks:
4124         for (node, excl_stor) in [(pnode, p_excl_stor), (snode, s_excl_stor)]:
4125           f_create = node == pnode
4126           _CreateSingleBlockDev(self, node, instance, disk, info, f_create,
4127                                 excl_stor)
4128     except errors.GenericError, e:
4129       feedback_fn("Initializing of DRBD devices failed;"
4130                   " renaming back original volumes...")
4131       for disk in new_disks:
4132         self.cfg.SetDiskID(disk, pnode)
4133       rename_back_list = [(n.children[0], o.logical_id)
4134                           for (n, o) in zip(new_disks, instance.disks)]
4135       result = self.rpc.call_blockdev_rename(pnode, rename_back_list)
4136       result.Raise("Failed to rename LVs back after error %s" % str(e))
4137       raise
4138
4139     # at this point, the instance has been modified
4140     instance.disk_template = constants.DT_DRBD8
4141     instance.disks = new_disks
4142     self.cfg.Update(instance, feedback_fn)
4143
4144     # Release node locks while waiting for sync
4145     _ReleaseLocks(self, locking.LEVEL_NODE)
4146
4147     # disks are created, waiting for sync
4148     disk_abort = not _WaitForSync(self, instance,
4149                                   oneshot=not self.op.wait_for_sync)
4150     if disk_abort:
4151       raise errors.OpExecError("There are some degraded disks for"
4152                                " this instance, please cleanup manually")
4153
4154     # Node resource locks will be released by caller
4155
4156   def _ConvertDrbdToPlain(self, feedback_fn):
4157     """Converts an instance from drbd to plain.
4158
4159     """
4160     instance = self.instance
4161
4162     assert len(instance.secondary_nodes) == 1
4163     assert instance.disk_template == constants.DT_DRBD8
4164
4165     pnode = instance.primary_node
4166     snode = instance.secondary_nodes[0]
4167     feedback_fn("Converting template to plain")
4168
4169     old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
4170     new_disks = [d.children[0] for d in instance.disks]
4171
4172     # copy over size, mode and name
4173     for parent, child in zip(old_disks, new_disks):
4174       child.size = parent.size
4175       child.mode = parent.mode
4176       child.name = parent.name
4177
4178     # this is a DRBD disk, return its port to the pool
4179     # NOTE: this must be done right before the call to cfg.Update!
4180     for disk in old_disks:
4181       tcp_port = disk.logical_id[2]
4182       self.cfg.AddTcpUdpPort(tcp_port)
4183
4184     # update instance structure
4185     instance.disks = new_disks
4186     instance.disk_template = constants.DT_PLAIN
4187     _UpdateIvNames(0, instance.disks)
4188     self.cfg.Update(instance, feedback_fn)
4189
4190     # Release locks in case removing disks takes a while
4191     _ReleaseLocks(self, locking.LEVEL_NODE)
4192
4193     feedback_fn("Removing volumes on the secondary node...")
4194     for disk in old_disks:
4195       self.cfg.SetDiskID(disk, snode)
4196       msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
4197       if msg:
4198         self.LogWarning("Could not remove block device %s on node %s,"
4199                         " continuing anyway: %s", disk.iv_name, snode, msg)
4200
4201     feedback_fn("Removing unneeded volumes on the primary node...")
4202     for idx, disk in enumerate(old_disks):
4203       meta = disk.children[1]
4204       self.cfg.SetDiskID(meta, pnode)
4205       msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
4206       if msg:
4207         self.LogWarning("Could not remove metadata for disk %d on node %s,"
4208                         " continuing anyway: %s", idx, pnode, msg)
4209
4210   def _CreateNewDisk(self, idx, params, _):
4211     """Creates a new disk.
4212
4213     """
4214     instance = self.instance
4215
4216     # add a new disk
4217     if instance.disk_template in constants.DTS_FILEBASED:
4218       (file_driver, file_path) = instance.disks[0].logical_id
4219       file_path = os.path.dirname(file_path)
4220     else:
4221       file_driver = file_path = None
4222
4223     disk = \
4224       _GenerateDiskTemplate(self, instance.disk_template, instance.name,
4225                             instance.primary_node, instance.secondary_nodes,
4226                             [params], file_path, file_driver, idx,
4227                             self.Log, self.diskparams)[0]
4228
4229     info = _GetInstanceInfoText(instance)
4230
4231     logging.info("Creating volume %s for instance %s",
4232                  disk.iv_name, instance.name)
4233     # Note: this needs to be kept in sync with _CreateDisks
4234     #HARDCODE
4235     for node in instance.all_nodes:
4236       f_create = (node == instance.primary_node)
4237       try:
4238         _CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
4239       except errors.OpExecError, err:
4240         self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
4241                         disk.iv_name, disk, node, err)
4242
4243     if self.cluster.prealloc_wipe_disks:
4244       # Wipe new disk
4245       _WipeDisks(self, instance,
4246                  disks=[(idx, disk, 0)])
4247
4248     return (disk, [
4249       ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
4250       ])
4251
4252   @staticmethod
4253   def _ModifyDisk(idx, disk, params, _):
4254     """Modifies a disk.
4255
4256     """
4257     changes = []
4258     mode = params.get(constants.IDISK_MODE, None)
4259     if mode:
4260       disk.mode = mode
4261       changes.append(("disk.mode/%d" % idx, disk.mode))
4262
4263     name = params.get(constants.IDISK_NAME, None)
4264     disk.name = name
4265     changes.append(("disk.name/%d" % idx, disk.name))
4266
4267     return changes
4268
4269   def _RemoveDisk(self, idx, root, _):
4270     """Removes a disk.
4271
4272     """
4273     (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
4274     for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
4275       self.cfg.SetDiskID(disk, node)
4276       msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
4277       if msg:
4278         self.LogWarning("Could not remove disk/%d on node '%s': %s,"
4279                         " continuing anyway", idx, node, msg)
4280
4281     # if this is a DRBD disk, return its port to the pool
4282     if root.dev_type in constants.LDS_DRBD:
4283       self.cfg.AddTcpUdpPort(root.logical_id[2])
4284
4285   def _CreateNewNic(self, idx, params, private):
4286     """Creates data structure for a new network interface.
4287
4288     """
4289     mac = params[constants.INIC_MAC]
4290     ip = params.get(constants.INIC_IP, None)
4291     net = params.get(constants.INIC_NETWORK, None)
4292     name = params.get(constants.INIC_NAME, None)
4293     net_uuid = self.cfg.LookupNetwork(net)
4294     #TODO: not private.filled?? can a nic have no nicparams??
4295     nicparams = private.filled
4296     nobj = objects.NIC(mac=mac, ip=ip, network=net_uuid, name=name,
4297                        nicparams=nicparams)
4298     nobj.uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
4299
4300     return (nobj, [
4301       ("nic.%d" % idx,
4302        "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
4303        (mac, ip, private.filled[constants.NIC_MODE],
4304        private.filled[constants.NIC_LINK],
4305        net)),
4306       ])
4307
4308   def _ApplyNicMods(self, idx, nic, params, private):
4309     """Modifies a network interface.
4310
4311     """
4312     changes = []
4313
4314     for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NAME]:
4315       if key in params:
4316         changes.append(("nic.%s/%d" % (key, idx), params[key]))
4317         setattr(nic, key, params[key])
4318
4319     new_net = params.get(constants.INIC_NETWORK, nic.network)
4320     new_net_uuid = self.cfg.LookupNetwork(new_net)
4321     if new_net_uuid != nic.network:
4322       changes.append(("nic.network/%d" % idx, new_net))
4323       nic.network = new_net_uuid
4324
4325     if private.filled:
4326       nic.nicparams = private.filled
4327
4328       for (key, val) in nic.nicparams.items():
4329         changes.append(("nic.%s/%d" % (key, idx), val))
4330
4331     return changes
4332
4333   def Exec(self, feedback_fn):
4334     """Modifies an instance.
4335
4336     All parameters take effect only at the next restart of the instance.
4337
4338     """
4339     # Process here the warnings from CheckPrereq, as we don't have a
4340     # feedback_fn there.
4341     # TODO: Replace with self.LogWarning
4342     for warn in self.warn:
4343       feedback_fn("WARNING: %s" % warn)
4344
4345     assert ((self.op.disk_template is None) ^
4346             bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
4347       "Not owning any node resource locks"
4348
4349     result = []
4350     instance = self.instance
4351
4352     # New primary node
4353     if self.op.pnode:
4354       instance.primary_node = self.op.pnode
4355
4356     # runtime memory
4357     if self.op.runtime_mem:
4358       rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
4359                                                      instance,
4360                                                      self.op.runtime_mem)
4361       rpcres.Raise("Cannot modify instance runtime memory")
4362       result.append(("runtime_memory", self.op.runtime_mem))
4363
4364     # Apply disk changes
4365     ApplyContainerMods("disk", instance.disks, result, self.diskmod,
4366                        self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
4367     _UpdateIvNames(0, instance.disks)
4368
4369     if self.op.disk_template:
4370       if __debug__:
4371         check_nodes = set(instance.all_nodes)
4372         if self.op.remote_node:
4373           check_nodes.add(self.op.remote_node)
4374         for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
4375           owned = self.owned_locks(level)
4376           assert not (check_nodes - owned), \
4377             ("Not owning the correct locks, owning %r, expected at least %r" %
4378              (owned, check_nodes))
4379
4380       r_shut = _ShutdownInstanceDisks(self, instance)
4381       if not r_shut:
4382         raise errors.OpExecError("Cannot shutdown instance disks, unable to"
4383                                  " proceed with disk template conversion")
4384       mode = (instance.disk_template, self.op.disk_template)
4385       try:
4386         self._DISK_CONVERSIONS[mode](self, feedback_fn)
4387       except:
4388         self.cfg.ReleaseDRBDMinors(instance.name)
4389         raise
4390       result.append(("disk_template", self.op.disk_template))
4391
4392       assert instance.disk_template == self.op.disk_template, \
4393         ("Expected disk template '%s', found '%s'" %
4394          (self.op.disk_template, instance.disk_template))
4395
4396     # Release node and resource locks if there are any (they might already have
4397     # been released during disk conversion)
4398     _ReleaseLocks(self, locking.LEVEL_NODE)
4399     _ReleaseLocks(self, locking.LEVEL_NODE_RES)
4400
4401     # Apply NIC changes
4402     if self._new_nics is not None:
4403       instance.nics = self._new_nics
4404       result.extend(self._nic_chgdesc)
4405
4406     # hvparams changes
4407     if self.op.hvparams:
4408       instance.hvparams = self.hv_inst
4409       for key, val in self.op.hvparams.iteritems():
4410         result.append(("hv/%s" % key, val))
4411
4412     # beparams changes
4413     if self.op.beparams:
4414       instance.beparams = self.be_inst
4415       for key, val in self.op.beparams.iteritems():
4416         result.append(("be/%s" % key, val))
4417
4418     # OS change
4419     if self.op.os_name:
4420       instance.os = self.op.os_name
4421
4422     # osparams changes
4423     if self.op.osparams:
4424       instance.osparams = self.os_inst
4425       for key, val in self.op.osparams.iteritems():
4426         result.append(("os/%s" % key, val))
4427
4428     if self.op.offline is None:
4429       # Ignore
4430       pass
4431     elif self.op.offline:
4432       # Mark instance as offline
4433       self.cfg.MarkInstanceOffline(instance.name)
4434       result.append(("admin_state", constants.ADMINST_OFFLINE))
4435     else:
4436       # Mark instance as online, but stopped
4437       self.cfg.MarkInstanceDown(instance.name)
4438       result.append(("admin_state", constants.ADMINST_DOWN))
4439
4440     self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
4441
4442     assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
4443                 self.owned_locks(locking.LEVEL_NODE)), \
4444       "All node locks should have been released by now"
4445
4446     return result
4447
4448   _DISK_CONVERSIONS = {
4449     (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
4450     (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
4451     }
4452
4453
4454 class LUInstanceChangeGroup(LogicalUnit):
4455   HPATH = "instance-change-group"
4456   HTYPE = constants.HTYPE_INSTANCE
4457   REQ_BGL = False
4458
4459   def ExpandNames(self):
4460     self.share_locks = _ShareAll()
4461
4462     self.needed_locks = {
4463       locking.LEVEL_NODEGROUP: [],
4464       locking.LEVEL_NODE: [],
4465       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
4466       }
4467
4468     self._ExpandAndLockInstance()
4469
4470     if self.op.target_groups:
4471       self.req_target_uuids = map(self.cfg.LookupNodeGroup,
4472                                   self.op.target_groups)
4473     else:
4474       self.req_target_uuids = None
4475
4476     self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator)
4477
4478   def DeclareLocks(self, level):
4479     if level == locking.LEVEL_NODEGROUP:
4480       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
4481
4482       if self.req_target_uuids:
4483         lock_groups = set(self.req_target_uuids)
4484
4485         # Lock all groups used by instance optimistically; this requires going
4486         # via the node before it's locked, requiring verification later on
4487         instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
4488         lock_groups.update(instance_groups)
4489       else:
4490         # No target groups, need to lock all of them
4491         lock_groups = locking.ALL_SET
4492
4493       self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
4494
4495     elif level == locking.LEVEL_NODE:
4496       if self.req_target_uuids:
4497         # Lock all nodes used by instances
4498         self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4499         self._LockInstancesNodes()
4500
4501         # Lock all nodes in all potential target groups
4502         lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) -
4503                        self.cfg.GetInstanceNodeGroups(self.op.instance_name))
4504         member_nodes = [node_name
4505                         for group in lock_groups
4506                         for node_name in self.cfg.GetNodeGroup(group).members]
4507         self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
4508       else:
4509         # Lock all nodes as all groups are potential targets
4510         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4511
4512   def CheckPrereq(self):
4513     owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
4514     owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
4515     owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
4516
4517     assert (self.req_target_uuids is None or
4518             owned_groups.issuperset(self.req_target_uuids))
4519     assert owned_instances == set([self.op.instance_name])
4520
4521     # Get instance information
4522     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4523
4524     # Check if node groups for locked instance are still correct
4525     assert owned_nodes.issuperset(self.instance.all_nodes), \
4526       ("Instance %s's nodes changed while we kept the lock" %
4527        self.op.instance_name)
4528
4529     inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name,
4530                                            owned_groups)
4531
4532     if self.req_target_uuids:
4533       # User requested specific target groups
4534       self.target_uuids = frozenset(self.req_target_uuids)
4535     else:
4536       # All groups except those used by the instance are potential targets
4537       self.target_uuids = owned_groups - inst_groups
4538
4539     conflicting_groups = self.target_uuids & inst_groups
4540     if conflicting_groups:
4541       raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are"
4542                                  " used by the instance '%s'" %
4543                                  (utils.CommaJoin(conflicting_groups),
4544                                   self.op.instance_name),
4545                                  errors.ECODE_INVAL)
4546
4547     if not self.target_uuids:
4548       raise errors.OpPrereqError("There are no possible target groups",
4549                                  errors.ECODE_INVAL)
4550
4551   def BuildHooksEnv(self):
4552     """Build hooks env.
4553
4554     """
4555     assert self.target_uuids
4556
4557     env = {
4558       "TARGET_GROUPS": " ".join(self.target_uuids),
4559       }
4560
4561     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4562
4563     return env
4564
4565   def BuildHooksNodes(self):
4566     """Build hooks nodes.
4567
4568     """
4569     mn = self.cfg.GetMasterNode()
4570     return ([mn], [mn])
4571
4572   def Exec(self, feedback_fn):
4573     instances = list(self.owned_locks(locking.LEVEL_INSTANCE))
4574
4575     assert instances == [self.op.instance_name], "Instance not locked"
4576
4577     req = iallocator.IAReqGroupChange(instances=instances,
4578                                       target_groups=list(self.target_uuids))
4579     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4580
4581     ial.Run(self.op.iallocator)
4582
4583     if not ial.success:
4584       raise errors.OpPrereqError("Can't compute solution for changing group of"
4585                                  " instance '%s' using iallocator '%s': %s" %
4586                                  (self.op.instance_name, self.op.iallocator,
4587                                   ial.info), errors.ECODE_NORES)
4588
4589     jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
4590
4591     self.LogInfo("Iallocator returned %s job(s) for changing group of"
4592                  " instance '%s'", len(jobs), self.op.instance_name)
4593
4594     return ResultWithJobs(jobs)
4595
4596
4597 class TLMigrateInstance(Tasklet):
4598   """Tasklet class for instance migration.
4599
4600   @type live: boolean
4601   @ivar live: whether the migration will be done live or non-live;
4602       this variable is initalized only after CheckPrereq has run
4603   @type cleanup: boolean
4604   @ivar cleanup: Wheater we cleanup from a failed migration
4605   @type iallocator: string
4606   @ivar iallocator: The iallocator used to determine target_node
4607   @type target_node: string
4608   @ivar target_node: If given, the target_node to reallocate the instance to
4609   @type failover: boolean
4610   @ivar failover: Whether operation results in failover or migration
4611   @type fallback: boolean
4612   @ivar fallback: Whether fallback to failover is allowed if migration not
4613                   possible
4614   @type ignore_consistency: boolean
4615   @ivar ignore_consistency: Wheter we should ignore consistency between source
4616                             and target node
4617   @type shutdown_timeout: int
4618   @ivar shutdown_timeout: In case of failover timeout of the shutdown
4619   @type ignore_ipolicy: bool
4620   @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
4621
4622   """
4623
4624   # Constants
4625   _MIGRATION_POLL_INTERVAL = 1      # seconds
4626   _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
4627
4628   def __init__(self, lu, instance_name, cleanup, failover, fallback,
4629                ignore_consistency, allow_runtime_changes, shutdown_timeout,
4630                ignore_ipolicy):
4631     """Initializes this class.
4632
4633     """
4634     Tasklet.__init__(self, lu)
4635
4636     # Parameters
4637     self.instance_name = instance_name
4638     self.cleanup = cleanup
4639     self.live = False # will be overridden later
4640     self.failover = failover
4641     self.fallback = fallback
4642     self.ignore_consistency = ignore_consistency
4643     self.shutdown_timeout = shutdown_timeout
4644     self.ignore_ipolicy = ignore_ipolicy
4645     self.allow_runtime_changes = allow_runtime_changes
4646
4647   def CheckPrereq(self):
4648     """Check prerequisites.
4649
4650     This checks that the instance is in the cluster.
4651
4652     """
4653     instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
4654     instance = self.cfg.GetInstanceInfo(instance_name)
4655     assert instance is not None
4656     self.instance = instance
4657     cluster = self.cfg.GetClusterInfo()
4658
4659     if (not self.cleanup and
4660         not instance.admin_state == constants.ADMINST_UP and
4661         not self.failover and self.fallback):
4662       self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
4663                       " switching to failover")
4664       self.failover = True
4665
4666     if instance.disk_template not in constants.DTS_MIRRORED:
4667       if self.failover:
4668         text = "failovers"
4669       else:
4670         text = "migrations"
4671       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
4672                                  " %s" % (instance.disk_template, text),
4673                                  errors.ECODE_STATE)
4674
4675     if instance.disk_template in constants.DTS_EXT_MIRROR:
4676       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
4677
4678       if self.lu.op.iallocator:
4679         assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4680         self._RunAllocator()
4681       else:
4682         # We set set self.target_node as it is required by
4683         # BuildHooksEnv
4684         self.target_node = self.lu.op.target_node
4685
4686       # Check that the target node is correct in terms of instance policy
4687       nodeinfo = self.cfg.GetNodeInfo(self.target_node)
4688       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4689       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4690                                                               group_info)
4691       _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4692                               ignore=self.ignore_ipolicy)
4693
4694       # self.target_node is already populated, either directly or by the
4695       # iallocator run
4696       target_node = self.target_node
4697       if self.target_node == instance.primary_node:
4698         raise errors.OpPrereqError("Cannot migrate instance %s"
4699                                    " to its primary (%s)" %
4700                                    (instance.name, instance.primary_node),
4701                                    errors.ECODE_STATE)
4702
4703       if len(self.lu.tasklets) == 1:
4704         # It is safe to release locks only when we're the only tasklet
4705         # in the LU
4706         _ReleaseLocks(self.lu, locking.LEVEL_NODE,
4707                       keep=[instance.primary_node, self.target_node])
4708         _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
4709
4710     else:
4711       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
4712
4713       secondary_nodes = instance.secondary_nodes
4714       if not secondary_nodes:
4715         raise errors.ConfigurationError("No secondary node but using"
4716                                         " %s disk template" %
4717                                         instance.disk_template)
4718       target_node = secondary_nodes[0]
4719       if self.lu.op.iallocator or (self.lu.op.target_node and
4720                                    self.lu.op.target_node != target_node):
4721         if self.failover:
4722           text = "failed over"
4723         else:
4724           text = "migrated"
4725         raise errors.OpPrereqError("Instances with disk template %s cannot"
4726                                    " be %s to arbitrary nodes"
4727                                    " (neither an iallocator nor a target"
4728                                    " node can be passed)" %
4729                                    (instance.disk_template, text),
4730                                    errors.ECODE_INVAL)
4731       nodeinfo = self.cfg.GetNodeInfo(target_node)
4732       group_info = self.cfg.GetNodeGroup(nodeinfo.group)
4733       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
4734                                                               group_info)
4735       _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
4736                               ignore=self.ignore_ipolicy)
4737
4738     i_be = cluster.FillBE(instance)
4739
4740     # check memory requirements on the secondary node
4741     if (not self.cleanup and
4742          (not self.failover or instance.admin_state == constants.ADMINST_UP)):
4743       self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
4744                                                "migrating instance %s" %
4745                                                instance.name,
4746                                                i_be[constants.BE_MINMEM],
4747                                                instance.hypervisor)
4748     else:
4749       self.lu.LogInfo("Not checking memory on the secondary node as"
4750                       " instance will not be started")
4751
4752     # check if failover must be forced instead of migration
4753     if (not self.cleanup and not self.failover and
4754         i_be[constants.BE_ALWAYS_FAILOVER]):
4755       self.lu.LogInfo("Instance configured to always failover; fallback"
4756                       " to failover")
4757       self.failover = True
4758
4759     # check bridge existance
4760     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
4761
4762     if not self.cleanup:
4763       _CheckNodeNotDrained(self.lu, target_node)
4764       if not self.failover:
4765         result = self.rpc.call_instance_migratable(instance.primary_node,
4766                                                    instance)
4767         if result.fail_msg and self.fallback:
4768           self.lu.LogInfo("Can't migrate, instance offline, fallback to"
4769                           " failover")
4770           self.failover = True
4771         else:
4772           result.Raise("Can't migrate, please use failover",
4773                        prereq=True, ecode=errors.ECODE_STATE)
4774
4775     assert not (self.failover and self.cleanup)
4776
4777     if not self.failover:
4778       if self.lu.op.live is not None and self.lu.op.mode is not None:
4779         raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
4780                                    " parameters are accepted",
4781                                    errors.ECODE_INVAL)
4782       if self.lu.op.live is not None:
4783         if self.lu.op.live:
4784           self.lu.op.mode = constants.HT_MIGRATION_LIVE
4785         else:
4786           self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
4787         # reset the 'live' parameter to None so that repeated
4788         # invocations of CheckPrereq do not raise an exception
4789         self.lu.op.live = None
4790       elif self.lu.op.mode is None:
4791         # read the default value from the hypervisor
4792         i_hv = cluster.FillHV(self.instance, skip_globals=False)
4793         self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
4794
4795       self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
4796     else:
4797       # Failover is never live
4798       self.live = False
4799
4800     if not (self.failover or self.cleanup):
4801       remote_info = self.rpc.call_instance_info(instance.primary_node,
4802                                                 instance.name,
4803                                                 instance.hypervisor)
4804       remote_info.Raise("Error checking instance on node %s" %
4805                         instance.primary_node)
4806       instance_running = bool(remote_info.payload)
4807       if instance_running:
4808         self.current_mem = int(remote_info.payload["memory"])
4809
4810   def _RunAllocator(self):
4811     """Run the allocator based on input opcode.
4812
4813     """
4814     assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
4815
4816     # FIXME: add a self.ignore_ipolicy option
4817     req = iallocator.IAReqRelocate(name=self.instance_name,
4818                                    relocate_from=[self.instance.primary_node])
4819     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
4820
4821     ial.Run(self.lu.op.iallocator)
4822
4823     if not ial.success:
4824       raise errors.OpPrereqError("Can't compute nodes using"
4825                                  " iallocator '%s': %s" %
4826                                  (self.lu.op.iallocator, ial.info),
4827                                  errors.ECODE_NORES)
4828     self.target_node = ial.result[0]
4829     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4830                     self.instance_name, self.lu.op.iallocator,
4831                     utils.CommaJoin(ial.result))
4832
4833   def _WaitUntilSync(self):
4834     """Poll with custom rpc for disk sync.
4835
4836     This uses our own step-based rpc call.
4837
4838     """
4839     self.feedback_fn("* wait until resync is done")
4840     all_done = False
4841     while not all_done:
4842       all_done = True
4843       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4844                                             self.nodes_ip,
4845                                             (self.instance.disks,
4846                                              self.instance))
4847       min_percent = 100
4848       for node, nres in result.items():
4849         nres.Raise("Cannot resync disks on node %s" % node)
4850         node_done, node_percent = nres.payload
4851         all_done = all_done and node_done
4852         if node_percent is not None:
4853           min_percent = min(min_percent, node_percent)
4854       if not all_done:
4855         if min_percent < 100:
4856           self.feedback_fn("   - progress: %.1f%%" % min_percent)
4857         time.sleep(2)
4858
4859   def _EnsureSecondary(self, node):
4860     """Demote a node to secondary.
4861
4862     """
4863     self.feedback_fn("* switching node %s to secondary mode" % node)
4864
4865     for dev in self.instance.disks:
4866       self.cfg.SetDiskID(dev, node)
4867
4868     result = self.rpc.call_blockdev_close(node, self.instance.name,
4869                                           self.instance.disks)
4870     result.Raise("Cannot change disk to secondary on node %s" % node)
4871
4872   def _GoStandalone(self):
4873     """Disconnect from the network.
4874
4875     """
4876     self.feedback_fn("* changing into standalone mode")
4877     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4878                                                self.instance.disks)
4879     for node, nres in result.items():
4880       nres.Raise("Cannot disconnect disks node %s" % node)
4881
4882   def _GoReconnect(self, multimaster):
4883     """Reconnect to the network.
4884
4885     """
4886     if multimaster:
4887       msg = "dual-master"
4888     else:
4889       msg = "single-master"
4890     self.feedback_fn("* changing disks into %s mode" % msg)
4891     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4892                                            (self.instance.disks, self.instance),
4893                                            self.instance.name, multimaster)
4894     for node, nres in result.items():
4895       nres.Raise("Cannot change disks config on node %s" % node)
4896
4897   def _ExecCleanup(self):
4898     """Try to cleanup after a failed migration.
4899
4900     The cleanup is done by:
4901       - check that the instance is running only on one node
4902         (and update the config if needed)
4903       - change disks on its secondary node to secondary
4904       - wait until disks are fully synchronized
4905       - disconnect from the network
4906       - change disks into single-master mode
4907       - wait again until disks are fully synchronized
4908
4909     """
4910     instance = self.instance
4911     target_node = self.target_node
4912     source_node = self.source_node
4913
4914     # check running on only one node
4915     self.feedback_fn("* checking where the instance actually runs"
4916                      " (if this hangs, the hypervisor might be in"
4917                      " a bad state)")
4918     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4919     for node, result in ins_l.items():
4920       result.Raise("Can't contact node %s" % node)
4921
4922     runningon_source = instance.name in ins_l[source_node].payload
4923     runningon_target = instance.name in ins_l[target_node].payload
4924
4925     if runningon_source and runningon_target:
4926       raise errors.OpExecError("Instance seems to be running on two nodes,"
4927                                " or the hypervisor is confused; you will have"
4928                                " to ensure manually that it runs only on one"
4929                                " and restart this operation")
4930
4931     if not (runningon_source or runningon_target):
4932       raise errors.OpExecError("Instance does not seem to be running at all;"
4933                                " in this case it's safer to repair by"
4934                                " running 'gnt-instance stop' to ensure disk"
4935                                " shutdown, and then restarting it")
4936
4937     if runningon_target:
4938       # the migration has actually succeeded, we need to update the config
4939       self.feedback_fn("* instance running on secondary node (%s),"
4940                        " updating config" % target_node)
4941       instance.primary_node = target_node
4942       self.cfg.Update(instance, self.feedback_fn)
4943       demoted_node = source_node
4944     else:
4945       self.feedback_fn("* instance confirmed to be running on its"
4946                        " primary node (%s)" % source_node)
4947       demoted_node = target_node
4948
4949     if instance.disk_template in constants.DTS_INT_MIRROR:
4950       self._EnsureSecondary(demoted_node)
4951       try:
4952         self._WaitUntilSync()
4953       except errors.OpExecError:
4954         # we ignore here errors, since if the device is standalone, it
4955         # won't be able to sync
4956         pass
4957       self._GoStandalone()
4958       self._GoReconnect(False)
4959       self._WaitUntilSync()
4960
4961     self.feedback_fn("* done")
4962
4963   def _RevertDiskStatus(self):
4964     """Try to revert the disk status after a failed migration.
4965
4966     """
4967     target_node = self.target_node
4968     if self.instance.disk_template in constants.DTS_EXT_MIRROR:
4969       return
4970
4971     try:
4972       self._EnsureSecondary(target_node)
4973       self._GoStandalone()
4974       self._GoReconnect(False)
4975       self._WaitUntilSync()
4976     except errors.OpExecError, err:
4977       self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
4978                          " please try to recover the instance manually;"
4979                          " error '%s'" % str(err))
4980
4981   def _AbortMigration(self):
4982     """Call the hypervisor code to abort a started migration.
4983
4984     """
4985     instance = self.instance
4986     target_node = self.target_node
4987     source_node = self.source_node
4988     migration_info = self.migration_info
4989
4990     abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
4991                                                                  instance,
4992                                                                  migration_info,
4993                                                                  False)
4994     abort_msg = abort_result.fail_msg
4995     if abort_msg:
4996       logging.error("Aborting migration failed on target node %s: %s",
4997                     target_node, abort_msg)
4998       # Don't raise an exception here, as we stil have to try to revert the
4999       # disk status, even if this step failed.
5000
5001     abort_result = self.rpc.call_instance_finalize_migration_src(
5002       source_node, instance, False, self.live)
5003     abort_msg = abort_result.fail_msg
5004     if abort_msg:
5005       logging.error("Aborting migration failed on source node %s: %s",
5006                     source_node, abort_msg)
5007
5008   def _ExecMigration(self):
5009     """Migrate an instance.
5010
5011     The migrate is done by:
5012       - change the disks into dual-master mode
5013       - wait until disks are fully synchronized again
5014       - migrate the instance
5015       - change disks on the new secondary node (the old primary) to secondary
5016       - wait until disks are fully synchronized
5017       - change disks into single-master mode
5018
5019     """
5020     instance = self.instance
5021     target_node = self.target_node
5022     source_node = self.source_node
5023
5024     # Check for hypervisor version mismatch and warn the user.
5025     nodeinfo = self.rpc.call_node_info([source_node, target_node],
5026                                        None, [self.instance.hypervisor], False)
5027     for ninfo in nodeinfo.values():
5028       ninfo.Raise("Unable to retrieve node information from node '%s'" %
5029                   ninfo.node)
5030     (_, _, (src_info, )) = nodeinfo[source_node].payload
5031     (_, _, (dst_info, )) = nodeinfo[target_node].payload
5032
5033     if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
5034         (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
5035       src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
5036       dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
5037       if src_version != dst_version:
5038         self.feedback_fn("* warning: hypervisor version mismatch between"
5039                          " source (%s) and target (%s) node" %
5040                          (src_version, dst_version))
5041
5042     self.feedback_fn("* checking disk consistency between source and target")
5043     for (idx, dev) in enumerate(instance.disks):
5044       if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
5045         raise errors.OpExecError("Disk %s is degraded or not fully"
5046                                  " synchronized on target node,"
5047                                  " aborting migration" % idx)
5048
5049     if self.current_mem > self.tgt_free_mem:
5050       if not self.allow_runtime_changes:
5051         raise errors.OpExecError("Memory ballooning not allowed and not enough"
5052                                  " free memory to fit instance %s on target"
5053                                  " node %s (have %dMB, need %dMB)" %
5054                                  (instance.name, target_node,
5055                                   self.tgt_free_mem, self.current_mem))
5056       self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
5057       rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
5058                                                      instance,
5059                                                      self.tgt_free_mem)
5060       rpcres.Raise("Cannot modify instance runtime memory")
5061
5062     # First get the migration information from the remote node
5063     result = self.rpc.call_migration_info(source_node, instance)
5064     msg = result.fail_msg
5065     if msg:
5066       log_err = ("Failed fetching source migration information from %s: %s" %
5067                  (source_node, msg))
5068       logging.error(log_err)
5069       raise errors.OpExecError(log_err)
5070
5071     self.migration_info = migration_info = result.payload
5072
5073     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5074       # Then switch the disks to master/master mode
5075       self._EnsureSecondary(target_node)
5076       self._GoStandalone()
5077       self._GoReconnect(True)
5078       self._WaitUntilSync()
5079
5080     self.feedback_fn("* preparing %s to accept the instance" % target_node)
5081     result = self.rpc.call_accept_instance(target_node,
5082                                            instance,
5083                                            migration_info,
5084                                            self.nodes_ip[target_node])
5085
5086     msg = result.fail_msg
5087     if msg:
5088       logging.error("Instance pre-migration failed, trying to revert"
5089                     " disk status: %s", msg)
5090       self.feedback_fn("Pre-migration failed, aborting")
5091       self._AbortMigration()
5092       self._RevertDiskStatus()
5093       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5094                                (instance.name, msg))
5095
5096     self.feedback_fn("* migrating instance to %s" % target_node)
5097     result = self.rpc.call_instance_migrate(source_node, instance,
5098                                             self.nodes_ip[target_node],
5099                                             self.live)
5100     msg = result.fail_msg
5101     if msg:
5102       logging.error("Instance migration failed, trying to revert"
5103                     " disk status: %s", msg)
5104       self.feedback_fn("Migration failed, aborting")
5105       self._AbortMigration()
5106       self._RevertDiskStatus()
5107       raise errors.OpExecError("Could not migrate instance %s: %s" %
5108                                (instance.name, msg))
5109
5110     self.feedback_fn("* starting memory transfer")
5111     last_feedback = time.time()
5112     while True:
5113       result = self.rpc.call_instance_get_migration_status(source_node,
5114                                                            instance)
5115       msg = result.fail_msg
5116       ms = result.payload   # MigrationStatus instance
5117       if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
5118         logging.error("Instance migration failed, trying to revert"
5119                       " disk status: %s", msg)
5120         self.feedback_fn("Migration failed, aborting")
5121         self._AbortMigration()
5122         self._RevertDiskStatus()
5123         if not msg:
5124           msg = "hypervisor returned failure"
5125         raise errors.OpExecError("Could not migrate instance %s: %s" %
5126                                  (instance.name, msg))
5127
5128       if result.payload.status != constants.HV_MIGRATION_ACTIVE:
5129         self.feedback_fn("* memory transfer complete")
5130         break
5131
5132       if (utils.TimeoutExpired(last_feedback,
5133                                self._MIGRATION_FEEDBACK_INTERVAL) and
5134           ms.transferred_ram is not None):
5135         mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
5136         self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
5137         last_feedback = time.time()
5138
5139       time.sleep(self._MIGRATION_POLL_INTERVAL)
5140
5141     result = self.rpc.call_instance_finalize_migration_src(source_node,
5142                                                            instance,
5143                                                            True,
5144                                                            self.live)
5145     msg = result.fail_msg
5146     if msg:
5147       logging.error("Instance migration succeeded, but finalization failed"
5148                     " on the source node: %s", msg)
5149       raise errors.OpExecError("Could not finalize instance migration: %s" %
5150                                msg)
5151
5152     instance.primary_node = target_node
5153
5154     # distribute new instance config to the other nodes
5155     self.cfg.Update(instance, self.feedback_fn)
5156
5157     result = self.rpc.call_instance_finalize_migration_dst(target_node,
5158                                                            instance,
5159                                                            migration_info,
5160                                                            True)
5161     msg = result.fail_msg
5162     if msg:
5163       logging.error("Instance migration succeeded, but finalization failed"
5164                     " on the target node: %s", msg)
5165       raise errors.OpExecError("Could not finalize instance migration: %s" %
5166                                msg)
5167
5168     if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
5169       self._EnsureSecondary(source_node)
5170       self._WaitUntilSync()
5171       self._GoStandalone()
5172       self._GoReconnect(False)
5173       self._WaitUntilSync()
5174
5175     # If the instance's disk template is `rbd' or `ext' and there was a
5176     # successful migration, unmap the device from the source node.
5177     if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
5178       disks = _ExpandCheckDisks(instance, instance.disks)
5179       self.feedback_fn("* unmapping instance's disks from %s" % source_node)
5180       for disk in disks:
5181         result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
5182         msg = result.fail_msg
5183         if msg:
5184           logging.error("Migration was successful, but couldn't unmap the"
5185                         " block device %s on source node %s: %s",
5186                         disk.iv_name, source_node, msg)
5187           logging.error("You need to unmap the device %s manually on %s",
5188                         disk.iv_name, source_node)
5189
5190     self.feedback_fn("* done")
5191
5192   def _ExecFailover(self):
5193     """Failover an instance.
5194
5195     The failover is done by shutting it down on its present node and
5196     starting it on the secondary.
5197
5198     """
5199     instance = self.instance
5200     primary_node = self.cfg.GetNodeInfo(instance.primary_node)
5201
5202     source_node = instance.primary_node
5203     target_node = self.target_node
5204
5205     if instance.admin_state == constants.ADMINST_UP:
5206       self.feedback_fn("* checking disk consistency between source and target")
5207       for (idx, dev) in enumerate(instance.disks):
5208         # for drbd, these are drbd over lvm
5209         if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
5210                                      False):
5211           if primary_node.offline:
5212             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
5213                              " target node %s" %
5214                              (primary_node.name, idx, target_node))
5215           elif not self.ignore_consistency:
5216             raise errors.OpExecError("Disk %s is degraded on target node,"
5217                                      " aborting failover" % idx)
5218     else:
5219       self.feedback_fn("* not checking disk consistency as instance is not"
5220                        " running")
5221
5222     self.feedback_fn("* shutting down instance on source node")
5223     logging.info("Shutting down instance %s on node %s",
5224                  instance.name, source_node)
5225
5226     result = self.rpc.call_instance_shutdown(source_node, instance,
5227                                              self.shutdown_timeout,
5228                                              self.lu.op.reason)
5229     msg = result.fail_msg
5230     if msg:
5231       if self.ignore_consistency or primary_node.offline:
5232         self.lu.LogWarning("Could not shutdown instance %s on node %s,"
5233                            " proceeding anyway; please make sure node"
5234                            " %s is down; error details: %s",
5235                            instance.name, source_node, source_node, msg)
5236       else:
5237         raise errors.OpExecError("Could not shutdown instance %s on"
5238                                  " node %s: %s" %
5239                                  (instance.name, source_node, msg))
5240
5241     self.feedback_fn("* deactivating the instance's disks on source node")
5242     if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
5243       raise errors.OpExecError("Can't shut down the instance's disks")
5244
5245     instance.primary_node = target_node
5246     # distribute new instance config to the other nodes
5247     self.cfg.Update(instance, self.feedback_fn)
5248
5249     # Only start the instance if it's marked as up
5250     if instance.admin_state == constants.ADMINST_UP:
5251       self.feedback_fn("* activating the instance's disks on target node %s" %
5252                        target_node)
5253       logging.info("Starting instance %s on node %s",
5254                    instance.name, target_node)
5255
5256       disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
5257                                            ignore_secondaries=True)
5258       if not disks_ok:
5259         _ShutdownInstanceDisks(self.lu, instance)
5260         raise errors.OpExecError("Can't activate the instance's disks")
5261
5262       self.feedback_fn("* starting the instance on the target node %s" %
5263                        target_node)
5264       result = self.rpc.call_instance_start(target_node, (instance, None, None),
5265                                             False, self.lu.op.reason)
5266       msg = result.fail_msg
5267       if msg:
5268         _ShutdownInstanceDisks(self.lu, instance)
5269         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
5270                                  (instance.name, target_node, msg))
5271
5272   def Exec(self, feedback_fn):
5273     """Perform the migration.
5274
5275     """
5276     self.feedback_fn = feedback_fn
5277     self.source_node = self.instance.primary_node
5278
5279     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
5280     if self.instance.disk_template in constants.DTS_INT_MIRROR:
5281       self.target_node = self.instance.secondary_nodes[0]
5282       # Otherwise self.target_node has been populated either
5283       # directly, or through an iallocator.
5284
5285     self.all_nodes = [self.source_node, self.target_node]
5286     self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
5287                          in self.cfg.GetMultiNodeInfo(self.all_nodes))
5288
5289     if self.failover:
5290       feedback_fn("Failover instance %s" % self.instance.name)
5291       self._ExecFailover()
5292     else:
5293       feedback_fn("Migrating instance %s" % self.instance.name)
5294
5295       if self.cleanup:
5296         return self._ExecCleanup()
5297       else:
5298         return self._ExecMigration()