Wait for a while in failed resyncs
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
755                         " '%s'" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURenameCluster(LogicalUnit):
1333   """Rename the cluster.
1334
1335   """
1336   HPATH = "cluster-rename"
1337   HTYPE = constants.HTYPE_CLUSTER
1338   _OP_REQP = ["name"]
1339
1340   def BuildHooksEnv(self):
1341     """Build hooks env.
1342
1343     """
1344     env = {
1345       "OP_TARGET": self.cfg.GetClusterName(),
1346       "NEW_NAME": self.op.name,
1347       }
1348     mn = self.cfg.GetMasterNode()
1349     return env, [mn], [mn]
1350
1351   def CheckPrereq(self):
1352     """Verify that the passed name is a valid one.
1353
1354     """
1355     hostname = utils.HostInfo(self.op.name)
1356
1357     new_name = hostname.name
1358     self.ip = new_ip = hostname.ip
1359     old_name = self.cfg.GetClusterName()
1360     old_ip = self.cfg.GetMasterIP()
1361     if new_name == old_name and new_ip == old_ip:
1362       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363                                  " cluster has changed")
1364     if new_ip != old_ip:
1365       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367                                    " reachable on the network. Aborting." %
1368                                    new_ip)
1369
1370     self.op.name = new_name
1371
1372   def Exec(self, feedback_fn):
1373     """Rename the cluster.
1374
1375     """
1376     clustername = self.op.name
1377     ip = self.ip
1378
1379     # shutdown the master IP
1380     master = self.cfg.GetMasterNode()
1381     result = self.rpc.call_node_stop_master(master, False)
1382     if result.failed or not result.data:
1383       raise errors.OpExecError("Could not disable the master role")
1384
1385     try:
1386       cluster = self.cfg.GetClusterInfo()
1387       cluster.cluster_name = clustername
1388       cluster.master_ip = ip
1389       self.cfg.Update(cluster)
1390
1391       # update the known hosts file
1392       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393       node_list = self.cfg.GetNodeList()
1394       try:
1395         node_list.remove(master)
1396       except ValueError:
1397         pass
1398       result = self.rpc.call_upload_file(node_list,
1399                                          constants.SSH_KNOWN_HOSTS_FILE)
1400       for to_node, to_result in result.iteritems():
1401         if to_result.failed or not to_result.data:
1402           logging.error("Copy of file %s to node %s failed",
1403                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1404
1405     finally:
1406       result = self.rpc.call_node_start_master(master, False)
1407       if result.failed or not result.data:
1408         self.LogWarning("Could not re-enable the master role on"
1409                         " the master, please restart manually.")
1410
1411
1412 def _RecursiveCheckIfLVMBased(disk):
1413   """Check if the given disk or its children are lvm-based.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to check
1417   @rtype: booleean
1418   @return: boolean indicating whether a LD_LV dev_type was found or not
1419
1420   """
1421   if disk.children:
1422     for chdisk in disk.children:
1423       if _RecursiveCheckIfLVMBased(chdisk):
1424         return True
1425   return disk.dev_type == constants.LD_LV
1426
1427
1428 class LUSetClusterParams(LogicalUnit):
1429   """Change the parameters of the cluster.
1430
1431   """
1432   HPATH = "cluster-modify"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = []
1435   REQ_BGL = False
1436
1437   def CheckArguments(self):
1438     """Check parameters
1439
1440     """
1441     if not hasattr(self.op, "candidate_pool_size"):
1442       self.op.candidate_pool_size = None
1443     if self.op.candidate_pool_size is not None:
1444       try:
1445         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446       except (ValueError, TypeError), err:
1447         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1448                                    str(err))
1449       if self.op.candidate_pool_size < 1:
1450         raise errors.OpPrereqError("At least one master candidate needed")
1451
1452   def ExpandNames(self):
1453     # FIXME: in the future maybe other cluster params won't require checking on
1454     # all nodes to be modified.
1455     self.needed_locks = {
1456       locking.LEVEL_NODE: locking.ALL_SET,
1457     }
1458     self.share_locks[locking.LEVEL_NODE] = 1
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     """
1464     env = {
1465       "OP_TARGET": self.cfg.GetClusterName(),
1466       "NEW_VG_NAME": self.op.vg_name,
1467       }
1468     mn = self.cfg.GetMasterNode()
1469     return env, [mn], [mn]
1470
1471   def CheckPrereq(self):
1472     """Check prerequisites.
1473
1474     This checks whether the given params don't conflict and
1475     if the given volume group is valid.
1476
1477     """
1478     if self.op.vg_name is not None and not self.op.vg_name:
1479       instances = self.cfg.GetAllInstancesInfo().values()
1480       for inst in instances:
1481         for disk in inst.disks:
1482           if _RecursiveCheckIfLVMBased(disk):
1483             raise errors.OpPrereqError("Cannot disable lvm storage while"
1484                                        " lvm-based instances exist")
1485
1486     node_list = self.acquired_locks[locking.LEVEL_NODE]
1487
1488     # if vg_name not None, checks given volume group on all nodes
1489     if self.op.vg_name:
1490       vglist = self.rpc.call_vg_list(node_list)
1491       for node in node_list:
1492         if vglist[node].failed:
1493           # ignoring down node
1494           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1495           continue
1496         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1497                                               self.op.vg_name,
1498                                               constants.MIN_VG_SIZE)
1499         if vgstatus:
1500           raise errors.OpPrereqError("Error on node '%s': %s" %
1501                                      (node, vgstatus))
1502
1503     self.cluster = cluster = self.cfg.GetClusterInfo()
1504     # validate beparams changes
1505     if self.op.beparams:
1506       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507       self.new_beparams = cluster.FillDict(
1508         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1509
1510     # hypervisor list/parameters
1511     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512     if self.op.hvparams:
1513       if not isinstance(self.op.hvparams, dict):
1514         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515       for hv_name, hv_dict in self.op.hvparams.items():
1516         if hv_name not in self.new_hvparams:
1517           self.new_hvparams[hv_name] = hv_dict
1518         else:
1519           self.new_hvparams[hv_name].update(hv_dict)
1520
1521     if self.op.enabled_hypervisors is not None:
1522       self.hv_list = self.op.enabled_hypervisors
1523     else:
1524       self.hv_list = cluster.enabled_hypervisors
1525
1526     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527       # either the enabled list has changed, or the parameters have, validate
1528       for hv_name, hv_params in self.new_hvparams.items():
1529         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530             (self.op.enabled_hypervisors and
1531              hv_name in self.op.enabled_hypervisors)):
1532           # either this is a new hypervisor, or its parameters have changed
1533           hv_class = hypervisor.GetHypervisor(hv_name)
1534           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535           hv_class.CheckParameterSyntax(hv_params)
1536           _CheckHVParams(self, node_list, hv_name, hv_params)
1537
1538   def Exec(self, feedback_fn):
1539     """Change the parameters of the cluster.
1540
1541     """
1542     if self.op.vg_name is not None:
1543       new_volume = self.op.vg_name
1544       if not new_volume:
1545         new_volume = None
1546       if new_volume != self.cfg.GetVGName():
1547         self.cfg.SetVGName(new_volume)
1548       else:
1549         feedback_fn("Cluster LVM configuration already in desired"
1550                     " state, not changing")
1551     if self.op.hvparams:
1552       self.cluster.hvparams = self.new_hvparams
1553     if self.op.enabled_hypervisors is not None:
1554       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555     if self.op.beparams:
1556       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557     if self.op.candidate_pool_size is not None:
1558       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559
1560     self.cfg.Update(self.cluster)
1561
1562     # we want to update nodes after the cluster so that if any errors
1563     # happen, we have recorded and saved the cluster info
1564     if self.op.candidate_pool_size is not None:
1565       _AdjustCandidatePool(self)
1566
1567
1568 class LURedistributeConfig(NoHooksLU):
1569   """Force the redistribution of cluster configuration.
1570
1571   This is a very simple LU.
1572
1573   """
1574   _OP_REQP = []
1575   REQ_BGL = False
1576
1577   def ExpandNames(self):
1578     self.needed_locks = {
1579       locking.LEVEL_NODE: locking.ALL_SET,
1580     }
1581     self.share_locks[locking.LEVEL_NODE] = 1
1582
1583   def CheckPrereq(self):
1584     """Check prerequisites.
1585
1586     """
1587
1588   def Exec(self, feedback_fn):
1589     """Redistribute the configuration.
1590
1591     """
1592     self.cfg.Update(self.cfg.GetClusterInfo())
1593
1594
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596   """Sleep and poll for an instance's disk to sync.
1597
1598   """
1599   if not instance.disks:
1600     return True
1601
1602   if not oneshot:
1603     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604
1605   node = instance.primary_node
1606
1607   for dev in instance.disks:
1608     lu.cfg.SetDiskID(dev, node)
1609
1610   retries = 0
1611   degr_retries = 10 # in seconds, as we sleep 1 second each time
1612   while True:
1613     max_time = 0
1614     done = True
1615     cumul_degraded = False
1616     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617     if rstats.failed or not rstats.data:
1618       lu.LogWarning("Can't get any data from node %s", node)
1619       retries += 1
1620       if retries >= 10:
1621         raise errors.RemoteError("Can't contact node %s for mirror data,"
1622                                  " aborting." % node)
1623       time.sleep(6)
1624       continue
1625     rstats = rstats.data
1626     retries = 0
1627     for i, mstat in enumerate(rstats):
1628       if mstat is None:
1629         lu.LogWarning("Can't compute data for node %s/%s",
1630                            node, instance.disks[i].iv_name)
1631         continue
1632       # we ignore the ldisk parameter
1633       perc_done, est_time, is_degraded, _ = mstat
1634       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635       if perc_done is not None:
1636         done = False
1637         if est_time is not None:
1638           rem_time = "%d estimated seconds remaining" % est_time
1639           max_time = est_time
1640         else:
1641           rem_time = "no time estimate"
1642         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643                         (instance.disks[i].iv_name, perc_done, rem_time))
1644
1645     # if we're done but degraded, let's do a few small retries, to
1646     # make sure we see a stable and not transient situation; therefore
1647     # we force restart of the loop
1648     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649       logging.info("Degraded disks found, %d retries left", degr_retries)
1650       degr_retries -= 1
1651       time.sleep(1)
1652       continue
1653
1654     if done or oneshot:
1655       break
1656
1657     time.sleep(min(60, max_time))
1658
1659   if done:
1660     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661   return not cumul_degraded
1662
1663
1664 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665   """Check that mirrors are not degraded.
1666
1667   The ldisk parameter, if True, will change the test from the
1668   is_degraded attribute (which represents overall non-ok status for
1669   the device(s)) to the ldisk (representing the local storage status).
1670
1671   """
1672   lu.cfg.SetDiskID(dev, node)
1673   if ldisk:
1674     idx = 6
1675   else:
1676     idx = 5
1677
1678   result = True
1679   if on_primary or dev.AssembleOnSecondary():
1680     rstats = lu.rpc.call_blockdev_find(node, dev)
1681     msg = rstats.RemoteFailMsg()
1682     if msg:
1683       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1684       result = False
1685     elif not rstats.payload:
1686       lu.LogWarning("Can't find disk on node %s", node)
1687       result = False
1688     else:
1689       result = result and (not rstats.payload[idx])
1690   if dev.children:
1691     for child in dev.children:
1692       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1693
1694   return result
1695
1696
1697 class LUDiagnoseOS(NoHooksLU):
1698   """Logical unit for OS diagnose/query.
1699
1700   """
1701   _OP_REQP = ["output_fields", "names"]
1702   REQ_BGL = False
1703   _FIELDS_STATIC = utils.FieldSet()
1704   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1705
1706   def ExpandNames(self):
1707     if self.op.names:
1708       raise errors.OpPrereqError("Selective OS query not supported")
1709
1710     _CheckOutputFields(static=self._FIELDS_STATIC,
1711                        dynamic=self._FIELDS_DYNAMIC,
1712                        selected=self.op.output_fields)
1713
1714     # Lock all nodes, in shared mode
1715     # Temporary removal of locks, should be reverted later
1716     # TODO: reintroduce locks when they are lighter-weight
1717     self.needed_locks = {}
1718     #self.share_locks[locking.LEVEL_NODE] = 1
1719     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1720
1721   def CheckPrereq(self):
1722     """Check prerequisites.
1723
1724     """
1725
1726   @staticmethod
1727   def _DiagnoseByOS(node_list, rlist):
1728     """Remaps a per-node return list into an a per-os per-node dictionary
1729
1730     @param node_list: a list with the names of all nodes
1731     @param rlist: a map with node names as keys and OS objects as values
1732
1733     @rtype: dict
1734     @return: a dictionary with osnames as keys and as value another map, with
1735         nodes as keys and list of OS objects as values, eg::
1736
1737           {"debian-etch": {"node1": [<object>,...],
1738                            "node2": [<object>,]}
1739           }
1740
1741     """
1742     all_os = {}
1743     # we build here the list of nodes that didn't fail the RPC (at RPC
1744     # level), so that nodes with a non-responding node daemon don't
1745     # make all OSes invalid
1746     good_nodes = [node_name for node_name in rlist
1747                   if not rlist[node_name].failed]
1748     for node_name, nr in rlist.iteritems():
1749       if nr.failed or not nr.data:
1750         continue
1751       for os_obj in nr.data:
1752         if os_obj.name not in all_os:
1753           # build a list of nodes for this os containing empty lists
1754           # for each node in node_list
1755           all_os[os_obj.name] = {}
1756           for nname in good_nodes:
1757             all_os[os_obj.name][nname] = []
1758         all_os[os_obj.name][node_name].append(os_obj)
1759     return all_os
1760
1761   def Exec(self, feedback_fn):
1762     """Compute the list of OSes.
1763
1764     """
1765     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766     node_data = self.rpc.call_os_diagnose(valid_nodes)
1767     if node_data == False:
1768       raise errors.OpExecError("Can't gather the list of OSes")
1769     pol = self._DiagnoseByOS(valid_nodes, node_data)
1770     output = []
1771     for os_name, os_data in pol.iteritems():
1772       row = []
1773       for field in self.op.output_fields:
1774         if field == "name":
1775           val = os_name
1776         elif field == "valid":
1777           val = utils.all([osl and osl[0] for osl in os_data.values()])
1778         elif field == "node_status":
1779           val = {}
1780           for node_name, nos_list in os_data.iteritems():
1781             val[node_name] = [(v.status, v.path) for v in nos_list]
1782         else:
1783           raise errors.ParameterError(field)
1784         row.append(val)
1785       output.append(row)
1786
1787     return output
1788
1789
1790 class LURemoveNode(LogicalUnit):
1791   """Logical unit for removing a node.
1792
1793   """
1794   HPATH = "node-remove"
1795   HTYPE = constants.HTYPE_NODE
1796   _OP_REQP = ["node_name"]
1797
1798   def BuildHooksEnv(self):
1799     """Build hooks env.
1800
1801     This doesn't run on the target node in the pre phase as a failed
1802     node would then be impossible to remove.
1803
1804     """
1805     env = {
1806       "OP_TARGET": self.op.node_name,
1807       "NODE_NAME": self.op.node_name,
1808       }
1809     all_nodes = self.cfg.GetNodeList()
1810     all_nodes.remove(self.op.node_name)
1811     return env, all_nodes, all_nodes
1812
1813   def CheckPrereq(self):
1814     """Check prerequisites.
1815
1816     This checks:
1817      - the node exists in the configuration
1818      - it does not have primary or secondary instances
1819      - it's not the master
1820
1821     Any errors are signalled by raising errors.OpPrereqError.
1822
1823     """
1824     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1825     if node is None:
1826       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1827
1828     instance_list = self.cfg.GetInstanceList()
1829
1830     masternode = self.cfg.GetMasterNode()
1831     if node.name == masternode:
1832       raise errors.OpPrereqError("Node is the master node,"
1833                                  " you need to failover first.")
1834
1835     for instance_name in instance_list:
1836       instance = self.cfg.GetInstanceInfo(instance_name)
1837       if node.name in instance.all_nodes:
1838         raise errors.OpPrereqError("Instance %s is still running on the node,"
1839                                    " please remove first." % instance_name)
1840     self.op.node_name = node.name
1841     self.node = node
1842
1843   def Exec(self, feedback_fn):
1844     """Removes the node from the cluster.
1845
1846     """
1847     node = self.node
1848     logging.info("Stopping the node daemon and removing configs from node %s",
1849                  node.name)
1850
1851     self.context.RemoveNode(node.name)
1852
1853     self.rpc.call_node_leave_cluster(node.name)
1854
1855     # Promote nodes to master candidate as needed
1856     _AdjustCandidatePool(self)
1857
1858
1859 class LUQueryNodes(NoHooksLU):
1860   """Logical unit for querying nodes.
1861
1862   """
1863   _OP_REQP = ["output_fields", "names", "use_locking"]
1864   REQ_BGL = False
1865   _FIELDS_DYNAMIC = utils.FieldSet(
1866     "dtotal", "dfree",
1867     "mtotal", "mnode", "mfree",
1868     "bootid",
1869     "ctotal", "cnodes", "csockets",
1870     )
1871
1872   _FIELDS_STATIC = utils.FieldSet(
1873     "name", "pinst_cnt", "sinst_cnt",
1874     "pinst_list", "sinst_list",
1875     "pip", "sip", "tags",
1876     "serial_no",
1877     "master_candidate",
1878     "master",
1879     "offline",
1880     "drained",
1881     )
1882
1883   def ExpandNames(self):
1884     _CheckOutputFields(static=self._FIELDS_STATIC,
1885                        dynamic=self._FIELDS_DYNAMIC,
1886                        selected=self.op.output_fields)
1887
1888     self.needed_locks = {}
1889     self.share_locks[locking.LEVEL_NODE] = 1
1890
1891     if self.op.names:
1892       self.wanted = _GetWantedNodes(self, self.op.names)
1893     else:
1894       self.wanted = locking.ALL_SET
1895
1896     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1897     self.do_locking = self.do_node_query and self.op.use_locking
1898     if self.do_locking:
1899       # if we don't request only static fields, we need to lock the nodes
1900       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1901
1902
1903   def CheckPrereq(self):
1904     """Check prerequisites.
1905
1906     """
1907     # The validation of the node list is done in the _GetWantedNodes,
1908     # if non empty, and if empty, there's no validation to do
1909     pass
1910
1911   def Exec(self, feedback_fn):
1912     """Computes the list of nodes and their attributes.
1913
1914     """
1915     all_info = self.cfg.GetAllNodesInfo()
1916     if self.do_locking:
1917       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1918     elif self.wanted != locking.ALL_SET:
1919       nodenames = self.wanted
1920       missing = set(nodenames).difference(all_info.keys())
1921       if missing:
1922         raise errors.OpExecError(
1923           "Some nodes were removed before retrieving their data: %s" % missing)
1924     else:
1925       nodenames = all_info.keys()
1926
1927     nodenames = utils.NiceSort(nodenames)
1928     nodelist = [all_info[name] for name in nodenames]
1929
1930     # begin data gathering
1931
1932     if self.do_node_query:
1933       live_data = {}
1934       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1935                                           self.cfg.GetHypervisorType())
1936       for name in nodenames:
1937         nodeinfo = node_data[name]
1938         if not nodeinfo.failed and nodeinfo.data:
1939           nodeinfo = nodeinfo.data
1940           fn = utils.TryConvert
1941           live_data[name] = {
1942             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1943             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1944             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1945             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1946             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1947             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1948             "bootid": nodeinfo.get('bootid', None),
1949             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1950             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1951             }
1952         else:
1953           live_data[name] = {}
1954     else:
1955       live_data = dict.fromkeys(nodenames, {})
1956
1957     node_to_primary = dict([(name, set()) for name in nodenames])
1958     node_to_secondary = dict([(name, set()) for name in nodenames])
1959
1960     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1961                              "sinst_cnt", "sinst_list"))
1962     if inst_fields & frozenset(self.op.output_fields):
1963       instancelist = self.cfg.GetInstanceList()
1964
1965       for instance_name in instancelist:
1966         inst = self.cfg.GetInstanceInfo(instance_name)
1967         if inst.primary_node in node_to_primary:
1968           node_to_primary[inst.primary_node].add(inst.name)
1969         for secnode in inst.secondary_nodes:
1970           if secnode in node_to_secondary:
1971             node_to_secondary[secnode].add(inst.name)
1972
1973     master_node = self.cfg.GetMasterNode()
1974
1975     # end data gathering
1976
1977     output = []
1978     for node in nodelist:
1979       node_output = []
1980       for field in self.op.output_fields:
1981         if field == "name":
1982           val = node.name
1983         elif field == "pinst_list":
1984           val = list(node_to_primary[node.name])
1985         elif field == "sinst_list":
1986           val = list(node_to_secondary[node.name])
1987         elif field == "pinst_cnt":
1988           val = len(node_to_primary[node.name])
1989         elif field == "sinst_cnt":
1990           val = len(node_to_secondary[node.name])
1991         elif field == "pip":
1992           val = node.primary_ip
1993         elif field == "sip":
1994           val = node.secondary_ip
1995         elif field == "tags":
1996           val = list(node.GetTags())
1997         elif field == "serial_no":
1998           val = node.serial_no
1999         elif field == "master_candidate":
2000           val = node.master_candidate
2001         elif field == "master":
2002           val = node.name == master_node
2003         elif field == "offline":
2004           val = node.offline
2005         elif field == "drained":
2006           val = node.drained
2007         elif self._FIELDS_DYNAMIC.Matches(field):
2008           val = live_data[node.name].get(field, None)
2009         else:
2010           raise errors.ParameterError(field)
2011         node_output.append(val)
2012       output.append(node_output)
2013
2014     return output
2015
2016
2017 class LUQueryNodeVolumes(NoHooksLU):
2018   """Logical unit for getting volumes on node(s).
2019
2020   """
2021   _OP_REQP = ["nodes", "output_fields"]
2022   REQ_BGL = False
2023   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2024   _FIELDS_STATIC = utils.FieldSet("node")
2025
2026   def ExpandNames(self):
2027     _CheckOutputFields(static=self._FIELDS_STATIC,
2028                        dynamic=self._FIELDS_DYNAMIC,
2029                        selected=self.op.output_fields)
2030
2031     self.needed_locks = {}
2032     self.share_locks[locking.LEVEL_NODE] = 1
2033     if not self.op.nodes:
2034       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2035     else:
2036       self.needed_locks[locking.LEVEL_NODE] = \
2037         _GetWantedNodes(self, self.op.nodes)
2038
2039   def CheckPrereq(self):
2040     """Check prerequisites.
2041
2042     This checks that the fields required are valid output fields.
2043
2044     """
2045     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2046
2047   def Exec(self, feedback_fn):
2048     """Computes the list of nodes and their attributes.
2049
2050     """
2051     nodenames = self.nodes
2052     volumes = self.rpc.call_node_volumes(nodenames)
2053
2054     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2055              in self.cfg.GetInstanceList()]
2056
2057     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2058
2059     output = []
2060     for node in nodenames:
2061       if node not in volumes or volumes[node].failed or not volumes[node].data:
2062         continue
2063
2064       node_vols = volumes[node].data[:]
2065       node_vols.sort(key=lambda vol: vol['dev'])
2066
2067       for vol in node_vols:
2068         node_output = []
2069         for field in self.op.output_fields:
2070           if field == "node":
2071             val = node
2072           elif field == "phys":
2073             val = vol['dev']
2074           elif field == "vg":
2075             val = vol['vg']
2076           elif field == "name":
2077             val = vol['name']
2078           elif field == "size":
2079             val = int(float(vol['size']))
2080           elif field == "instance":
2081             for inst in ilist:
2082               if node not in lv_by_node[inst]:
2083                 continue
2084               if vol['name'] in lv_by_node[inst][node]:
2085                 val = inst.name
2086                 break
2087             else:
2088               val = '-'
2089           else:
2090             raise errors.ParameterError(field)
2091           node_output.append(str(val))
2092
2093         output.append(node_output)
2094
2095     return output
2096
2097
2098 class LUAddNode(LogicalUnit):
2099   """Logical unit for adding node to the cluster.
2100
2101   """
2102   HPATH = "node-add"
2103   HTYPE = constants.HTYPE_NODE
2104   _OP_REQP = ["node_name"]
2105
2106   def BuildHooksEnv(self):
2107     """Build hooks env.
2108
2109     This will run on all nodes before, and on all nodes + the new node after.
2110
2111     """
2112     env = {
2113       "OP_TARGET": self.op.node_name,
2114       "NODE_NAME": self.op.node_name,
2115       "NODE_PIP": self.op.primary_ip,
2116       "NODE_SIP": self.op.secondary_ip,
2117       }
2118     nodes_0 = self.cfg.GetNodeList()
2119     nodes_1 = nodes_0 + [self.op.node_name, ]
2120     return env, nodes_0, nodes_1
2121
2122   def CheckPrereq(self):
2123     """Check prerequisites.
2124
2125     This checks:
2126      - the new node is not already in the config
2127      - it is resolvable
2128      - its parameters (single/dual homed) matches the cluster
2129
2130     Any errors are signalled by raising errors.OpPrereqError.
2131
2132     """
2133     node_name = self.op.node_name
2134     cfg = self.cfg
2135
2136     dns_data = utils.HostInfo(node_name)
2137
2138     node = dns_data.name
2139     primary_ip = self.op.primary_ip = dns_data.ip
2140     secondary_ip = getattr(self.op, "secondary_ip", None)
2141     if secondary_ip is None:
2142       secondary_ip = primary_ip
2143     if not utils.IsValidIP(secondary_ip):
2144       raise errors.OpPrereqError("Invalid secondary IP given")
2145     self.op.secondary_ip = secondary_ip
2146
2147     node_list = cfg.GetNodeList()
2148     if not self.op.readd and node in node_list:
2149       raise errors.OpPrereqError("Node %s is already in the configuration" %
2150                                  node)
2151     elif self.op.readd and node not in node_list:
2152       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2153
2154     for existing_node_name in node_list:
2155       existing_node = cfg.GetNodeInfo(existing_node_name)
2156
2157       if self.op.readd and node == existing_node_name:
2158         if (existing_node.primary_ip != primary_ip or
2159             existing_node.secondary_ip != secondary_ip):
2160           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2161                                      " address configuration as before")
2162         continue
2163
2164       if (existing_node.primary_ip == primary_ip or
2165           existing_node.secondary_ip == primary_ip or
2166           existing_node.primary_ip == secondary_ip or
2167           existing_node.secondary_ip == secondary_ip):
2168         raise errors.OpPrereqError("New node ip address(es) conflict with"
2169                                    " existing node %s" % existing_node.name)
2170
2171     # check that the type of the node (single versus dual homed) is the
2172     # same as for the master
2173     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2174     master_singlehomed = myself.secondary_ip == myself.primary_ip
2175     newbie_singlehomed = secondary_ip == primary_ip
2176     if master_singlehomed != newbie_singlehomed:
2177       if master_singlehomed:
2178         raise errors.OpPrereqError("The master has no private ip but the"
2179                                    " new node has one")
2180       else:
2181         raise errors.OpPrereqError("The master has a private ip but the"
2182                                    " new node doesn't have one")
2183
2184     # checks reachablity
2185     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2186       raise errors.OpPrereqError("Node not reachable by ping")
2187
2188     if not newbie_singlehomed:
2189       # check reachability from my secondary ip to newbie's secondary ip
2190       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2191                            source=myself.secondary_ip):
2192         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2193                                    " based ping to noded port")
2194
2195     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2196     mc_now, _ = self.cfg.GetMasterCandidateStats()
2197     master_candidate = mc_now < cp_size
2198
2199     self.new_node = objects.Node(name=node,
2200                                  primary_ip=primary_ip,
2201                                  secondary_ip=secondary_ip,
2202                                  master_candidate=master_candidate,
2203                                  offline=False, drained=False)
2204
2205   def Exec(self, feedback_fn):
2206     """Adds the new node to the cluster.
2207
2208     """
2209     new_node = self.new_node
2210     node = new_node.name
2211
2212     # check connectivity
2213     result = self.rpc.call_version([node])[node]
2214     result.Raise()
2215     if result.data:
2216       if constants.PROTOCOL_VERSION == result.data:
2217         logging.info("Communication to node %s fine, sw version %s match",
2218                      node, result.data)
2219       else:
2220         raise errors.OpExecError("Version mismatch master version %s,"
2221                                  " node version %s" %
2222                                  (constants.PROTOCOL_VERSION, result.data))
2223     else:
2224       raise errors.OpExecError("Cannot get version from the new node")
2225
2226     # setup ssh on node
2227     logging.info("Copy ssh key to node %s", node)
2228     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2229     keyarray = []
2230     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2231                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2232                 priv_key, pub_key]
2233
2234     for i in keyfiles:
2235       f = open(i, 'r')
2236       try:
2237         keyarray.append(f.read())
2238       finally:
2239         f.close()
2240
2241     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2242                                     keyarray[2],
2243                                     keyarray[3], keyarray[4], keyarray[5])
2244
2245     msg = result.RemoteFailMsg()
2246     if msg:
2247       raise errors.OpExecError("Cannot transfer ssh keys to the"
2248                                " new node: %s" % msg)
2249
2250     # Add node to our /etc/hosts, and add key to known_hosts
2251     utils.AddHostToEtcHosts(new_node.name)
2252
2253     if new_node.secondary_ip != new_node.primary_ip:
2254       result = self.rpc.call_node_has_ip_address(new_node.name,
2255                                                  new_node.secondary_ip)
2256       if result.failed or not result.data:
2257         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2258                                  " you gave (%s). Please fix and re-run this"
2259                                  " command." % new_node.secondary_ip)
2260
2261     node_verify_list = [self.cfg.GetMasterNode()]
2262     node_verify_param = {
2263       'nodelist': [node],
2264       # TODO: do a node-net-test as well?
2265     }
2266
2267     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2268                                        self.cfg.GetClusterName())
2269     for verifier in node_verify_list:
2270       if result[verifier].failed or not result[verifier].data:
2271         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2272                                  " for remote verification" % verifier)
2273       if result[verifier].data['nodelist']:
2274         for failed in result[verifier].data['nodelist']:
2275           feedback_fn("ssh/hostname verification failed %s -> %s" %
2276                       (verifier, result[verifier].data['nodelist'][failed]))
2277         raise errors.OpExecError("ssh/hostname verification failed.")
2278
2279     # Distribute updated /etc/hosts and known_hosts to all nodes,
2280     # including the node just added
2281     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2282     dist_nodes = self.cfg.GetNodeList()
2283     if not self.op.readd:
2284       dist_nodes.append(node)
2285     if myself.name in dist_nodes:
2286       dist_nodes.remove(myself.name)
2287
2288     logging.debug("Copying hosts and known_hosts to all nodes")
2289     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2290       result = self.rpc.call_upload_file(dist_nodes, fname)
2291       for to_node, to_result in result.iteritems():
2292         if to_result.failed or not to_result.data:
2293           logging.error("Copy of file %s to node %s failed", fname, to_node)
2294
2295     to_copy = []
2296     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2297     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2298       to_copy.append(constants.VNC_PASSWORD_FILE)
2299
2300     for fname in to_copy:
2301       result = self.rpc.call_upload_file([node], fname)
2302       if result[node].failed or not result[node]:
2303         logging.error("Could not copy file %s to node %s", fname, node)
2304
2305     if self.op.readd:
2306       self.context.ReaddNode(new_node)
2307     else:
2308       self.context.AddNode(new_node)
2309
2310
2311 class LUSetNodeParams(LogicalUnit):
2312   """Modifies the parameters of a node.
2313
2314   """
2315   HPATH = "node-modify"
2316   HTYPE = constants.HTYPE_NODE
2317   _OP_REQP = ["node_name"]
2318   REQ_BGL = False
2319
2320   def CheckArguments(self):
2321     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2322     if node_name is None:
2323       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2324     self.op.node_name = node_name
2325     _CheckBooleanOpField(self.op, 'master_candidate')
2326     _CheckBooleanOpField(self.op, 'offline')
2327     _CheckBooleanOpField(self.op, 'drained')
2328     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2329     if all_mods.count(None) == 3:
2330       raise errors.OpPrereqError("Please pass at least one modification")
2331     if all_mods.count(True) > 1:
2332       raise errors.OpPrereqError("Can't set the node into more than one"
2333                                  " state at the same time")
2334
2335   def ExpandNames(self):
2336     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2337
2338   def BuildHooksEnv(self):
2339     """Build hooks env.
2340
2341     This runs on the master node.
2342
2343     """
2344     env = {
2345       "OP_TARGET": self.op.node_name,
2346       "MASTER_CANDIDATE": str(self.op.master_candidate),
2347       "OFFLINE": str(self.op.offline),
2348       "DRAINED": str(self.op.drained),
2349       }
2350     nl = [self.cfg.GetMasterNode(),
2351           self.op.node_name]
2352     return env, nl, nl
2353
2354   def CheckPrereq(self):
2355     """Check prerequisites.
2356
2357     This only checks the instance list against the existing names.
2358
2359     """
2360     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2361
2362     if ((self.op.master_candidate == False or self.op.offline == True or
2363          self.op.drained == True) and node.master_candidate):
2364       # we will demote the node from master_candidate
2365       if self.op.node_name == self.cfg.GetMasterNode():
2366         raise errors.OpPrereqError("The master node has to be a"
2367                                    " master candidate, online and not drained")
2368       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2369       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2370       if num_candidates <= cp_size:
2371         msg = ("Not enough master candidates (desired"
2372                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2373         if self.op.force:
2374           self.LogWarning(msg)
2375         else:
2376           raise errors.OpPrereqError(msg)
2377
2378     if (self.op.master_candidate == True and
2379         ((node.offline and not self.op.offline == False) or
2380          (node.drained and not self.op.drained == False))):
2381       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2382                                  " to master_candidate" % node.name)
2383
2384     return
2385
2386   def Exec(self, feedback_fn):
2387     """Modifies a node.
2388
2389     """
2390     node = self.node
2391
2392     result = []
2393     changed_mc = False
2394
2395     if self.op.offline is not None:
2396       node.offline = self.op.offline
2397       result.append(("offline", str(self.op.offline)))
2398       if self.op.offline == True:
2399         if node.master_candidate:
2400           node.master_candidate = False
2401           changed_mc = True
2402           result.append(("master_candidate", "auto-demotion due to offline"))
2403         if node.drained:
2404           node.drained = False
2405           result.append(("drained", "clear drained status due to offline"))
2406
2407     if self.op.master_candidate is not None:
2408       node.master_candidate = self.op.master_candidate
2409       changed_mc = True
2410       result.append(("master_candidate", str(self.op.master_candidate)))
2411       if self.op.master_candidate == False:
2412         rrc = self.rpc.call_node_demote_from_mc(node.name)
2413         msg = rrc.RemoteFailMsg()
2414         if msg:
2415           self.LogWarning("Node failed to demote itself: %s" % msg)
2416
2417     if self.op.drained is not None:
2418       node.drained = self.op.drained
2419       result.append(("drained", str(self.op.drained)))
2420       if self.op.drained == True:
2421         if node.master_candidate:
2422           node.master_candidate = False
2423           changed_mc = True
2424           result.append(("master_candidate", "auto-demotion due to drain"))
2425         if node.offline:
2426           node.offline = False
2427           result.append(("offline", "clear offline status due to drain"))
2428
2429     # this will trigger configuration file update, if needed
2430     self.cfg.Update(node)
2431     # this will trigger job queue propagation or cleanup
2432     if changed_mc:
2433       self.context.ReaddNode(node)
2434
2435     return result
2436
2437
2438 class LUQueryClusterInfo(NoHooksLU):
2439   """Query cluster configuration.
2440
2441   """
2442   _OP_REQP = []
2443   REQ_BGL = False
2444
2445   def ExpandNames(self):
2446     self.needed_locks = {}
2447
2448   def CheckPrereq(self):
2449     """No prerequsites needed for this LU.
2450
2451     """
2452     pass
2453
2454   def Exec(self, feedback_fn):
2455     """Return cluster config.
2456
2457     """
2458     cluster = self.cfg.GetClusterInfo()
2459     result = {
2460       "software_version": constants.RELEASE_VERSION,
2461       "protocol_version": constants.PROTOCOL_VERSION,
2462       "config_version": constants.CONFIG_VERSION,
2463       "os_api_version": constants.OS_API_VERSION,
2464       "export_version": constants.EXPORT_VERSION,
2465       "architecture": (platform.architecture()[0], platform.machine()),
2466       "name": cluster.cluster_name,
2467       "master": cluster.master_node,
2468       "default_hypervisor": cluster.default_hypervisor,
2469       "enabled_hypervisors": cluster.enabled_hypervisors,
2470       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2471                         for hypervisor in cluster.enabled_hypervisors]),
2472       "beparams": cluster.beparams,
2473       "candidate_pool_size": cluster.candidate_pool_size,
2474       "default_bridge": cluster.default_bridge,
2475       "master_netdev": cluster.master_netdev,
2476       "volume_group_name": cluster.volume_group_name,
2477       "file_storage_dir": cluster.file_storage_dir,
2478       }
2479
2480     return result
2481
2482
2483 class LUQueryConfigValues(NoHooksLU):
2484   """Return configuration values.
2485
2486   """
2487   _OP_REQP = []
2488   REQ_BGL = False
2489   _FIELDS_DYNAMIC = utils.FieldSet()
2490   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2491
2492   def ExpandNames(self):
2493     self.needed_locks = {}
2494
2495     _CheckOutputFields(static=self._FIELDS_STATIC,
2496                        dynamic=self._FIELDS_DYNAMIC,
2497                        selected=self.op.output_fields)
2498
2499   def CheckPrereq(self):
2500     """No prerequisites.
2501
2502     """
2503     pass
2504
2505   def Exec(self, feedback_fn):
2506     """Dump a representation of the cluster config to the standard output.
2507
2508     """
2509     values = []
2510     for field in self.op.output_fields:
2511       if field == "cluster_name":
2512         entry = self.cfg.GetClusterName()
2513       elif field == "master_node":
2514         entry = self.cfg.GetMasterNode()
2515       elif field == "drain_flag":
2516         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2517       else:
2518         raise errors.ParameterError(field)
2519       values.append(entry)
2520     return values
2521
2522
2523 class LUActivateInstanceDisks(NoHooksLU):
2524   """Bring up an instance's disks.
2525
2526   """
2527   _OP_REQP = ["instance_name"]
2528   REQ_BGL = False
2529
2530   def ExpandNames(self):
2531     self._ExpandAndLockInstance()
2532     self.needed_locks[locking.LEVEL_NODE] = []
2533     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2534
2535   def DeclareLocks(self, level):
2536     if level == locking.LEVEL_NODE:
2537       self._LockInstancesNodes()
2538
2539   def CheckPrereq(self):
2540     """Check prerequisites.
2541
2542     This checks that the instance is in the cluster.
2543
2544     """
2545     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2546     assert self.instance is not None, \
2547       "Cannot retrieve locked instance %s" % self.op.instance_name
2548     _CheckNodeOnline(self, self.instance.primary_node)
2549
2550   def Exec(self, feedback_fn):
2551     """Activate the disks.
2552
2553     """
2554     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2555     if not disks_ok:
2556       raise errors.OpExecError("Cannot activate block devices")
2557
2558     return disks_info
2559
2560
2561 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2562   """Prepare the block devices for an instance.
2563
2564   This sets up the block devices on all nodes.
2565
2566   @type lu: L{LogicalUnit}
2567   @param lu: the logical unit on whose behalf we execute
2568   @type instance: L{objects.Instance}
2569   @param instance: the instance for whose disks we assemble
2570   @type ignore_secondaries: boolean
2571   @param ignore_secondaries: if true, errors on secondary nodes
2572       won't result in an error return from the function
2573   @return: False if the operation failed, otherwise a list of
2574       (host, instance_visible_name, node_visible_name)
2575       with the mapping from node devices to instance devices
2576
2577   """
2578   device_info = []
2579   disks_ok = True
2580   iname = instance.name
2581   # With the two passes mechanism we try to reduce the window of
2582   # opportunity for the race condition of switching DRBD to primary
2583   # before handshaking occured, but we do not eliminate it
2584
2585   # The proper fix would be to wait (with some limits) until the
2586   # connection has been made and drbd transitions from WFConnection
2587   # into any other network-connected state (Connected, SyncTarget,
2588   # SyncSource, etc.)
2589
2590   # 1st pass, assemble on all nodes in secondary mode
2591   for inst_disk in instance.disks:
2592     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2593       lu.cfg.SetDiskID(node_disk, node)
2594       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2595       msg = result.RemoteFailMsg()
2596       if msg:
2597         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2598                            " (is_primary=False, pass=1): %s",
2599                            inst_disk.iv_name, node, msg)
2600         if not ignore_secondaries:
2601           disks_ok = False
2602
2603   # FIXME: race condition on drbd migration to primary
2604
2605   # 2nd pass, do only the primary node
2606   for inst_disk in instance.disks:
2607     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2608       if node != instance.primary_node:
2609         continue
2610       lu.cfg.SetDiskID(node_disk, node)
2611       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2612       msg = result.RemoteFailMsg()
2613       if msg:
2614         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2615                            " (is_primary=True, pass=2): %s",
2616                            inst_disk.iv_name, node, msg)
2617         disks_ok = False
2618     device_info.append((instance.primary_node, inst_disk.iv_name,
2619                         result.payload))
2620
2621   # leave the disks configured for the primary node
2622   # this is a workaround that would be fixed better by
2623   # improving the logical/physical id handling
2624   for disk in instance.disks:
2625     lu.cfg.SetDiskID(disk, instance.primary_node)
2626
2627   return disks_ok, device_info
2628
2629
2630 def _StartInstanceDisks(lu, instance, force):
2631   """Start the disks of an instance.
2632
2633   """
2634   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2635                                            ignore_secondaries=force)
2636   if not disks_ok:
2637     _ShutdownInstanceDisks(lu, instance)
2638     if force is not None and not force:
2639       lu.proc.LogWarning("", hint="If the message above refers to a"
2640                          " secondary node,"
2641                          " you can retry the operation using '--force'.")
2642     raise errors.OpExecError("Disk consistency error")
2643
2644
2645 class LUDeactivateInstanceDisks(NoHooksLU):
2646   """Shutdown an instance's disks.
2647
2648   """
2649   _OP_REQP = ["instance_name"]
2650   REQ_BGL = False
2651
2652   def ExpandNames(self):
2653     self._ExpandAndLockInstance()
2654     self.needed_locks[locking.LEVEL_NODE] = []
2655     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2656
2657   def DeclareLocks(self, level):
2658     if level == locking.LEVEL_NODE:
2659       self._LockInstancesNodes()
2660
2661   def CheckPrereq(self):
2662     """Check prerequisites.
2663
2664     This checks that the instance is in the cluster.
2665
2666     """
2667     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2668     assert self.instance is not None, \
2669       "Cannot retrieve locked instance %s" % self.op.instance_name
2670
2671   def Exec(self, feedback_fn):
2672     """Deactivate the disks
2673
2674     """
2675     instance = self.instance
2676     _SafeShutdownInstanceDisks(self, instance)
2677
2678
2679 def _SafeShutdownInstanceDisks(lu, instance):
2680   """Shutdown block devices of an instance.
2681
2682   This function checks if an instance is running, before calling
2683   _ShutdownInstanceDisks.
2684
2685   """
2686   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2687                                       [instance.hypervisor])
2688   ins_l = ins_l[instance.primary_node]
2689   if ins_l.failed or not isinstance(ins_l.data, list):
2690     raise errors.OpExecError("Can't contact node '%s'" %
2691                              instance.primary_node)
2692
2693   if instance.name in ins_l.data:
2694     raise errors.OpExecError("Instance is running, can't shutdown"
2695                              " block devices.")
2696
2697   _ShutdownInstanceDisks(lu, instance)
2698
2699
2700 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2701   """Shutdown block devices of an instance.
2702
2703   This does the shutdown on all nodes of the instance.
2704
2705   If the ignore_primary is false, errors on the primary node are
2706   ignored.
2707
2708   """
2709   all_result = True
2710   for disk in instance.disks:
2711     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2712       lu.cfg.SetDiskID(top_disk, node)
2713       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2714       msg = result.RemoteFailMsg()
2715       if msg:
2716         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2717                       disk.iv_name, node, msg)
2718         if not ignore_primary or node != instance.primary_node:
2719           all_result = False
2720   return all_result
2721
2722
2723 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2724   """Checks if a node has enough free memory.
2725
2726   This function check if a given node has the needed amount of free
2727   memory. In case the node has less memory or we cannot get the
2728   information from the node, this function raise an OpPrereqError
2729   exception.
2730
2731   @type lu: C{LogicalUnit}
2732   @param lu: a logical unit from which we get configuration data
2733   @type node: C{str}
2734   @param node: the node to check
2735   @type reason: C{str}
2736   @param reason: string to use in the error message
2737   @type requested: C{int}
2738   @param requested: the amount of memory in MiB to check for
2739   @type hypervisor_name: C{str}
2740   @param hypervisor_name: the hypervisor to ask for memory stats
2741   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2742       we cannot check the node
2743
2744   """
2745   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2746   nodeinfo[node].Raise()
2747   free_mem = nodeinfo[node].data.get('memory_free')
2748   if not isinstance(free_mem, int):
2749     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2750                              " was '%s'" % (node, free_mem))
2751   if requested > free_mem:
2752     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2753                              " needed %s MiB, available %s MiB" %
2754                              (node, reason, requested, free_mem))
2755
2756
2757 class LUStartupInstance(LogicalUnit):
2758   """Starts an instance.
2759
2760   """
2761   HPATH = "instance-start"
2762   HTYPE = constants.HTYPE_INSTANCE
2763   _OP_REQP = ["instance_name", "force"]
2764   REQ_BGL = False
2765
2766   def ExpandNames(self):
2767     self._ExpandAndLockInstance()
2768
2769   def BuildHooksEnv(self):
2770     """Build hooks env.
2771
2772     This runs on master, primary and secondary nodes of the instance.
2773
2774     """
2775     env = {
2776       "FORCE": self.op.force,
2777       }
2778     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2779     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2780     return env, nl, nl
2781
2782   def CheckPrereq(self):
2783     """Check prerequisites.
2784
2785     This checks that the instance is in the cluster.
2786
2787     """
2788     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2789     assert self.instance is not None, \
2790       "Cannot retrieve locked instance %s" % self.op.instance_name
2791
2792     # extra beparams
2793     self.beparams = getattr(self.op, "beparams", {})
2794     if self.beparams:
2795       if not isinstance(self.beparams, dict):
2796         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2797                                    " dict" % (type(self.beparams), ))
2798       # fill the beparams dict
2799       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2800       self.op.beparams = self.beparams
2801
2802     # extra hvparams
2803     self.hvparams = getattr(self.op, "hvparams", {})
2804     if self.hvparams:
2805       if not isinstance(self.hvparams, dict):
2806         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2807                                    " dict" % (type(self.hvparams), ))
2808
2809       # check hypervisor parameter syntax (locally)
2810       cluster = self.cfg.GetClusterInfo()
2811       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2812       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2813                                     instance.hvparams)
2814       filled_hvp.update(self.hvparams)
2815       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2816       hv_type.CheckParameterSyntax(filled_hvp)
2817       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2818       self.op.hvparams = self.hvparams
2819
2820     _CheckNodeOnline(self, instance.primary_node)
2821
2822     bep = self.cfg.GetClusterInfo().FillBE(instance)
2823     # check bridges existance
2824     _CheckInstanceBridgesExist(self, instance)
2825
2826     remote_info = self.rpc.call_instance_info(instance.primary_node,
2827                                               instance.name,
2828                                               instance.hypervisor)
2829     remote_info.Raise()
2830     if not remote_info.data:
2831       _CheckNodeFreeMemory(self, instance.primary_node,
2832                            "starting instance %s" % instance.name,
2833                            bep[constants.BE_MEMORY], instance.hypervisor)
2834
2835   def Exec(self, feedback_fn):
2836     """Start the instance.
2837
2838     """
2839     instance = self.instance
2840     force = self.op.force
2841
2842     self.cfg.MarkInstanceUp(instance.name)
2843
2844     node_current = instance.primary_node
2845
2846     _StartInstanceDisks(self, instance, force)
2847
2848     result = self.rpc.call_instance_start(node_current, instance,
2849                                           self.hvparams, self.beparams)
2850     msg = result.RemoteFailMsg()
2851     if msg:
2852       _ShutdownInstanceDisks(self, instance)
2853       raise errors.OpExecError("Could not start instance: %s" % msg)
2854
2855
2856 class LURebootInstance(LogicalUnit):
2857   """Reboot an instance.
2858
2859   """
2860   HPATH = "instance-reboot"
2861   HTYPE = constants.HTYPE_INSTANCE
2862   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2863   REQ_BGL = False
2864
2865   def ExpandNames(self):
2866     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2867                                    constants.INSTANCE_REBOOT_HARD,
2868                                    constants.INSTANCE_REBOOT_FULL]:
2869       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2870                                   (constants.INSTANCE_REBOOT_SOFT,
2871                                    constants.INSTANCE_REBOOT_HARD,
2872                                    constants.INSTANCE_REBOOT_FULL))
2873     self._ExpandAndLockInstance()
2874
2875   def BuildHooksEnv(self):
2876     """Build hooks env.
2877
2878     This runs on master, primary and secondary nodes of the instance.
2879
2880     """
2881     env = {
2882       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2883       "REBOOT_TYPE": self.op.reboot_type,
2884       }
2885     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2886     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2887     return env, nl, nl
2888
2889   def CheckPrereq(self):
2890     """Check prerequisites.
2891
2892     This checks that the instance is in the cluster.
2893
2894     """
2895     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2896     assert self.instance is not None, \
2897       "Cannot retrieve locked instance %s" % self.op.instance_name
2898
2899     _CheckNodeOnline(self, instance.primary_node)
2900
2901     # check bridges existance
2902     _CheckInstanceBridgesExist(self, instance)
2903
2904   def Exec(self, feedback_fn):
2905     """Reboot the instance.
2906
2907     """
2908     instance = self.instance
2909     ignore_secondaries = self.op.ignore_secondaries
2910     reboot_type = self.op.reboot_type
2911
2912     node_current = instance.primary_node
2913
2914     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2915                        constants.INSTANCE_REBOOT_HARD]:
2916       for disk in instance.disks:
2917         self.cfg.SetDiskID(disk, node_current)
2918       result = self.rpc.call_instance_reboot(node_current, instance,
2919                                              reboot_type)
2920       msg = result.RemoteFailMsg()
2921       if msg:
2922         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2923     else:
2924       result = self.rpc.call_instance_shutdown(node_current, instance)
2925       msg = result.RemoteFailMsg()
2926       if msg:
2927         raise errors.OpExecError("Could not shutdown instance for"
2928                                  " full reboot: %s" % msg)
2929       _ShutdownInstanceDisks(self, instance)
2930       _StartInstanceDisks(self, instance, ignore_secondaries)
2931       result = self.rpc.call_instance_start(node_current, instance, None, None)
2932       msg = result.RemoteFailMsg()
2933       if msg:
2934         _ShutdownInstanceDisks(self, instance)
2935         raise errors.OpExecError("Could not start instance for"
2936                                  " full reboot: %s" % msg)
2937
2938     self.cfg.MarkInstanceUp(instance.name)
2939
2940
2941 class LUShutdownInstance(LogicalUnit):
2942   """Shutdown an instance.
2943
2944   """
2945   HPATH = "instance-stop"
2946   HTYPE = constants.HTYPE_INSTANCE
2947   _OP_REQP = ["instance_name"]
2948   REQ_BGL = False
2949
2950   def ExpandNames(self):
2951     self._ExpandAndLockInstance()
2952
2953   def BuildHooksEnv(self):
2954     """Build hooks env.
2955
2956     This runs on master, primary and secondary nodes of the instance.
2957
2958     """
2959     env = _BuildInstanceHookEnvByObject(self, self.instance)
2960     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2961     return env, nl, nl
2962
2963   def CheckPrereq(self):
2964     """Check prerequisites.
2965
2966     This checks that the instance is in the cluster.
2967
2968     """
2969     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2970     assert self.instance is not None, \
2971       "Cannot retrieve locked instance %s" % self.op.instance_name
2972     _CheckNodeOnline(self, self.instance.primary_node)
2973
2974   def Exec(self, feedback_fn):
2975     """Shutdown the instance.
2976
2977     """
2978     instance = self.instance
2979     node_current = instance.primary_node
2980     self.cfg.MarkInstanceDown(instance.name)
2981     result = self.rpc.call_instance_shutdown(node_current, instance)
2982     msg = result.RemoteFailMsg()
2983     if msg:
2984       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2985
2986     _ShutdownInstanceDisks(self, instance)
2987
2988
2989 class LUReinstallInstance(LogicalUnit):
2990   """Reinstall an instance.
2991
2992   """
2993   HPATH = "instance-reinstall"
2994   HTYPE = constants.HTYPE_INSTANCE
2995   _OP_REQP = ["instance_name"]
2996   REQ_BGL = False
2997
2998   def ExpandNames(self):
2999     self._ExpandAndLockInstance()
3000
3001   def BuildHooksEnv(self):
3002     """Build hooks env.
3003
3004     This runs on master, primary and secondary nodes of the instance.
3005
3006     """
3007     env = _BuildInstanceHookEnvByObject(self, self.instance)
3008     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3009     return env, nl, nl
3010
3011   def CheckPrereq(self):
3012     """Check prerequisites.
3013
3014     This checks that the instance is in the cluster and is not running.
3015
3016     """
3017     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3018     assert instance is not None, \
3019       "Cannot retrieve locked instance %s" % self.op.instance_name
3020     _CheckNodeOnline(self, instance.primary_node)
3021
3022     if instance.disk_template == constants.DT_DISKLESS:
3023       raise errors.OpPrereqError("Instance '%s' has no disks" %
3024                                  self.op.instance_name)
3025     if instance.admin_up:
3026       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3027                                  self.op.instance_name)
3028     remote_info = self.rpc.call_instance_info(instance.primary_node,
3029                                               instance.name,
3030                                               instance.hypervisor)
3031     remote_info.Raise()
3032     if remote_info.data:
3033       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3034                                  (self.op.instance_name,
3035                                   instance.primary_node))
3036
3037     self.op.os_type = getattr(self.op, "os_type", None)
3038     if self.op.os_type is not None:
3039       # OS verification
3040       pnode = self.cfg.GetNodeInfo(
3041         self.cfg.ExpandNodeName(instance.primary_node))
3042       if pnode is None:
3043         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3044                                    self.op.pnode)
3045       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3046       result.Raise()
3047       if not isinstance(result.data, objects.OS):
3048         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3049                                    " primary node"  % self.op.os_type)
3050
3051     self.instance = instance
3052
3053   def Exec(self, feedback_fn):
3054     """Reinstall the instance.
3055
3056     """
3057     inst = self.instance
3058
3059     if self.op.os_type is not None:
3060       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3061       inst.os = self.op.os_type
3062       self.cfg.Update(inst)
3063
3064     _StartInstanceDisks(self, inst, None)
3065     try:
3066       feedback_fn("Running the instance OS create scripts...")
3067       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3068       msg = result.RemoteFailMsg()
3069       if msg:
3070         raise errors.OpExecError("Could not install OS for instance %s"
3071                                  " on node %s: %s" %
3072                                  (inst.name, inst.primary_node, msg))
3073     finally:
3074       _ShutdownInstanceDisks(self, inst)
3075
3076
3077 class LURenameInstance(LogicalUnit):
3078   """Rename an instance.
3079
3080   """
3081   HPATH = "instance-rename"
3082   HTYPE = constants.HTYPE_INSTANCE
3083   _OP_REQP = ["instance_name", "new_name"]
3084
3085   def BuildHooksEnv(self):
3086     """Build hooks env.
3087
3088     This runs on master, primary and secondary nodes of the instance.
3089
3090     """
3091     env = _BuildInstanceHookEnvByObject(self, self.instance)
3092     env["INSTANCE_NEW_NAME"] = self.op.new_name
3093     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3094     return env, nl, nl
3095
3096   def CheckPrereq(self):
3097     """Check prerequisites.
3098
3099     This checks that the instance is in the cluster and is not running.
3100
3101     """
3102     instance = self.cfg.GetInstanceInfo(
3103       self.cfg.ExpandInstanceName(self.op.instance_name))
3104     if instance is None:
3105       raise errors.OpPrereqError("Instance '%s' not known" %
3106                                  self.op.instance_name)
3107     _CheckNodeOnline(self, instance.primary_node)
3108
3109     if instance.admin_up:
3110       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3111                                  self.op.instance_name)
3112     remote_info = self.rpc.call_instance_info(instance.primary_node,
3113                                               instance.name,
3114                                               instance.hypervisor)
3115     remote_info.Raise()
3116     if remote_info.data:
3117       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3118                                  (self.op.instance_name,
3119                                   instance.primary_node))
3120     self.instance = instance
3121
3122     # new name verification
3123     name_info = utils.HostInfo(self.op.new_name)
3124
3125     self.op.new_name = new_name = name_info.name
3126     instance_list = self.cfg.GetInstanceList()
3127     if new_name in instance_list:
3128       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3129                                  new_name)
3130
3131     if not getattr(self.op, "ignore_ip", False):
3132       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3133         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3134                                    (name_info.ip, new_name))
3135
3136
3137   def Exec(self, feedback_fn):
3138     """Reinstall the instance.
3139
3140     """
3141     inst = self.instance
3142     old_name = inst.name
3143
3144     if inst.disk_template == constants.DT_FILE:
3145       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3146
3147     self.cfg.RenameInstance(inst.name, self.op.new_name)
3148     # Change the instance lock. This is definitely safe while we hold the BGL
3149     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3150     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3151
3152     # re-read the instance from the configuration after rename
3153     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3154
3155     if inst.disk_template == constants.DT_FILE:
3156       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3157       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3158                                                      old_file_storage_dir,
3159                                                      new_file_storage_dir)
3160       result.Raise()
3161       if not result.data:
3162         raise errors.OpExecError("Could not connect to node '%s' to rename"
3163                                  " directory '%s' to '%s' (but the instance"
3164                                  " has been renamed in Ganeti)" % (
3165                                  inst.primary_node, old_file_storage_dir,
3166                                  new_file_storage_dir))
3167
3168       if not result.data[0]:
3169         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3170                                  " (but the instance has been renamed in"
3171                                  " Ganeti)" % (old_file_storage_dir,
3172                                                new_file_storage_dir))
3173
3174     _StartInstanceDisks(self, inst, None)
3175     try:
3176       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3177                                                  old_name)
3178       msg = result.RemoteFailMsg()
3179       if msg:
3180         msg = ("Could not run OS rename script for instance %s on node %s"
3181                " (but the instance has been renamed in Ganeti): %s" %
3182                (inst.name, inst.primary_node, msg))
3183         self.proc.LogWarning(msg)
3184     finally:
3185       _ShutdownInstanceDisks(self, inst)
3186
3187
3188 class LURemoveInstance(LogicalUnit):
3189   """Remove an instance.
3190
3191   """
3192   HPATH = "instance-remove"
3193   HTYPE = constants.HTYPE_INSTANCE
3194   _OP_REQP = ["instance_name", "ignore_failures"]
3195   REQ_BGL = False
3196
3197   def ExpandNames(self):
3198     self._ExpandAndLockInstance()
3199     self.needed_locks[locking.LEVEL_NODE] = []
3200     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3201
3202   def DeclareLocks(self, level):
3203     if level == locking.LEVEL_NODE:
3204       self._LockInstancesNodes()
3205
3206   def BuildHooksEnv(self):
3207     """Build hooks env.
3208
3209     This runs on master, primary and secondary nodes of the instance.
3210
3211     """
3212     env = _BuildInstanceHookEnvByObject(self, self.instance)
3213     nl = [self.cfg.GetMasterNode()]
3214     return env, nl, nl
3215
3216   def CheckPrereq(self):
3217     """Check prerequisites.
3218
3219     This checks that the instance is in the cluster.
3220
3221     """
3222     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3223     assert self.instance is not None, \
3224       "Cannot retrieve locked instance %s" % self.op.instance_name
3225
3226   def Exec(self, feedback_fn):
3227     """Remove the instance.
3228
3229     """
3230     instance = self.instance
3231     logging.info("Shutting down instance %s on node %s",
3232                  instance.name, instance.primary_node)
3233
3234     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3235     msg = result.RemoteFailMsg()
3236     if msg:
3237       if self.op.ignore_failures:
3238         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3239       else:
3240         raise errors.OpExecError("Could not shutdown instance %s on"
3241                                  " node %s: %s" %
3242                                  (instance.name, instance.primary_node, msg))
3243
3244     logging.info("Removing block devices for instance %s", instance.name)
3245
3246     if not _RemoveDisks(self, instance):
3247       if self.op.ignore_failures:
3248         feedback_fn("Warning: can't remove instance's disks")
3249       else:
3250         raise errors.OpExecError("Can't remove instance's disks")
3251
3252     logging.info("Removing instance %s out of cluster config", instance.name)
3253
3254     self.cfg.RemoveInstance(instance.name)
3255     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3256
3257
3258 class LUQueryInstances(NoHooksLU):
3259   """Logical unit for querying instances.
3260
3261   """
3262   _OP_REQP = ["output_fields", "names", "use_locking"]
3263   REQ_BGL = False
3264   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3265                                     "admin_state",
3266                                     "disk_template", "ip", "mac", "bridge",
3267                                     "sda_size", "sdb_size", "vcpus", "tags",
3268                                     "network_port", "beparams",
3269                                     r"(disk)\.(size)/([0-9]+)",
3270                                     r"(disk)\.(sizes)", "disk_usage",
3271                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3272                                     r"(nic)\.(macs|ips|bridges)",
3273                                     r"(disk|nic)\.(count)",
3274                                     "serial_no", "hypervisor", "hvparams",] +
3275                                   ["hv/%s" % name
3276                                    for name in constants.HVS_PARAMETERS] +
3277                                   ["be/%s" % name
3278                                    for name in constants.BES_PARAMETERS])
3279   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3280
3281
3282   def ExpandNames(self):
3283     _CheckOutputFields(static=self._FIELDS_STATIC,
3284                        dynamic=self._FIELDS_DYNAMIC,
3285                        selected=self.op.output_fields)
3286
3287     self.needed_locks = {}
3288     self.share_locks[locking.LEVEL_INSTANCE] = 1
3289     self.share_locks[locking.LEVEL_NODE] = 1
3290
3291     if self.op.names:
3292       self.wanted = _GetWantedInstances(self, self.op.names)
3293     else:
3294       self.wanted = locking.ALL_SET
3295
3296     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3297     self.do_locking = self.do_node_query and self.op.use_locking
3298     if self.do_locking:
3299       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3300       self.needed_locks[locking.LEVEL_NODE] = []
3301       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3302
3303   def DeclareLocks(self, level):
3304     if level == locking.LEVEL_NODE and self.do_locking:
3305       self._LockInstancesNodes()
3306
3307   def CheckPrereq(self):
3308     """Check prerequisites.
3309
3310     """
3311     pass
3312
3313   def Exec(self, feedback_fn):
3314     """Computes the list of nodes and their attributes.
3315
3316     """
3317     all_info = self.cfg.GetAllInstancesInfo()
3318     if self.wanted == locking.ALL_SET:
3319       # caller didn't specify instance names, so ordering is not important
3320       if self.do_locking:
3321         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3322       else:
3323         instance_names = all_info.keys()
3324       instance_names = utils.NiceSort(instance_names)
3325     else:
3326       # caller did specify names, so we must keep the ordering
3327       if self.do_locking:
3328         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3329       else:
3330         tgt_set = all_info.keys()
3331       missing = set(self.wanted).difference(tgt_set)
3332       if missing:
3333         raise errors.OpExecError("Some instances were removed before"
3334                                  " retrieving their data: %s" % missing)
3335       instance_names = self.wanted
3336
3337     instance_list = [all_info[iname] for iname in instance_names]
3338
3339     # begin data gathering
3340
3341     nodes = frozenset([inst.primary_node for inst in instance_list])
3342     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3343
3344     bad_nodes = []
3345     off_nodes = []
3346     if self.do_node_query:
3347       live_data = {}
3348       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3349       for name in nodes:
3350         result = node_data[name]
3351         if result.offline:
3352           # offline nodes will be in both lists
3353           off_nodes.append(name)
3354         if result.failed:
3355           bad_nodes.append(name)
3356         else:
3357           if result.data:
3358             live_data.update(result.data)
3359             # else no instance is alive
3360     else:
3361       live_data = dict([(name, {}) for name in instance_names])
3362
3363     # end data gathering
3364
3365     HVPREFIX = "hv/"
3366     BEPREFIX = "be/"
3367     output = []
3368     for instance in instance_list:
3369       iout = []
3370       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3371       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3372       for field in self.op.output_fields:
3373         st_match = self._FIELDS_STATIC.Matches(field)
3374         if field == "name":
3375           val = instance.name
3376         elif field == "os":
3377           val = instance.os
3378         elif field == "pnode":
3379           val = instance.primary_node
3380         elif field == "snodes":
3381           val = list(instance.secondary_nodes)
3382         elif field == "admin_state":
3383           val = instance.admin_up
3384         elif field == "oper_state":
3385           if instance.primary_node in bad_nodes:
3386             val = None
3387           else:
3388             val = bool(live_data.get(instance.name))
3389         elif field == "status":
3390           if instance.primary_node in off_nodes:
3391             val = "ERROR_nodeoffline"
3392           elif instance.primary_node in bad_nodes:
3393             val = "ERROR_nodedown"
3394           else:
3395             running = bool(live_data.get(instance.name))
3396             if running:
3397               if instance.admin_up:
3398                 val = "running"
3399               else:
3400                 val = "ERROR_up"
3401             else:
3402               if instance.admin_up:
3403                 val = "ERROR_down"
3404               else:
3405                 val = "ADMIN_down"
3406         elif field == "oper_ram":
3407           if instance.primary_node in bad_nodes:
3408             val = None
3409           elif instance.name in live_data:
3410             val = live_data[instance.name].get("memory", "?")
3411           else:
3412             val = "-"
3413         elif field == "disk_template":
3414           val = instance.disk_template
3415         elif field == "ip":
3416           val = instance.nics[0].ip
3417         elif field == "bridge":
3418           val = instance.nics[0].bridge
3419         elif field == "mac":
3420           val = instance.nics[0].mac
3421         elif field == "sda_size" or field == "sdb_size":
3422           idx = ord(field[2]) - ord('a')
3423           try:
3424             val = instance.FindDisk(idx).size
3425           except errors.OpPrereqError:
3426             val = None
3427         elif field == "disk_usage": # total disk usage per node
3428           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3429           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3430         elif field == "tags":
3431           val = list(instance.GetTags())
3432         elif field == "serial_no":
3433           val = instance.serial_no
3434         elif field == "network_port":
3435           val = instance.network_port
3436         elif field == "hypervisor":
3437           val = instance.hypervisor
3438         elif field == "hvparams":
3439           val = i_hv
3440         elif (field.startswith(HVPREFIX) and
3441               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3442           val = i_hv.get(field[len(HVPREFIX):], None)
3443         elif field == "beparams":
3444           val = i_be
3445         elif (field.startswith(BEPREFIX) and
3446               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3447           val = i_be.get(field[len(BEPREFIX):], None)
3448         elif st_match and st_match.groups():
3449           # matches a variable list
3450           st_groups = st_match.groups()
3451           if st_groups and st_groups[0] == "disk":
3452             if st_groups[1] == "count":
3453               val = len(instance.disks)
3454             elif st_groups[1] == "sizes":
3455               val = [disk.size for disk in instance.disks]
3456             elif st_groups[1] == "size":
3457               try:
3458                 val = instance.FindDisk(st_groups[2]).size
3459               except errors.OpPrereqError:
3460                 val = None
3461             else:
3462               assert False, "Unhandled disk parameter"
3463           elif st_groups[0] == "nic":
3464             if st_groups[1] == "count":
3465               val = len(instance.nics)
3466             elif st_groups[1] == "macs":
3467               val = [nic.mac for nic in instance.nics]
3468             elif st_groups[1] == "ips":
3469               val = [nic.ip for nic in instance.nics]
3470             elif st_groups[1] == "bridges":
3471               val = [nic.bridge for nic in instance.nics]
3472             else:
3473               # index-based item
3474               nic_idx = int(st_groups[2])
3475               if nic_idx >= len(instance.nics):
3476                 val = None
3477               else:
3478                 if st_groups[1] == "mac":
3479                   val = instance.nics[nic_idx].mac
3480                 elif st_groups[1] == "ip":
3481                   val = instance.nics[nic_idx].ip
3482                 elif st_groups[1] == "bridge":
3483                   val = instance.nics[nic_idx].bridge
3484                 else:
3485                   assert False, "Unhandled NIC parameter"
3486           else:
3487             assert False, "Unhandled variable parameter"
3488         else:
3489           raise errors.ParameterError(field)
3490         iout.append(val)
3491       output.append(iout)
3492
3493     return output
3494
3495
3496 class LUFailoverInstance(LogicalUnit):
3497   """Failover an instance.
3498
3499   """
3500   HPATH = "instance-failover"
3501   HTYPE = constants.HTYPE_INSTANCE
3502   _OP_REQP = ["instance_name", "ignore_consistency"]
3503   REQ_BGL = False
3504
3505   def ExpandNames(self):
3506     self._ExpandAndLockInstance()
3507     self.needed_locks[locking.LEVEL_NODE] = []
3508     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3509
3510   def DeclareLocks(self, level):
3511     if level == locking.LEVEL_NODE:
3512       self._LockInstancesNodes()
3513
3514   def BuildHooksEnv(self):
3515     """Build hooks env.
3516
3517     This runs on master, primary and secondary nodes of the instance.
3518
3519     """
3520     env = {
3521       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3522       }
3523     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3524     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3525     return env, nl, nl
3526
3527   def CheckPrereq(self):
3528     """Check prerequisites.
3529
3530     This checks that the instance is in the cluster.
3531
3532     """
3533     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3534     assert self.instance is not None, \
3535       "Cannot retrieve locked instance %s" % self.op.instance_name
3536
3537     bep = self.cfg.GetClusterInfo().FillBE(instance)
3538     if instance.disk_template not in constants.DTS_NET_MIRROR:
3539       raise errors.OpPrereqError("Instance's disk layout is not"
3540                                  " network mirrored, cannot failover.")
3541
3542     secondary_nodes = instance.secondary_nodes
3543     if not secondary_nodes:
3544       raise errors.ProgrammerError("no secondary node but using "
3545                                    "a mirrored disk template")
3546
3547     target_node = secondary_nodes[0]
3548     _CheckNodeOnline(self, target_node)
3549     _CheckNodeNotDrained(self, target_node)
3550
3551     if instance.admin_up:
3552       # check memory requirements on the secondary node
3553       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3554                            instance.name, bep[constants.BE_MEMORY],
3555                            instance.hypervisor)
3556     else:
3557       self.LogInfo("Not checking memory on the secondary node as"
3558                    " instance will not be started")
3559
3560     # check bridge existance
3561     brlist = [nic.bridge for nic in instance.nics]
3562     result = self.rpc.call_bridges_exist(target_node, brlist)
3563     result.Raise()
3564     if not result.data:
3565       raise errors.OpPrereqError("One or more target bridges %s does not"
3566                                  " exist on destination node '%s'" %
3567                                  (brlist, target_node))
3568
3569   def Exec(self, feedback_fn):
3570     """Failover an instance.
3571
3572     The failover is done by shutting it down on its present node and
3573     starting it on the secondary.
3574
3575     """
3576     instance = self.instance
3577
3578     source_node = instance.primary_node
3579     target_node = instance.secondary_nodes[0]
3580
3581     feedback_fn("* checking disk consistency between source and target")
3582     for dev in instance.disks:
3583       # for drbd, these are drbd over lvm
3584       if not _CheckDiskConsistency(self, dev, target_node, False):
3585         if instance.admin_up and not self.op.ignore_consistency:
3586           raise errors.OpExecError("Disk %s is degraded on target node,"
3587                                    " aborting failover." % dev.iv_name)
3588
3589     feedback_fn("* shutting down instance on source node")
3590     logging.info("Shutting down instance %s on node %s",
3591                  instance.name, source_node)
3592
3593     result = self.rpc.call_instance_shutdown(source_node, instance)
3594     msg = result.RemoteFailMsg()
3595     if msg:
3596       if self.op.ignore_consistency:
3597         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3598                              " Proceeding anyway. Please make sure node"
3599                              " %s is down. Error details: %s",
3600                              instance.name, source_node, source_node, msg)
3601       else:
3602         raise errors.OpExecError("Could not shutdown instance %s on"
3603                                  " node %s: %s" %
3604                                  (instance.name, source_node, msg))
3605
3606     feedback_fn("* deactivating the instance's disks on source node")
3607     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3608       raise errors.OpExecError("Can't shut down the instance's disks.")
3609
3610     instance.primary_node = target_node
3611     # distribute new instance config to the other nodes
3612     self.cfg.Update(instance)
3613
3614     # Only start the instance if it's marked as up
3615     if instance.admin_up:
3616       feedback_fn("* activating the instance's disks on target node")
3617       logging.info("Starting instance %s on node %s",
3618                    instance.name, target_node)
3619
3620       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3621                                                ignore_secondaries=True)
3622       if not disks_ok:
3623         _ShutdownInstanceDisks(self, instance)
3624         raise errors.OpExecError("Can't activate the instance's disks")
3625
3626       feedback_fn("* starting the instance on the target node")
3627       result = self.rpc.call_instance_start(target_node, instance, None, None)
3628       msg = result.RemoteFailMsg()
3629       if msg:
3630         _ShutdownInstanceDisks(self, instance)
3631         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3632                                  (instance.name, target_node, msg))
3633
3634
3635 class LUMigrateInstance(LogicalUnit):
3636   """Migrate an instance.
3637
3638   This is migration without shutting down, compared to the failover,
3639   which is done with shutdown.
3640
3641   """
3642   HPATH = "instance-migrate"
3643   HTYPE = constants.HTYPE_INSTANCE
3644   _OP_REQP = ["instance_name", "live", "cleanup"]
3645
3646   REQ_BGL = False
3647
3648   def ExpandNames(self):
3649     self._ExpandAndLockInstance()
3650     self.needed_locks[locking.LEVEL_NODE] = []
3651     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3652
3653   def DeclareLocks(self, level):
3654     if level == locking.LEVEL_NODE:
3655       self._LockInstancesNodes()
3656
3657   def BuildHooksEnv(self):
3658     """Build hooks env.
3659
3660     This runs on master, primary and secondary nodes of the instance.
3661
3662     """
3663     env = _BuildInstanceHookEnvByObject(self, self.instance)
3664     env["MIGRATE_LIVE"] = self.op.live
3665     env["MIGRATE_CLEANUP"] = self.op.cleanup
3666     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3667     return env, nl, nl
3668
3669   def CheckPrereq(self):
3670     """Check prerequisites.
3671
3672     This checks that the instance is in the cluster.
3673
3674     """
3675     instance = self.cfg.GetInstanceInfo(
3676       self.cfg.ExpandInstanceName(self.op.instance_name))
3677     if instance is None:
3678       raise errors.OpPrereqError("Instance '%s' not known" %
3679                                  self.op.instance_name)
3680
3681     if instance.disk_template != constants.DT_DRBD8:
3682       raise errors.OpPrereqError("Instance's disk layout is not"
3683                                  " drbd8, cannot migrate.")
3684
3685     secondary_nodes = instance.secondary_nodes
3686     if not secondary_nodes:
3687       raise errors.ConfigurationError("No secondary node but using"
3688                                       " drbd8 disk template")
3689
3690     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3691
3692     target_node = secondary_nodes[0]
3693     # check memory requirements on the secondary node
3694     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3695                          instance.name, i_be[constants.BE_MEMORY],
3696                          instance.hypervisor)
3697
3698     # check bridge existance
3699     brlist = [nic.bridge for nic in instance.nics]
3700     result = self.rpc.call_bridges_exist(target_node, brlist)
3701     if result.failed or not result.data:
3702       raise errors.OpPrereqError("One or more target bridges %s does not"
3703                                  " exist on destination node '%s'" %
3704                                  (brlist, target_node))
3705
3706     if not self.op.cleanup:
3707       _CheckNodeNotDrained(self, target_node)
3708       result = self.rpc.call_instance_migratable(instance.primary_node,
3709                                                  instance)
3710       msg = result.RemoteFailMsg()
3711       if msg:
3712         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3713                                    msg)
3714
3715     self.instance = instance
3716
3717   def _WaitUntilSync(self):
3718     """Poll with custom rpc for disk sync.
3719
3720     This uses our own step-based rpc call.
3721
3722     """
3723     self.feedback_fn("* wait until resync is done")
3724     all_done = False
3725     while not all_done:
3726       all_done = True
3727       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3728                                             self.nodes_ip,
3729                                             self.instance.disks)
3730       min_percent = 100
3731       for node, nres in result.items():
3732         msg = nres.RemoteFailMsg()
3733         if msg:
3734           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3735                                    (node, msg))
3736         node_done, node_percent = nres.payload
3737         all_done = all_done and node_done
3738         if node_percent is not None:
3739           min_percent = min(min_percent, node_percent)
3740       if not all_done:
3741         if min_percent < 100:
3742           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3743         time.sleep(2)
3744
3745   def _EnsureSecondary(self, node):
3746     """Demote a node to secondary.
3747
3748     """
3749     self.feedback_fn("* switching node %s to secondary mode" % node)
3750
3751     for dev in self.instance.disks:
3752       self.cfg.SetDiskID(dev, node)
3753
3754     result = self.rpc.call_blockdev_close(node, self.instance.name,
3755                                           self.instance.disks)
3756     msg = result.RemoteFailMsg()
3757     if msg:
3758       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3759                                " error %s" % (node, msg))
3760
3761   def _GoStandalone(self):
3762     """Disconnect from the network.
3763
3764     """
3765     self.feedback_fn("* changing into standalone mode")
3766     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3767                                                self.instance.disks)
3768     for node, nres in result.items():
3769       msg = nres.RemoteFailMsg()
3770       if msg:
3771         raise errors.OpExecError("Cannot disconnect disks node %s,"
3772                                  " error %s" % (node, msg))
3773
3774   def _GoReconnect(self, multimaster):
3775     """Reconnect to the network.
3776
3777     """
3778     if multimaster:
3779       msg = "dual-master"
3780     else:
3781       msg = "single-master"
3782     self.feedback_fn("* changing disks into %s mode" % msg)
3783     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3784                                            self.instance.disks,
3785                                            self.instance.name, multimaster)
3786     for node, nres in result.items():
3787       msg = nres.RemoteFailMsg()
3788       if msg:
3789         raise errors.OpExecError("Cannot change disks config on node %s,"
3790                                  " error: %s" % (node, msg))
3791
3792   def _ExecCleanup(self):
3793     """Try to cleanup after a failed migration.
3794
3795     The cleanup is done by:
3796       - check that the instance is running only on one node
3797         (and update the config if needed)
3798       - change disks on its secondary node to secondary
3799       - wait until disks are fully synchronized
3800       - disconnect from the network
3801       - change disks into single-master mode
3802       - wait again until disks are fully synchronized
3803
3804     """
3805     instance = self.instance
3806     target_node = self.target_node
3807     source_node = self.source_node
3808
3809     # check running on only one node
3810     self.feedback_fn("* checking where the instance actually runs"
3811                      " (if this hangs, the hypervisor might be in"
3812                      " a bad state)")
3813     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3814     for node, result in ins_l.items():
3815       result.Raise()
3816       if not isinstance(result.data, list):
3817         raise errors.OpExecError("Can't contact node '%s'" % node)
3818
3819     runningon_source = instance.name in ins_l[source_node].data
3820     runningon_target = instance.name in ins_l[target_node].data
3821
3822     if runningon_source and runningon_target:
3823       raise errors.OpExecError("Instance seems to be running on two nodes,"
3824                                " or the hypervisor is confused. You will have"
3825                                " to ensure manually that it runs only on one"
3826                                " and restart this operation.")
3827
3828     if not (runningon_source or runningon_target):
3829       raise errors.OpExecError("Instance does not seem to be running at all."
3830                                " In this case, it's safer to repair by"
3831                                " running 'gnt-instance stop' to ensure disk"
3832                                " shutdown, and then restarting it.")
3833
3834     if runningon_target:
3835       # the migration has actually succeeded, we need to update the config
3836       self.feedback_fn("* instance running on secondary node (%s),"
3837                        " updating config" % target_node)
3838       instance.primary_node = target_node
3839       self.cfg.Update(instance)
3840       demoted_node = source_node
3841     else:
3842       self.feedback_fn("* instance confirmed to be running on its"
3843                        " primary node (%s)" % source_node)
3844       demoted_node = target_node
3845
3846     self._EnsureSecondary(demoted_node)
3847     try:
3848       self._WaitUntilSync()
3849     except errors.OpExecError:
3850       # we ignore here errors, since if the device is standalone, it
3851       # won't be able to sync
3852       pass
3853     self._GoStandalone()
3854     self._GoReconnect(False)
3855     self._WaitUntilSync()
3856
3857     self.feedback_fn("* done")
3858
3859   def _RevertDiskStatus(self):
3860     """Try to revert the disk status after a failed migration.
3861
3862     """
3863     target_node = self.target_node
3864     try:
3865       self._EnsureSecondary(target_node)
3866       self._GoStandalone()
3867       self._GoReconnect(False)
3868       self._WaitUntilSync()
3869     except errors.OpExecError, err:
3870       self.LogWarning("Migration failed and I can't reconnect the"
3871                       " drives: error '%s'\n"
3872                       "Please look and recover the instance status" %
3873                       str(err))
3874
3875   def _AbortMigration(self):
3876     """Call the hypervisor code to abort a started migration.
3877
3878     """
3879     instance = self.instance
3880     target_node = self.target_node
3881     migration_info = self.migration_info
3882
3883     abort_result = self.rpc.call_finalize_migration(target_node,
3884                                                     instance,
3885                                                     migration_info,
3886                                                     False)
3887     abort_msg = abort_result.RemoteFailMsg()
3888     if abort_msg:
3889       logging.error("Aborting migration failed on target node %s: %s" %
3890                     (target_node, abort_msg))
3891       # Don't raise an exception here, as we stil have to try to revert the
3892       # disk status, even if this step failed.
3893
3894   def _ExecMigration(self):
3895     """Migrate an instance.
3896
3897     The migrate is done by:
3898       - change the disks into dual-master mode
3899       - wait until disks are fully synchronized again
3900       - migrate the instance
3901       - change disks on the new secondary node (the old primary) to secondary
3902       - wait until disks are fully synchronized
3903       - change disks into single-master mode
3904
3905     """
3906     instance = self.instance
3907     target_node = self.target_node
3908     source_node = self.source_node
3909
3910     self.feedback_fn("* checking disk consistency between source and target")
3911     for dev in instance.disks:
3912       if not _CheckDiskConsistency(self, dev, target_node, False):
3913         raise errors.OpExecError("Disk %s is degraded or not fully"
3914                                  " synchronized on target node,"
3915                                  " aborting migrate." % dev.iv_name)
3916
3917     # First get the migration information from the remote node
3918     result = self.rpc.call_migration_info(source_node, instance)
3919     msg = result.RemoteFailMsg()
3920     if msg:
3921       log_err = ("Failed fetching source migration information from %s: %s" %
3922                  (source_node, msg))
3923       logging.error(log_err)
3924       raise errors.OpExecError(log_err)
3925
3926     self.migration_info = migration_info = result.payload
3927
3928     # Then switch the disks to master/master mode
3929     self._EnsureSecondary(target_node)
3930     self._GoStandalone()
3931     self._GoReconnect(True)
3932     self._WaitUntilSync()
3933
3934     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3935     result = self.rpc.call_accept_instance(target_node,
3936                                            instance,
3937                                            migration_info,
3938                                            self.nodes_ip[target_node])
3939
3940     msg = result.RemoteFailMsg()
3941     if msg:
3942       logging.error("Instance pre-migration failed, trying to revert"
3943                     " disk status: %s", msg)
3944       self._AbortMigration()
3945       self._RevertDiskStatus()
3946       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3947                                (instance.name, msg))
3948
3949     self.feedback_fn("* migrating instance to %s" % target_node)
3950     time.sleep(10)
3951     result = self.rpc.call_instance_migrate(source_node, instance,
3952                                             self.nodes_ip[target_node],
3953                                             self.op.live)
3954     msg = result.RemoteFailMsg()
3955     if msg:
3956       logging.error("Instance migration failed, trying to revert"
3957                     " disk status: %s", msg)
3958       self._AbortMigration()
3959       self._RevertDiskStatus()
3960       raise errors.OpExecError("Could not migrate instance %s: %s" %
3961                                (instance.name, msg))
3962     time.sleep(10)
3963
3964     instance.primary_node = target_node
3965     # distribute new instance config to the other nodes
3966     self.cfg.Update(instance)
3967
3968     result = self.rpc.call_finalize_migration(target_node,
3969                                               instance,
3970                                               migration_info,
3971                                               True)
3972     msg = result.RemoteFailMsg()
3973     if msg:
3974       logging.error("Instance migration succeeded, but finalization failed:"
3975                     " %s" % msg)
3976       raise errors.OpExecError("Could not finalize instance migration: %s" %
3977                                msg)
3978
3979     self._EnsureSecondary(source_node)
3980     self._WaitUntilSync()
3981     self._GoStandalone()
3982     self._GoReconnect(False)
3983     self._WaitUntilSync()
3984
3985     self.feedback_fn("* done")
3986
3987   def Exec(self, feedback_fn):
3988     """Perform the migration.
3989
3990     """
3991     self.feedback_fn = feedback_fn
3992
3993     self.source_node = self.instance.primary_node
3994     self.target_node = self.instance.secondary_nodes[0]
3995     self.all_nodes = [self.source_node, self.target_node]
3996     self.nodes_ip = {
3997       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3998       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3999       }
4000     if self.op.cleanup:
4001       return self._ExecCleanup()
4002     else:
4003       return self._ExecMigration()
4004
4005
4006 def _CreateBlockDev(lu, node, instance, device, force_create,
4007                     info, force_open):
4008   """Create a tree of block devices on a given node.
4009
4010   If this device type has to be created on secondaries, create it and
4011   all its children.
4012
4013   If not, just recurse to children keeping the same 'force' value.
4014
4015   @param lu: the lu on whose behalf we execute
4016   @param node: the node on which to create the device
4017   @type instance: L{objects.Instance}
4018   @param instance: the instance which owns the device
4019   @type device: L{objects.Disk}
4020   @param device: the device to create
4021   @type force_create: boolean
4022   @param force_create: whether to force creation of this device; this
4023       will be change to True whenever we find a device which has
4024       CreateOnSecondary() attribute
4025   @param info: the extra 'metadata' we should attach to the device
4026       (this will be represented as a LVM tag)
4027   @type force_open: boolean
4028   @param force_open: this parameter will be passes to the
4029       L{backend.BlockdevCreate} function where it specifies
4030       whether we run on primary or not, and it affects both
4031       the child assembly and the device own Open() execution
4032
4033   """
4034   if device.CreateOnSecondary():
4035     force_create = True
4036
4037   if device.children:
4038     for child in device.children:
4039       _CreateBlockDev(lu, node, instance, child, force_create,
4040                       info, force_open)
4041
4042   if not force_create:
4043     return
4044
4045   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4046
4047
4048 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4049   """Create a single block device on a given node.
4050
4051   This will not recurse over children of the device, so they must be
4052   created in advance.
4053
4054   @param lu: the lu on whose behalf we execute
4055   @param node: the node on which to create the device
4056   @type instance: L{objects.Instance}
4057   @param instance: the instance which owns the device
4058   @type device: L{objects.Disk}
4059   @param device: the device to create
4060   @param info: the extra 'metadata' we should attach to the device
4061       (this will be represented as a LVM tag)
4062   @type force_open: boolean
4063   @param force_open: this parameter will be passes to the
4064       L{backend.BlockdevCreate} function where it specifies
4065       whether we run on primary or not, and it affects both
4066       the child assembly and the device own Open() execution
4067
4068   """
4069   lu.cfg.SetDiskID(device, node)
4070   result = lu.rpc.call_blockdev_create(node, device, device.size,
4071                                        instance.name, force_open, info)
4072   msg = result.RemoteFailMsg()
4073   if msg:
4074     raise errors.OpExecError("Can't create block device %s on"
4075                              " node %s for instance %s: %s" %
4076                              (device, node, instance.name, msg))
4077   if device.physical_id is None:
4078     device.physical_id = result.payload
4079
4080
4081 def _GenerateUniqueNames(lu, exts):
4082   """Generate a suitable LV name.
4083
4084   This will generate a logical volume name for the given instance.
4085
4086   """
4087   results = []
4088   for val in exts:
4089     new_id = lu.cfg.GenerateUniqueID()
4090     results.append("%s%s" % (new_id, val))
4091   return results
4092
4093
4094 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4095                          p_minor, s_minor):
4096   """Generate a drbd8 device complete with its children.
4097
4098   """
4099   port = lu.cfg.AllocatePort()
4100   vgname = lu.cfg.GetVGName()
4101   shared_secret = lu.cfg.GenerateDRBDSecret()
4102   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4103                           logical_id=(vgname, names[0]))
4104   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4105                           logical_id=(vgname, names[1]))
4106   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4107                           logical_id=(primary, secondary, port,
4108                                       p_minor, s_minor,
4109                                       shared_secret),
4110                           children=[dev_data, dev_meta],
4111                           iv_name=iv_name)
4112   return drbd_dev
4113
4114
4115 def _GenerateDiskTemplate(lu, template_name,
4116                           instance_name, primary_node,
4117                           secondary_nodes, disk_info,
4118                           file_storage_dir, file_driver,
4119                           base_index):
4120   """Generate the entire disk layout for a given template type.
4121
4122   """
4123   #TODO: compute space requirements
4124
4125   vgname = lu.cfg.GetVGName()
4126   disk_count = len(disk_info)
4127   disks = []
4128   if template_name == constants.DT_DISKLESS:
4129     pass
4130   elif template_name == constants.DT_PLAIN:
4131     if len(secondary_nodes) != 0:
4132       raise errors.ProgrammerError("Wrong template configuration")
4133
4134     names = _GenerateUniqueNames(lu, [".disk%d" % i
4135                                       for i in range(disk_count)])
4136     for idx, disk in enumerate(disk_info):
4137       disk_index = idx + base_index
4138       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4139                               logical_id=(vgname, names[idx]),
4140                               iv_name="disk/%d" % disk_index,
4141                               mode=disk["mode"])
4142       disks.append(disk_dev)
4143   elif template_name == constants.DT_DRBD8:
4144     if len(secondary_nodes) != 1:
4145       raise errors.ProgrammerError("Wrong template configuration")
4146     remote_node = secondary_nodes[0]
4147     minors = lu.cfg.AllocateDRBDMinor(
4148       [primary_node, remote_node] * len(disk_info), instance_name)
4149
4150     names = []
4151     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4152                                                for i in range(disk_count)]):
4153       names.append(lv_prefix + "_data")
4154       names.append(lv_prefix + "_meta")
4155     for idx, disk in enumerate(disk_info):
4156       disk_index = idx + base_index
4157       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4158                                       disk["size"], names[idx*2:idx*2+2],
4159                                       "disk/%d" % disk_index,
4160                                       minors[idx*2], minors[idx*2+1])
4161       disk_dev.mode = disk["mode"]
4162       disks.append(disk_dev)
4163   elif template_name == constants.DT_FILE:
4164     if len(secondary_nodes) != 0:
4165       raise errors.ProgrammerError("Wrong template configuration")
4166
4167     for idx, disk in enumerate(disk_info):
4168       disk_index = idx + base_index
4169       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4170                               iv_name="disk/%d" % disk_index,
4171                               logical_id=(file_driver,
4172                                           "%s/disk%d" % (file_storage_dir,
4173                                                          disk_index)),
4174                               mode=disk["mode"])
4175       disks.append(disk_dev)
4176   else:
4177     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4178   return disks
4179
4180
4181 def _GetInstanceInfoText(instance):
4182   """Compute that text that should be added to the disk's metadata.
4183
4184   """
4185   return "originstname+%s" % instance.name
4186
4187
4188 def _CreateDisks(lu, instance):
4189   """Create all disks for an instance.
4190
4191   This abstracts away some work from AddInstance.
4192
4193   @type lu: L{LogicalUnit}
4194   @param lu: the logical unit on whose behalf we execute
4195   @type instance: L{objects.Instance}
4196   @param instance: the instance whose disks we should create
4197   @rtype: boolean
4198   @return: the success of the creation
4199
4200   """
4201   info = _GetInstanceInfoText(instance)
4202   pnode = instance.primary_node
4203
4204   if instance.disk_template == constants.DT_FILE:
4205     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4206     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4207
4208     if result.failed or not result.data:
4209       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4210
4211     if not result.data[0]:
4212       raise errors.OpExecError("Failed to create directory '%s'" %
4213                                file_storage_dir)
4214
4215   # Note: this needs to be kept in sync with adding of disks in
4216   # LUSetInstanceParams
4217   for device in instance.disks:
4218     logging.info("Creating volume %s for instance %s",
4219                  device.iv_name, instance.name)
4220     #HARDCODE
4221     for node in instance.all_nodes:
4222       f_create = node == pnode
4223       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4224
4225
4226 def _RemoveDisks(lu, instance):
4227   """Remove all disks for an instance.
4228
4229   This abstracts away some work from `AddInstance()` and
4230   `RemoveInstance()`. Note that in case some of the devices couldn't
4231   be removed, the removal will continue with the other ones (compare
4232   with `_CreateDisks()`).
4233
4234   @type lu: L{LogicalUnit}
4235   @param lu: the logical unit on whose behalf we execute
4236   @type instance: L{objects.Instance}
4237   @param instance: the instance whose disks we should remove
4238   @rtype: boolean
4239   @return: the success of the removal
4240
4241   """
4242   logging.info("Removing block devices for instance %s", instance.name)
4243
4244   all_result = True
4245   for device in instance.disks:
4246     for node, disk in device.ComputeNodeTree(instance.primary_node):
4247       lu.cfg.SetDiskID(disk, node)
4248       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4249       if msg:
4250         lu.LogWarning("Could not remove block device %s on node %s,"
4251                       " continuing anyway: %s", device.iv_name, node, msg)
4252         all_result = False
4253
4254   if instance.disk_template == constants.DT_FILE:
4255     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4256     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4257                                                  file_storage_dir)
4258     if result.failed or not result.data:
4259       logging.error("Could not remove directory '%s'", file_storage_dir)
4260       all_result = False
4261
4262   return all_result
4263
4264
4265 def _ComputeDiskSize(disk_template, disks):
4266   """Compute disk size requirements in the volume group
4267
4268   """
4269   # Required free disk space as a function of disk and swap space
4270   req_size_dict = {
4271     constants.DT_DISKLESS: None,
4272     constants.DT_PLAIN: sum(d["size"] for d in disks),
4273     # 128 MB are added for drbd metadata for each disk
4274     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4275     constants.DT_FILE: None,
4276   }
4277
4278   if disk_template not in req_size_dict:
4279     raise errors.ProgrammerError("Disk template '%s' size requirement"
4280                                  " is unknown" %  disk_template)
4281
4282   return req_size_dict[disk_template]
4283
4284
4285 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4286   """Hypervisor parameter validation.
4287
4288   This function abstract the hypervisor parameter validation to be
4289   used in both instance create and instance modify.
4290
4291   @type lu: L{LogicalUnit}
4292   @param lu: the logical unit for which we check
4293   @type nodenames: list
4294   @param nodenames: the list of nodes on which we should check
4295   @type hvname: string
4296   @param hvname: the name of the hypervisor we should use
4297   @type hvparams: dict
4298   @param hvparams: the parameters which we need to check
4299   @raise errors.OpPrereqError: if the parameters are not valid
4300
4301   """
4302   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4303                                                   hvname,
4304                                                   hvparams)
4305   for node in nodenames:
4306     info = hvinfo[node]
4307     if info.offline:
4308       continue
4309     msg = info.RemoteFailMsg()
4310     if msg:
4311       raise errors.OpPrereqError("Hypervisor parameter validation"
4312                                  " failed on node %s: %s" % (node, msg))
4313
4314
4315 class LUCreateInstance(LogicalUnit):
4316   """Create an instance.
4317
4318   """
4319   HPATH = "instance-add"
4320   HTYPE = constants.HTYPE_INSTANCE
4321   _OP_REQP = ["instance_name", "disks", "disk_template",
4322               "mode", "start",
4323               "wait_for_sync", "ip_check", "nics",
4324               "hvparams", "beparams"]
4325   REQ_BGL = False
4326
4327   def _ExpandNode(self, node):
4328     """Expands and checks one node name.
4329
4330     """
4331     node_full = self.cfg.ExpandNodeName(node)
4332     if node_full is None:
4333       raise errors.OpPrereqError("Unknown node %s" % node)
4334     return node_full
4335
4336   def ExpandNames(self):
4337     """ExpandNames for CreateInstance.
4338
4339     Figure out the right locks for instance creation.
4340
4341     """
4342     self.needed_locks = {}
4343
4344     # set optional parameters to none if they don't exist
4345     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4346       if not hasattr(self.op, attr):
4347         setattr(self.op, attr, None)
4348
4349     # cheap checks, mostly valid constants given
4350
4351     # verify creation mode
4352     if self.op.mode not in (constants.INSTANCE_CREATE,
4353                             constants.INSTANCE_IMPORT):
4354       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4355                                  self.op.mode)
4356
4357     # disk template and mirror node verification
4358     if self.op.disk_template not in constants.DISK_TEMPLATES:
4359       raise errors.OpPrereqError("Invalid disk template name")
4360
4361     if self.op.hypervisor is None:
4362       self.op.hypervisor = self.cfg.GetHypervisorType()
4363
4364     cluster = self.cfg.GetClusterInfo()
4365     enabled_hvs = cluster.enabled_hypervisors
4366     if self.op.hypervisor not in enabled_hvs:
4367       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4368                                  " cluster (%s)" % (self.op.hypervisor,
4369                                   ",".join(enabled_hvs)))
4370
4371     # check hypervisor parameter syntax (locally)
4372     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4373     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4374                                   self.op.hvparams)
4375     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4376     hv_type.CheckParameterSyntax(filled_hvp)
4377     self.hv_full = filled_hvp
4378
4379     # fill and remember the beparams dict
4380     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4381     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4382                                     self.op.beparams)
4383
4384     #### instance parameters check
4385
4386     # instance name verification
4387     hostname1 = utils.HostInfo(self.op.instance_name)
4388     self.op.instance_name = instance_name = hostname1.name
4389
4390     # this is just a preventive check, but someone might still add this
4391     # instance in the meantime, and creation will fail at lock-add time
4392     if instance_name in self.cfg.GetInstanceList():
4393       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4394                                  instance_name)
4395
4396     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4397
4398     # NIC buildup
4399     self.nics = []
4400     for nic in self.op.nics:
4401       # ip validity checks
4402       ip = nic.get("ip", None)
4403       if ip is None or ip.lower() == "none":
4404         nic_ip = None
4405       elif ip.lower() == constants.VALUE_AUTO:
4406         nic_ip = hostname1.ip
4407       else:
4408         if not utils.IsValidIP(ip):
4409           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4410                                      " like a valid IP" % ip)
4411         nic_ip = ip
4412
4413       # MAC address verification
4414       mac = nic.get("mac", constants.VALUE_AUTO)
4415       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4416         if not utils.IsValidMac(mac.lower()):
4417           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4418                                      mac)
4419       # bridge verification
4420       bridge = nic.get("bridge", None)
4421       if bridge is None:
4422         bridge = self.cfg.GetDefBridge()
4423       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4424
4425     # disk checks/pre-build
4426     self.disks = []
4427     for disk in self.op.disks:
4428       mode = disk.get("mode", constants.DISK_RDWR)
4429       if mode not in constants.DISK_ACCESS_SET:
4430         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4431                                    mode)
4432       size = disk.get("size", None)
4433       if size is None:
4434         raise errors.OpPrereqError("Missing disk size")
4435       try:
4436         size = int(size)
4437       except ValueError:
4438         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4439       self.disks.append({"size": size, "mode": mode})
4440
4441     # used in CheckPrereq for ip ping check
4442     self.check_ip = hostname1.ip
4443
4444     # file storage checks
4445     if (self.op.file_driver and
4446         not self.op.file_driver in constants.FILE_DRIVER):
4447       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4448                                  self.op.file_driver)
4449
4450     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4451       raise errors.OpPrereqError("File storage directory path not absolute")
4452
4453     ### Node/iallocator related checks
4454     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4455       raise errors.OpPrereqError("One and only one of iallocator and primary"
4456                                  " node must be given")
4457
4458     if self.op.iallocator:
4459       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4460     else:
4461       self.op.pnode = self._ExpandNode(self.op.pnode)
4462       nodelist = [self.op.pnode]
4463       if self.op.snode is not None:
4464         self.op.snode = self._ExpandNode(self.op.snode)
4465         nodelist.append(self.op.snode)
4466       self.needed_locks[locking.LEVEL_NODE] = nodelist
4467
4468     # in case of import lock the source node too
4469     if self.op.mode == constants.INSTANCE_IMPORT:
4470       src_node = getattr(self.op, "src_node", None)
4471       src_path = getattr(self.op, "src_path", None)
4472
4473       if src_path is None:
4474         self.op.src_path = src_path = self.op.instance_name
4475
4476       if src_node is None:
4477         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4478         self.op.src_node = None
4479         if os.path.isabs(src_path):
4480           raise errors.OpPrereqError("Importing an instance from an absolute"
4481                                      " path requires a source node option.")
4482       else:
4483         self.op.src_node = src_node = self._ExpandNode(src_node)
4484         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4485           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4486         if not os.path.isabs(src_path):
4487           self.op.src_path = src_path = \
4488             os.path.join(constants.EXPORT_DIR, src_path)
4489
4490     else: # INSTANCE_CREATE
4491       if getattr(self.op, "os_type", None) is None:
4492         raise errors.OpPrereqError("No guest OS specified")
4493
4494   def _RunAllocator(self):
4495     """Run the allocator based on input opcode.
4496
4497     """
4498     nics = [n.ToDict() for n in self.nics]
4499     ial = IAllocator(self,
4500                      mode=constants.IALLOCATOR_MODE_ALLOC,
4501                      name=self.op.instance_name,
4502                      disk_template=self.op.disk_template,
4503                      tags=[],
4504                      os=self.op.os_type,
4505                      vcpus=self.be_full[constants.BE_VCPUS],
4506                      mem_size=self.be_full[constants.BE_MEMORY],
4507                      disks=self.disks,
4508                      nics=nics,
4509                      hypervisor=self.op.hypervisor,
4510                      )
4511
4512     ial.Run(self.op.iallocator)
4513
4514     if not ial.success:
4515       raise errors.OpPrereqError("Can't compute nodes using"
4516                                  " iallocator '%s': %s" % (self.op.iallocator,
4517                                                            ial.info))
4518     if len(ial.nodes) != ial.required_nodes:
4519       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4520                                  " of nodes (%s), required %s" %
4521                                  (self.op.iallocator, len(ial.nodes),
4522                                   ial.required_nodes))
4523     self.op.pnode = ial.nodes[0]
4524     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4525                  self.op.instance_name, self.op.iallocator,
4526                  ", ".join(ial.nodes))
4527     if ial.required_nodes == 2:
4528       self.op.snode = ial.nodes[1]
4529
4530   def BuildHooksEnv(self):
4531     """Build hooks env.
4532
4533     This runs on master, primary and secondary nodes of the instance.
4534
4535     """
4536     env = {
4537       "ADD_MODE": self.op.mode,
4538       }
4539     if self.op.mode == constants.INSTANCE_IMPORT:
4540       env["SRC_NODE"] = self.op.src_node
4541       env["SRC_PATH"] = self.op.src_path
4542       env["SRC_IMAGES"] = self.src_images
4543
4544     env.update(_BuildInstanceHookEnv(
4545       name=self.op.instance_name,
4546       primary_node=self.op.pnode,
4547       secondary_nodes=self.secondaries,
4548       status=self.op.start,
4549       os_type=self.op.os_type,
4550       memory=self.be_full[constants.BE_MEMORY],
4551       vcpus=self.be_full[constants.BE_VCPUS],
4552       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4553       disk_template=self.op.disk_template,
4554       disks=[(d["size"], d["mode"]) for d in self.disks],
4555       bep=self.be_full,
4556       hvp=self.hv_full,
4557       hypervisor=self.op.hypervisor,
4558     ))
4559
4560     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4561           self.secondaries)
4562     return env, nl, nl
4563
4564
4565   def CheckPrereq(self):
4566     """Check prerequisites.
4567
4568     """
4569     if (not self.cfg.GetVGName() and
4570         self.op.disk_template not in constants.DTS_NOT_LVM):
4571       raise errors.OpPrereqError("Cluster does not support lvm-based"
4572                                  " instances")
4573
4574     if self.op.mode == constants.INSTANCE_IMPORT:
4575       src_node = self.op.src_node
4576       src_path = self.op.src_path
4577
4578       if src_node is None:
4579         exp_list = self.rpc.call_export_list(
4580           self.acquired_locks[locking.LEVEL_NODE])
4581         found = False
4582         for node in exp_list:
4583           if not exp_list[node].failed and src_path in exp_list[node].data:
4584             found = True
4585             self.op.src_node = src_node = node
4586             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4587                                                        src_path)
4588             break
4589         if not found:
4590           raise errors.OpPrereqError("No export found for relative path %s" %
4591                                       src_path)
4592
4593       _CheckNodeOnline(self, src_node)
4594       result = self.rpc.call_export_info(src_node, src_path)
4595       result.Raise()
4596       if not result.data:
4597         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4598
4599       export_info = result.data
4600       if not export_info.has_section(constants.INISECT_EXP):
4601         raise errors.ProgrammerError("Corrupted export config")
4602
4603       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4604       if (int(ei_version) != constants.EXPORT_VERSION):
4605         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4606                                    (ei_version, constants.EXPORT_VERSION))
4607
4608       # Check that the new instance doesn't have less disks than the export
4609       instance_disks = len(self.disks)
4610       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4611       if instance_disks < export_disks:
4612         raise errors.OpPrereqError("Not enough disks to import."
4613                                    " (instance: %d, export: %d)" %
4614                                    (instance_disks, export_disks))
4615
4616       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4617       disk_images = []
4618       for idx in range(export_disks):
4619         option = 'disk%d_dump' % idx
4620         if export_info.has_option(constants.INISECT_INS, option):
4621           # FIXME: are the old os-es, disk sizes, etc. useful?
4622           export_name = export_info.get(constants.INISECT_INS, option)
4623           image = os.path.join(src_path, export_name)
4624           disk_images.append(image)
4625         else:
4626           disk_images.append(False)
4627
4628       self.src_images = disk_images
4629
4630       old_name = export_info.get(constants.INISECT_INS, 'name')
4631       # FIXME: int() here could throw a ValueError on broken exports
4632       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4633       if self.op.instance_name == old_name:
4634         for idx, nic in enumerate(self.nics):
4635           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4636             nic_mac_ini = 'nic%d_mac' % idx
4637             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4638
4639     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4640     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4641     if self.op.start and not self.op.ip_check:
4642       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4643                                  " adding an instance in start mode")
4644
4645     if self.op.ip_check:
4646       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4647         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4648                                    (self.check_ip, self.op.instance_name))
4649
4650     #### mac address generation
4651     # By generating here the mac address both the allocator and the hooks get
4652     # the real final mac address rather than the 'auto' or 'generate' value.
4653     # There is a race condition between the generation and the instance object
4654     # creation, which means that we know the mac is valid now, but we're not
4655     # sure it will be when we actually add the instance. If things go bad
4656     # adding the instance will abort because of a duplicate mac, and the
4657     # creation job will fail.
4658     for nic in self.nics:
4659       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4660         nic.mac = self.cfg.GenerateMAC()
4661
4662     #### allocator run
4663
4664     if self.op.iallocator is not None:
4665       self._RunAllocator()
4666
4667     #### node related checks
4668
4669     # check primary node
4670     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4671     assert self.pnode is not None, \
4672       "Cannot retrieve locked node %s" % self.op.pnode
4673     if pnode.offline:
4674       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4675                                  pnode.name)
4676     if pnode.drained:
4677       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4678                                  pnode.name)
4679
4680     self.secondaries = []
4681
4682     # mirror node verification
4683     if self.op.disk_template in constants.DTS_NET_MIRROR:
4684       if self.op.snode is None:
4685         raise errors.OpPrereqError("The networked disk templates need"
4686                                    " a mirror node")
4687       if self.op.snode == pnode.name:
4688         raise errors.OpPrereqError("The secondary node cannot be"
4689                                    " the primary node.")
4690       _CheckNodeOnline(self, self.op.snode)
4691       _CheckNodeNotDrained(self, self.op.snode)
4692       self.secondaries.append(self.op.snode)
4693
4694     nodenames = [pnode.name] + self.secondaries
4695
4696     req_size = _ComputeDiskSize(self.op.disk_template,
4697                                 self.disks)
4698
4699     # Check lv size requirements
4700     if req_size is not None:
4701       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4702                                          self.op.hypervisor)
4703       for node in nodenames:
4704         info = nodeinfo[node]
4705         info.Raise()
4706         info = info.data
4707         if not info:
4708           raise errors.OpPrereqError("Cannot get current information"
4709                                      " from node '%s'" % node)
4710         vg_free = info.get('vg_free', None)
4711         if not isinstance(vg_free, int):
4712           raise errors.OpPrereqError("Can't compute free disk space on"
4713                                      " node %s" % node)
4714         if req_size > info['vg_free']:
4715           raise errors.OpPrereqError("Not enough disk space on target node %s."
4716                                      " %d MB available, %d MB required" %
4717                                      (node, info['vg_free'], req_size))
4718
4719     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4720
4721     # os verification
4722     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4723     result.Raise()
4724     if not isinstance(result.data, objects.OS):
4725       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4726                                  " primary node"  % self.op.os_type)
4727
4728     # bridge check on primary node
4729     bridges = [n.bridge for n in self.nics]
4730     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4731     result.Raise()
4732     if not result.data:
4733       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4734                                  " exist on destination node '%s'" %
4735                                  (",".join(bridges), pnode.name))
4736
4737     # memory check on primary node
4738     if self.op.start:
4739       _CheckNodeFreeMemory(self, self.pnode.name,
4740                            "creating instance %s" % self.op.instance_name,
4741                            self.be_full[constants.BE_MEMORY],
4742                            self.op.hypervisor)
4743
4744   def Exec(self, feedback_fn):
4745     """Create and add the instance to the cluster.
4746
4747     """
4748     instance = self.op.instance_name
4749     pnode_name = self.pnode.name
4750
4751     ht_kind = self.op.hypervisor
4752     if ht_kind in constants.HTS_REQ_PORT:
4753       network_port = self.cfg.AllocatePort()
4754     else:
4755       network_port = None
4756
4757     ##if self.op.vnc_bind_address is None:
4758     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4759
4760     # this is needed because os.path.join does not accept None arguments
4761     if self.op.file_storage_dir is None:
4762       string_file_storage_dir = ""
4763     else:
4764       string_file_storage_dir = self.op.file_storage_dir
4765
4766     # build the full file storage dir path
4767     file_storage_dir = os.path.normpath(os.path.join(
4768                                         self.cfg.GetFileStorageDir(),
4769                                         string_file_storage_dir, instance))
4770
4771
4772     disks = _GenerateDiskTemplate(self,
4773                                   self.op.disk_template,
4774                                   instance, pnode_name,
4775                                   self.secondaries,
4776                                   self.disks,
4777                                   file_storage_dir,
4778                                   self.op.file_driver,
4779                                   0)
4780
4781     iobj = objects.Instance(name=instance, os=self.op.os_type,
4782                             primary_node=pnode_name,
4783                             nics=self.nics, disks=disks,
4784                             disk_template=self.op.disk_template,
4785                             admin_up=False,
4786                             network_port=network_port,
4787                             beparams=self.op.beparams,
4788                             hvparams=self.op.hvparams,
4789                             hypervisor=self.op.hypervisor,
4790                             )
4791
4792     feedback_fn("* creating instance disks...")
4793     try:
4794       _CreateDisks(self, iobj)
4795     except errors.OpExecError:
4796       self.LogWarning("Device creation failed, reverting...")
4797       try:
4798         _RemoveDisks(self, iobj)
4799       finally:
4800         self.cfg.ReleaseDRBDMinors(instance)
4801         raise
4802
4803     feedback_fn("adding instance %s to cluster config" % instance)
4804
4805     self.cfg.AddInstance(iobj)
4806     # Declare that we don't want to remove the instance lock anymore, as we've
4807     # added the instance to the config
4808     del self.remove_locks[locking.LEVEL_INSTANCE]
4809     # Unlock all the nodes
4810     if self.op.mode == constants.INSTANCE_IMPORT:
4811       nodes_keep = [self.op.src_node]
4812       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4813                        if node != self.op.src_node]
4814       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4815       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4816     else:
4817       self.context.glm.release(locking.LEVEL_NODE)
4818       del self.acquired_locks[locking.LEVEL_NODE]
4819
4820     if self.op.wait_for_sync:
4821       disk_abort = not _WaitForSync(self, iobj)
4822     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4823       # make sure the disks are not degraded (still sync-ing is ok)
4824       time.sleep(15)
4825       feedback_fn("* checking mirrors status")
4826       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4827     else:
4828       disk_abort = False
4829
4830     if disk_abort:
4831       _RemoveDisks(self, iobj)
4832       self.cfg.RemoveInstance(iobj.name)
4833       # Make sure the instance lock gets removed
4834       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4835       raise errors.OpExecError("There are some degraded disks for"
4836                                " this instance")
4837
4838     feedback_fn("creating os for instance %s on node %s" %
4839                 (instance, pnode_name))
4840
4841     if iobj.disk_template != constants.DT_DISKLESS:
4842       if self.op.mode == constants.INSTANCE_CREATE:
4843         feedback_fn("* running the instance OS create scripts...")
4844         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4845         msg = result.RemoteFailMsg()
4846         if msg:
4847           raise errors.OpExecError("Could not add os for instance %s"
4848                                    " on node %s: %s" %
4849                                    (instance, pnode_name, msg))
4850
4851       elif self.op.mode == constants.INSTANCE_IMPORT:
4852         feedback_fn("* running the instance OS import scripts...")
4853         src_node = self.op.src_node
4854         src_images = self.src_images
4855         cluster_name = self.cfg.GetClusterName()
4856         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4857                                                          src_node, src_images,
4858                                                          cluster_name)
4859         import_result.Raise()
4860         for idx, result in enumerate(import_result.data):
4861           if not result:
4862             self.LogWarning("Could not import the image %s for instance"
4863                             " %s, disk %d, on node %s" %
4864                             (src_images[idx], instance, idx, pnode_name))
4865       else:
4866         # also checked in the prereq part
4867         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4868                                      % self.op.mode)
4869
4870     if self.op.start:
4871       iobj.admin_up = True
4872       self.cfg.Update(iobj)
4873       logging.info("Starting instance %s on node %s", instance, pnode_name)
4874       feedback_fn("* starting instance...")
4875       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4876       msg = result.RemoteFailMsg()
4877       if msg:
4878         raise errors.OpExecError("Could not start instance: %s" % msg)
4879
4880
4881 class LUConnectConsole(NoHooksLU):
4882   """Connect to an instance's console.
4883
4884   This is somewhat special in that it returns the command line that
4885   you need to run on the master node in order to connect to the
4886   console.
4887
4888   """
4889   _OP_REQP = ["instance_name"]
4890   REQ_BGL = False
4891
4892   def ExpandNames(self):
4893     self._ExpandAndLockInstance()
4894
4895   def CheckPrereq(self):
4896     """Check prerequisites.
4897
4898     This checks that the instance is in the cluster.
4899
4900     """
4901     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4902     assert self.instance is not None, \
4903       "Cannot retrieve locked instance %s" % self.op.instance_name
4904     _CheckNodeOnline(self, self.instance.primary_node)
4905
4906   def Exec(self, feedback_fn):
4907     """Connect to the console of an instance
4908
4909     """
4910     instance = self.instance
4911     node = instance.primary_node
4912
4913     node_insts = self.rpc.call_instance_list([node],
4914                                              [instance.hypervisor])[node]
4915     node_insts.Raise()
4916
4917     if instance.name not in node_insts.data:
4918       raise errors.OpExecError("Instance %s is not running." % instance.name)
4919
4920     logging.debug("Connecting to console of %s on %s", instance.name, node)
4921
4922     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4923     cluster = self.cfg.GetClusterInfo()
4924     # beparams and hvparams are passed separately, to avoid editing the
4925     # instance and then saving the defaults in the instance itself.
4926     hvparams = cluster.FillHV(instance)
4927     beparams = cluster.FillBE(instance)
4928     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4929
4930     # build ssh cmdline
4931     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4932
4933
4934 class LUReplaceDisks(LogicalUnit):
4935   """Replace the disks of an instance.
4936
4937   """
4938   HPATH = "mirrors-replace"
4939   HTYPE = constants.HTYPE_INSTANCE
4940   _OP_REQP = ["instance_name", "mode", "disks"]
4941   REQ_BGL = False
4942
4943   def CheckArguments(self):
4944     if not hasattr(self.op, "remote_node"):
4945       self.op.remote_node = None
4946     if not hasattr(self.op, "iallocator"):
4947       self.op.iallocator = None
4948
4949     # check for valid parameter combination
4950     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4951     if self.op.mode == constants.REPLACE_DISK_CHG:
4952       if cnt == 2:
4953         raise errors.OpPrereqError("When changing the secondary either an"
4954                                    " iallocator script must be used or the"
4955                                    " new node given")
4956       elif cnt == 0:
4957         raise errors.OpPrereqError("Give either the iallocator or the new"
4958                                    " secondary, not both")
4959     else: # not replacing the secondary
4960       if cnt != 2:
4961         raise errors.OpPrereqError("The iallocator and new node options can"
4962                                    " be used only when changing the"
4963                                    " secondary node")
4964
4965   def ExpandNames(self):
4966     self._ExpandAndLockInstance()
4967
4968     if self.op.iallocator is not None:
4969       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4970     elif self.op.remote_node is not None:
4971       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4972       if remote_node is None:
4973         raise errors.OpPrereqError("Node '%s' not known" %
4974                                    self.op.remote_node)
4975       self.op.remote_node = remote_node
4976       # Warning: do not remove the locking of the new secondary here
4977       # unless DRBD8.AddChildren is changed to work in parallel;
4978       # currently it doesn't since parallel invocations of
4979       # FindUnusedMinor will conflict
4980       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4981       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4982     else:
4983       self.needed_locks[locking.LEVEL_NODE] = []
4984       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4985
4986   def DeclareLocks(self, level):
4987     # If we're not already locking all nodes in the set we have to declare the
4988     # instance's primary/secondary nodes.
4989     if (level == locking.LEVEL_NODE and
4990         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4991       self._LockInstancesNodes()
4992
4993   def _RunAllocator(self):
4994     """Compute a new secondary node using an IAllocator.
4995
4996     """
4997     ial = IAllocator(self,
4998                      mode=constants.IALLOCATOR_MODE_RELOC,
4999                      name=self.op.instance_name,
5000                      relocate_from=[self.sec_node])
5001
5002     ial.Run(self.op.iallocator)
5003
5004     if not ial.success:
5005       raise errors.OpPrereqError("Can't compute nodes using"
5006                                  " iallocator '%s': %s" % (self.op.iallocator,
5007                                                            ial.info))
5008     if len(ial.nodes) != ial.required_nodes:
5009       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5010                                  " of nodes (%s), required %s" %
5011                                  (len(ial.nodes), ial.required_nodes))
5012     self.op.remote_node = ial.nodes[0]
5013     self.LogInfo("Selected new secondary for the instance: %s",
5014                  self.op.remote_node)
5015
5016   def BuildHooksEnv(self):
5017     """Build hooks env.
5018
5019     This runs on the master, the primary and all the secondaries.
5020
5021     """
5022     env = {
5023       "MODE": self.op.mode,
5024       "NEW_SECONDARY": self.op.remote_node,
5025       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5026       }
5027     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5028     nl = [
5029       self.cfg.GetMasterNode(),
5030       self.instance.primary_node,
5031       ]
5032     if self.op.remote_node is not None:
5033       nl.append(self.op.remote_node)
5034     return env, nl, nl
5035
5036   def CheckPrereq(self):
5037     """Check prerequisites.
5038
5039     This checks that the instance is in the cluster.
5040
5041     """
5042     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5043     assert instance is not None, \
5044       "Cannot retrieve locked instance %s" % self.op.instance_name
5045     self.instance = instance
5046
5047     if instance.disk_template != constants.DT_DRBD8:
5048       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5049                                  " instances")
5050
5051     if len(instance.secondary_nodes) != 1:
5052       raise errors.OpPrereqError("The instance has a strange layout,"
5053                                  " expected one secondary but found %d" %
5054                                  len(instance.secondary_nodes))
5055
5056     self.sec_node = instance.secondary_nodes[0]
5057
5058     if self.op.iallocator is not None:
5059       self._RunAllocator()
5060
5061     remote_node = self.op.remote_node
5062     if remote_node is not None:
5063       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5064       assert self.remote_node_info is not None, \
5065         "Cannot retrieve locked node %s" % remote_node
5066     else:
5067       self.remote_node_info = None
5068     if remote_node == instance.primary_node:
5069       raise errors.OpPrereqError("The specified node is the primary node of"
5070                                  " the instance.")
5071     elif remote_node == self.sec_node:
5072       raise errors.OpPrereqError("The specified node is already the"
5073                                  " secondary node of the instance.")
5074
5075     if self.op.mode == constants.REPLACE_DISK_PRI:
5076       n1 = self.tgt_node = instance.primary_node
5077       n2 = self.oth_node = self.sec_node
5078     elif self.op.mode == constants.REPLACE_DISK_SEC:
5079       n1 = self.tgt_node = self.sec_node
5080       n2 = self.oth_node = instance.primary_node
5081     elif self.op.mode == constants.REPLACE_DISK_CHG:
5082       n1 = self.new_node = remote_node
5083       n2 = self.oth_node = instance.primary_node
5084       self.tgt_node = self.sec_node
5085       _CheckNodeNotDrained(self, remote_node)
5086     else:
5087       raise errors.ProgrammerError("Unhandled disk replace mode")
5088
5089     _CheckNodeOnline(self, n1)
5090     _CheckNodeOnline(self, n2)
5091
5092     if not self.op.disks:
5093       self.op.disks = range(len(instance.disks))
5094
5095     for disk_idx in self.op.disks:
5096       instance.FindDisk(disk_idx)
5097
5098   def _ExecD8DiskOnly(self, feedback_fn):
5099     """Replace a disk on the primary or secondary for dbrd8.
5100
5101     The algorithm for replace is quite complicated:
5102
5103       1. for each disk to be replaced:
5104
5105         1. create new LVs on the target node with unique names
5106         1. detach old LVs from the drbd device
5107         1. rename old LVs to name_replaced.<time_t>
5108         1. rename new LVs to old LVs
5109         1. attach the new LVs (with the old names now) to the drbd device
5110
5111       1. wait for sync across all devices
5112
5113       1. for each modified disk:
5114
5115         1. remove old LVs (which have the name name_replaces.<time_t>)
5116
5117     Failures are not very well handled.
5118
5119     """
5120     steps_total = 6
5121     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5122     instance = self.instance
5123     iv_names = {}
5124     vgname = self.cfg.GetVGName()
5125     # start of work
5126     cfg = self.cfg
5127     tgt_node = self.tgt_node
5128     oth_node = self.oth_node
5129
5130     # Step: check device activation
5131     self.proc.LogStep(1, steps_total, "check device existence")
5132     info("checking volume groups")
5133     my_vg = cfg.GetVGName()
5134     results = self.rpc.call_vg_list([oth_node, tgt_node])
5135     if not results:
5136       raise errors.OpExecError("Can't list volume groups on the nodes")
5137     for node in oth_node, tgt_node:
5138       res = results[node]
5139       if res.failed or not res.data or my_vg not in res.data:
5140         raise errors.OpExecError("Volume group '%s' not found on %s" %
5141                                  (my_vg, node))
5142     for idx, dev in enumerate(instance.disks):
5143       if idx not in self.op.disks:
5144         continue
5145       for node in tgt_node, oth_node:
5146         info("checking disk/%d on %s" % (idx, node))
5147         cfg.SetDiskID(dev, node)
5148         result = self.rpc.call_blockdev_find(node, dev)
5149         msg = result.RemoteFailMsg()
5150         if not msg and not result.payload:
5151           msg = "disk not found"
5152         if msg:
5153           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5154                                    (idx, node, msg))
5155
5156     # Step: check other node consistency
5157     self.proc.LogStep(2, steps_total, "check peer consistency")
5158     for idx, dev in enumerate(instance.disks):
5159       if idx not in self.op.disks:
5160         continue
5161       info("checking disk/%d consistency on %s" % (idx, oth_node))
5162       if not _CheckDiskConsistency(self, dev, oth_node,
5163                                    oth_node==instance.primary_node):
5164         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5165                                  " to replace disks on this node (%s)" %
5166                                  (oth_node, tgt_node))
5167
5168     # Step: create new storage
5169     self.proc.LogStep(3, steps_total, "allocate new storage")
5170     for idx, dev in enumerate(instance.disks):
5171       if idx not in self.op.disks:
5172         continue
5173       size = dev.size
5174       cfg.SetDiskID(dev, tgt_node)
5175       lv_names = [".disk%d_%s" % (idx, suf)
5176                   for suf in ["data", "meta"]]
5177       names = _GenerateUniqueNames(self, lv_names)
5178       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5179                              logical_id=(vgname, names[0]))
5180       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5181                              logical_id=(vgname, names[1]))
5182       new_lvs = [lv_data, lv_meta]
5183       old_lvs = dev.children
5184       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5185       info("creating new local storage on %s for %s" %
5186            (tgt_node, dev.iv_name))
5187       # we pass force_create=True to force the LVM creation
5188       for new_lv in new_lvs:
5189         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5190                         _GetInstanceInfoText(instance), False)
5191
5192     # Step: for each lv, detach+rename*2+attach
5193     self.proc.LogStep(4, steps_total, "change drbd configuration")
5194     for dev, old_lvs, new_lvs in iv_names.itervalues():
5195       info("detaching %s drbd from local storage" % dev.iv_name)
5196       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5197       result.Raise()
5198       if not result.data:
5199         raise errors.OpExecError("Can't detach drbd from local storage on node"
5200                                  " %s for device %s" % (tgt_node, dev.iv_name))
5201       #dev.children = []
5202       #cfg.Update(instance)
5203
5204       # ok, we created the new LVs, so now we know we have the needed
5205       # storage; as such, we proceed on the target node to rename
5206       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5207       # using the assumption that logical_id == physical_id (which in
5208       # turn is the unique_id on that node)
5209
5210       # FIXME(iustin): use a better name for the replaced LVs
5211       temp_suffix = int(time.time())
5212       ren_fn = lambda d, suff: (d.physical_id[0],
5213                                 d.physical_id[1] + "_replaced-%s" % suff)
5214       # build the rename list based on what LVs exist on the node
5215       rlist = []
5216       for to_ren in old_lvs:
5217         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5218         if not result.RemoteFailMsg() and result.payload:
5219           # device exists
5220           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5221
5222       info("renaming the old LVs on the target node")
5223       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5224       result.Raise()
5225       if not result.data:
5226         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5227       # now we rename the new LVs to the old LVs
5228       info("renaming the new LVs on the target node")
5229       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5230       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5231       result.Raise()
5232       if not result.data:
5233         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5234
5235       for old, new in zip(old_lvs, new_lvs):
5236         new.logical_id = old.logical_id
5237         cfg.SetDiskID(new, tgt_node)
5238
5239       for disk in old_lvs:
5240         disk.logical_id = ren_fn(disk, temp_suffix)
5241         cfg.SetDiskID(disk, tgt_node)
5242
5243       # now that the new lvs have the old name, we can add them to the device
5244       info("adding new mirror component on %s" % tgt_node)
5245       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5246       if result.failed or not result.data:
5247         for new_lv in new_lvs:
5248           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5249           if msg:
5250             warning("Can't rollback device %s: %s", dev, msg,
5251                     hint="cleanup manually the unused logical volumes")
5252         raise errors.OpExecError("Can't add local storage to drbd")
5253
5254       dev.children = new_lvs
5255       cfg.Update(instance)
5256
5257     # Step: wait for sync
5258
5259     # this can fail as the old devices are degraded and _WaitForSync
5260     # does a combined result over all disks, so we don't check its
5261     # return value
5262     self.proc.LogStep(5, steps_total, "sync devices")
5263     _WaitForSync(self, instance, unlock=True)
5264
5265     # so check manually all the devices
5266     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5267       cfg.SetDiskID(dev, instance.primary_node)
5268       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5269       msg = result.RemoteFailMsg()
5270       if not msg and not result.payload:
5271         msg = "disk not found"
5272       if msg:
5273         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5274                                  (name, msg))
5275       if result.payload[5]:
5276         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5277
5278     # Step: remove old storage
5279     self.proc.LogStep(6, steps_total, "removing old storage")
5280     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5281       info("remove logical volumes for %s" % name)
5282       for lv in old_lvs:
5283         cfg.SetDiskID(lv, tgt_node)
5284         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5285         if msg:
5286           warning("Can't remove old LV: %s" % msg,
5287                   hint="manually remove unused LVs")
5288           continue
5289
5290   def _ExecD8Secondary(self, feedback_fn):
5291     """Replace the secondary node for drbd8.
5292
5293     The algorithm for replace is quite complicated:
5294       - for all disks of the instance:
5295         - create new LVs on the new node with same names
5296         - shutdown the drbd device on the old secondary
5297         - disconnect the drbd network on the primary
5298         - create the drbd device on the new secondary
5299         - network attach the drbd on the primary, using an artifice:
5300           the drbd code for Attach() will connect to the network if it
5301           finds a device which is connected to the good local disks but
5302           not network enabled
5303       - wait for sync across all devices
5304       - remove all disks from the old secondary
5305
5306     Failures are not very well handled.
5307
5308     """
5309     steps_total = 6
5310     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5311     instance = self.instance
5312     iv_names = {}
5313     # start of work
5314     cfg = self.cfg
5315     old_node = self.tgt_node
5316     new_node = self.new_node
5317     pri_node = instance.primary_node
5318     nodes_ip = {
5319       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5320       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5321       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5322       }
5323
5324     # Step: check device activation
5325     self.proc.LogStep(1, steps_total, "check device existence")
5326     info("checking volume groups")
5327     my_vg = cfg.GetVGName()
5328     results = self.rpc.call_vg_list([pri_node, new_node])
5329     for node in pri_node, new_node:
5330       res = results[node]
5331       if res.failed or not res.data or my_vg not in res.data:
5332         raise errors.OpExecError("Volume group '%s' not found on %s" %
5333                                  (my_vg, node))
5334     for idx, dev in enumerate(instance.disks):
5335       if idx not in self.op.disks:
5336         continue
5337       info("checking disk/%d on %s" % (idx, pri_node))
5338       cfg.SetDiskID(dev, pri_node)
5339       result = self.rpc.call_blockdev_find(pri_node, dev)
5340       msg = result.RemoteFailMsg()
5341       if not msg and not result.payload:
5342         msg = "disk not found"
5343       if msg:
5344         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5345                                  (idx, pri_node, msg))
5346
5347     # Step: check other node consistency
5348     self.proc.LogStep(2, steps_total, "check peer consistency")
5349     for idx, dev in enumerate(instance.disks):
5350       if idx not in self.op.disks:
5351         continue
5352       info("checking disk/%d consistency on %s" % (idx, pri_node))
5353       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5354         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5355                                  " unsafe to replace the secondary" %
5356                                  pri_node)
5357
5358     # Step: create new storage
5359     self.proc.LogStep(3, steps_total, "allocate new storage")
5360     for idx, dev in enumerate(instance.disks):
5361       info("adding new local storage on %s for disk/%d" %
5362            (new_node, idx))
5363       # we pass force_create=True to force LVM creation
5364       for new_lv in dev.children:
5365         _CreateBlockDev(self, new_node, instance, new_lv, True,
5366                         _GetInstanceInfoText(instance), False)
5367
5368     # Step 4: dbrd minors and drbd setups changes
5369     # after this, we must manually remove the drbd minors on both the
5370     # error and the success paths
5371     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5372                                    instance.name)
5373     logging.debug("Allocated minors %s" % (minors,))
5374     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5375     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5376       size = dev.size
5377       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5378       # create new devices on new_node; note that we create two IDs:
5379       # one without port, so the drbd will be activated without
5380       # networking information on the new node at this stage, and one
5381       # with network, for the latter activation in step 4
5382       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5383       if pri_node == o_node1:
5384         p_minor = o_minor1
5385       else:
5386         p_minor = o_minor2
5387
5388       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5389       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5390
5391       iv_names[idx] = (dev, dev.children, new_net_id)
5392       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5393                     new_net_id)
5394       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5395                               logical_id=new_alone_id,
5396                               children=dev.children)
5397       try:
5398         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5399                               _GetInstanceInfoText(instance), False)
5400       except errors.GenericError:
5401         self.cfg.ReleaseDRBDMinors(instance.name)
5402         raise
5403
5404     for idx, dev in enumerate(instance.disks):
5405       # we have new devices, shutdown the drbd on the old secondary
5406       info("shutting down drbd for disk/%d on old node" % idx)
5407       cfg.SetDiskID(dev, old_node)
5408       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5409       if msg:
5410         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5411                 (idx, msg),
5412                 hint="Please cleanup this device manually as soon as possible")
5413
5414     info("detaching primary drbds from the network (=> standalone)")
5415     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5416                                                instance.disks)[pri_node]
5417
5418     msg = result.RemoteFailMsg()
5419     if msg:
5420       # detaches didn't succeed (unlikely)
5421       self.cfg.ReleaseDRBDMinors(instance.name)
5422       raise errors.OpExecError("Can't detach the disks from the network on"
5423                                " old node: %s" % (msg,))
5424
5425     # if we managed to detach at least one, we update all the disks of
5426     # the instance to point to the new secondary
5427     info("updating instance configuration")
5428     for dev, _, new_logical_id in iv_names.itervalues():
5429       dev.logical_id = new_logical_id
5430       cfg.SetDiskID(dev, pri_node)
5431     cfg.Update(instance)
5432
5433     # and now perform the drbd attach
5434     info("attaching primary drbds to new secondary (standalone => connected)")
5435     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5436                                            instance.disks, instance.name,
5437                                            False)
5438     for to_node, to_result in result.items():
5439       msg = to_result.RemoteFailMsg()
5440       if msg:
5441         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5442                 hint="please do a gnt-instance info to see the"
5443                 " status of disks")
5444
5445     # this can fail as the old devices are degraded and _WaitForSync
5446     # does a combined result over all disks, so we don't check its
5447     # return value
5448     self.proc.LogStep(5, steps_total, "sync devices")
5449     _WaitForSync(self, instance, unlock=True)
5450
5451     # so check manually all the devices
5452     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5453       cfg.SetDiskID(dev, pri_node)
5454       result = self.rpc.call_blockdev_find(pri_node, dev)
5455       msg = result.RemoteFailMsg()
5456       if not msg and not result.payload:
5457         msg = "disk not found"
5458       if msg:
5459         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5460                                  (idx, msg))
5461       if result.payload[5]:
5462         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5463
5464     self.proc.LogStep(6, steps_total, "removing old storage")
5465     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5466       info("remove logical volumes for disk/%d" % idx)
5467       for lv in old_lvs:
5468         cfg.SetDiskID(lv, old_node)
5469         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5470         if msg:
5471           warning("Can't remove LV on old secondary: %s", msg,
5472                   hint="Cleanup stale volumes by hand")
5473
5474   def Exec(self, feedback_fn):
5475     """Execute disk replacement.
5476
5477     This dispatches the disk replacement to the appropriate handler.
5478
5479     """
5480     instance = self.instance
5481
5482     # Activate the instance disks if we're replacing them on a down instance
5483     if not instance.admin_up:
5484       _StartInstanceDisks(self, instance, True)
5485
5486     if self.op.mode == constants.REPLACE_DISK_CHG:
5487       fn = self._ExecD8Secondary
5488     else:
5489       fn = self._ExecD8DiskOnly
5490
5491     ret = fn(feedback_fn)
5492
5493     # Deactivate the instance disks if we're replacing them on a down instance
5494     if not instance.admin_up:
5495       _SafeShutdownInstanceDisks(self, instance)
5496
5497     return ret
5498
5499
5500 class LUGrowDisk(LogicalUnit):
5501   """Grow a disk of an instance.
5502
5503   """
5504   HPATH = "disk-grow"
5505   HTYPE = constants.HTYPE_INSTANCE
5506   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5507   REQ_BGL = False
5508
5509   def ExpandNames(self):
5510     self._ExpandAndLockInstance()
5511     self.needed_locks[locking.LEVEL_NODE] = []
5512     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5513
5514   def DeclareLocks(self, level):
5515     if level == locking.LEVEL_NODE:
5516       self._LockInstancesNodes()
5517
5518   def BuildHooksEnv(self):
5519     """Build hooks env.
5520
5521     This runs on the master, the primary and all the secondaries.
5522
5523     """
5524     env = {
5525       "DISK": self.op.disk,
5526       "AMOUNT": self.op.amount,
5527       }
5528     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5529     nl = [
5530       self.cfg.GetMasterNode(),
5531       self.instance.primary_node,
5532       ]
5533     return env, nl, nl
5534
5535   def CheckPrereq(self):
5536     """Check prerequisites.
5537
5538     This checks that the instance is in the cluster.
5539
5540     """
5541     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5542     assert instance is not None, \
5543       "Cannot retrieve locked instance %s" % self.op.instance_name
5544     nodenames = list(instance.all_nodes)
5545     for node in nodenames:
5546       _CheckNodeOnline(self, node)
5547
5548
5549     self.instance = instance
5550
5551     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5552       raise errors.OpPrereqError("Instance's disk layout does not support"
5553                                  " growing.")
5554
5555     self.disk = instance.FindDisk(self.op.disk)
5556
5557     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5558                                        instance.hypervisor)
5559     for node in nodenames:
5560       info = nodeinfo[node]
5561       if info.failed or not info.data:
5562         raise errors.OpPrereqError("Cannot get current information"
5563                                    " from node '%s'" % node)
5564       vg_free = info.data.get('vg_free', None)
5565       if not isinstance(vg_free, int):
5566         raise errors.OpPrereqError("Can't compute free disk space on"
5567                                    " node %s" % node)
5568       if self.op.amount > vg_free:
5569         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5570                                    " %d MiB available, %d MiB required" %
5571                                    (node, vg_free, self.op.amount))
5572
5573   def Exec(self, feedback_fn):
5574     """Execute disk grow.
5575
5576     """
5577     instance = self.instance
5578     disk = self.disk
5579     for node in instance.all_nodes:
5580       self.cfg.SetDiskID(disk, node)
5581       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5582       msg = result.RemoteFailMsg()
5583       if msg:
5584         raise errors.OpExecError("Grow request failed to node %s: %s" %
5585                                  (node, msg))
5586     disk.RecordGrow(self.op.amount)
5587     self.cfg.Update(instance)
5588     if self.op.wait_for_sync:
5589       disk_abort = not _WaitForSync(self, instance)
5590       if disk_abort:
5591         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5592                              " status.\nPlease check the instance.")
5593
5594
5595 class LUQueryInstanceData(NoHooksLU):
5596   """Query runtime instance data.
5597
5598   """
5599   _OP_REQP = ["instances", "static"]
5600   REQ_BGL = False
5601
5602   def ExpandNames(self):
5603     self.needed_locks = {}
5604     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5605
5606     if not isinstance(self.op.instances, list):
5607       raise errors.OpPrereqError("Invalid argument type 'instances'")
5608
5609     if self.op.instances:
5610       self.wanted_names = []
5611       for name in self.op.instances:
5612         full_name = self.cfg.ExpandInstanceName(name)
5613         if full_name is None:
5614           raise errors.OpPrereqError("Instance '%s' not known" % name)
5615         self.wanted_names.append(full_name)
5616       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5617     else:
5618       self.wanted_names = None
5619       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5620
5621     self.needed_locks[locking.LEVEL_NODE] = []
5622     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5623
5624   def DeclareLocks(self, level):
5625     if level == locking.LEVEL_NODE:
5626       self._LockInstancesNodes()
5627
5628   def CheckPrereq(self):
5629     """Check prerequisites.
5630
5631     This only checks the optional instance list against the existing names.
5632
5633     """
5634     if self.wanted_names is None:
5635       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5636
5637     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5638                              in self.wanted_names]
5639     return
5640
5641   def _ComputeDiskStatus(self, instance, snode, dev):
5642     """Compute block device status.
5643
5644     """
5645     static = self.op.static
5646     if not static:
5647       self.cfg.SetDiskID(dev, instance.primary_node)
5648       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5649       if dev_pstatus.offline:
5650         dev_pstatus = None
5651       else:
5652         msg = dev_pstatus.RemoteFailMsg()
5653         if msg:
5654           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5655                                    (instance.name, msg))
5656         dev_pstatus = dev_pstatus.payload
5657     else:
5658       dev_pstatus = None
5659
5660     if dev.dev_type in constants.LDS_DRBD:
5661       # we change the snode then (otherwise we use the one passed in)
5662       if dev.logical_id[0] == instance.primary_node:
5663         snode = dev.logical_id[1]
5664       else:
5665         snode = dev.logical_id[0]
5666
5667     if snode and not static:
5668       self.cfg.SetDiskID(dev, snode)
5669       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5670       if dev_sstatus.offline:
5671         dev_sstatus = None
5672       else:
5673         msg = dev_sstatus.RemoteFailMsg()
5674         if msg:
5675           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5676                                    (instance.name, msg))
5677         dev_sstatus = dev_sstatus.payload
5678     else:
5679       dev_sstatus = None
5680
5681     if dev.children:
5682       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5683                       for child in dev.children]
5684     else:
5685       dev_children = []
5686
5687     data = {
5688       "iv_name": dev.iv_name,
5689       "dev_type": dev.dev_type,
5690       "logical_id": dev.logical_id,
5691       "physical_id": dev.physical_id,
5692       "pstatus": dev_pstatus,
5693       "sstatus": dev_sstatus,
5694       "children": dev_children,
5695       "mode": dev.mode,
5696       }
5697
5698     return data
5699
5700   def Exec(self, feedback_fn):
5701     """Gather and return data"""
5702     result = {}
5703
5704     cluster = self.cfg.GetClusterInfo()
5705
5706     for instance in self.wanted_instances:
5707       if not self.op.static:
5708         remote_info = self.rpc.call_instance_info(instance.primary_node,
5709                                                   instance.name,
5710                                                   instance.hypervisor)
5711         remote_info.Raise()
5712         remote_info = remote_info.data
5713         if remote_info and "state" in remote_info:
5714           remote_state = "up"
5715         else:
5716           remote_state = "down"
5717       else:
5718         remote_state = None
5719       if instance.admin_up:
5720         config_state = "up"
5721       else:
5722         config_state = "down"
5723
5724       disks = [self._ComputeDiskStatus(instance, None, device)
5725                for device in instance.disks]
5726
5727       idict = {
5728         "name": instance.name,
5729         "config_state": config_state,
5730         "run_state": remote_state,
5731         "pnode": instance.primary_node,
5732         "snodes": instance.secondary_nodes,
5733         "os": instance.os,
5734         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5735         "disks": disks,
5736         "hypervisor": instance.hypervisor,
5737         "network_port": instance.network_port,
5738         "hv_instance": instance.hvparams,
5739         "hv_actual": cluster.FillHV(instance),
5740         "be_instance": instance.beparams,
5741         "be_actual": cluster.FillBE(instance),
5742         }
5743
5744       result[instance.name] = idict
5745
5746     return result
5747
5748
5749 class LUSetInstanceParams(LogicalUnit):
5750   """Modifies an instances's parameters.
5751
5752   """
5753   HPATH = "instance-modify"
5754   HTYPE = constants.HTYPE_INSTANCE
5755   _OP_REQP = ["instance_name"]
5756   REQ_BGL = False
5757
5758   def CheckArguments(self):
5759     if not hasattr(self.op, 'nics'):
5760       self.op.nics = []
5761     if not hasattr(self.op, 'disks'):
5762       self.op.disks = []
5763     if not hasattr(self.op, 'beparams'):
5764       self.op.beparams = {}
5765     if not hasattr(self.op, 'hvparams'):
5766       self.op.hvparams = {}
5767     self.op.force = getattr(self.op, "force", False)
5768     if not (self.op.nics or self.op.disks or
5769             self.op.hvparams or self.op.beparams):
5770       raise errors.OpPrereqError("No changes submitted")
5771
5772     # Disk validation
5773     disk_addremove = 0
5774     for disk_op, disk_dict in self.op.disks:
5775       if disk_op == constants.DDM_REMOVE:
5776         disk_addremove += 1
5777         continue
5778       elif disk_op == constants.DDM_ADD:
5779         disk_addremove += 1
5780       else:
5781         if not isinstance(disk_op, int):
5782           raise errors.OpPrereqError("Invalid disk index")
5783       if disk_op == constants.DDM_ADD:
5784         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5785         if mode not in constants.DISK_ACCESS_SET:
5786           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5787         size = disk_dict.get('size', None)
5788         if size is None:
5789           raise errors.OpPrereqError("Required disk parameter size missing")
5790         try:
5791           size = int(size)
5792         except ValueError, err:
5793           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5794                                      str(err))
5795         disk_dict['size'] = size
5796       else:
5797         # modification of disk
5798         if 'size' in disk_dict:
5799           raise errors.OpPrereqError("Disk size change not possible, use"
5800                                      " grow-disk")
5801
5802     if disk_addremove > 1:
5803       raise errors.OpPrereqError("Only one disk add or remove operation"
5804                                  " supported at a time")
5805
5806     # NIC validation
5807     nic_addremove = 0
5808     for nic_op, nic_dict in self.op.nics:
5809       if nic_op == constants.DDM_REMOVE:
5810         nic_addremove += 1
5811         continue
5812       elif nic_op == constants.DDM_ADD:
5813         nic_addremove += 1
5814       else:
5815         if not isinstance(nic_op, int):
5816           raise errors.OpPrereqError("Invalid nic index")
5817
5818       # nic_dict should be a dict
5819       nic_ip = nic_dict.get('ip', None)
5820       if nic_ip is not None:
5821         if nic_ip.lower() == constants.VALUE_NONE:
5822           nic_dict['ip'] = None
5823         else:
5824           if not utils.IsValidIP(nic_ip):
5825             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5826
5827       if nic_op == constants.DDM_ADD:
5828         nic_bridge = nic_dict.get('bridge', None)
5829         if nic_bridge is None:
5830           nic_dict['bridge'] = self.cfg.GetDefBridge()
5831         nic_mac = nic_dict.get('mac', None)
5832         if nic_mac is None:
5833           nic_dict['mac'] = constants.VALUE_AUTO
5834
5835       if 'mac' in nic_dict:
5836         nic_mac = nic_dict['mac']
5837         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5838           if not utils.IsValidMac(nic_mac):
5839             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5840         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5841           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5842                                      " modifying an existing nic")
5843
5844     if nic_addremove > 1:
5845       raise errors.OpPrereqError("Only one NIC add or remove operation"
5846                                  " supported at a time")
5847
5848   def ExpandNames(self):
5849     self._ExpandAndLockInstance()
5850     self.needed_locks[locking.LEVEL_NODE] = []
5851     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5852
5853   def DeclareLocks(self, level):
5854     if level == locking.LEVEL_NODE:
5855       self._LockInstancesNodes()
5856
5857   def BuildHooksEnv(self):
5858     """Build hooks env.
5859
5860     This runs on the master, primary and secondaries.
5861
5862     """
5863     args = dict()
5864     if constants.BE_MEMORY in self.be_new:
5865       args['memory'] = self.be_new[constants.BE_MEMORY]
5866     if constants.BE_VCPUS in self.be_new:
5867       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5868     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5869     # information at all.
5870     if self.op.nics:
5871       args['nics'] = []
5872       nic_override = dict(self.op.nics)
5873       for idx, nic in enumerate(self.instance.nics):
5874         if idx in nic_override:
5875           this_nic_override = nic_override[idx]
5876         else:
5877           this_nic_override = {}
5878         if 'ip' in this_nic_override:
5879           ip = this_nic_override['ip']
5880         else:
5881           ip = nic.ip
5882         if 'bridge' in this_nic_override:
5883           bridge = this_nic_override['bridge']
5884         else:
5885           bridge = nic.bridge
5886         if 'mac' in this_nic_override:
5887           mac = this_nic_override['mac']
5888         else:
5889           mac = nic.mac
5890         args['nics'].append((ip, bridge, mac))
5891       if constants.DDM_ADD in nic_override:
5892         ip = nic_override[constants.DDM_ADD].get('ip', None)
5893         bridge = nic_override[constants.DDM_ADD]['bridge']
5894         mac = nic_override[constants.DDM_ADD]['mac']
5895         args['nics'].append((ip, bridge, mac))
5896       elif constants.DDM_REMOVE in nic_override:
5897         del args['nics'][-1]
5898
5899     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5900     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5901     return env, nl, nl
5902
5903   def CheckPrereq(self):
5904     """Check prerequisites.
5905
5906     This only checks the instance list against the existing names.
5907
5908     """
5909     force = self.force = self.op.force
5910
5911     # checking the new params on the primary/secondary nodes
5912
5913     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5914     assert self.instance is not None, \
5915       "Cannot retrieve locked instance %s" % self.op.instance_name
5916     pnode = instance.primary_node
5917     nodelist = list(instance.all_nodes)
5918
5919     # hvparams processing
5920     if self.op.hvparams:
5921       i_hvdict = copy.deepcopy(instance.hvparams)
5922       for key, val in self.op.hvparams.iteritems():
5923         if val == constants.VALUE_DEFAULT:
5924           try:
5925             del i_hvdict[key]
5926           except KeyError:
5927             pass
5928         else:
5929           i_hvdict[key] = val
5930       cluster = self.cfg.GetClusterInfo()
5931       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5932       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5933                                 i_hvdict)
5934       # local check
5935       hypervisor.GetHypervisor(
5936         instance.hypervisor).CheckParameterSyntax(hv_new)
5937       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5938       self.hv_new = hv_new # the new actual values
5939       self.hv_inst = i_hvdict # the new dict (without defaults)
5940     else:
5941       self.hv_new = self.hv_inst = {}
5942
5943     # beparams processing
5944     if self.op.beparams:
5945       i_bedict = copy.deepcopy(instance.beparams)
5946       for key, val in self.op.beparams.iteritems():
5947         if val == constants.VALUE_DEFAULT:
5948           try:
5949             del i_bedict[key]
5950           except KeyError:
5951             pass
5952         else:
5953           i_bedict[key] = val
5954       cluster = self.cfg.GetClusterInfo()
5955       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5956       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5957                                 i_bedict)
5958       self.be_new = be_new # the new actual values
5959       self.be_inst = i_bedict # the new dict (without defaults)
5960     else:
5961       self.be_new = self.be_inst = {}
5962
5963     self.warn = []
5964
5965     if constants.BE_MEMORY in self.op.beparams and not self.force:
5966       mem_check_list = [pnode]
5967       if be_new[constants.BE_AUTO_BALANCE]:
5968         # either we changed auto_balance to yes or it was from before
5969         mem_check_list.extend(instance.secondary_nodes)
5970       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5971                                                   instance.hypervisor)
5972       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5973                                          instance.hypervisor)
5974       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5975         # Assume the primary node is unreachable and go ahead
5976         self.warn.append("Can't get info from primary node %s" % pnode)
5977       else:
5978         if not instance_info.failed and instance_info.data:
5979           current_mem = int(instance_info.data['memory'])
5980         else:
5981           # Assume instance not running
5982           # (there is a slight race condition here, but it's not very probable,
5983           # and we have no other way to check)
5984           current_mem = 0
5985         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5986                     nodeinfo[pnode].data['memory_free'])
5987         if miss_mem > 0:
5988           raise errors.OpPrereqError("This change will prevent the instance"
5989                                      " from starting, due to %d MB of memory"
5990                                      " missing on its primary node" % miss_mem)
5991
5992       if be_new[constants.BE_AUTO_BALANCE]:
5993         for node, nres in nodeinfo.iteritems():
5994           if node not in instance.secondary_nodes:
5995             continue
5996           if nres.failed or not isinstance(nres.data, dict):
5997             self.warn.append("Can't get info from secondary node %s" % node)
5998           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5999             self.warn.append("Not enough memory to failover instance to"
6000                              " secondary node %s" % node)
6001
6002     # NIC processing
6003     for nic_op, nic_dict in self.op.nics:
6004       if nic_op == constants.DDM_REMOVE:
6005         if not instance.nics:
6006           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6007         continue
6008       if nic_op != constants.DDM_ADD:
6009         # an existing nic
6010         if nic_op < 0 or nic_op >= len(instance.nics):
6011           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6012                                      " are 0 to %d" %
6013                                      (nic_op, len(instance.nics)))
6014       if 'bridge' in nic_dict:
6015         nic_bridge = nic_dict['bridge']
6016         if nic_bridge is None:
6017           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6018         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6019           msg = ("Bridge '%s' doesn't exist on one of"
6020                  " the instance nodes" % nic_bridge)
6021           if self.force:
6022             self.warn.append(msg)
6023           else:
6024             raise errors.OpPrereqError(msg)
6025       if 'mac' in nic_dict:
6026         nic_mac = nic_dict['mac']
6027         if nic_mac is None:
6028           raise errors.OpPrereqError('Cannot set the nic mac to None')
6029         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6030           # otherwise generate the mac
6031           nic_dict['mac'] = self.cfg.GenerateMAC()
6032         else:
6033           # or validate/reserve the current one
6034           if self.cfg.IsMacInUse(nic_mac):
6035             raise errors.OpPrereqError("MAC address %s already in use"
6036                                        " in cluster" % nic_mac)
6037
6038     # DISK processing
6039     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6040       raise errors.OpPrereqError("Disk operations not supported for"
6041                                  " diskless instances")
6042     for disk_op, disk_dict in self.op.disks:
6043       if disk_op == constants.DDM_REMOVE:
6044         if len(instance.disks) == 1:
6045           raise errors.OpPrereqError("Cannot remove the last disk of"
6046                                      " an instance")
6047         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6048         ins_l = ins_l[pnode]
6049         if ins_l.failed or not isinstance(ins_l.data, list):
6050           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6051         if instance.name in ins_l.data:
6052           raise errors.OpPrereqError("Instance is running, can't remove"
6053                                      " disks.")
6054
6055       if (disk_op == constants.DDM_ADD and
6056           len(instance.nics) >= constants.MAX_DISKS):
6057         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6058                                    " add more" % constants.MAX_DISKS)
6059       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6060         # an existing disk
6061         if disk_op < 0 or disk_op >= len(instance.disks):
6062           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6063                                      " are 0 to %d" %
6064                                      (disk_op, len(instance.disks)))
6065
6066     return
6067
6068   def Exec(self, feedback_fn):
6069     """Modifies an instance.
6070
6071     All parameters take effect only at the next restart of the instance.
6072
6073     """
6074     # Process here the warnings from CheckPrereq, as we don't have a
6075     # feedback_fn there.
6076     for warn in self.warn:
6077       feedback_fn("WARNING: %s" % warn)
6078
6079     result = []
6080     instance = self.instance
6081     # disk changes
6082     for disk_op, disk_dict in self.op.disks:
6083       if disk_op == constants.DDM_REMOVE:
6084         # remove the last disk
6085         device = instance.disks.pop()
6086         device_idx = len(instance.disks)
6087         for node, disk in device.ComputeNodeTree(instance.primary_node):
6088           self.cfg.SetDiskID(disk, node)
6089           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6090           if msg:
6091             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6092                             " continuing anyway", device_idx, node, msg)
6093         result.append(("disk/%d" % device_idx, "remove"))
6094       elif disk_op == constants.DDM_ADD:
6095         # add a new disk
6096         if instance.disk_template == constants.DT_FILE:
6097           file_driver, file_path = instance.disks[0].logical_id
6098           file_path = os.path.dirname(file_path)
6099         else:
6100           file_driver = file_path = None
6101         disk_idx_base = len(instance.disks)
6102         new_disk = _GenerateDiskTemplate(self,
6103                                          instance.disk_template,
6104                                          instance.name, instance.primary_node,
6105                                          instance.secondary_nodes,
6106                                          [disk_dict],
6107                                          file_path,
6108                                          file_driver,
6109                                          disk_idx_base)[0]
6110         instance.disks.append(new_disk)
6111         info = _GetInstanceInfoText(instance)
6112
6113         logging.info("Creating volume %s for instance %s",
6114                      new_disk.iv_name, instance.name)
6115         # Note: this needs to be kept in sync with _CreateDisks
6116         #HARDCODE
6117         for node in instance.all_nodes:
6118           f_create = node == instance.primary_node
6119           try:
6120             _CreateBlockDev(self, node, instance, new_disk,
6121                             f_create, info, f_create)
6122           except errors.OpExecError, err:
6123             self.LogWarning("Failed to create volume %s (%s) on"
6124                             " node %s: %s",
6125                             new_disk.iv_name, new_disk, node, err)
6126         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6127                        (new_disk.size, new_disk.mode)))
6128       else:
6129         # change a given disk
6130         instance.disks[disk_op].mode = disk_dict['mode']
6131         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6132     # NIC changes
6133     for nic_op, nic_dict in self.op.nics:
6134       if nic_op == constants.DDM_REMOVE:
6135         # remove the last nic
6136         del instance.nics[-1]
6137         result.append(("nic.%d" % len(instance.nics), "remove"))
6138       elif nic_op == constants.DDM_ADD:
6139         # mac and bridge should be set, by now
6140         mac = nic_dict['mac']
6141         bridge = nic_dict['bridge']
6142         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6143                               bridge=bridge)
6144         instance.nics.append(new_nic)
6145         result.append(("nic.%d" % (len(instance.nics) - 1),
6146                        "add:mac=%s,ip=%s,bridge=%s" %
6147                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6148       else:
6149         # change a given nic
6150         for key in 'mac', 'ip', 'bridge':
6151           if key in nic_dict:
6152             setattr(instance.nics[nic_op], key, nic_dict[key])
6153             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6154
6155     # hvparams changes
6156     if self.op.hvparams:
6157       instance.hvparams = self.hv_inst
6158       for key, val in self.op.hvparams.iteritems():
6159         result.append(("hv/%s" % key, val))
6160
6161     # beparams changes
6162     if self.op.beparams:
6163       instance.beparams = self.be_inst
6164       for key, val in self.op.beparams.iteritems():
6165         result.append(("be/%s" % key, val))
6166
6167     self.cfg.Update(instance)
6168
6169     return result
6170
6171
6172 class LUQueryExports(NoHooksLU):
6173   """Query the exports list
6174
6175   """
6176   _OP_REQP = ['nodes']
6177   REQ_BGL = False
6178
6179   def ExpandNames(self):
6180     self.needed_locks = {}
6181     self.share_locks[locking.LEVEL_NODE] = 1
6182     if not self.op.nodes:
6183       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6184     else:
6185       self.needed_locks[locking.LEVEL_NODE] = \
6186         _GetWantedNodes(self, self.op.nodes)
6187
6188   def CheckPrereq(self):
6189     """Check prerequisites.
6190
6191     """
6192     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6193
6194   def Exec(self, feedback_fn):
6195     """Compute the list of all the exported system images.
6196
6197     @rtype: dict
6198     @return: a dictionary with the structure node->(export-list)
6199         where export-list is a list of the instances exported on
6200         that node.
6201
6202     """
6203     rpcresult = self.rpc.call_export_list(self.nodes)
6204     result = {}
6205     for node in rpcresult:
6206       if rpcresult[node].failed:
6207         result[node] = False
6208       else:
6209         result[node] = rpcresult[node].data
6210
6211     return result
6212
6213
6214 class LUExportInstance(LogicalUnit):
6215   """Export an instance to an image in the cluster.
6216
6217   """
6218   HPATH = "instance-export"
6219   HTYPE = constants.HTYPE_INSTANCE
6220   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6221   REQ_BGL = False
6222
6223   def ExpandNames(self):
6224     self._ExpandAndLockInstance()
6225     # FIXME: lock only instance primary and destination node
6226     #
6227     # Sad but true, for now we have do lock all nodes, as we don't know where
6228     # the previous export might be, and and in this LU we search for it and
6229     # remove it from its current node. In the future we could fix this by:
6230     #  - making a tasklet to search (share-lock all), then create the new one,
6231     #    then one to remove, after
6232     #  - removing the removal operation altoghether
6233     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6234
6235   def DeclareLocks(self, level):
6236     """Last minute lock declaration."""
6237     # All nodes are locked anyway, so nothing to do here.
6238
6239   def BuildHooksEnv(self):
6240     """Build hooks env.
6241
6242     This will run on the master, primary node and target node.
6243
6244     """
6245     env = {
6246       "EXPORT_NODE": self.op.target_node,
6247       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6248       }
6249     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6250     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6251           self.op.target_node]
6252     return env, nl, nl
6253
6254   def CheckPrereq(self):
6255     """Check prerequisites.
6256
6257     This checks that the instance and node names are valid.
6258
6259     """
6260     instance_name = self.op.instance_name
6261     self.instance = self.cfg.GetInstanceInfo(instance_name)
6262     assert self.instance is not None, \
6263           "Cannot retrieve locked instance %s" % self.op.instance_name
6264     _CheckNodeOnline(self, self.instance.primary_node)
6265
6266     self.dst_node = self.cfg.GetNodeInfo(
6267       self.cfg.ExpandNodeName(self.op.target_node))
6268
6269     if self.dst_node is None:
6270       # This is wrong node name, not a non-locked node
6271       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6272     _CheckNodeOnline(self, self.dst_node.name)
6273     _CheckNodeNotDrained(self, self.dst_node.name)
6274
6275     # instance disk type verification
6276     for disk in self.instance.disks:
6277       if disk.dev_type == constants.LD_FILE:
6278         raise errors.OpPrereqError("Export not supported for instances with"
6279                                    " file-based disks")
6280
6281   def Exec(self, feedback_fn):
6282     """Export an instance to an image in the cluster.
6283
6284     """
6285     instance = self.instance
6286     dst_node = self.dst_node
6287     src_node = instance.primary_node
6288     if self.op.shutdown:
6289       # shutdown the instance, but not the disks
6290       result = self.rpc.call_instance_shutdown(src_node, instance)
6291       msg = result.RemoteFailMsg()
6292       if msg:
6293         raise errors.OpExecError("Could not shutdown instance %s on"
6294                                  " node %s: %s" %
6295                                  (instance.name, src_node, msg))
6296
6297     vgname = self.cfg.GetVGName()
6298
6299     snap_disks = []
6300
6301     # set the disks ID correctly since call_instance_start needs the
6302     # correct drbd minor to create the symlinks
6303     for disk in instance.disks:
6304       self.cfg.SetDiskID(disk, src_node)
6305
6306     try:
6307       for idx, disk in enumerate(instance.disks):
6308         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6309         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6310         if new_dev_name.failed or not new_dev_name.data:
6311           self.LogWarning("Could not snapshot disk/%d on node %s",
6312                           idx, src_node)
6313           snap_disks.append(False)
6314         else:
6315           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6316                                  logical_id=(vgname, new_dev_name.data),
6317                                  physical_id=(vgname, new_dev_name.data),
6318                                  iv_name=disk.iv_name)
6319           snap_disks.append(new_dev)
6320
6321     finally:
6322       if self.op.shutdown and instance.admin_up:
6323         result = self.rpc.call_instance_start(src_node, instance, None, None)
6324         msg = result.RemoteFailMsg()
6325         if msg:
6326           _ShutdownInstanceDisks(self, instance)
6327           raise errors.OpExecError("Could not start instance: %s" % msg)
6328
6329     # TODO: check for size
6330
6331     cluster_name = self.cfg.GetClusterName()
6332     for idx, dev in enumerate(snap_disks):
6333       if dev:
6334         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6335                                                instance, cluster_name, idx)
6336         if result.failed or not result.data:
6337           self.LogWarning("Could not export disk/%d from node %s to"
6338                           " node %s", idx, src_node, dst_node.name)
6339         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6340         if msg:
6341           self.LogWarning("Could not remove snapshot for disk/%d from node"
6342                           " %s: %s", idx, src_node, msg)
6343
6344     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6345     if result.failed or not result.data:
6346       self.LogWarning("Could not finalize export for instance %s on node %s",
6347                       instance.name, dst_node.name)
6348
6349     nodelist = self.cfg.GetNodeList()
6350     nodelist.remove(dst_node.name)
6351
6352     # on one-node clusters nodelist will be empty after the removal
6353     # if we proceed the backup would be removed because OpQueryExports
6354     # substitutes an empty list with the full cluster node list.
6355     if nodelist:
6356       exportlist = self.rpc.call_export_list(nodelist)
6357       for node in exportlist:
6358         if exportlist[node].failed:
6359           continue
6360         if instance.name in exportlist[node].data:
6361           if not self.rpc.call_export_remove(node, instance.name):
6362             self.LogWarning("Could not remove older export for instance %s"
6363                             " on node %s", instance.name, node)
6364
6365
6366 class LURemoveExport(NoHooksLU):
6367   """Remove exports related to the named instance.
6368
6369   """
6370   _OP_REQP = ["instance_name"]
6371   REQ_BGL = False
6372
6373   def ExpandNames(self):
6374     self.needed_locks = {}
6375     # We need all nodes to be locked in order for RemoveExport to work, but we
6376     # don't need to lock the instance itself, as nothing will happen to it (and
6377     # we can remove exports also for a removed instance)
6378     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6379
6380   def CheckPrereq(self):
6381     """Check prerequisites.
6382     """
6383     pass
6384
6385   def Exec(self, feedback_fn):
6386     """Remove any export.
6387
6388     """
6389     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6390     # If the instance was not found we'll try with the name that was passed in.
6391     # This will only work if it was an FQDN, though.
6392     fqdn_warn = False
6393     if not instance_name:
6394       fqdn_warn = True
6395       instance_name = self.op.instance_name
6396
6397     exportlist = self.rpc.call_export_list(self.acquired_locks[
6398       locking.LEVEL_NODE])
6399     found = False
6400     for node in exportlist:
6401       if exportlist[node].failed:
6402         self.LogWarning("Failed to query node %s, continuing" % node)
6403         continue
6404       if instance_name in exportlist[node].data:
6405         found = True
6406         result = self.rpc.call_export_remove(node, instance_name)
6407         if result.failed or not result.data:
6408           logging.error("Could not remove export for instance %s"
6409                         " on node %s", instance_name, node)
6410
6411     if fqdn_warn and not found:
6412       feedback_fn("Export not found. If trying to remove an export belonging"
6413                   " to a deleted instance please use its Fully Qualified"
6414                   " Domain Name.")
6415
6416
6417 class TagsLU(NoHooksLU):
6418   """Generic tags LU.
6419
6420   This is an abstract class which is the parent of all the other tags LUs.
6421
6422   """
6423
6424   def ExpandNames(self):
6425     self.needed_locks = {}
6426     if self.op.kind == constants.TAG_NODE:
6427       name = self.cfg.ExpandNodeName(self.op.name)
6428       if name is None:
6429         raise errors.OpPrereqError("Invalid node name (%s)" %
6430                                    (self.op.name,))
6431       self.op.name = name
6432       self.needed_locks[locking.LEVEL_NODE] = name
6433     elif self.op.kind == constants.TAG_INSTANCE:
6434       name = self.cfg.ExpandInstanceName(self.op.name)
6435       if name is None:
6436         raise errors.OpPrereqError("Invalid instance name (%s)" %
6437                                    (self.op.name,))
6438       self.op.name = name
6439       self.needed_locks[locking.LEVEL_INSTANCE] = name
6440
6441   def CheckPrereq(self):
6442     """Check prerequisites.
6443
6444     """
6445     if self.op.kind == constants.TAG_CLUSTER:
6446       self.target = self.cfg.GetClusterInfo()
6447     elif self.op.kind == constants.TAG_NODE:
6448       self.target = self.cfg.GetNodeInfo(self.op.name)
6449     elif self.op.kind == constants.TAG_INSTANCE:
6450       self.target = self.cfg.GetInstanceInfo(self.op.name)
6451     else:
6452       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6453                                  str(self.op.kind))
6454
6455
6456 class LUGetTags(TagsLU):
6457   """Returns the tags of a given object.
6458
6459   """
6460   _OP_REQP = ["kind", "name"]
6461   REQ_BGL = False
6462
6463   def Exec(self, feedback_fn):
6464     """Returns the tag list.
6465
6466     """
6467     return list(self.target.GetTags())
6468
6469
6470 class LUSearchTags(NoHooksLU):
6471   """Searches the tags for a given pattern.
6472
6473   """
6474   _OP_REQP = ["pattern"]
6475   REQ_BGL = False
6476
6477   def ExpandNames(self):
6478     self.needed_locks = {}
6479
6480   def CheckPrereq(self):
6481     """Check prerequisites.
6482
6483     This checks the pattern passed for validity by compiling it.
6484
6485     """
6486     try:
6487       self.re = re.compile(self.op.pattern)
6488     except re.error, err:
6489       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6490                                  (self.op.pattern, err))
6491
6492   def Exec(self, feedback_fn):
6493     """Returns the tag list.
6494
6495     """
6496     cfg = self.cfg
6497     tgts = [("/cluster", cfg.GetClusterInfo())]
6498     ilist = cfg.GetAllInstancesInfo().values()
6499     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6500     nlist = cfg.GetAllNodesInfo().values()
6501     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6502     results = []
6503     for path, target in tgts:
6504       for tag in target.GetTags():
6505         if self.re.search(tag):
6506           results.append((path, tag))
6507     return results
6508
6509
6510 class LUAddTags(TagsLU):
6511   """Sets a tag on a given object.
6512
6513   """
6514   _OP_REQP = ["kind", "name", "tags"]
6515   REQ_BGL = False
6516
6517   def CheckPrereq(self):
6518     """Check prerequisites.
6519
6520     This checks the type and length of the tag name and value.
6521
6522     """
6523     TagsLU.CheckPrereq(self)
6524     for tag in self.op.tags:
6525       objects.TaggableObject.ValidateTag(tag)
6526
6527   def Exec(self, feedback_fn):
6528     """Sets the tag.
6529
6530     """
6531     try:
6532       for tag in self.op.tags:
6533         self.target.AddTag(tag)
6534     except errors.TagError, err:
6535       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6536     try:
6537       self.cfg.Update(self.target)
6538     except errors.ConfigurationError:
6539       raise errors.OpRetryError("There has been a modification to the"
6540                                 " config file and the operation has been"
6541                                 " aborted. Please retry.")
6542
6543
6544 class LUDelTags(TagsLU):
6545   """Delete a list of tags from a given object.
6546
6547   """
6548   _OP_REQP = ["kind", "name", "tags"]
6549   REQ_BGL = False
6550
6551   def CheckPrereq(self):
6552     """Check prerequisites.
6553
6554     This checks that we have the given tag.
6555
6556     """
6557     TagsLU.CheckPrereq(self)
6558     for tag in self.op.tags:
6559       objects.TaggableObject.ValidateTag(tag)
6560     del_tags = frozenset(self.op.tags)
6561     cur_tags = self.target.GetTags()
6562     if not del_tags <= cur_tags:
6563       diff_tags = del_tags - cur_tags
6564       diff_names = ["'%s'" % tag for tag in diff_tags]
6565       diff_names.sort()
6566       raise errors.OpPrereqError("Tag(s) %s not found" %
6567                                  (",".join(diff_names)))
6568
6569   def Exec(self, feedback_fn):
6570     """Remove the tag from the object.
6571
6572     """
6573     for tag in self.op.tags:
6574       self.target.RemoveTag(tag)
6575     try:
6576       self.cfg.Update(self.target)
6577     except errors.ConfigurationError:
6578       raise errors.OpRetryError("There has been a modification to the"
6579                                 " config file and the operation has been"
6580                                 " aborted. Please retry.")
6581
6582
6583 class LUTestDelay(NoHooksLU):
6584   """Sleep for a specified amount of time.
6585
6586   This LU sleeps on the master and/or nodes for a specified amount of
6587   time.
6588
6589   """
6590   _OP_REQP = ["duration", "on_master", "on_nodes"]
6591   REQ_BGL = False
6592
6593   def ExpandNames(self):
6594     """Expand names and set required locks.
6595
6596     This expands the node list, if any.
6597
6598     """
6599     self.needed_locks = {}
6600     if self.op.on_nodes:
6601       # _GetWantedNodes can be used here, but is not always appropriate to use
6602       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6603       # more information.
6604       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6605       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6606
6607   def CheckPrereq(self):
6608     """Check prerequisites.
6609
6610     """
6611
6612   def Exec(self, feedback_fn):
6613     """Do the actual sleep.
6614
6615     """
6616     if self.op.on_master:
6617       if not utils.TestDelay(self.op.duration):
6618         raise errors.OpExecError("Error during master delay test")
6619     if self.op.on_nodes:
6620       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6621       if not result:
6622         raise errors.OpExecError("Complete failure from rpc call")
6623       for node, node_result in result.items():
6624         node_result.Raise()
6625         if not node_result.data:
6626           raise errors.OpExecError("Failure during rpc call to node %s,"
6627                                    " result: %s" % (node, node_result.data))
6628
6629
6630 class IAllocator(object):
6631   """IAllocator framework.
6632
6633   An IAllocator instance has three sets of attributes:
6634     - cfg that is needed to query the cluster
6635     - input data (all members of the _KEYS class attribute are required)
6636     - four buffer attributes (in|out_data|text), that represent the
6637       input (to the external script) in text and data structure format,
6638       and the output from it, again in two formats
6639     - the result variables from the script (success, info, nodes) for
6640       easy usage
6641
6642   """
6643   _ALLO_KEYS = [
6644     "mem_size", "disks", "disk_template",
6645     "os", "tags", "nics", "vcpus", "hypervisor",
6646     ]
6647   _RELO_KEYS = [
6648     "relocate_from",
6649     ]
6650
6651   def __init__(self, lu, mode, name, **kwargs):
6652     self.lu = lu
6653     # init buffer variables
6654     self.in_text = self.out_text = self.in_data = self.out_data = None
6655     # init all input fields so that pylint is happy
6656     self.mode = mode
6657     self.name = name
6658     self.mem_size = self.disks = self.disk_template = None
6659     self.os = self.tags = self.nics = self.vcpus = None
6660     self.hypervisor = None
6661     self.relocate_from = None
6662     # computed fields
6663     self.required_nodes = None
6664     # init result fields
6665     self.success = self.info = self.nodes = None
6666     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6667       keyset = self._ALLO_KEYS
6668     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6669       keyset = self._RELO_KEYS
6670     else:
6671       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6672                                    " IAllocator" % self.mode)
6673     for key in kwargs:
6674       if key not in keyset:
6675         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6676                                      " IAllocator" % key)
6677       setattr(self, key, kwargs[key])
6678     for key in keyset:
6679       if key not in kwargs:
6680         raise errors.ProgrammerError("Missing input parameter '%s' to"
6681                                      " IAllocator" % key)
6682     self._BuildInputData()
6683
6684   def _ComputeClusterData(self):
6685     """Compute the generic allocator input data.
6686
6687     This is the data that is independent of the actual operation.
6688
6689     """
6690     cfg = self.lu.cfg
6691     cluster_info = cfg.GetClusterInfo()
6692     # cluster data
6693     data = {
6694       "version": constants.IALLOCATOR_VERSION,
6695       "cluster_name": cfg.GetClusterName(),
6696       "cluster_tags": list(cluster_info.GetTags()),
6697       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6698       # we don't have job IDs
6699       }
6700     iinfo = cfg.GetAllInstancesInfo().values()
6701     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6702
6703     # node data
6704     node_results = {}
6705     node_list = cfg.GetNodeList()
6706
6707     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6708       hypervisor_name = self.hypervisor
6709     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6710       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6711
6712     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6713                                            hypervisor_name)
6714     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6715                        cluster_info.enabled_hypervisors)
6716     for nname, nresult in node_data.items():
6717       # first fill in static (config-based) values
6718       ninfo = cfg.GetNodeInfo(nname)
6719       pnr = {
6720         "tags": list(ninfo.GetTags()),
6721         "primary_ip": ninfo.primary_ip,
6722         "secondary_ip": ninfo.secondary_ip,
6723         "offline": ninfo.offline,
6724         "drained": ninfo.drained,
6725         "master_candidate": ninfo.master_candidate,
6726         }
6727
6728       if not ninfo.offline:
6729         nresult.Raise()
6730         if not isinstance(nresult.data, dict):
6731           raise errors.OpExecError("Can't get data for node %s" % nname)
6732         remote_info = nresult.data
6733         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6734                      'vg_size', 'vg_free', 'cpu_total']:
6735           if attr not in remote_info:
6736             raise errors.OpExecError("Node '%s' didn't return attribute"
6737                                      " '%s'" % (nname, attr))
6738           try:
6739             remote_info[attr] = int(remote_info[attr])
6740           except ValueError, err:
6741             raise errors.OpExecError("Node '%s' returned invalid value"
6742                                      " for '%s': %s" % (nname, attr, err))
6743         # compute memory used by primary instances
6744         i_p_mem = i_p_up_mem = 0
6745         for iinfo, beinfo in i_list:
6746           if iinfo.primary_node == nname:
6747             i_p_mem += beinfo[constants.BE_MEMORY]
6748             if iinfo.name not in node_iinfo[nname].data:
6749               i_used_mem = 0
6750             else:
6751               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6752             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6753             remote_info['memory_free'] -= max(0, i_mem_diff)
6754
6755             if iinfo.admin_up:
6756               i_p_up_mem += beinfo[constants.BE_MEMORY]
6757
6758         # compute memory used by instances
6759         pnr_dyn = {
6760           "total_memory": remote_info['memory_total'],
6761           "reserved_memory": remote_info['memory_dom0'],
6762           "free_memory": remote_info['memory_free'],
6763           "total_disk": remote_info['vg_size'],
6764           "free_disk": remote_info['vg_free'],
6765           "total_cpus": remote_info['cpu_total'],
6766           "i_pri_memory": i_p_mem,
6767           "i_pri_up_memory": i_p_up_mem,
6768           }
6769         pnr.update(pnr_dyn)
6770
6771       node_results[nname] = pnr
6772     data["nodes"] = node_results
6773
6774     # instance data
6775     instance_data = {}
6776     for iinfo, beinfo in i_list:
6777       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6778                   for n in iinfo.nics]
6779       pir = {
6780         "tags": list(iinfo.GetTags()),
6781         "admin_up": iinfo.admin_up,
6782         "vcpus": beinfo[constants.BE_VCPUS],
6783         "memory": beinfo[constants.BE_MEMORY],
6784         "os": iinfo.os,
6785         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6786         "nics": nic_data,
6787         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6788         "disk_template": iinfo.disk_template,
6789         "hypervisor": iinfo.hypervisor,
6790         }
6791       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6792                                                  pir["disks"])
6793       instance_data[iinfo.name] = pir
6794
6795     data["instances"] = instance_data
6796
6797     self.in_data = data
6798
6799   def _AddNewInstance(self):
6800     """Add new instance data to allocator structure.
6801
6802     This in combination with _AllocatorGetClusterData will create the
6803     correct structure needed as input for the allocator.
6804
6805     The checks for the completeness of the opcode must have already been
6806     done.
6807
6808     """
6809     data = self.in_data
6810
6811     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6812
6813     if self.disk_template in constants.DTS_NET_MIRROR:
6814       self.required_nodes = 2
6815     else:
6816       self.required_nodes = 1
6817     request = {
6818       "type": "allocate",
6819       "name": self.name,
6820       "disk_template": self.disk_template,
6821       "tags": self.tags,
6822       "os": self.os,
6823       "vcpus": self.vcpus,
6824       "memory": self.mem_size,
6825       "disks": self.disks,
6826       "disk_space_total": disk_space,
6827       "nics": self.nics,
6828       "required_nodes": self.required_nodes,
6829       }
6830     data["request"] = request
6831
6832   def _AddRelocateInstance(self):
6833     """Add relocate instance data to allocator structure.
6834
6835     This in combination with _IAllocatorGetClusterData will create the
6836     correct structure needed as input for the allocator.
6837
6838     The checks for the completeness of the opcode must have already been
6839     done.
6840
6841     """
6842     instance = self.lu.cfg.GetInstanceInfo(self.name)
6843     if instance is None:
6844       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6845                                    " IAllocator" % self.name)
6846
6847     if instance.disk_template not in constants.DTS_NET_MIRROR:
6848       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6849
6850     if len(instance.secondary_nodes) != 1:
6851       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6852
6853     self.required_nodes = 1
6854     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6855     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6856
6857     request = {
6858       "type": "relocate",
6859       "name": self.name,
6860       "disk_space_total": disk_space,
6861       "required_nodes": self.required_nodes,
6862       "relocate_from": self.relocate_from,
6863       }
6864     self.in_data["request"] = request
6865
6866   def _BuildInputData(self):
6867     """Build input data structures.
6868
6869     """
6870     self._ComputeClusterData()
6871
6872     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6873       self._AddNewInstance()
6874     else:
6875       self._AddRelocateInstance()
6876
6877     self.in_text = serializer.Dump(self.in_data)
6878
6879   def Run(self, name, validate=True, call_fn=None):
6880     """Run an instance allocator and return the results.
6881
6882     """
6883     if call_fn is None:
6884       call_fn = self.lu.rpc.call_iallocator_runner
6885     data = self.in_text
6886
6887     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6888     result.Raise()
6889
6890     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6891       raise errors.OpExecError("Invalid result from master iallocator runner")
6892
6893     rcode, stdout, stderr, fail = result.data
6894
6895     if rcode == constants.IARUN_NOTFOUND:
6896       raise errors.OpExecError("Can't find allocator '%s'" % name)
6897     elif rcode == constants.IARUN_FAILURE:
6898       raise errors.OpExecError("Instance allocator call failed: %s,"
6899                                " output: %s" % (fail, stdout+stderr))
6900     self.out_text = stdout
6901     if validate:
6902       self._ValidateResult()
6903
6904   def _ValidateResult(self):
6905     """Process the allocator results.
6906
6907     This will process and if successful save the result in
6908     self.out_data and the other parameters.
6909
6910     """
6911     try:
6912       rdict = serializer.Load(self.out_text)
6913     except Exception, err:
6914       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6915
6916     if not isinstance(rdict, dict):
6917       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6918
6919     for key in "success", "info", "nodes":
6920       if key not in rdict:
6921         raise errors.OpExecError("Can't parse iallocator results:"
6922                                  " missing key '%s'" % key)
6923       setattr(self, key, rdict[key])
6924
6925     if not isinstance(rdict["nodes"], list):
6926       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6927                                " is not a list")
6928     self.out_data = rdict
6929
6930
6931 class LUTestAllocator(NoHooksLU):
6932   """Run allocator tests.
6933
6934   This LU runs the allocator tests
6935
6936   """
6937   _OP_REQP = ["direction", "mode", "name"]
6938
6939   def CheckPrereq(self):
6940     """Check prerequisites.
6941
6942     This checks the opcode parameters depending on the director and mode test.
6943
6944     """
6945     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6946       for attr in ["name", "mem_size", "disks", "disk_template",
6947                    "os", "tags", "nics", "vcpus"]:
6948         if not hasattr(self.op, attr):
6949           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6950                                      attr)
6951       iname = self.cfg.ExpandInstanceName(self.op.name)
6952       if iname is not None:
6953         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6954                                    iname)
6955       if not isinstance(self.op.nics, list):
6956         raise errors.OpPrereqError("Invalid parameter 'nics'")
6957       for row in self.op.nics:
6958         if (not isinstance(row, dict) or
6959             "mac" not in row or
6960             "ip" not in row or
6961             "bridge" not in row):
6962           raise errors.OpPrereqError("Invalid contents of the"
6963                                      " 'nics' parameter")
6964       if not isinstance(self.op.disks, list):
6965         raise errors.OpPrereqError("Invalid parameter 'disks'")
6966       for row in self.op.disks:
6967         if (not isinstance(row, dict) or
6968             "size" not in row or
6969             not isinstance(row["size"], int) or
6970             "mode" not in row or
6971             row["mode"] not in ['r', 'w']):
6972           raise errors.OpPrereqError("Invalid contents of the"
6973                                      " 'disks' parameter")
6974       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6975         self.op.hypervisor = self.cfg.GetHypervisorType()
6976     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6977       if not hasattr(self.op, "name"):
6978         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6979       fname = self.cfg.ExpandInstanceName(self.op.name)
6980       if fname is None:
6981         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6982                                    self.op.name)
6983       self.op.name = fname
6984       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6985     else:
6986       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6987                                  self.op.mode)
6988
6989     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6990       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6991         raise errors.OpPrereqError("Missing allocator name")
6992     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6993       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6994                                  self.op.direction)
6995
6996   def Exec(self, feedback_fn):
6997     """Run the allocator test.
6998
6999     """
7000     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7001       ial = IAllocator(self,
7002                        mode=self.op.mode,
7003                        name=self.op.name,
7004                        mem_size=self.op.mem_size,
7005                        disks=self.op.disks,
7006                        disk_template=self.op.disk_template,
7007                        os=self.op.os,
7008                        tags=self.op.tags,
7009                        nics=self.op.nics,
7010                        vcpus=self.op.vcpus,
7011                        hypervisor=self.op.hypervisor,
7012                        )
7013     else:
7014       ial = IAllocator(self,
7015                        mode=self.op.mode,
7016                        name=self.op.name,
7017                        relocate_from=list(self.relocate_from),
7018                        )
7019
7020     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7021       result = ial.in_text
7022     else:
7023       ial.Run(self.op.allocator, validate=False)
7024       result = ial.out_text
7025     return result