7c25d9555db1a549f0cecb1f26a1f3d5274937dc
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384         if to_result.failed or not to_result.data:
1385           logging.error("Copy of file %s to node %s failed",
1386                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1387
1388     finally:
1389       result = self.rpc.call_node_start_master(master, False)
1390       if result.failed or not result.data:
1391         self.LogWarning("Could not re-enable the master role on"
1392                         " the master, please restart manually.")
1393
1394
1395 def _RecursiveCheckIfLVMBased(disk):
1396   """Check if the given disk or its children are lvm-based.
1397
1398   @type disk: L{objects.Disk}
1399   @param disk: the disk to check
1400   @rtype: booleean
1401   @return: boolean indicating whether a LD_LV dev_type was found or not
1402
1403   """
1404   if disk.children:
1405     for chdisk in disk.children:
1406       if _RecursiveCheckIfLVMBased(chdisk):
1407         return True
1408   return disk.dev_type == constants.LD_LV
1409
1410
1411 class LUSetClusterParams(LogicalUnit):
1412   """Change the parameters of the cluster.
1413
1414   """
1415   HPATH = "cluster-modify"
1416   HTYPE = constants.HTYPE_CLUSTER
1417   _OP_REQP = []
1418   REQ_BGL = False
1419
1420   def CheckArguments(self):
1421     """Check parameters
1422
1423     """
1424     if not hasattr(self.op, "candidate_pool_size"):
1425       self.op.candidate_pool_size = None
1426     if self.op.candidate_pool_size is not None:
1427       try:
1428         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1429       except (ValueError, TypeError), err:
1430         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1431                                    str(err))
1432       if self.op.candidate_pool_size < 1:
1433         raise errors.OpPrereqError("At least one master candidate needed")
1434
1435   def ExpandNames(self):
1436     # FIXME: in the future maybe other cluster params won't require checking on
1437     # all nodes to be modified.
1438     self.needed_locks = {
1439       locking.LEVEL_NODE: locking.ALL_SET,
1440     }
1441     self.share_locks[locking.LEVEL_NODE] = 1
1442
1443   def BuildHooksEnv(self):
1444     """Build hooks env.
1445
1446     """
1447     env = {
1448       "OP_TARGET": self.cfg.GetClusterName(),
1449       "NEW_VG_NAME": self.op.vg_name,
1450       }
1451     mn = self.cfg.GetMasterNode()
1452     return env, [mn], [mn]
1453
1454   def CheckPrereq(self):
1455     """Check prerequisites.
1456
1457     This checks whether the given params don't conflict and
1458     if the given volume group is valid.
1459
1460     """
1461     if self.op.vg_name is not None and not self.op.vg_name:
1462       instances = self.cfg.GetAllInstancesInfo().values()
1463       for inst in instances:
1464         for disk in inst.disks:
1465           if _RecursiveCheckIfLVMBased(disk):
1466             raise errors.OpPrereqError("Cannot disable lvm storage while"
1467                                        " lvm-based instances exist")
1468
1469     node_list = self.acquired_locks[locking.LEVEL_NODE]
1470
1471     # if vg_name not None, checks given volume group on all nodes
1472     if self.op.vg_name:
1473       vglist = self.rpc.call_vg_list(node_list)
1474       for node in node_list:
1475         if vglist[node].failed:
1476           # ignoring down node
1477           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1478           continue
1479         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1480                                               self.op.vg_name,
1481                                               constants.MIN_VG_SIZE)
1482         if vgstatus:
1483           raise errors.OpPrereqError("Error on node '%s': %s" %
1484                                      (node, vgstatus))
1485
1486     self.cluster = cluster = self.cfg.GetClusterInfo()
1487     # validate beparams changes
1488     if self.op.beparams:
1489       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1490       self.new_beparams = cluster.FillDict(
1491         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1492
1493     # hypervisor list/parameters
1494     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1495     if self.op.hvparams:
1496       if not isinstance(self.op.hvparams, dict):
1497         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1498       for hv_name, hv_dict in self.op.hvparams.items():
1499         if hv_name not in self.new_hvparams:
1500           self.new_hvparams[hv_name] = hv_dict
1501         else:
1502           self.new_hvparams[hv_name].update(hv_dict)
1503
1504     if self.op.enabled_hypervisors is not None:
1505       self.hv_list = self.op.enabled_hypervisors
1506     else:
1507       self.hv_list = cluster.enabled_hypervisors
1508
1509     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1510       # either the enabled list has changed, or the parameters have, validate
1511       for hv_name, hv_params in self.new_hvparams.items():
1512         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1513             (self.op.enabled_hypervisors and
1514              hv_name in self.op.enabled_hypervisors)):
1515           # either this is a new hypervisor, or its parameters have changed
1516           hv_class = hypervisor.GetHypervisor(hv_name)
1517           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1518           hv_class.CheckParameterSyntax(hv_params)
1519           _CheckHVParams(self, node_list, hv_name, hv_params)
1520
1521   def Exec(self, feedback_fn):
1522     """Change the parameters of the cluster.
1523
1524     """
1525     if self.op.vg_name is not None:
1526       new_volume = self.op.vg_name
1527       if not new_volume:
1528         new_volume = None
1529       if new_volume != self.cfg.GetVGName():
1530         self.cfg.SetVGName(new_volume)
1531       else:
1532         feedback_fn("Cluster LVM configuration already in desired"
1533                     " state, not changing")
1534     if self.op.hvparams:
1535       self.cluster.hvparams = self.new_hvparams
1536     if self.op.enabled_hypervisors is not None:
1537       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1538     if self.op.beparams:
1539       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1540     if self.op.candidate_pool_size is not None:
1541       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1542
1543     self.cfg.Update(self.cluster)
1544
1545     # we want to update nodes after the cluster so that if any errors
1546     # happen, we have recorded and saved the cluster info
1547     if self.op.candidate_pool_size is not None:
1548       _AdjustCandidatePool(self)
1549
1550
1551 class LURedistributeConfig(NoHooksLU):
1552   """Force the redistribution of cluster configuration.
1553
1554   This is a very simple LU.
1555
1556   """
1557   _OP_REQP = []
1558   REQ_BGL = False
1559
1560   def ExpandNames(self):
1561     self.needed_locks = {
1562       locking.LEVEL_NODE: locking.ALL_SET,
1563     }
1564     self.share_locks[locking.LEVEL_NODE] = 1
1565
1566   def CheckPrereq(self):
1567     """Check prerequisites.
1568
1569     """
1570
1571   def Exec(self, feedback_fn):
1572     """Redistribute the configuration.
1573
1574     """
1575     self.cfg.Update(self.cfg.GetClusterInfo())
1576
1577
1578 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1579   """Sleep and poll for an instance's disk to sync.
1580
1581   """
1582   if not instance.disks:
1583     return True
1584
1585   if not oneshot:
1586     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1587
1588   node = instance.primary_node
1589
1590   for dev in instance.disks:
1591     lu.cfg.SetDiskID(dev, node)
1592
1593   retries = 0
1594   while True:
1595     max_time = 0
1596     done = True
1597     cumul_degraded = False
1598     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1599     if rstats.failed or not rstats.data:
1600       lu.LogWarning("Can't get any data from node %s", node)
1601       retries += 1
1602       if retries >= 10:
1603         raise errors.RemoteError("Can't contact node %s for mirror data,"
1604                                  " aborting." % node)
1605       time.sleep(6)
1606       continue
1607     rstats = rstats.data
1608     retries = 0
1609     for i, mstat in enumerate(rstats):
1610       if mstat is None:
1611         lu.LogWarning("Can't compute data for node %s/%s",
1612                            node, instance.disks[i].iv_name)
1613         continue
1614       # we ignore the ldisk parameter
1615       perc_done, est_time, is_degraded, _ = mstat
1616       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1617       if perc_done is not None:
1618         done = False
1619         if est_time is not None:
1620           rem_time = "%d estimated seconds remaining" % est_time
1621           max_time = est_time
1622         else:
1623           rem_time = "no time estimate"
1624         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1625                         (instance.disks[i].iv_name, perc_done, rem_time))
1626     if done or oneshot:
1627       break
1628
1629     time.sleep(min(60, max_time))
1630
1631   if done:
1632     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1633   return not cumul_degraded
1634
1635
1636 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1637   """Check that mirrors are not degraded.
1638
1639   The ldisk parameter, if True, will change the test from the
1640   is_degraded attribute (which represents overall non-ok status for
1641   the device(s)) to the ldisk (representing the local storage status).
1642
1643   """
1644   lu.cfg.SetDiskID(dev, node)
1645   if ldisk:
1646     idx = 6
1647   else:
1648     idx = 5
1649
1650   result = True
1651   if on_primary or dev.AssembleOnSecondary():
1652     rstats = lu.rpc.call_blockdev_find(node, dev)
1653     msg = rstats.RemoteFailMsg()
1654     if msg:
1655       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1656       result = False
1657     elif not rstats.payload:
1658       lu.LogWarning("Can't find disk on node %s", node)
1659       result = False
1660     else:
1661       result = result and (not rstats.payload[idx])
1662   if dev.children:
1663     for child in dev.children:
1664       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1665
1666   return result
1667
1668
1669 class LUDiagnoseOS(NoHooksLU):
1670   """Logical unit for OS diagnose/query.
1671
1672   """
1673   _OP_REQP = ["output_fields", "names"]
1674   REQ_BGL = False
1675   _FIELDS_STATIC = utils.FieldSet()
1676   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1677
1678   def ExpandNames(self):
1679     if self.op.names:
1680       raise errors.OpPrereqError("Selective OS query not supported")
1681
1682     _CheckOutputFields(static=self._FIELDS_STATIC,
1683                        dynamic=self._FIELDS_DYNAMIC,
1684                        selected=self.op.output_fields)
1685
1686     # Lock all nodes, in shared mode
1687     # Temporary removal of locks, should be reverted later
1688     # TODO: reintroduce locks when they are lighter-weight
1689     self.needed_locks = {}
1690     #self.share_locks[locking.LEVEL_NODE] = 1
1691     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1692
1693   def CheckPrereq(self):
1694     """Check prerequisites.
1695
1696     """
1697
1698   @staticmethod
1699   def _DiagnoseByOS(node_list, rlist):
1700     """Remaps a per-node return list into an a per-os per-node dictionary
1701
1702     @param node_list: a list with the names of all nodes
1703     @param rlist: a map with node names as keys and OS objects as values
1704
1705     @rtype: dict
1706     @return: a dictionary with osnames as keys and as value another map, with
1707         nodes as keys and list of OS objects as values, eg::
1708
1709           {"debian-etch": {"node1": [<object>,...],
1710                            "node2": [<object>,]}
1711           }
1712
1713     """
1714     all_os = {}
1715     # we build here the list of nodes that didn't fail the RPC (at RPC
1716     # level), so that nodes with a non-responding node daemon don't
1717     # make all OSes invalid
1718     good_nodes = [node_name for node_name in rlist
1719                   if not rlist[node_name].failed]
1720     for node_name, nr in rlist.iteritems():
1721       if nr.failed or not nr.data:
1722         continue
1723       for os_obj in nr.data:
1724         if os_obj.name not in all_os:
1725           # build a list of nodes for this os containing empty lists
1726           # for each node in node_list
1727           all_os[os_obj.name] = {}
1728           for nname in good_nodes:
1729             all_os[os_obj.name][nname] = []
1730         all_os[os_obj.name][node_name].append(os_obj)
1731     return all_os
1732
1733   def Exec(self, feedback_fn):
1734     """Compute the list of OSes.
1735
1736     """
1737     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1738     node_data = self.rpc.call_os_diagnose(valid_nodes)
1739     if node_data == False:
1740       raise errors.OpExecError("Can't gather the list of OSes")
1741     pol = self._DiagnoseByOS(valid_nodes, node_data)
1742     output = []
1743     for os_name, os_data in pol.iteritems():
1744       row = []
1745       for field in self.op.output_fields:
1746         if field == "name":
1747           val = os_name
1748         elif field == "valid":
1749           val = utils.all([osl and osl[0] for osl in os_data.values()])
1750         elif field == "node_status":
1751           val = {}
1752           for node_name, nos_list in os_data.iteritems():
1753             val[node_name] = [(v.status, v.path) for v in nos_list]
1754         else:
1755           raise errors.ParameterError(field)
1756         row.append(val)
1757       output.append(row)
1758
1759     return output
1760
1761
1762 class LURemoveNode(LogicalUnit):
1763   """Logical unit for removing a node.
1764
1765   """
1766   HPATH = "node-remove"
1767   HTYPE = constants.HTYPE_NODE
1768   _OP_REQP = ["node_name"]
1769
1770   def BuildHooksEnv(self):
1771     """Build hooks env.
1772
1773     This doesn't run on the target node in the pre phase as a failed
1774     node would then be impossible to remove.
1775
1776     """
1777     env = {
1778       "OP_TARGET": self.op.node_name,
1779       "NODE_NAME": self.op.node_name,
1780       }
1781     all_nodes = self.cfg.GetNodeList()
1782     all_nodes.remove(self.op.node_name)
1783     return env, all_nodes, all_nodes
1784
1785   def CheckPrereq(self):
1786     """Check prerequisites.
1787
1788     This checks:
1789      - the node exists in the configuration
1790      - it does not have primary or secondary instances
1791      - it's not the master
1792
1793     Any errors are signalled by raising errors.OpPrereqError.
1794
1795     """
1796     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1797     if node is None:
1798       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1799
1800     instance_list = self.cfg.GetInstanceList()
1801
1802     masternode = self.cfg.GetMasterNode()
1803     if node.name == masternode:
1804       raise errors.OpPrereqError("Node is the master node,"
1805                                  " you need to failover first.")
1806
1807     for instance_name in instance_list:
1808       instance = self.cfg.GetInstanceInfo(instance_name)
1809       if node.name in instance.all_nodes:
1810         raise errors.OpPrereqError("Instance %s is still running on the node,"
1811                                    " please remove first." % instance_name)
1812     self.op.node_name = node.name
1813     self.node = node
1814
1815   def Exec(self, feedback_fn):
1816     """Removes the node from the cluster.
1817
1818     """
1819     node = self.node
1820     logging.info("Stopping the node daemon and removing configs from node %s",
1821                  node.name)
1822
1823     self.context.RemoveNode(node.name)
1824
1825     self.rpc.call_node_leave_cluster(node.name)
1826
1827     # Promote nodes to master candidate as needed
1828     _AdjustCandidatePool(self)
1829
1830
1831 class LUQueryNodes(NoHooksLU):
1832   """Logical unit for querying nodes.
1833
1834   """
1835   _OP_REQP = ["output_fields", "names", "use_locking"]
1836   REQ_BGL = False
1837   _FIELDS_DYNAMIC = utils.FieldSet(
1838     "dtotal", "dfree",
1839     "mtotal", "mnode", "mfree",
1840     "bootid",
1841     "ctotal", "cnodes", "csockets",
1842     )
1843
1844   _FIELDS_STATIC = utils.FieldSet(
1845     "name", "pinst_cnt", "sinst_cnt",
1846     "pinst_list", "sinst_list",
1847     "pip", "sip", "tags",
1848     "serial_no",
1849     "master_candidate",
1850     "master",
1851     "offline",
1852     "drained",
1853     )
1854
1855   def ExpandNames(self):
1856     _CheckOutputFields(static=self._FIELDS_STATIC,
1857                        dynamic=self._FIELDS_DYNAMIC,
1858                        selected=self.op.output_fields)
1859
1860     self.needed_locks = {}
1861     self.share_locks[locking.LEVEL_NODE] = 1
1862
1863     if self.op.names:
1864       self.wanted = _GetWantedNodes(self, self.op.names)
1865     else:
1866       self.wanted = locking.ALL_SET
1867
1868     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1869     self.do_locking = self.do_node_query and self.op.use_locking
1870     if self.do_locking:
1871       # if we don't request only static fields, we need to lock the nodes
1872       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1873
1874
1875   def CheckPrereq(self):
1876     """Check prerequisites.
1877
1878     """
1879     # The validation of the node list is done in the _GetWantedNodes,
1880     # if non empty, and if empty, there's no validation to do
1881     pass
1882
1883   def Exec(self, feedback_fn):
1884     """Computes the list of nodes and their attributes.
1885
1886     """
1887     all_info = self.cfg.GetAllNodesInfo()
1888     if self.do_locking:
1889       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1890     elif self.wanted != locking.ALL_SET:
1891       nodenames = self.wanted
1892       missing = set(nodenames).difference(all_info.keys())
1893       if missing:
1894         raise errors.OpExecError(
1895           "Some nodes were removed before retrieving their data: %s" % missing)
1896     else:
1897       nodenames = all_info.keys()
1898
1899     nodenames = utils.NiceSort(nodenames)
1900     nodelist = [all_info[name] for name in nodenames]
1901
1902     # begin data gathering
1903
1904     if self.do_node_query:
1905       live_data = {}
1906       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1907                                           self.cfg.GetHypervisorType())
1908       for name in nodenames:
1909         nodeinfo = node_data[name]
1910         if not nodeinfo.failed and nodeinfo.data:
1911           nodeinfo = nodeinfo.data
1912           fn = utils.TryConvert
1913           live_data[name] = {
1914             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1915             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1916             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1917             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1918             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1919             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1920             "bootid": nodeinfo.get('bootid', None),
1921             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1922             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1923             }
1924         else:
1925           live_data[name] = {}
1926     else:
1927       live_data = dict.fromkeys(nodenames, {})
1928
1929     node_to_primary = dict([(name, set()) for name in nodenames])
1930     node_to_secondary = dict([(name, set()) for name in nodenames])
1931
1932     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1933                              "sinst_cnt", "sinst_list"))
1934     if inst_fields & frozenset(self.op.output_fields):
1935       instancelist = self.cfg.GetInstanceList()
1936
1937       for instance_name in instancelist:
1938         inst = self.cfg.GetInstanceInfo(instance_name)
1939         if inst.primary_node in node_to_primary:
1940           node_to_primary[inst.primary_node].add(inst.name)
1941         for secnode in inst.secondary_nodes:
1942           if secnode in node_to_secondary:
1943             node_to_secondary[secnode].add(inst.name)
1944
1945     master_node = self.cfg.GetMasterNode()
1946
1947     # end data gathering
1948
1949     output = []
1950     for node in nodelist:
1951       node_output = []
1952       for field in self.op.output_fields:
1953         if field == "name":
1954           val = node.name
1955         elif field == "pinst_list":
1956           val = list(node_to_primary[node.name])
1957         elif field == "sinst_list":
1958           val = list(node_to_secondary[node.name])
1959         elif field == "pinst_cnt":
1960           val = len(node_to_primary[node.name])
1961         elif field == "sinst_cnt":
1962           val = len(node_to_secondary[node.name])
1963         elif field == "pip":
1964           val = node.primary_ip
1965         elif field == "sip":
1966           val = node.secondary_ip
1967         elif field == "tags":
1968           val = list(node.GetTags())
1969         elif field == "serial_no":
1970           val = node.serial_no
1971         elif field == "master_candidate":
1972           val = node.master_candidate
1973         elif field == "master":
1974           val = node.name == master_node
1975         elif field == "offline":
1976           val = node.offline
1977         elif field == "drained":
1978           val = node.drained
1979         elif self._FIELDS_DYNAMIC.Matches(field):
1980           val = live_data[node.name].get(field, None)
1981         else:
1982           raise errors.ParameterError(field)
1983         node_output.append(val)
1984       output.append(node_output)
1985
1986     return output
1987
1988
1989 class LUQueryNodeVolumes(NoHooksLU):
1990   """Logical unit for getting volumes on node(s).
1991
1992   """
1993   _OP_REQP = ["nodes", "output_fields"]
1994   REQ_BGL = False
1995   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1996   _FIELDS_STATIC = utils.FieldSet("node")
1997
1998   def ExpandNames(self):
1999     _CheckOutputFields(static=self._FIELDS_STATIC,
2000                        dynamic=self._FIELDS_DYNAMIC,
2001                        selected=self.op.output_fields)
2002
2003     self.needed_locks = {}
2004     self.share_locks[locking.LEVEL_NODE] = 1
2005     if not self.op.nodes:
2006       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2007     else:
2008       self.needed_locks[locking.LEVEL_NODE] = \
2009         _GetWantedNodes(self, self.op.nodes)
2010
2011   def CheckPrereq(self):
2012     """Check prerequisites.
2013
2014     This checks that the fields required are valid output fields.
2015
2016     """
2017     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2018
2019   def Exec(self, feedback_fn):
2020     """Computes the list of nodes and their attributes.
2021
2022     """
2023     nodenames = self.nodes
2024     volumes = self.rpc.call_node_volumes(nodenames)
2025
2026     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2027              in self.cfg.GetInstanceList()]
2028
2029     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2030
2031     output = []
2032     for node in nodenames:
2033       if node not in volumes or volumes[node].failed or not volumes[node].data:
2034         continue
2035
2036       node_vols = volumes[node].data[:]
2037       node_vols.sort(key=lambda vol: vol['dev'])
2038
2039       for vol in node_vols:
2040         node_output = []
2041         for field in self.op.output_fields:
2042           if field == "node":
2043             val = node
2044           elif field == "phys":
2045             val = vol['dev']
2046           elif field == "vg":
2047             val = vol['vg']
2048           elif field == "name":
2049             val = vol['name']
2050           elif field == "size":
2051             val = int(float(vol['size']))
2052           elif field == "instance":
2053             for inst in ilist:
2054               if node not in lv_by_node[inst]:
2055                 continue
2056               if vol['name'] in lv_by_node[inst][node]:
2057                 val = inst.name
2058                 break
2059             else:
2060               val = '-'
2061           else:
2062             raise errors.ParameterError(field)
2063           node_output.append(str(val))
2064
2065         output.append(node_output)
2066
2067     return output
2068
2069
2070 class LUAddNode(LogicalUnit):
2071   """Logical unit for adding node to the cluster.
2072
2073   """
2074   HPATH = "node-add"
2075   HTYPE = constants.HTYPE_NODE
2076   _OP_REQP = ["node_name"]
2077
2078   def BuildHooksEnv(self):
2079     """Build hooks env.
2080
2081     This will run on all nodes before, and on all nodes + the new node after.
2082
2083     """
2084     env = {
2085       "OP_TARGET": self.op.node_name,
2086       "NODE_NAME": self.op.node_name,
2087       "NODE_PIP": self.op.primary_ip,
2088       "NODE_SIP": self.op.secondary_ip,
2089       }
2090     nodes_0 = self.cfg.GetNodeList()
2091     nodes_1 = nodes_0 + [self.op.node_name, ]
2092     return env, nodes_0, nodes_1
2093
2094   def CheckPrereq(self):
2095     """Check prerequisites.
2096
2097     This checks:
2098      - the new node is not already in the config
2099      - it is resolvable
2100      - its parameters (single/dual homed) matches the cluster
2101
2102     Any errors are signalled by raising errors.OpPrereqError.
2103
2104     """
2105     node_name = self.op.node_name
2106     cfg = self.cfg
2107
2108     dns_data = utils.HostInfo(node_name)
2109
2110     node = dns_data.name
2111     primary_ip = self.op.primary_ip = dns_data.ip
2112     secondary_ip = getattr(self.op, "secondary_ip", None)
2113     if secondary_ip is None:
2114       secondary_ip = primary_ip
2115     if not utils.IsValidIP(secondary_ip):
2116       raise errors.OpPrereqError("Invalid secondary IP given")
2117     self.op.secondary_ip = secondary_ip
2118
2119     node_list = cfg.GetNodeList()
2120     if not self.op.readd and node in node_list:
2121       raise errors.OpPrereqError("Node %s is already in the configuration" %
2122                                  node)
2123     elif self.op.readd and node not in node_list:
2124       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2125
2126     for existing_node_name in node_list:
2127       existing_node = cfg.GetNodeInfo(existing_node_name)
2128
2129       if self.op.readd and node == existing_node_name:
2130         if (existing_node.primary_ip != primary_ip or
2131             existing_node.secondary_ip != secondary_ip):
2132           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2133                                      " address configuration as before")
2134         continue
2135
2136       if (existing_node.primary_ip == primary_ip or
2137           existing_node.secondary_ip == primary_ip or
2138           existing_node.primary_ip == secondary_ip or
2139           existing_node.secondary_ip == secondary_ip):
2140         raise errors.OpPrereqError("New node ip address(es) conflict with"
2141                                    " existing node %s" % existing_node.name)
2142
2143     # check that the type of the node (single versus dual homed) is the
2144     # same as for the master
2145     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2146     master_singlehomed = myself.secondary_ip == myself.primary_ip
2147     newbie_singlehomed = secondary_ip == primary_ip
2148     if master_singlehomed != newbie_singlehomed:
2149       if master_singlehomed:
2150         raise errors.OpPrereqError("The master has no private ip but the"
2151                                    " new node has one")
2152       else:
2153         raise errors.OpPrereqError("The master has a private ip but the"
2154                                    " new node doesn't have one")
2155
2156     # checks reachablity
2157     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2158       raise errors.OpPrereqError("Node not reachable by ping")
2159
2160     if not newbie_singlehomed:
2161       # check reachability from my secondary ip to newbie's secondary ip
2162       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2163                            source=myself.secondary_ip):
2164         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2165                                    " based ping to noded port")
2166
2167     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2168     mc_now, _ = self.cfg.GetMasterCandidateStats()
2169     master_candidate = mc_now < cp_size
2170
2171     self.new_node = objects.Node(name=node,
2172                                  primary_ip=primary_ip,
2173                                  secondary_ip=secondary_ip,
2174                                  master_candidate=master_candidate,
2175                                  offline=False, drained=False)
2176
2177   def Exec(self, feedback_fn):
2178     """Adds the new node to the cluster.
2179
2180     """
2181     new_node = self.new_node
2182     node = new_node.name
2183
2184     # check connectivity
2185     result = self.rpc.call_version([node])[node]
2186     result.Raise()
2187     if result.data:
2188       if constants.PROTOCOL_VERSION == result.data:
2189         logging.info("Communication to node %s fine, sw version %s match",
2190                      node, result.data)
2191       else:
2192         raise errors.OpExecError("Version mismatch master version %s,"
2193                                  " node version %s" %
2194                                  (constants.PROTOCOL_VERSION, result.data))
2195     else:
2196       raise errors.OpExecError("Cannot get version from the new node")
2197
2198     # setup ssh on node
2199     logging.info("Copy ssh key to node %s", node)
2200     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2201     keyarray = []
2202     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2203                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2204                 priv_key, pub_key]
2205
2206     for i in keyfiles:
2207       f = open(i, 'r')
2208       try:
2209         keyarray.append(f.read())
2210       finally:
2211         f.close()
2212
2213     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2214                                     keyarray[2],
2215                                     keyarray[3], keyarray[4], keyarray[5])
2216
2217     msg = result.RemoteFailMsg()
2218     if msg:
2219       raise errors.OpExecError("Cannot transfer ssh keys to the"
2220                                " new node: %s" % msg)
2221
2222     # Add node to our /etc/hosts, and add key to known_hosts
2223     utils.AddHostToEtcHosts(new_node.name)
2224
2225     if new_node.secondary_ip != new_node.primary_ip:
2226       result = self.rpc.call_node_has_ip_address(new_node.name,
2227                                                  new_node.secondary_ip)
2228       if result.failed or not result.data:
2229         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2230                                  " you gave (%s). Please fix and re-run this"
2231                                  " command." % new_node.secondary_ip)
2232
2233     node_verify_list = [self.cfg.GetMasterNode()]
2234     node_verify_param = {
2235       'nodelist': [node],
2236       # TODO: do a node-net-test as well?
2237     }
2238
2239     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2240                                        self.cfg.GetClusterName())
2241     for verifier in node_verify_list:
2242       if result[verifier].failed or not result[verifier].data:
2243         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2244                                  " for remote verification" % verifier)
2245       if result[verifier].data['nodelist']:
2246         for failed in result[verifier].data['nodelist']:
2247           feedback_fn("ssh/hostname verification failed %s -> %s" %
2248                       (verifier, result[verifier].data['nodelist'][failed]))
2249         raise errors.OpExecError("ssh/hostname verification failed.")
2250
2251     # Distribute updated /etc/hosts and known_hosts to all nodes,
2252     # including the node just added
2253     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2254     dist_nodes = self.cfg.GetNodeList()
2255     if not self.op.readd:
2256       dist_nodes.append(node)
2257     if myself.name in dist_nodes:
2258       dist_nodes.remove(myself.name)
2259
2260     logging.debug("Copying hosts and known_hosts to all nodes")
2261     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2262       result = self.rpc.call_upload_file(dist_nodes, fname)
2263       for to_node, to_result in result.iteritems():
2264         if to_result.failed or not to_result.data:
2265           logging.error("Copy of file %s to node %s failed", fname, to_node)
2266
2267     if self.op.readd:
2268       self.context.ReaddNode(new_node)
2269     else:
2270       self.context.AddNode(new_node)
2271
2272
2273 class LUSetNodeParams(LogicalUnit):
2274   """Modifies the parameters of a node.
2275
2276   """
2277   HPATH = "node-modify"
2278   HTYPE = constants.HTYPE_NODE
2279   _OP_REQP = ["node_name"]
2280   REQ_BGL = False
2281
2282   def CheckArguments(self):
2283     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2284     if node_name is None:
2285       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2286     self.op.node_name = node_name
2287     _CheckBooleanOpField(self.op, 'master_candidate')
2288     _CheckBooleanOpField(self.op, 'offline')
2289     _CheckBooleanOpField(self.op, 'drained')
2290     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2291     if all_mods.count(None) == 3:
2292       raise errors.OpPrereqError("Please pass at least one modification")
2293     if all_mods.count(True) > 1:
2294       raise errors.OpPrereqError("Can't set the node into more than one"
2295                                  " state at the same time")
2296
2297   def ExpandNames(self):
2298     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2299
2300   def BuildHooksEnv(self):
2301     """Build hooks env.
2302
2303     This runs on the master node.
2304
2305     """
2306     env = {
2307       "OP_TARGET": self.op.node_name,
2308       "MASTER_CANDIDATE": str(self.op.master_candidate),
2309       "OFFLINE": str(self.op.offline),
2310       "DRAINED": str(self.op.drained),
2311       }
2312     nl = [self.cfg.GetMasterNode(),
2313           self.op.node_name]
2314     return env, nl, nl
2315
2316   def CheckPrereq(self):
2317     """Check prerequisites.
2318
2319     This only checks the instance list against the existing names.
2320
2321     """
2322     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2323
2324     if ((self.op.master_candidate == False or self.op.offline == True or
2325          self.op.drained == True) and node.master_candidate):
2326       # we will demote the node from master_candidate
2327       if self.op.node_name == self.cfg.GetMasterNode():
2328         raise errors.OpPrereqError("The master node has to be a"
2329                                    " master candidate, online and not drained")
2330       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2331       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2332       if num_candidates <= cp_size:
2333         msg = ("Not enough master candidates (desired"
2334                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2335         if self.op.force:
2336           self.LogWarning(msg)
2337         else:
2338           raise errors.OpPrereqError(msg)
2339
2340     if (self.op.master_candidate == True and
2341         ((node.offline and not self.op.offline == False) or
2342          (node.drained and not self.op.drained == False))):
2343       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2344                                  " to master_candidate" % node.name)
2345
2346     return
2347
2348   def Exec(self, feedback_fn):
2349     """Modifies a node.
2350
2351     """
2352     node = self.node
2353
2354     result = []
2355     changed_mc = False
2356
2357     if self.op.offline is not None:
2358       node.offline = self.op.offline
2359       result.append(("offline", str(self.op.offline)))
2360       if self.op.offline == True:
2361         if node.master_candidate:
2362           node.master_candidate = False
2363           changed_mc = True
2364           result.append(("master_candidate", "auto-demotion due to offline"))
2365         if node.drained:
2366           node.drained = False
2367           result.append(("drained", "clear drained status due to offline"))
2368
2369     if self.op.master_candidate is not None:
2370       node.master_candidate = self.op.master_candidate
2371       changed_mc = True
2372       result.append(("master_candidate", str(self.op.master_candidate)))
2373       if self.op.master_candidate == False:
2374         rrc = self.rpc.call_node_demote_from_mc(node.name)
2375         msg = rrc.RemoteFailMsg()
2376         if msg:
2377           self.LogWarning("Node failed to demote itself: %s" % msg)
2378
2379     if self.op.drained is not None:
2380       node.drained = self.op.drained
2381       result.append(("drained", str(self.op.drained)))
2382       if self.op.drained == True:
2383         if node.master_candidate:
2384           node.master_candidate = False
2385           changed_mc = True
2386           result.append(("master_candidate", "auto-demotion due to drain"))
2387         if node.offline:
2388           node.offline = False
2389           result.append(("offline", "clear offline status due to drain"))
2390
2391     # this will trigger configuration file update, if needed
2392     self.cfg.Update(node)
2393     # this will trigger job queue propagation or cleanup
2394     if changed_mc:
2395       self.context.ReaddNode(node)
2396
2397     return result
2398
2399
2400 class LUQueryClusterInfo(NoHooksLU):
2401   """Query cluster configuration.
2402
2403   """
2404   _OP_REQP = []
2405   REQ_BGL = False
2406
2407   def ExpandNames(self):
2408     self.needed_locks = {}
2409
2410   def CheckPrereq(self):
2411     """No prerequsites needed for this LU.
2412
2413     """
2414     pass
2415
2416   def Exec(self, feedback_fn):
2417     """Return cluster config.
2418
2419     """
2420     cluster = self.cfg.GetClusterInfo()
2421     result = {
2422       "software_version": constants.RELEASE_VERSION,
2423       "protocol_version": constants.PROTOCOL_VERSION,
2424       "config_version": constants.CONFIG_VERSION,
2425       "os_api_version": constants.OS_API_VERSION,
2426       "export_version": constants.EXPORT_VERSION,
2427       "architecture": (platform.architecture()[0], platform.machine()),
2428       "name": cluster.cluster_name,
2429       "master": cluster.master_node,
2430       "default_hypervisor": cluster.default_hypervisor,
2431       "enabled_hypervisors": cluster.enabled_hypervisors,
2432       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2433                         for hypervisor in cluster.enabled_hypervisors]),
2434       "beparams": cluster.beparams,
2435       "candidate_pool_size": cluster.candidate_pool_size,
2436       "default_bridge": cluster.default_bridge,
2437       "master_netdev": cluster.master_netdev,
2438       "volume_group_name": cluster.volume_group_name,
2439       "file_storage_dir": cluster.file_storage_dir,
2440       }
2441
2442     return result
2443
2444
2445 class LUQueryConfigValues(NoHooksLU):
2446   """Return configuration values.
2447
2448   """
2449   _OP_REQP = []
2450   REQ_BGL = False
2451   _FIELDS_DYNAMIC = utils.FieldSet()
2452   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2453
2454   def ExpandNames(self):
2455     self.needed_locks = {}
2456
2457     _CheckOutputFields(static=self._FIELDS_STATIC,
2458                        dynamic=self._FIELDS_DYNAMIC,
2459                        selected=self.op.output_fields)
2460
2461   def CheckPrereq(self):
2462     """No prerequisites.
2463
2464     """
2465     pass
2466
2467   def Exec(self, feedback_fn):
2468     """Dump a representation of the cluster config to the standard output.
2469
2470     """
2471     values = []
2472     for field in self.op.output_fields:
2473       if field == "cluster_name":
2474         entry = self.cfg.GetClusterName()
2475       elif field == "master_node":
2476         entry = self.cfg.GetMasterNode()
2477       elif field == "drain_flag":
2478         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2479       else:
2480         raise errors.ParameterError(field)
2481       values.append(entry)
2482     return values
2483
2484
2485 class LUActivateInstanceDisks(NoHooksLU):
2486   """Bring up an instance's disks.
2487
2488   """
2489   _OP_REQP = ["instance_name"]
2490   REQ_BGL = False
2491
2492   def ExpandNames(self):
2493     self._ExpandAndLockInstance()
2494     self.needed_locks[locking.LEVEL_NODE] = []
2495     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2496
2497   def DeclareLocks(self, level):
2498     if level == locking.LEVEL_NODE:
2499       self._LockInstancesNodes()
2500
2501   def CheckPrereq(self):
2502     """Check prerequisites.
2503
2504     This checks that the instance is in the cluster.
2505
2506     """
2507     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2508     assert self.instance is not None, \
2509       "Cannot retrieve locked instance %s" % self.op.instance_name
2510     _CheckNodeOnline(self, self.instance.primary_node)
2511
2512   def Exec(self, feedback_fn):
2513     """Activate the disks.
2514
2515     """
2516     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2517     if not disks_ok:
2518       raise errors.OpExecError("Cannot activate block devices")
2519
2520     return disks_info
2521
2522
2523 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2524   """Prepare the block devices for an instance.
2525
2526   This sets up the block devices on all nodes.
2527
2528   @type lu: L{LogicalUnit}
2529   @param lu: the logical unit on whose behalf we execute
2530   @type instance: L{objects.Instance}
2531   @param instance: the instance for whose disks we assemble
2532   @type ignore_secondaries: boolean
2533   @param ignore_secondaries: if true, errors on secondary nodes
2534       won't result in an error return from the function
2535   @return: False if the operation failed, otherwise a list of
2536       (host, instance_visible_name, node_visible_name)
2537       with the mapping from node devices to instance devices
2538
2539   """
2540   device_info = []
2541   disks_ok = True
2542   iname = instance.name
2543   # With the two passes mechanism we try to reduce the window of
2544   # opportunity for the race condition of switching DRBD to primary
2545   # before handshaking occured, but we do not eliminate it
2546
2547   # The proper fix would be to wait (with some limits) until the
2548   # connection has been made and drbd transitions from WFConnection
2549   # into any other network-connected state (Connected, SyncTarget,
2550   # SyncSource, etc.)
2551
2552   # 1st pass, assemble on all nodes in secondary mode
2553   for inst_disk in instance.disks:
2554     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2555       lu.cfg.SetDiskID(node_disk, node)
2556       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2557       msg = result.RemoteFailMsg()
2558       if msg:
2559         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2560                            " (is_primary=False, pass=1): %s",
2561                            inst_disk.iv_name, node, msg)
2562         if not ignore_secondaries:
2563           disks_ok = False
2564
2565   # FIXME: race condition on drbd migration to primary
2566
2567   # 2nd pass, do only the primary node
2568   for inst_disk in instance.disks:
2569     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2570       if node != instance.primary_node:
2571         continue
2572       lu.cfg.SetDiskID(node_disk, node)
2573       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2574       msg = result.RemoteFailMsg()
2575       if msg:
2576         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2577                            " (is_primary=True, pass=2): %s",
2578                            inst_disk.iv_name, node, msg)
2579         disks_ok = False
2580     device_info.append((instance.primary_node, inst_disk.iv_name,
2581                         result.payload))
2582
2583   # leave the disks configured for the primary node
2584   # this is a workaround that would be fixed better by
2585   # improving the logical/physical id handling
2586   for disk in instance.disks:
2587     lu.cfg.SetDiskID(disk, instance.primary_node)
2588
2589   return disks_ok, device_info
2590
2591
2592 def _StartInstanceDisks(lu, instance, force):
2593   """Start the disks of an instance.
2594
2595   """
2596   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2597                                            ignore_secondaries=force)
2598   if not disks_ok:
2599     _ShutdownInstanceDisks(lu, instance)
2600     if force is not None and not force:
2601       lu.proc.LogWarning("", hint="If the message above refers to a"
2602                          " secondary node,"
2603                          " you can retry the operation using '--force'.")
2604     raise errors.OpExecError("Disk consistency error")
2605
2606
2607 class LUDeactivateInstanceDisks(NoHooksLU):
2608   """Shutdown an instance's disks.
2609
2610   """
2611   _OP_REQP = ["instance_name"]
2612   REQ_BGL = False
2613
2614   def ExpandNames(self):
2615     self._ExpandAndLockInstance()
2616     self.needed_locks[locking.LEVEL_NODE] = []
2617     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2618
2619   def DeclareLocks(self, level):
2620     if level == locking.LEVEL_NODE:
2621       self._LockInstancesNodes()
2622
2623   def CheckPrereq(self):
2624     """Check prerequisites.
2625
2626     This checks that the instance is in the cluster.
2627
2628     """
2629     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2630     assert self.instance is not None, \
2631       "Cannot retrieve locked instance %s" % self.op.instance_name
2632
2633   def Exec(self, feedback_fn):
2634     """Deactivate the disks
2635
2636     """
2637     instance = self.instance
2638     _SafeShutdownInstanceDisks(self, instance)
2639
2640
2641 def _SafeShutdownInstanceDisks(lu, instance):
2642   """Shutdown block devices of an instance.
2643
2644   This function checks if an instance is running, before calling
2645   _ShutdownInstanceDisks.
2646
2647   """
2648   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2649                                       [instance.hypervisor])
2650   ins_l = ins_l[instance.primary_node]
2651   if ins_l.failed or not isinstance(ins_l.data, list):
2652     raise errors.OpExecError("Can't contact node '%s'" %
2653                              instance.primary_node)
2654
2655   if instance.name in ins_l.data:
2656     raise errors.OpExecError("Instance is running, can't shutdown"
2657                              " block devices.")
2658
2659   _ShutdownInstanceDisks(lu, instance)
2660
2661
2662 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2663   """Shutdown block devices of an instance.
2664
2665   This does the shutdown on all nodes of the instance.
2666
2667   If the ignore_primary is false, errors on the primary node are
2668   ignored.
2669
2670   """
2671   all_result = True
2672   for disk in instance.disks:
2673     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2674       lu.cfg.SetDiskID(top_disk, node)
2675       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2676       msg = result.RemoteFailMsg()
2677       if msg:
2678         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2679                       disk.iv_name, node, msg)
2680         if not ignore_primary or node != instance.primary_node:
2681           all_result = False
2682   return all_result
2683
2684
2685 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2686   """Checks if a node has enough free memory.
2687
2688   This function check if a given node has the needed amount of free
2689   memory. In case the node has less memory or we cannot get the
2690   information from the node, this function raise an OpPrereqError
2691   exception.
2692
2693   @type lu: C{LogicalUnit}
2694   @param lu: a logical unit from which we get configuration data
2695   @type node: C{str}
2696   @param node: the node to check
2697   @type reason: C{str}
2698   @param reason: string to use in the error message
2699   @type requested: C{int}
2700   @param requested: the amount of memory in MiB to check for
2701   @type hypervisor_name: C{str}
2702   @param hypervisor_name: the hypervisor to ask for memory stats
2703   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2704       we cannot check the node
2705
2706   """
2707   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2708   nodeinfo[node].Raise()
2709   free_mem = nodeinfo[node].data.get('memory_free')
2710   if not isinstance(free_mem, int):
2711     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2712                              " was '%s'" % (node, free_mem))
2713   if requested > free_mem:
2714     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2715                              " needed %s MiB, available %s MiB" %
2716                              (node, reason, requested, free_mem))
2717
2718
2719 class LUStartupInstance(LogicalUnit):
2720   """Starts an instance.
2721
2722   """
2723   HPATH = "instance-start"
2724   HTYPE = constants.HTYPE_INSTANCE
2725   _OP_REQP = ["instance_name", "force"]
2726   REQ_BGL = False
2727
2728   def ExpandNames(self):
2729     self._ExpandAndLockInstance()
2730
2731   def BuildHooksEnv(self):
2732     """Build hooks env.
2733
2734     This runs on master, primary and secondary nodes of the instance.
2735
2736     """
2737     env = {
2738       "FORCE": self.op.force,
2739       }
2740     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2741     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2742     return env, nl, nl
2743
2744   def CheckPrereq(self):
2745     """Check prerequisites.
2746
2747     This checks that the instance is in the cluster.
2748
2749     """
2750     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2751     assert self.instance is not None, \
2752       "Cannot retrieve locked instance %s" % self.op.instance_name
2753
2754     # extra beparams
2755     self.beparams = getattr(self.op, "beparams", {})
2756     if self.beparams:
2757       if not isinstance(self.beparams, dict):
2758         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2759                                    " dict" % (type(self.beparams), ))
2760       # fill the beparams dict
2761       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2762       self.op.beparams = self.beparams
2763
2764     # extra hvparams
2765     self.hvparams = getattr(self.op, "hvparams", {})
2766     if self.hvparams:
2767       if not isinstance(self.hvparams, dict):
2768         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2769                                    " dict" % (type(self.hvparams), ))
2770
2771       # check hypervisor parameter syntax (locally)
2772       cluster = self.cfg.GetClusterInfo()
2773       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2774       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2775                                     instance.hvparams)
2776       filled_hvp.update(self.hvparams)
2777       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2778       hv_type.CheckParameterSyntax(filled_hvp)
2779       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2780       self.op.hvparams = self.hvparams
2781
2782     _CheckNodeOnline(self, instance.primary_node)
2783
2784     bep = self.cfg.GetClusterInfo().FillBE(instance)
2785     # check bridges existance
2786     _CheckInstanceBridgesExist(self, instance)
2787
2788     remote_info = self.rpc.call_instance_info(instance.primary_node,
2789                                               instance.name,
2790                                               instance.hypervisor)
2791     remote_info.Raise()
2792     if not remote_info.data:
2793       _CheckNodeFreeMemory(self, instance.primary_node,
2794                            "starting instance %s" % instance.name,
2795                            bep[constants.BE_MEMORY], instance.hypervisor)
2796
2797   def Exec(self, feedback_fn):
2798     """Start the instance.
2799
2800     """
2801     instance = self.instance
2802     force = self.op.force
2803
2804     self.cfg.MarkInstanceUp(instance.name)
2805
2806     node_current = instance.primary_node
2807
2808     _StartInstanceDisks(self, instance, force)
2809
2810     result = self.rpc.call_instance_start(node_current, instance,
2811                                           self.hvparams, self.beparams)
2812     msg = result.RemoteFailMsg()
2813     if msg:
2814       _ShutdownInstanceDisks(self, instance)
2815       raise errors.OpExecError("Could not start instance: %s" % msg)
2816
2817
2818 class LURebootInstance(LogicalUnit):
2819   """Reboot an instance.
2820
2821   """
2822   HPATH = "instance-reboot"
2823   HTYPE = constants.HTYPE_INSTANCE
2824   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2825   REQ_BGL = False
2826
2827   def ExpandNames(self):
2828     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2829                                    constants.INSTANCE_REBOOT_HARD,
2830                                    constants.INSTANCE_REBOOT_FULL]:
2831       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2832                                   (constants.INSTANCE_REBOOT_SOFT,
2833                                    constants.INSTANCE_REBOOT_HARD,
2834                                    constants.INSTANCE_REBOOT_FULL))
2835     self._ExpandAndLockInstance()
2836
2837   def BuildHooksEnv(self):
2838     """Build hooks env.
2839
2840     This runs on master, primary and secondary nodes of the instance.
2841
2842     """
2843     env = {
2844       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2845       "REBOOT_TYPE": self.op.reboot_type,
2846       }
2847     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2848     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2849     return env, nl, nl
2850
2851   def CheckPrereq(self):
2852     """Check prerequisites.
2853
2854     This checks that the instance is in the cluster.
2855
2856     """
2857     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2858     assert self.instance is not None, \
2859       "Cannot retrieve locked instance %s" % self.op.instance_name
2860
2861     _CheckNodeOnline(self, instance.primary_node)
2862
2863     # check bridges existance
2864     _CheckInstanceBridgesExist(self, instance)
2865
2866   def Exec(self, feedback_fn):
2867     """Reboot the instance.
2868
2869     """
2870     instance = self.instance
2871     ignore_secondaries = self.op.ignore_secondaries
2872     reboot_type = self.op.reboot_type
2873
2874     node_current = instance.primary_node
2875
2876     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2877                        constants.INSTANCE_REBOOT_HARD]:
2878       for disk in instance.disks:
2879         self.cfg.SetDiskID(disk, node_current)
2880       result = self.rpc.call_instance_reboot(node_current, instance,
2881                                              reboot_type)
2882       msg = result.RemoteFailMsg()
2883       if msg:
2884         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2885     else:
2886       result = self.rpc.call_instance_shutdown(node_current, instance)
2887       msg = result.RemoteFailMsg()
2888       if msg:
2889         raise errors.OpExecError("Could not shutdown instance for"
2890                                  " full reboot: %s" % msg)
2891       _ShutdownInstanceDisks(self, instance)
2892       _StartInstanceDisks(self, instance, ignore_secondaries)
2893       result = self.rpc.call_instance_start(node_current, instance, None, None)
2894       msg = result.RemoteFailMsg()
2895       if msg:
2896         _ShutdownInstanceDisks(self, instance)
2897         raise errors.OpExecError("Could not start instance for"
2898                                  " full reboot: %s" % msg)
2899
2900     self.cfg.MarkInstanceUp(instance.name)
2901
2902
2903 class LUShutdownInstance(LogicalUnit):
2904   """Shutdown an instance.
2905
2906   """
2907   HPATH = "instance-stop"
2908   HTYPE = constants.HTYPE_INSTANCE
2909   _OP_REQP = ["instance_name"]
2910   REQ_BGL = False
2911
2912   def ExpandNames(self):
2913     self._ExpandAndLockInstance()
2914
2915   def BuildHooksEnv(self):
2916     """Build hooks env.
2917
2918     This runs on master, primary and secondary nodes of the instance.
2919
2920     """
2921     env = _BuildInstanceHookEnvByObject(self, self.instance)
2922     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2923     return env, nl, nl
2924
2925   def CheckPrereq(self):
2926     """Check prerequisites.
2927
2928     This checks that the instance is in the cluster.
2929
2930     """
2931     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2932     assert self.instance is not None, \
2933       "Cannot retrieve locked instance %s" % self.op.instance_name
2934     _CheckNodeOnline(self, self.instance.primary_node)
2935
2936   def Exec(self, feedback_fn):
2937     """Shutdown the instance.
2938
2939     """
2940     instance = self.instance
2941     node_current = instance.primary_node
2942     self.cfg.MarkInstanceDown(instance.name)
2943     result = self.rpc.call_instance_shutdown(node_current, instance)
2944     msg = result.RemoteFailMsg()
2945     if msg:
2946       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2947
2948     _ShutdownInstanceDisks(self, instance)
2949
2950
2951 class LUReinstallInstance(LogicalUnit):
2952   """Reinstall an instance.
2953
2954   """
2955   HPATH = "instance-reinstall"
2956   HTYPE = constants.HTYPE_INSTANCE
2957   _OP_REQP = ["instance_name"]
2958   REQ_BGL = False
2959
2960   def ExpandNames(self):
2961     self._ExpandAndLockInstance()
2962
2963   def BuildHooksEnv(self):
2964     """Build hooks env.
2965
2966     This runs on master, primary and secondary nodes of the instance.
2967
2968     """
2969     env = _BuildInstanceHookEnvByObject(self, self.instance)
2970     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2971     return env, nl, nl
2972
2973   def CheckPrereq(self):
2974     """Check prerequisites.
2975
2976     This checks that the instance is in the cluster and is not running.
2977
2978     """
2979     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2980     assert instance is not None, \
2981       "Cannot retrieve locked instance %s" % self.op.instance_name
2982     _CheckNodeOnline(self, instance.primary_node)
2983
2984     if instance.disk_template == constants.DT_DISKLESS:
2985       raise errors.OpPrereqError("Instance '%s' has no disks" %
2986                                  self.op.instance_name)
2987     if instance.admin_up:
2988       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2989                                  self.op.instance_name)
2990     remote_info = self.rpc.call_instance_info(instance.primary_node,
2991                                               instance.name,
2992                                               instance.hypervisor)
2993     remote_info.Raise()
2994     if remote_info.data:
2995       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2996                                  (self.op.instance_name,
2997                                   instance.primary_node))
2998
2999     self.op.os_type = getattr(self.op, "os_type", None)
3000     if self.op.os_type is not None:
3001       # OS verification
3002       pnode = self.cfg.GetNodeInfo(
3003         self.cfg.ExpandNodeName(instance.primary_node))
3004       if pnode is None:
3005         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3006                                    self.op.pnode)
3007       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3008       result.Raise()
3009       if not isinstance(result.data, objects.OS):
3010         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3011                                    " primary node"  % self.op.os_type)
3012
3013     self.instance = instance
3014
3015   def Exec(self, feedback_fn):
3016     """Reinstall the instance.
3017
3018     """
3019     inst = self.instance
3020
3021     if self.op.os_type is not None:
3022       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3023       inst.os = self.op.os_type
3024       self.cfg.Update(inst)
3025
3026     _StartInstanceDisks(self, inst, None)
3027     try:
3028       feedback_fn("Running the instance OS create scripts...")
3029       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3030       msg = result.RemoteFailMsg()
3031       if msg:
3032         raise errors.OpExecError("Could not install OS for instance %s"
3033                                  " on node %s: %s" %
3034                                  (inst.name, inst.primary_node, msg))
3035     finally:
3036       _ShutdownInstanceDisks(self, inst)
3037
3038
3039 class LURenameInstance(LogicalUnit):
3040   """Rename an instance.
3041
3042   """
3043   HPATH = "instance-rename"
3044   HTYPE = constants.HTYPE_INSTANCE
3045   _OP_REQP = ["instance_name", "new_name"]
3046
3047   def BuildHooksEnv(self):
3048     """Build hooks env.
3049
3050     This runs on master, primary and secondary nodes of the instance.
3051
3052     """
3053     env = _BuildInstanceHookEnvByObject(self, self.instance)
3054     env["INSTANCE_NEW_NAME"] = self.op.new_name
3055     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3056     return env, nl, nl
3057
3058   def CheckPrereq(self):
3059     """Check prerequisites.
3060
3061     This checks that the instance is in the cluster and is not running.
3062
3063     """
3064     instance = self.cfg.GetInstanceInfo(
3065       self.cfg.ExpandInstanceName(self.op.instance_name))
3066     if instance is None:
3067       raise errors.OpPrereqError("Instance '%s' not known" %
3068                                  self.op.instance_name)
3069     _CheckNodeOnline(self, instance.primary_node)
3070
3071     if instance.admin_up:
3072       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3073                                  self.op.instance_name)
3074     remote_info = self.rpc.call_instance_info(instance.primary_node,
3075                                               instance.name,
3076                                               instance.hypervisor)
3077     remote_info.Raise()
3078     if remote_info.data:
3079       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3080                                  (self.op.instance_name,
3081                                   instance.primary_node))
3082     self.instance = instance
3083
3084     # new name verification
3085     name_info = utils.HostInfo(self.op.new_name)
3086
3087     self.op.new_name = new_name = name_info.name
3088     instance_list = self.cfg.GetInstanceList()
3089     if new_name in instance_list:
3090       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3091                                  new_name)
3092
3093     if not getattr(self.op, "ignore_ip", False):
3094       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3095         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3096                                    (name_info.ip, new_name))
3097
3098
3099   def Exec(self, feedback_fn):
3100     """Reinstall the instance.
3101
3102     """
3103     inst = self.instance
3104     old_name = inst.name
3105
3106     if inst.disk_template == constants.DT_FILE:
3107       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3108
3109     self.cfg.RenameInstance(inst.name, self.op.new_name)
3110     # Change the instance lock. This is definitely safe while we hold the BGL
3111     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3112     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3113
3114     # re-read the instance from the configuration after rename
3115     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3116
3117     if inst.disk_template == constants.DT_FILE:
3118       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3119       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3120                                                      old_file_storage_dir,
3121                                                      new_file_storage_dir)
3122       result.Raise()
3123       if not result.data:
3124         raise errors.OpExecError("Could not connect to node '%s' to rename"
3125                                  " directory '%s' to '%s' (but the instance"
3126                                  " has been renamed in Ganeti)" % (
3127                                  inst.primary_node, old_file_storage_dir,
3128                                  new_file_storage_dir))
3129
3130       if not result.data[0]:
3131         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3132                                  " (but the instance has been renamed in"
3133                                  " Ganeti)" % (old_file_storage_dir,
3134                                                new_file_storage_dir))
3135
3136     _StartInstanceDisks(self, inst, None)
3137     try:
3138       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3139                                                  old_name)
3140       msg = result.RemoteFailMsg()
3141       if msg:
3142         msg = ("Could not run OS rename script for instance %s on node %s"
3143                " (but the instance has been renamed in Ganeti): %s" %
3144                (inst.name, inst.primary_node, msg))
3145         self.proc.LogWarning(msg)
3146     finally:
3147       _ShutdownInstanceDisks(self, inst)
3148
3149
3150 class LURemoveInstance(LogicalUnit):
3151   """Remove an instance.
3152
3153   """
3154   HPATH = "instance-remove"
3155   HTYPE = constants.HTYPE_INSTANCE
3156   _OP_REQP = ["instance_name", "ignore_failures"]
3157   REQ_BGL = False
3158
3159   def ExpandNames(self):
3160     self._ExpandAndLockInstance()
3161     self.needed_locks[locking.LEVEL_NODE] = []
3162     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3163
3164   def DeclareLocks(self, level):
3165     if level == locking.LEVEL_NODE:
3166       self._LockInstancesNodes()
3167
3168   def BuildHooksEnv(self):
3169     """Build hooks env.
3170
3171     This runs on master, primary and secondary nodes of the instance.
3172
3173     """
3174     env = _BuildInstanceHookEnvByObject(self, self.instance)
3175     nl = [self.cfg.GetMasterNode()]
3176     return env, nl, nl
3177
3178   def CheckPrereq(self):
3179     """Check prerequisites.
3180
3181     This checks that the instance is in the cluster.
3182
3183     """
3184     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3185     assert self.instance is not None, \
3186       "Cannot retrieve locked instance %s" % self.op.instance_name
3187
3188   def Exec(self, feedback_fn):
3189     """Remove the instance.
3190
3191     """
3192     instance = self.instance
3193     logging.info("Shutting down instance %s on node %s",
3194                  instance.name, instance.primary_node)
3195
3196     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3197     msg = result.RemoteFailMsg()
3198     if msg:
3199       if self.op.ignore_failures:
3200         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3201       else:
3202         raise errors.OpExecError("Could not shutdown instance %s on"
3203                                  " node %s: %s" %
3204                                  (instance.name, instance.primary_node, msg))
3205
3206     logging.info("Removing block devices for instance %s", instance.name)
3207
3208     if not _RemoveDisks(self, instance):
3209       if self.op.ignore_failures:
3210         feedback_fn("Warning: can't remove instance's disks")
3211       else:
3212         raise errors.OpExecError("Can't remove instance's disks")
3213
3214     logging.info("Removing instance %s out of cluster config", instance.name)
3215
3216     self.cfg.RemoveInstance(instance.name)
3217     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3218
3219
3220 class LUQueryInstances(NoHooksLU):
3221   """Logical unit for querying instances.
3222
3223   """
3224   _OP_REQP = ["output_fields", "names", "use_locking"]
3225   REQ_BGL = False
3226   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3227                                     "admin_state",
3228                                     "disk_template", "ip", "mac", "bridge",
3229                                     "sda_size", "sdb_size", "vcpus", "tags",
3230                                     "network_port", "beparams",
3231                                     r"(disk)\.(size)/([0-9]+)",
3232                                     r"(disk)\.(sizes)", "disk_usage",
3233                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3234                                     r"(nic)\.(macs|ips|bridges)",
3235                                     r"(disk|nic)\.(count)",
3236                                     "serial_no", "hypervisor", "hvparams",] +
3237                                   ["hv/%s" % name
3238                                    for name in constants.HVS_PARAMETERS] +
3239                                   ["be/%s" % name
3240                                    for name in constants.BES_PARAMETERS])
3241   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3242
3243
3244   def ExpandNames(self):
3245     _CheckOutputFields(static=self._FIELDS_STATIC,
3246                        dynamic=self._FIELDS_DYNAMIC,
3247                        selected=self.op.output_fields)
3248
3249     self.needed_locks = {}
3250     self.share_locks[locking.LEVEL_INSTANCE] = 1
3251     self.share_locks[locking.LEVEL_NODE] = 1
3252
3253     if self.op.names:
3254       self.wanted = _GetWantedInstances(self, self.op.names)
3255     else:
3256       self.wanted = locking.ALL_SET
3257
3258     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3259     self.do_locking = self.do_node_query and self.op.use_locking
3260     if self.do_locking:
3261       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3262       self.needed_locks[locking.LEVEL_NODE] = []
3263       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3264
3265   def DeclareLocks(self, level):
3266     if level == locking.LEVEL_NODE and self.do_locking:
3267       self._LockInstancesNodes()
3268
3269   def CheckPrereq(self):
3270     """Check prerequisites.
3271
3272     """
3273     pass
3274
3275   def Exec(self, feedback_fn):
3276     """Computes the list of nodes and their attributes.
3277
3278     """
3279     all_info = self.cfg.GetAllInstancesInfo()
3280     if self.wanted == locking.ALL_SET:
3281       # caller didn't specify instance names, so ordering is not important
3282       if self.do_locking:
3283         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3284       else:
3285         instance_names = all_info.keys()
3286       instance_names = utils.NiceSort(instance_names)
3287     else:
3288       # caller did specify names, so we must keep the ordering
3289       if self.do_locking:
3290         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3291       else:
3292         tgt_set = all_info.keys()
3293       missing = set(self.wanted).difference(tgt_set)
3294       if missing:
3295         raise errors.OpExecError("Some instances were removed before"
3296                                  " retrieving their data: %s" % missing)
3297       instance_names = self.wanted
3298
3299     instance_list = [all_info[iname] for iname in instance_names]
3300
3301     # begin data gathering
3302
3303     nodes = frozenset([inst.primary_node for inst in instance_list])
3304     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3305
3306     bad_nodes = []
3307     off_nodes = []
3308     if self.do_node_query:
3309       live_data = {}
3310       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3311       for name in nodes:
3312         result = node_data[name]
3313         if result.offline:
3314           # offline nodes will be in both lists
3315           off_nodes.append(name)
3316         if result.failed:
3317           bad_nodes.append(name)
3318         else:
3319           if result.data:
3320             live_data.update(result.data)
3321             # else no instance is alive
3322     else:
3323       live_data = dict([(name, {}) for name in instance_names])
3324
3325     # end data gathering
3326
3327     HVPREFIX = "hv/"
3328     BEPREFIX = "be/"
3329     output = []
3330     for instance in instance_list:
3331       iout = []
3332       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3333       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3334       for field in self.op.output_fields:
3335         st_match = self._FIELDS_STATIC.Matches(field)
3336         if field == "name":
3337           val = instance.name
3338         elif field == "os":
3339           val = instance.os
3340         elif field == "pnode":
3341           val = instance.primary_node
3342         elif field == "snodes":
3343           val = list(instance.secondary_nodes)
3344         elif field == "admin_state":
3345           val = instance.admin_up
3346         elif field == "oper_state":
3347           if instance.primary_node in bad_nodes:
3348             val = None
3349           else:
3350             val = bool(live_data.get(instance.name))
3351         elif field == "status":
3352           if instance.primary_node in off_nodes:
3353             val = "ERROR_nodeoffline"
3354           elif instance.primary_node in bad_nodes:
3355             val = "ERROR_nodedown"
3356           else:
3357             running = bool(live_data.get(instance.name))
3358             if running:
3359               if instance.admin_up:
3360                 val = "running"
3361               else:
3362                 val = "ERROR_up"
3363             else:
3364               if instance.admin_up:
3365                 val = "ERROR_down"
3366               else:
3367                 val = "ADMIN_down"
3368         elif field == "oper_ram":
3369           if instance.primary_node in bad_nodes:
3370             val = None
3371           elif instance.name in live_data:
3372             val = live_data[instance.name].get("memory", "?")
3373           else:
3374             val = "-"
3375         elif field == "disk_template":
3376           val = instance.disk_template
3377         elif field == "ip":
3378           val = instance.nics[0].ip
3379         elif field == "bridge":
3380           val = instance.nics[0].bridge
3381         elif field == "mac":
3382           val = instance.nics[0].mac
3383         elif field == "sda_size" or field == "sdb_size":
3384           idx = ord(field[2]) - ord('a')
3385           try:
3386             val = instance.FindDisk(idx).size
3387           except errors.OpPrereqError:
3388             val = None
3389         elif field == "disk_usage": # total disk usage per node
3390           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3391           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3392         elif field == "tags":
3393           val = list(instance.GetTags())
3394         elif field == "serial_no":
3395           val = instance.serial_no
3396         elif field == "network_port":
3397           val = instance.network_port
3398         elif field == "hypervisor":
3399           val = instance.hypervisor
3400         elif field == "hvparams":
3401           val = i_hv
3402         elif (field.startswith(HVPREFIX) and
3403               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3404           val = i_hv.get(field[len(HVPREFIX):], None)
3405         elif field == "beparams":
3406           val = i_be
3407         elif (field.startswith(BEPREFIX) and
3408               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3409           val = i_be.get(field[len(BEPREFIX):], None)
3410         elif st_match and st_match.groups():
3411           # matches a variable list
3412           st_groups = st_match.groups()
3413           if st_groups and st_groups[0] == "disk":
3414             if st_groups[1] == "count":
3415               val = len(instance.disks)
3416             elif st_groups[1] == "sizes":
3417               val = [disk.size for disk in instance.disks]
3418             elif st_groups[1] == "size":
3419               try:
3420                 val = instance.FindDisk(st_groups[2]).size
3421               except errors.OpPrereqError:
3422                 val = None
3423             else:
3424               assert False, "Unhandled disk parameter"
3425           elif st_groups[0] == "nic":
3426             if st_groups[1] == "count":
3427               val = len(instance.nics)
3428             elif st_groups[1] == "macs":
3429               val = [nic.mac for nic in instance.nics]
3430             elif st_groups[1] == "ips":
3431               val = [nic.ip for nic in instance.nics]
3432             elif st_groups[1] == "bridges":
3433               val = [nic.bridge for nic in instance.nics]
3434             else:
3435               # index-based item
3436               nic_idx = int(st_groups[2])
3437               if nic_idx >= len(instance.nics):
3438                 val = None
3439               else:
3440                 if st_groups[1] == "mac":
3441                   val = instance.nics[nic_idx].mac
3442                 elif st_groups[1] == "ip":
3443                   val = instance.nics[nic_idx].ip
3444                 elif st_groups[1] == "bridge":
3445                   val = instance.nics[nic_idx].bridge
3446                 else:
3447                   assert False, "Unhandled NIC parameter"
3448           else:
3449             assert False, "Unhandled variable parameter"
3450         else:
3451           raise errors.ParameterError(field)
3452         iout.append(val)
3453       output.append(iout)
3454
3455     return output
3456
3457
3458 class LUFailoverInstance(LogicalUnit):
3459   """Failover an instance.
3460
3461   """
3462   HPATH = "instance-failover"
3463   HTYPE = constants.HTYPE_INSTANCE
3464   _OP_REQP = ["instance_name", "ignore_consistency"]
3465   REQ_BGL = False
3466
3467   def ExpandNames(self):
3468     self._ExpandAndLockInstance()
3469     self.needed_locks[locking.LEVEL_NODE] = []
3470     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3471
3472   def DeclareLocks(self, level):
3473     if level == locking.LEVEL_NODE:
3474       self._LockInstancesNodes()
3475
3476   def BuildHooksEnv(self):
3477     """Build hooks env.
3478
3479     This runs on master, primary and secondary nodes of the instance.
3480
3481     """
3482     env = {
3483       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3484       }
3485     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3486     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3487     return env, nl, nl
3488
3489   def CheckPrereq(self):
3490     """Check prerequisites.
3491
3492     This checks that the instance is in the cluster.
3493
3494     """
3495     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3496     assert self.instance is not None, \
3497       "Cannot retrieve locked instance %s" % self.op.instance_name
3498
3499     bep = self.cfg.GetClusterInfo().FillBE(instance)
3500     if instance.disk_template not in constants.DTS_NET_MIRROR:
3501       raise errors.OpPrereqError("Instance's disk layout is not"
3502                                  " network mirrored, cannot failover.")
3503
3504     secondary_nodes = instance.secondary_nodes
3505     if not secondary_nodes:
3506       raise errors.ProgrammerError("no secondary node but using "
3507                                    "a mirrored disk template")
3508
3509     target_node = secondary_nodes[0]
3510     _CheckNodeOnline(self, target_node)
3511     _CheckNodeNotDrained(self, target_node)
3512     # check memory requirements on the secondary node
3513     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3514                          instance.name, bep[constants.BE_MEMORY],
3515                          instance.hypervisor)
3516
3517     # check bridge existance
3518     brlist = [nic.bridge for nic in instance.nics]
3519     result = self.rpc.call_bridges_exist(target_node, brlist)
3520     result.Raise()
3521     if not result.data:
3522       raise errors.OpPrereqError("One or more target bridges %s does not"
3523                                  " exist on destination node '%s'" %
3524                                  (brlist, target_node))
3525
3526   def Exec(self, feedback_fn):
3527     """Failover an instance.
3528
3529     The failover is done by shutting it down on its present node and
3530     starting it on the secondary.
3531
3532     """
3533     instance = self.instance
3534
3535     source_node = instance.primary_node
3536     target_node = instance.secondary_nodes[0]
3537
3538     feedback_fn("* checking disk consistency between source and target")
3539     for dev in instance.disks:
3540       # for drbd, these are drbd over lvm
3541       if not _CheckDiskConsistency(self, dev, target_node, False):
3542         if instance.admin_up and not self.op.ignore_consistency:
3543           raise errors.OpExecError("Disk %s is degraded on target node,"
3544                                    " aborting failover." % dev.iv_name)
3545
3546     feedback_fn("* shutting down instance on source node")
3547     logging.info("Shutting down instance %s on node %s",
3548                  instance.name, source_node)
3549
3550     result = self.rpc.call_instance_shutdown(source_node, instance)
3551     msg = result.RemoteFailMsg()
3552     if msg:
3553       if self.op.ignore_consistency:
3554         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3555                              " Proceeding anyway. Please make sure node"
3556                              " %s is down. Error details: %s",
3557                              instance.name, source_node, source_node, msg)
3558       else:
3559         raise errors.OpExecError("Could not shutdown instance %s on"
3560                                  " node %s: %s" %
3561                                  (instance.name, source_node, msg))
3562
3563     feedback_fn("* deactivating the instance's disks on source node")
3564     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3565       raise errors.OpExecError("Can't shut down the instance's disks.")
3566
3567     instance.primary_node = target_node
3568     # distribute new instance config to the other nodes
3569     self.cfg.Update(instance)
3570
3571     # Only start the instance if it's marked as up
3572     if instance.admin_up:
3573       feedback_fn("* activating the instance's disks on target node")
3574       logging.info("Starting instance %s on node %s",
3575                    instance.name, target_node)
3576
3577       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3578                                                ignore_secondaries=True)
3579       if not disks_ok:
3580         _ShutdownInstanceDisks(self, instance)
3581         raise errors.OpExecError("Can't activate the instance's disks")
3582
3583       feedback_fn("* starting the instance on the target node")
3584       result = self.rpc.call_instance_start(target_node, instance, None, None)
3585       msg = result.RemoteFailMsg()
3586       if msg:
3587         _ShutdownInstanceDisks(self, instance)
3588         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3589                                  (instance.name, target_node, msg))
3590
3591
3592 class LUMigrateInstance(LogicalUnit):
3593   """Migrate an instance.
3594
3595   This is migration without shutting down, compared to the failover,
3596   which is done with shutdown.
3597
3598   """
3599   HPATH = "instance-migrate"
3600   HTYPE = constants.HTYPE_INSTANCE
3601   _OP_REQP = ["instance_name", "live", "cleanup"]
3602
3603   REQ_BGL = False
3604
3605   def ExpandNames(self):
3606     self._ExpandAndLockInstance()
3607     self.needed_locks[locking.LEVEL_NODE] = []
3608     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3609
3610   def DeclareLocks(self, level):
3611     if level == locking.LEVEL_NODE:
3612       self._LockInstancesNodes()
3613
3614   def BuildHooksEnv(self):
3615     """Build hooks env.
3616
3617     This runs on master, primary and secondary nodes of the instance.
3618
3619     """
3620     env = _BuildInstanceHookEnvByObject(self, self.instance)
3621     env["MIGRATE_LIVE"] = self.op.live
3622     env["MIGRATE_CLEANUP"] = self.op.cleanup
3623     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3624     return env, nl, nl
3625
3626   def CheckPrereq(self):
3627     """Check prerequisites.
3628
3629     This checks that the instance is in the cluster.
3630
3631     """
3632     instance = self.cfg.GetInstanceInfo(
3633       self.cfg.ExpandInstanceName(self.op.instance_name))
3634     if instance is None:
3635       raise errors.OpPrereqError("Instance '%s' not known" %
3636                                  self.op.instance_name)
3637
3638     if instance.disk_template != constants.DT_DRBD8:
3639       raise errors.OpPrereqError("Instance's disk layout is not"
3640                                  " drbd8, cannot migrate.")
3641
3642     secondary_nodes = instance.secondary_nodes
3643     if not secondary_nodes:
3644       raise errors.ConfigurationError("No secondary node but using"
3645                                       " drbd8 disk template")
3646
3647     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3648
3649     target_node = secondary_nodes[0]
3650     # check memory requirements on the secondary node
3651     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3652                          instance.name, i_be[constants.BE_MEMORY],
3653                          instance.hypervisor)
3654
3655     # check bridge existance
3656     brlist = [nic.bridge for nic in instance.nics]
3657     result = self.rpc.call_bridges_exist(target_node, brlist)
3658     if result.failed or not result.data:
3659       raise errors.OpPrereqError("One or more target bridges %s does not"
3660                                  " exist on destination node '%s'" %
3661                                  (brlist, target_node))
3662
3663     if not self.op.cleanup:
3664       _CheckNodeNotDrained(self, target_node)
3665       result = self.rpc.call_instance_migratable(instance.primary_node,
3666                                                  instance)
3667       msg = result.RemoteFailMsg()
3668       if msg:
3669         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3670                                    msg)
3671
3672     self.instance = instance
3673
3674   def _WaitUntilSync(self):
3675     """Poll with custom rpc for disk sync.
3676
3677     This uses our own step-based rpc call.
3678
3679     """
3680     self.feedback_fn("* wait until resync is done")
3681     all_done = False
3682     while not all_done:
3683       all_done = True
3684       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3685                                             self.nodes_ip,
3686                                             self.instance.disks)
3687       min_percent = 100
3688       for node, nres in result.items():
3689         msg = nres.RemoteFailMsg()
3690         if msg:
3691           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3692                                    (node, msg))
3693         node_done, node_percent = nres.payload
3694         all_done = all_done and node_done
3695         if node_percent is not None:
3696           min_percent = min(min_percent, node_percent)
3697       if not all_done:
3698         if min_percent < 100:
3699           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3700         time.sleep(2)
3701
3702   def _EnsureSecondary(self, node):
3703     """Demote a node to secondary.
3704
3705     """
3706     self.feedback_fn("* switching node %s to secondary mode" % node)
3707
3708     for dev in self.instance.disks:
3709       self.cfg.SetDiskID(dev, node)
3710
3711     result = self.rpc.call_blockdev_close(node, self.instance.name,
3712                                           self.instance.disks)
3713     msg = result.RemoteFailMsg()
3714     if msg:
3715       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3716                                " error %s" % (node, msg))
3717
3718   def _GoStandalone(self):
3719     """Disconnect from the network.
3720
3721     """
3722     self.feedback_fn("* changing into standalone mode")
3723     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3724                                                self.instance.disks)
3725     for node, nres in result.items():
3726       msg = nres.RemoteFailMsg()
3727       if msg:
3728         raise errors.OpExecError("Cannot disconnect disks node %s,"
3729                                  " error %s" % (node, msg))
3730
3731   def _GoReconnect(self, multimaster):
3732     """Reconnect to the network.
3733
3734     """
3735     if multimaster:
3736       msg = "dual-master"
3737     else:
3738       msg = "single-master"
3739     self.feedback_fn("* changing disks into %s mode" % msg)
3740     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3741                                            self.instance.disks,
3742                                            self.instance.name, multimaster)
3743     for node, nres in result.items():
3744       msg = nres.RemoteFailMsg()
3745       if msg:
3746         raise errors.OpExecError("Cannot change disks config on node %s,"
3747                                  " error: %s" % (node, msg))
3748
3749   def _ExecCleanup(self):
3750     """Try to cleanup after a failed migration.
3751
3752     The cleanup is done by:
3753       - check that the instance is running only on one node
3754         (and update the config if needed)
3755       - change disks on its secondary node to secondary
3756       - wait until disks are fully synchronized
3757       - disconnect from the network
3758       - change disks into single-master mode
3759       - wait again until disks are fully synchronized
3760
3761     """
3762     instance = self.instance
3763     target_node = self.target_node
3764     source_node = self.source_node
3765
3766     # check running on only one node
3767     self.feedback_fn("* checking where the instance actually runs"
3768                      " (if this hangs, the hypervisor might be in"
3769                      " a bad state)")
3770     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3771     for node, result in ins_l.items():
3772       result.Raise()
3773       if not isinstance(result.data, list):
3774         raise errors.OpExecError("Can't contact node '%s'" % node)
3775
3776     runningon_source = instance.name in ins_l[source_node].data
3777     runningon_target = instance.name in ins_l[target_node].data
3778
3779     if runningon_source and runningon_target:
3780       raise errors.OpExecError("Instance seems to be running on two nodes,"
3781                                " or the hypervisor is confused. You will have"
3782                                " to ensure manually that it runs only on one"
3783                                " and restart this operation.")
3784
3785     if not (runningon_source or runningon_target):
3786       raise errors.OpExecError("Instance does not seem to be running at all."
3787                                " In this case, it's safer to repair by"
3788                                " running 'gnt-instance stop' to ensure disk"
3789                                " shutdown, and then restarting it.")
3790
3791     if runningon_target:
3792       # the migration has actually succeeded, we need to update the config
3793       self.feedback_fn("* instance running on secondary node (%s),"
3794                        " updating config" % target_node)
3795       instance.primary_node = target_node
3796       self.cfg.Update(instance)
3797       demoted_node = source_node
3798     else:
3799       self.feedback_fn("* instance confirmed to be running on its"
3800                        " primary node (%s)" % source_node)
3801       demoted_node = target_node
3802
3803     self._EnsureSecondary(demoted_node)
3804     try:
3805       self._WaitUntilSync()
3806     except errors.OpExecError:
3807       # we ignore here errors, since if the device is standalone, it
3808       # won't be able to sync
3809       pass
3810     self._GoStandalone()
3811     self._GoReconnect(False)
3812     self._WaitUntilSync()
3813
3814     self.feedback_fn("* done")
3815
3816   def _RevertDiskStatus(self):
3817     """Try to revert the disk status after a failed migration.
3818
3819     """
3820     target_node = self.target_node
3821     try:
3822       self._EnsureSecondary(target_node)
3823       self._GoStandalone()
3824       self._GoReconnect(False)
3825       self._WaitUntilSync()
3826     except errors.OpExecError, err:
3827       self.LogWarning("Migration failed and I can't reconnect the"
3828                       " drives: error '%s'\n"
3829                       "Please look and recover the instance status" %
3830                       str(err))
3831
3832   def _AbortMigration(self):
3833     """Call the hypervisor code to abort a started migration.
3834
3835     """
3836     instance = self.instance
3837     target_node = self.target_node
3838     migration_info = self.migration_info
3839
3840     abort_result = self.rpc.call_finalize_migration(target_node,
3841                                                     instance,
3842                                                     migration_info,
3843                                                     False)
3844     abort_msg = abort_result.RemoteFailMsg()
3845     if abort_msg:
3846       logging.error("Aborting migration failed on target node %s: %s" %
3847                     (target_node, abort_msg))
3848       # Don't raise an exception here, as we stil have to try to revert the
3849       # disk status, even if this step failed.
3850
3851   def _ExecMigration(self):
3852     """Migrate an instance.
3853
3854     The migrate is done by:
3855       - change the disks into dual-master mode
3856       - wait until disks are fully synchronized again
3857       - migrate the instance
3858       - change disks on the new secondary node (the old primary) to secondary
3859       - wait until disks are fully synchronized
3860       - change disks into single-master mode
3861
3862     """
3863     instance = self.instance
3864     target_node = self.target_node
3865     source_node = self.source_node
3866
3867     self.feedback_fn("* checking disk consistency between source and target")
3868     for dev in instance.disks:
3869       if not _CheckDiskConsistency(self, dev, target_node, False):
3870         raise errors.OpExecError("Disk %s is degraded or not fully"
3871                                  " synchronized on target node,"
3872                                  " aborting migrate." % dev.iv_name)
3873
3874     # First get the migration information from the remote node
3875     result = self.rpc.call_migration_info(source_node, instance)
3876     msg = result.RemoteFailMsg()
3877     if msg:
3878       log_err = ("Failed fetching source migration information from %s: %s" %
3879                  (source_node, msg))
3880       logging.error(log_err)
3881       raise errors.OpExecError(log_err)
3882
3883     self.migration_info = migration_info = result.payload
3884
3885     # Then switch the disks to master/master mode
3886     self._EnsureSecondary(target_node)
3887     self._GoStandalone()
3888     self._GoReconnect(True)
3889     self._WaitUntilSync()
3890
3891     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3892     result = self.rpc.call_accept_instance(target_node,
3893                                            instance,
3894                                            migration_info,
3895                                            self.nodes_ip[target_node])
3896
3897     msg = result.RemoteFailMsg()
3898     if msg:
3899       logging.error("Instance pre-migration failed, trying to revert"
3900                     " disk status: %s", msg)
3901       self._AbortMigration()
3902       self._RevertDiskStatus()
3903       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3904                                (instance.name, msg))
3905
3906     self.feedback_fn("* migrating instance to %s" % target_node)
3907     time.sleep(10)
3908     result = self.rpc.call_instance_migrate(source_node, instance,
3909                                             self.nodes_ip[target_node],
3910                                             self.op.live)
3911     msg = result.RemoteFailMsg()
3912     if msg:
3913       logging.error("Instance migration failed, trying to revert"
3914                     " disk status: %s", msg)
3915       self._AbortMigration()
3916       self._RevertDiskStatus()
3917       raise errors.OpExecError("Could not migrate instance %s: %s" %
3918                                (instance.name, msg))
3919     time.sleep(10)
3920
3921     instance.primary_node = target_node
3922     # distribute new instance config to the other nodes
3923     self.cfg.Update(instance)
3924
3925     result = self.rpc.call_finalize_migration(target_node,
3926                                               instance,
3927                                               migration_info,
3928                                               True)
3929     msg = result.RemoteFailMsg()
3930     if msg:
3931       logging.error("Instance migration succeeded, but finalization failed:"
3932                     " %s" % msg)
3933       raise errors.OpExecError("Could not finalize instance migration: %s" %
3934                                msg)
3935
3936     self._EnsureSecondary(source_node)
3937     self._WaitUntilSync()
3938     self._GoStandalone()
3939     self._GoReconnect(False)
3940     self._WaitUntilSync()
3941
3942     self.feedback_fn("* done")
3943
3944   def Exec(self, feedback_fn):
3945     """Perform the migration.
3946
3947     """
3948     self.feedback_fn = feedback_fn
3949
3950     self.source_node = self.instance.primary_node
3951     self.target_node = self.instance.secondary_nodes[0]
3952     self.all_nodes = [self.source_node, self.target_node]
3953     self.nodes_ip = {
3954       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3955       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3956       }
3957     if self.op.cleanup:
3958       return self._ExecCleanup()
3959     else:
3960       return self._ExecMigration()
3961
3962
3963 def _CreateBlockDev(lu, node, instance, device, force_create,
3964                     info, force_open):
3965   """Create a tree of block devices on a given node.
3966
3967   If this device type has to be created on secondaries, create it and
3968   all its children.
3969
3970   If not, just recurse to children keeping the same 'force' value.
3971
3972   @param lu: the lu on whose behalf we execute
3973   @param node: the node on which to create the device
3974   @type instance: L{objects.Instance}
3975   @param instance: the instance which owns the device
3976   @type device: L{objects.Disk}
3977   @param device: the device to create
3978   @type force_create: boolean
3979   @param force_create: whether to force creation of this device; this
3980       will be change to True whenever we find a device which has
3981       CreateOnSecondary() attribute
3982   @param info: the extra 'metadata' we should attach to the device
3983       (this will be represented as a LVM tag)
3984   @type force_open: boolean
3985   @param force_open: this parameter will be passes to the
3986       L{backend.BlockdevCreate} function where it specifies
3987       whether we run on primary or not, and it affects both
3988       the child assembly and the device own Open() execution
3989
3990   """
3991   if device.CreateOnSecondary():
3992     force_create = True
3993
3994   if device.children:
3995     for child in device.children:
3996       _CreateBlockDev(lu, node, instance, child, force_create,
3997                       info, force_open)
3998
3999   if not force_create:
4000     return
4001
4002   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4003
4004
4005 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4006   """Create a single block device on a given node.
4007
4008   This will not recurse over children of the device, so they must be
4009   created in advance.
4010
4011   @param lu: the lu on whose behalf we execute
4012   @param node: the node on which to create the device
4013   @type instance: L{objects.Instance}
4014   @param instance: the instance which owns the device
4015   @type device: L{objects.Disk}
4016   @param device: the device to create
4017   @param info: the extra 'metadata' we should attach to the device
4018       (this will be represented as a LVM tag)
4019   @type force_open: boolean
4020   @param force_open: this parameter will be passes to the
4021       L{backend.BlockdevCreate} function where it specifies
4022       whether we run on primary or not, and it affects both
4023       the child assembly and the device own Open() execution
4024
4025   """
4026   lu.cfg.SetDiskID(device, node)
4027   result = lu.rpc.call_blockdev_create(node, device, device.size,
4028                                        instance.name, force_open, info)
4029   msg = result.RemoteFailMsg()
4030   if msg:
4031     raise errors.OpExecError("Can't create block device %s on"
4032                              " node %s for instance %s: %s" %
4033                              (device, node, instance.name, msg))
4034   if device.physical_id is None:
4035     device.physical_id = result.payload
4036
4037
4038 def _GenerateUniqueNames(lu, exts):
4039   """Generate a suitable LV name.
4040
4041   This will generate a logical volume name for the given instance.
4042
4043   """
4044   results = []
4045   for val in exts:
4046     new_id = lu.cfg.GenerateUniqueID()
4047     results.append("%s%s" % (new_id, val))
4048   return results
4049
4050
4051 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4052                          p_minor, s_minor):
4053   """Generate a drbd8 device complete with its children.
4054
4055   """
4056   port = lu.cfg.AllocatePort()
4057   vgname = lu.cfg.GetVGName()
4058   shared_secret = lu.cfg.GenerateDRBDSecret()
4059   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4060                           logical_id=(vgname, names[0]))
4061   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4062                           logical_id=(vgname, names[1]))
4063   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4064                           logical_id=(primary, secondary, port,
4065                                       p_minor, s_minor,
4066                                       shared_secret),
4067                           children=[dev_data, dev_meta],
4068                           iv_name=iv_name)
4069   return drbd_dev
4070
4071
4072 def _GenerateDiskTemplate(lu, template_name,
4073                           instance_name, primary_node,
4074                           secondary_nodes, disk_info,
4075                           file_storage_dir, file_driver,
4076                           base_index):
4077   """Generate the entire disk layout for a given template type.
4078
4079   """
4080   #TODO: compute space requirements
4081
4082   vgname = lu.cfg.GetVGName()
4083   disk_count = len(disk_info)
4084   disks = []
4085   if template_name == constants.DT_DISKLESS:
4086     pass
4087   elif template_name == constants.DT_PLAIN:
4088     if len(secondary_nodes) != 0:
4089       raise errors.ProgrammerError("Wrong template configuration")
4090
4091     names = _GenerateUniqueNames(lu, [".disk%d" % i
4092                                       for i in range(disk_count)])
4093     for idx, disk in enumerate(disk_info):
4094       disk_index = idx + base_index
4095       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4096                               logical_id=(vgname, names[idx]),
4097                               iv_name="disk/%d" % disk_index,
4098                               mode=disk["mode"])
4099       disks.append(disk_dev)
4100   elif template_name == constants.DT_DRBD8:
4101     if len(secondary_nodes) != 1:
4102       raise errors.ProgrammerError("Wrong template configuration")
4103     remote_node = secondary_nodes[0]
4104     minors = lu.cfg.AllocateDRBDMinor(
4105       [primary_node, remote_node] * len(disk_info), instance_name)
4106
4107     names = []
4108     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4109                                                for i in range(disk_count)]):
4110       names.append(lv_prefix + "_data")
4111       names.append(lv_prefix + "_meta")
4112     for idx, disk in enumerate(disk_info):
4113       disk_index = idx + base_index
4114       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4115                                       disk["size"], names[idx*2:idx*2+2],
4116                                       "disk/%d" % disk_index,
4117                                       minors[idx*2], minors[idx*2+1])
4118       disk_dev.mode = disk["mode"]
4119       disks.append(disk_dev)
4120   elif template_name == constants.DT_FILE:
4121     if len(secondary_nodes) != 0:
4122       raise errors.ProgrammerError("Wrong template configuration")
4123
4124     for idx, disk in enumerate(disk_info):
4125       disk_index = idx + base_index
4126       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4127                               iv_name="disk/%d" % disk_index,
4128                               logical_id=(file_driver,
4129                                           "%s/disk%d" % (file_storage_dir,
4130                                                          disk_index)),
4131                               mode=disk["mode"])
4132       disks.append(disk_dev)
4133   else:
4134     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4135   return disks
4136
4137
4138 def _GetInstanceInfoText(instance):
4139   """Compute that text that should be added to the disk's metadata.
4140
4141   """
4142   return "originstname+%s" % instance.name
4143
4144
4145 def _CreateDisks(lu, instance):
4146   """Create all disks for an instance.
4147
4148   This abstracts away some work from AddInstance.
4149
4150   @type lu: L{LogicalUnit}
4151   @param lu: the logical unit on whose behalf we execute
4152   @type instance: L{objects.Instance}
4153   @param instance: the instance whose disks we should create
4154   @rtype: boolean
4155   @return: the success of the creation
4156
4157   """
4158   info = _GetInstanceInfoText(instance)
4159   pnode = instance.primary_node
4160
4161   if instance.disk_template == constants.DT_FILE:
4162     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4163     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4164
4165     if result.failed or not result.data:
4166       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4167
4168     if not result.data[0]:
4169       raise errors.OpExecError("Failed to create directory '%s'" %
4170                                file_storage_dir)
4171
4172   # Note: this needs to be kept in sync with adding of disks in
4173   # LUSetInstanceParams
4174   for device in instance.disks:
4175     logging.info("Creating volume %s for instance %s",
4176                  device.iv_name, instance.name)
4177     #HARDCODE
4178     for node in instance.all_nodes:
4179       f_create = node == pnode
4180       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4181
4182
4183 def _RemoveDisks(lu, instance):
4184   """Remove all disks for an instance.
4185
4186   This abstracts away some work from `AddInstance()` and
4187   `RemoveInstance()`. Note that in case some of the devices couldn't
4188   be removed, the removal will continue with the other ones (compare
4189   with `_CreateDisks()`).
4190
4191   @type lu: L{LogicalUnit}
4192   @param lu: the logical unit on whose behalf we execute
4193   @type instance: L{objects.Instance}
4194   @param instance: the instance whose disks we should remove
4195   @rtype: boolean
4196   @return: the success of the removal
4197
4198   """
4199   logging.info("Removing block devices for instance %s", instance.name)
4200
4201   all_result = True
4202   for device in instance.disks:
4203     for node, disk in device.ComputeNodeTree(instance.primary_node):
4204       lu.cfg.SetDiskID(disk, node)
4205       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4206       if msg:
4207         lu.LogWarning("Could not remove block device %s on node %s,"
4208                       " continuing anyway: %s", device.iv_name, node, msg)
4209         all_result = False
4210
4211   if instance.disk_template == constants.DT_FILE:
4212     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4213     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4214                                                  file_storage_dir)
4215     if result.failed or not result.data:
4216       logging.error("Could not remove directory '%s'", file_storage_dir)
4217       all_result = False
4218
4219   return all_result
4220
4221
4222 def _ComputeDiskSize(disk_template, disks):
4223   """Compute disk size requirements in the volume group
4224
4225   """
4226   # Required free disk space as a function of disk and swap space
4227   req_size_dict = {
4228     constants.DT_DISKLESS: None,
4229     constants.DT_PLAIN: sum(d["size"] for d in disks),
4230     # 128 MB are added for drbd metadata for each disk
4231     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4232     constants.DT_FILE: None,
4233   }
4234
4235   if disk_template not in req_size_dict:
4236     raise errors.ProgrammerError("Disk template '%s' size requirement"
4237                                  " is unknown" %  disk_template)
4238
4239   return req_size_dict[disk_template]
4240
4241
4242 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4243   """Hypervisor parameter validation.
4244
4245   This function abstract the hypervisor parameter validation to be
4246   used in both instance create and instance modify.
4247
4248   @type lu: L{LogicalUnit}
4249   @param lu: the logical unit for which we check
4250   @type nodenames: list
4251   @param nodenames: the list of nodes on which we should check
4252   @type hvname: string
4253   @param hvname: the name of the hypervisor we should use
4254   @type hvparams: dict
4255   @param hvparams: the parameters which we need to check
4256   @raise errors.OpPrereqError: if the parameters are not valid
4257
4258   """
4259   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4260                                                   hvname,
4261                                                   hvparams)
4262   for node in nodenames:
4263     info = hvinfo[node]
4264     if info.offline:
4265       continue
4266     msg = info.RemoteFailMsg()
4267     if msg:
4268       raise errors.OpPrereqError("Hypervisor parameter validation"
4269                                  " failed on node %s: %s" % (node, msg))
4270
4271
4272 class LUCreateInstance(LogicalUnit):
4273   """Create an instance.
4274
4275   """
4276   HPATH = "instance-add"
4277   HTYPE = constants.HTYPE_INSTANCE
4278   _OP_REQP = ["instance_name", "disks", "disk_template",
4279               "mode", "start",
4280               "wait_for_sync", "ip_check", "nics",
4281               "hvparams", "beparams"]
4282   REQ_BGL = False
4283
4284   def _ExpandNode(self, node):
4285     """Expands and checks one node name.
4286
4287     """
4288     node_full = self.cfg.ExpandNodeName(node)
4289     if node_full is None:
4290       raise errors.OpPrereqError("Unknown node %s" % node)
4291     return node_full
4292
4293   def ExpandNames(self):
4294     """ExpandNames for CreateInstance.
4295
4296     Figure out the right locks for instance creation.
4297
4298     """
4299     self.needed_locks = {}
4300
4301     # set optional parameters to none if they don't exist
4302     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4303       if not hasattr(self.op, attr):
4304         setattr(self.op, attr, None)
4305
4306     # cheap checks, mostly valid constants given
4307
4308     # verify creation mode
4309     if self.op.mode not in (constants.INSTANCE_CREATE,
4310                             constants.INSTANCE_IMPORT):
4311       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4312                                  self.op.mode)
4313
4314     # disk template and mirror node verification
4315     if self.op.disk_template not in constants.DISK_TEMPLATES:
4316       raise errors.OpPrereqError("Invalid disk template name")
4317
4318     if self.op.hypervisor is None:
4319       self.op.hypervisor = self.cfg.GetHypervisorType()
4320
4321     cluster = self.cfg.GetClusterInfo()
4322     enabled_hvs = cluster.enabled_hypervisors
4323     if self.op.hypervisor not in enabled_hvs:
4324       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4325                                  " cluster (%s)" % (self.op.hypervisor,
4326                                   ",".join(enabled_hvs)))
4327
4328     # check hypervisor parameter syntax (locally)
4329     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4330     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4331                                   self.op.hvparams)
4332     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4333     hv_type.CheckParameterSyntax(filled_hvp)
4334
4335     # fill and remember the beparams dict
4336     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4337     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4338                                     self.op.beparams)
4339
4340     #### instance parameters check
4341
4342     # instance name verification
4343     hostname1 = utils.HostInfo(self.op.instance_name)
4344     self.op.instance_name = instance_name = hostname1.name
4345
4346     # this is just a preventive check, but someone might still add this
4347     # instance in the meantime, and creation will fail at lock-add time
4348     if instance_name in self.cfg.GetInstanceList():
4349       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4350                                  instance_name)
4351
4352     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4353
4354     # NIC buildup
4355     self.nics = []
4356     for nic in self.op.nics:
4357       # ip validity checks
4358       ip = nic.get("ip", None)
4359       if ip is None or ip.lower() == "none":
4360         nic_ip = None
4361       elif ip.lower() == constants.VALUE_AUTO:
4362         nic_ip = hostname1.ip
4363       else:
4364         if not utils.IsValidIP(ip):
4365           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4366                                      " like a valid IP" % ip)
4367         nic_ip = ip
4368
4369       # MAC address verification
4370       mac = nic.get("mac", constants.VALUE_AUTO)
4371       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4372         if not utils.IsValidMac(mac.lower()):
4373           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4374                                      mac)
4375       # bridge verification
4376       bridge = nic.get("bridge", None)
4377       if bridge is None:
4378         bridge = self.cfg.GetDefBridge()
4379       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4380
4381     # disk checks/pre-build
4382     self.disks = []
4383     for disk in self.op.disks:
4384       mode = disk.get("mode", constants.DISK_RDWR)
4385       if mode not in constants.DISK_ACCESS_SET:
4386         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4387                                    mode)
4388       size = disk.get("size", None)
4389       if size is None:
4390         raise errors.OpPrereqError("Missing disk size")
4391       try:
4392         size = int(size)
4393       except ValueError:
4394         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4395       self.disks.append({"size": size, "mode": mode})
4396
4397     # used in CheckPrereq for ip ping check
4398     self.check_ip = hostname1.ip
4399
4400     # file storage checks
4401     if (self.op.file_driver and
4402         not self.op.file_driver in constants.FILE_DRIVER):
4403       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4404                                  self.op.file_driver)
4405
4406     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4407       raise errors.OpPrereqError("File storage directory path not absolute")
4408
4409     ### Node/iallocator related checks
4410     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4411       raise errors.OpPrereqError("One and only one of iallocator and primary"
4412                                  " node must be given")
4413
4414     if self.op.iallocator:
4415       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4416     else:
4417       self.op.pnode = self._ExpandNode(self.op.pnode)
4418       nodelist = [self.op.pnode]
4419       if self.op.snode is not None:
4420         self.op.snode = self._ExpandNode(self.op.snode)
4421         nodelist.append(self.op.snode)
4422       self.needed_locks[locking.LEVEL_NODE] = nodelist
4423
4424     # in case of import lock the source node too
4425     if self.op.mode == constants.INSTANCE_IMPORT:
4426       src_node = getattr(self.op, "src_node", None)
4427       src_path = getattr(self.op, "src_path", None)
4428
4429       if src_path is None:
4430         self.op.src_path = src_path = self.op.instance_name
4431
4432       if src_node is None:
4433         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4434         self.op.src_node = None
4435         if os.path.isabs(src_path):
4436           raise errors.OpPrereqError("Importing an instance from an absolute"
4437                                      " path requires a source node option.")
4438       else:
4439         self.op.src_node = src_node = self._ExpandNode(src_node)
4440         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4441           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4442         if not os.path.isabs(src_path):
4443           self.op.src_path = src_path = \
4444             os.path.join(constants.EXPORT_DIR, src_path)
4445
4446     else: # INSTANCE_CREATE
4447       if getattr(self.op, "os_type", None) is None:
4448         raise errors.OpPrereqError("No guest OS specified")
4449
4450   def _RunAllocator(self):
4451     """Run the allocator based on input opcode.
4452
4453     """
4454     nics = [n.ToDict() for n in self.nics]
4455     ial = IAllocator(self,
4456                      mode=constants.IALLOCATOR_MODE_ALLOC,
4457                      name=self.op.instance_name,
4458                      disk_template=self.op.disk_template,
4459                      tags=[],
4460                      os=self.op.os_type,
4461                      vcpus=self.be_full[constants.BE_VCPUS],
4462                      mem_size=self.be_full[constants.BE_MEMORY],
4463                      disks=self.disks,
4464                      nics=nics,
4465                      hypervisor=self.op.hypervisor,
4466                      )
4467
4468     ial.Run(self.op.iallocator)
4469
4470     if not ial.success:
4471       raise errors.OpPrereqError("Can't compute nodes using"
4472                                  " iallocator '%s': %s" % (self.op.iallocator,
4473                                                            ial.info))
4474     if len(ial.nodes) != ial.required_nodes:
4475       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4476                                  " of nodes (%s), required %s" %
4477                                  (self.op.iallocator, len(ial.nodes),
4478                                   ial.required_nodes))
4479     self.op.pnode = ial.nodes[0]
4480     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4481                  self.op.instance_name, self.op.iallocator,
4482                  ", ".join(ial.nodes))
4483     if ial.required_nodes == 2:
4484       self.op.snode = ial.nodes[1]
4485
4486   def BuildHooksEnv(self):
4487     """Build hooks env.
4488
4489     This runs on master, primary and secondary nodes of the instance.
4490
4491     """
4492     env = {
4493       "ADD_MODE": self.op.mode,
4494       }
4495     if self.op.mode == constants.INSTANCE_IMPORT:
4496       env["SRC_NODE"] = self.op.src_node
4497       env["SRC_PATH"] = self.op.src_path
4498       env["SRC_IMAGES"] = self.src_images
4499
4500     env.update(_BuildInstanceHookEnv(
4501       name=self.op.instance_name,
4502       primary_node=self.op.pnode,
4503       secondary_nodes=self.secondaries,
4504       status=self.op.start,
4505       os_type=self.op.os_type,
4506       memory=self.be_full[constants.BE_MEMORY],
4507       vcpus=self.be_full[constants.BE_VCPUS],
4508       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4509       disk_template=self.op.disk_template,
4510       disks=[(d["size"], d["mode"]) for d in self.disks],
4511     ))
4512
4513     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4514           self.secondaries)
4515     return env, nl, nl
4516
4517
4518   def CheckPrereq(self):
4519     """Check prerequisites.
4520
4521     """
4522     if (not self.cfg.GetVGName() and
4523         self.op.disk_template not in constants.DTS_NOT_LVM):
4524       raise errors.OpPrereqError("Cluster does not support lvm-based"
4525                                  " instances")
4526
4527     if self.op.mode == constants.INSTANCE_IMPORT:
4528       src_node = self.op.src_node
4529       src_path = self.op.src_path
4530
4531       if src_node is None:
4532         exp_list = self.rpc.call_export_list(
4533           self.acquired_locks[locking.LEVEL_NODE])
4534         found = False
4535         for node in exp_list:
4536           if not exp_list[node].failed and src_path in exp_list[node].data:
4537             found = True
4538             self.op.src_node = src_node = node
4539             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4540                                                        src_path)
4541             break
4542         if not found:
4543           raise errors.OpPrereqError("No export found for relative path %s" %
4544                                       src_path)
4545
4546       _CheckNodeOnline(self, src_node)
4547       result = self.rpc.call_export_info(src_node, src_path)
4548       result.Raise()
4549       if not result.data:
4550         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4551
4552       export_info = result.data
4553       if not export_info.has_section(constants.INISECT_EXP):
4554         raise errors.ProgrammerError("Corrupted export config")
4555
4556       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4557       if (int(ei_version) != constants.EXPORT_VERSION):
4558         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4559                                    (ei_version, constants.EXPORT_VERSION))
4560
4561       # Check that the new instance doesn't have less disks than the export
4562       instance_disks = len(self.disks)
4563       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4564       if instance_disks < export_disks:
4565         raise errors.OpPrereqError("Not enough disks to import."
4566                                    " (instance: %d, export: %d)" %
4567                                    (instance_disks, export_disks))
4568
4569       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4570       disk_images = []
4571       for idx in range(export_disks):
4572         option = 'disk%d_dump' % idx
4573         if export_info.has_option(constants.INISECT_INS, option):
4574           # FIXME: are the old os-es, disk sizes, etc. useful?
4575           export_name = export_info.get(constants.INISECT_INS, option)
4576           image = os.path.join(src_path, export_name)
4577           disk_images.append(image)
4578         else:
4579           disk_images.append(False)
4580
4581       self.src_images = disk_images
4582
4583       old_name = export_info.get(constants.INISECT_INS, 'name')
4584       # FIXME: int() here could throw a ValueError on broken exports
4585       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4586       if self.op.instance_name == old_name:
4587         for idx, nic in enumerate(self.nics):
4588           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4589             nic_mac_ini = 'nic%d_mac' % idx
4590             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4591
4592     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4593     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4594     if self.op.start and not self.op.ip_check:
4595       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4596                                  " adding an instance in start mode")
4597
4598     if self.op.ip_check:
4599       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4600         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4601                                    (self.check_ip, self.op.instance_name))
4602
4603     #### mac address generation
4604     # By generating here the mac address both the allocator and the hooks get
4605     # the real final mac address rather than the 'auto' or 'generate' value.
4606     # There is a race condition between the generation and the instance object
4607     # creation, which means that we know the mac is valid now, but we're not
4608     # sure it will be when we actually add the instance. If things go bad
4609     # adding the instance will abort because of a duplicate mac, and the
4610     # creation job will fail.
4611     for nic in self.nics:
4612       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4613         nic.mac = self.cfg.GenerateMAC()
4614
4615     #### allocator run
4616
4617     if self.op.iallocator is not None:
4618       self._RunAllocator()
4619
4620     #### node related checks
4621
4622     # check primary node
4623     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4624     assert self.pnode is not None, \
4625       "Cannot retrieve locked node %s" % self.op.pnode
4626     if pnode.offline:
4627       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4628                                  pnode.name)
4629     if pnode.drained:
4630       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4631                                  pnode.name)
4632
4633     self.secondaries = []
4634
4635     # mirror node verification
4636     if self.op.disk_template in constants.DTS_NET_MIRROR:
4637       if self.op.snode is None:
4638         raise errors.OpPrereqError("The networked disk templates need"
4639                                    " a mirror node")
4640       if self.op.snode == pnode.name:
4641         raise errors.OpPrereqError("The secondary node cannot be"
4642                                    " the primary node.")
4643       _CheckNodeOnline(self, self.op.snode)
4644       _CheckNodeNotDrained(self, self.op.snode)
4645       self.secondaries.append(self.op.snode)
4646
4647     nodenames = [pnode.name] + self.secondaries
4648
4649     req_size = _ComputeDiskSize(self.op.disk_template,
4650                                 self.disks)
4651
4652     # Check lv size requirements
4653     if req_size is not None:
4654       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4655                                          self.op.hypervisor)
4656       for node in nodenames:
4657         info = nodeinfo[node]
4658         info.Raise()
4659         info = info.data
4660         if not info:
4661           raise errors.OpPrereqError("Cannot get current information"
4662                                      " from node '%s'" % node)
4663         vg_free = info.get('vg_free', None)
4664         if not isinstance(vg_free, int):
4665           raise errors.OpPrereqError("Can't compute free disk space on"
4666                                      " node %s" % node)
4667         if req_size > info['vg_free']:
4668           raise errors.OpPrereqError("Not enough disk space on target node %s."
4669                                      " %d MB available, %d MB required" %
4670                                      (node, info['vg_free'], req_size))
4671
4672     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4673
4674     # os verification
4675     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4676     result.Raise()
4677     if not isinstance(result.data, objects.OS):
4678       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4679                                  " primary node"  % self.op.os_type)
4680
4681     # bridge check on primary node
4682     bridges = [n.bridge for n in self.nics]
4683     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4684     result.Raise()
4685     if not result.data:
4686       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4687                                  " exist on destination node '%s'" %
4688                                  (",".join(bridges), pnode.name))
4689
4690     # memory check on primary node
4691     if self.op.start:
4692       _CheckNodeFreeMemory(self, self.pnode.name,
4693                            "creating instance %s" % self.op.instance_name,
4694                            self.be_full[constants.BE_MEMORY],
4695                            self.op.hypervisor)
4696
4697   def Exec(self, feedback_fn):
4698     """Create and add the instance to the cluster.
4699
4700     """
4701     instance = self.op.instance_name
4702     pnode_name = self.pnode.name
4703
4704     ht_kind = self.op.hypervisor
4705     if ht_kind in constants.HTS_REQ_PORT:
4706       network_port = self.cfg.AllocatePort()
4707     else:
4708       network_port = None
4709
4710     ##if self.op.vnc_bind_address is None:
4711     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4712
4713     # this is needed because os.path.join does not accept None arguments
4714     if self.op.file_storage_dir is None:
4715       string_file_storage_dir = ""
4716     else:
4717       string_file_storage_dir = self.op.file_storage_dir
4718
4719     # build the full file storage dir path
4720     file_storage_dir = os.path.normpath(os.path.join(
4721                                         self.cfg.GetFileStorageDir(),
4722                                         string_file_storage_dir, instance))
4723
4724
4725     disks = _GenerateDiskTemplate(self,
4726                                   self.op.disk_template,
4727                                   instance, pnode_name,
4728                                   self.secondaries,
4729                                   self.disks,
4730                                   file_storage_dir,
4731                                   self.op.file_driver,
4732                                   0)
4733
4734     iobj = objects.Instance(name=instance, os=self.op.os_type,
4735                             primary_node=pnode_name,
4736                             nics=self.nics, disks=disks,
4737                             disk_template=self.op.disk_template,
4738                             admin_up=False,
4739                             network_port=network_port,
4740                             beparams=self.op.beparams,
4741                             hvparams=self.op.hvparams,
4742                             hypervisor=self.op.hypervisor,
4743                             )
4744
4745     feedback_fn("* creating instance disks...")
4746     try:
4747       _CreateDisks(self, iobj)
4748     except errors.OpExecError:
4749       self.LogWarning("Device creation failed, reverting...")
4750       try:
4751         _RemoveDisks(self, iobj)
4752       finally:
4753         self.cfg.ReleaseDRBDMinors(instance)
4754         raise
4755
4756     feedback_fn("adding instance %s to cluster config" % instance)
4757
4758     self.cfg.AddInstance(iobj)
4759     # Declare that we don't want to remove the instance lock anymore, as we've
4760     # added the instance to the config
4761     del self.remove_locks[locking.LEVEL_INSTANCE]
4762     # Unlock all the nodes
4763     if self.op.mode == constants.INSTANCE_IMPORT:
4764       nodes_keep = [self.op.src_node]
4765       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4766                        if node != self.op.src_node]
4767       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4768       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4769     else:
4770       self.context.glm.release(locking.LEVEL_NODE)
4771       del self.acquired_locks[locking.LEVEL_NODE]
4772
4773     if self.op.wait_for_sync:
4774       disk_abort = not _WaitForSync(self, iobj)
4775     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4776       # make sure the disks are not degraded (still sync-ing is ok)
4777       time.sleep(15)
4778       feedback_fn("* checking mirrors status")
4779       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4780     else:
4781       disk_abort = False
4782
4783     if disk_abort:
4784       _RemoveDisks(self, iobj)
4785       self.cfg.RemoveInstance(iobj.name)
4786       # Make sure the instance lock gets removed
4787       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4788       raise errors.OpExecError("There are some degraded disks for"
4789                                " this instance")
4790
4791     feedback_fn("creating os for instance %s on node %s" %
4792                 (instance, pnode_name))
4793
4794     if iobj.disk_template != constants.DT_DISKLESS:
4795       if self.op.mode == constants.INSTANCE_CREATE:
4796         feedback_fn("* running the instance OS create scripts...")
4797         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4798         msg = result.RemoteFailMsg()
4799         if msg:
4800           raise errors.OpExecError("Could not add os for instance %s"
4801                                    " on node %s: %s" %
4802                                    (instance, pnode_name, msg))
4803
4804       elif self.op.mode == constants.INSTANCE_IMPORT:
4805         feedback_fn("* running the instance OS import scripts...")
4806         src_node = self.op.src_node
4807         src_images = self.src_images
4808         cluster_name = self.cfg.GetClusterName()
4809         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4810                                                          src_node, src_images,
4811                                                          cluster_name)
4812         import_result.Raise()
4813         for idx, result in enumerate(import_result.data):
4814           if not result:
4815             self.LogWarning("Could not import the image %s for instance"
4816                             " %s, disk %d, on node %s" %
4817                             (src_images[idx], instance, idx, pnode_name))
4818       else:
4819         # also checked in the prereq part
4820         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4821                                      % self.op.mode)
4822
4823     if self.op.start:
4824       iobj.admin_up = True
4825       self.cfg.Update(iobj)
4826       logging.info("Starting instance %s on node %s", instance, pnode_name)
4827       feedback_fn("* starting instance...")
4828       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4829       msg = result.RemoteFailMsg()
4830       if msg:
4831         raise errors.OpExecError("Could not start instance: %s" % msg)
4832
4833
4834 class LUConnectConsole(NoHooksLU):
4835   """Connect to an instance's console.
4836
4837   This is somewhat special in that it returns the command line that
4838   you need to run on the master node in order to connect to the
4839   console.
4840
4841   """
4842   _OP_REQP = ["instance_name"]
4843   REQ_BGL = False
4844
4845   def ExpandNames(self):
4846     self._ExpandAndLockInstance()
4847
4848   def CheckPrereq(self):
4849     """Check prerequisites.
4850
4851     This checks that the instance is in the cluster.
4852
4853     """
4854     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4855     assert self.instance is not None, \
4856       "Cannot retrieve locked instance %s" % self.op.instance_name
4857     _CheckNodeOnline(self, self.instance.primary_node)
4858
4859   def Exec(self, feedback_fn):
4860     """Connect to the console of an instance
4861
4862     """
4863     instance = self.instance
4864     node = instance.primary_node
4865
4866     node_insts = self.rpc.call_instance_list([node],
4867                                              [instance.hypervisor])[node]
4868     node_insts.Raise()
4869
4870     if instance.name not in node_insts.data:
4871       raise errors.OpExecError("Instance %s is not running." % instance.name)
4872
4873     logging.debug("Connecting to console of %s on %s", instance.name, node)
4874
4875     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4876     cluster = self.cfg.GetClusterInfo()
4877     # beparams and hvparams are passed separately, to avoid editing the
4878     # instance and then saving the defaults in the instance itself.
4879     hvparams = cluster.FillHV(instance)
4880     beparams = cluster.FillBE(instance)
4881     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4882
4883     # build ssh cmdline
4884     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4885
4886
4887 class LUReplaceDisks(LogicalUnit):
4888   """Replace the disks of an instance.
4889
4890   """
4891   HPATH = "mirrors-replace"
4892   HTYPE = constants.HTYPE_INSTANCE
4893   _OP_REQP = ["instance_name", "mode", "disks"]
4894   REQ_BGL = False
4895
4896   def CheckArguments(self):
4897     if not hasattr(self.op, "remote_node"):
4898       self.op.remote_node = None
4899     if not hasattr(self.op, "iallocator"):
4900       self.op.iallocator = None
4901
4902     # check for valid parameter combination
4903     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4904     if self.op.mode == constants.REPLACE_DISK_CHG:
4905       if cnt == 2:
4906         raise errors.OpPrereqError("When changing the secondary either an"
4907                                    " iallocator script must be used or the"
4908                                    " new node given")
4909       elif cnt == 0:
4910         raise errors.OpPrereqError("Give either the iallocator or the new"
4911                                    " secondary, not both")
4912     else: # not replacing the secondary
4913       if cnt != 2:
4914         raise errors.OpPrereqError("The iallocator and new node options can"
4915                                    " be used only when changing the"
4916                                    " secondary node")
4917
4918   def ExpandNames(self):
4919     self._ExpandAndLockInstance()
4920
4921     if self.op.iallocator is not None:
4922       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4923     elif self.op.remote_node is not None:
4924       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4925       if remote_node is None:
4926         raise errors.OpPrereqError("Node '%s' not known" %
4927                                    self.op.remote_node)
4928       self.op.remote_node = remote_node
4929       # Warning: do not remove the locking of the new secondary here
4930       # unless DRBD8.AddChildren is changed to work in parallel;
4931       # currently it doesn't since parallel invocations of
4932       # FindUnusedMinor will conflict
4933       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4934       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4935     else:
4936       self.needed_locks[locking.LEVEL_NODE] = []
4937       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4938
4939   def DeclareLocks(self, level):
4940     # If we're not already locking all nodes in the set we have to declare the
4941     # instance's primary/secondary nodes.
4942     if (level == locking.LEVEL_NODE and
4943         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4944       self._LockInstancesNodes()
4945
4946   def _RunAllocator(self):
4947     """Compute a new secondary node using an IAllocator.
4948
4949     """
4950     ial = IAllocator(self,
4951                      mode=constants.IALLOCATOR_MODE_RELOC,
4952                      name=self.op.instance_name,
4953                      relocate_from=[self.sec_node])
4954
4955     ial.Run(self.op.iallocator)
4956
4957     if not ial.success:
4958       raise errors.OpPrereqError("Can't compute nodes using"
4959                                  " iallocator '%s': %s" % (self.op.iallocator,
4960                                                            ial.info))
4961     if len(ial.nodes) != ial.required_nodes:
4962       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4963                                  " of nodes (%s), required %s" %
4964                                  (len(ial.nodes), ial.required_nodes))
4965     self.op.remote_node = ial.nodes[0]
4966     self.LogInfo("Selected new secondary for the instance: %s",
4967                  self.op.remote_node)
4968
4969   def BuildHooksEnv(self):
4970     """Build hooks env.
4971
4972     This runs on the master, the primary and all the secondaries.
4973
4974     """
4975     env = {
4976       "MODE": self.op.mode,
4977       "NEW_SECONDARY": self.op.remote_node,
4978       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4979       }
4980     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4981     nl = [
4982       self.cfg.GetMasterNode(),
4983       self.instance.primary_node,
4984       ]
4985     if self.op.remote_node is not None:
4986       nl.append(self.op.remote_node)
4987     return env, nl, nl
4988
4989   def CheckPrereq(self):
4990     """Check prerequisites.
4991
4992     This checks that the instance is in the cluster.
4993
4994     """
4995     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4996     assert instance is not None, \
4997       "Cannot retrieve locked instance %s" % self.op.instance_name
4998     self.instance = instance
4999
5000     if instance.disk_template != constants.DT_DRBD8:
5001       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5002                                  " instances")
5003
5004     if len(instance.secondary_nodes) != 1:
5005       raise errors.OpPrereqError("The instance has a strange layout,"
5006                                  " expected one secondary but found %d" %
5007                                  len(instance.secondary_nodes))
5008
5009     self.sec_node = instance.secondary_nodes[0]
5010
5011     if self.op.iallocator is not None:
5012       self._RunAllocator()
5013
5014     remote_node = self.op.remote_node
5015     if remote_node is not None:
5016       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5017       assert self.remote_node_info is not None, \
5018         "Cannot retrieve locked node %s" % remote_node
5019     else:
5020       self.remote_node_info = None
5021     if remote_node == instance.primary_node:
5022       raise errors.OpPrereqError("The specified node is the primary node of"
5023                                  " the instance.")
5024     elif remote_node == self.sec_node:
5025       raise errors.OpPrereqError("The specified node is already the"
5026                                  " secondary node of the instance.")
5027
5028     if self.op.mode == constants.REPLACE_DISK_PRI:
5029       n1 = self.tgt_node = instance.primary_node
5030       n2 = self.oth_node = self.sec_node
5031     elif self.op.mode == constants.REPLACE_DISK_SEC:
5032       n1 = self.tgt_node = self.sec_node
5033       n2 = self.oth_node = instance.primary_node
5034     elif self.op.mode == constants.REPLACE_DISK_CHG:
5035       n1 = self.new_node = remote_node
5036       n2 = self.oth_node = instance.primary_node
5037       self.tgt_node = self.sec_node
5038       _CheckNodeNotDrained(self, remote_node)
5039     else:
5040       raise errors.ProgrammerError("Unhandled disk replace mode")
5041
5042     _CheckNodeOnline(self, n1)
5043     _CheckNodeOnline(self, n2)
5044
5045     if not self.op.disks:
5046       self.op.disks = range(len(instance.disks))
5047
5048     for disk_idx in self.op.disks:
5049       instance.FindDisk(disk_idx)
5050
5051   def _ExecD8DiskOnly(self, feedback_fn):
5052     """Replace a disk on the primary or secondary for dbrd8.
5053
5054     The algorithm for replace is quite complicated:
5055
5056       1. for each disk to be replaced:
5057
5058         1. create new LVs on the target node with unique names
5059         1. detach old LVs from the drbd device
5060         1. rename old LVs to name_replaced.<time_t>
5061         1. rename new LVs to old LVs
5062         1. attach the new LVs (with the old names now) to the drbd device
5063
5064       1. wait for sync across all devices
5065
5066       1. for each modified disk:
5067
5068         1. remove old LVs (which have the name name_replaces.<time_t>)
5069
5070     Failures are not very well handled.
5071
5072     """
5073     steps_total = 6
5074     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5075     instance = self.instance
5076     iv_names = {}
5077     vgname = self.cfg.GetVGName()
5078     # start of work
5079     cfg = self.cfg
5080     tgt_node = self.tgt_node
5081     oth_node = self.oth_node
5082
5083     # Step: check device activation
5084     self.proc.LogStep(1, steps_total, "check device existence")
5085     info("checking volume groups")
5086     my_vg = cfg.GetVGName()
5087     results = self.rpc.call_vg_list([oth_node, tgt_node])
5088     if not results:
5089       raise errors.OpExecError("Can't list volume groups on the nodes")
5090     for node in oth_node, tgt_node:
5091       res = results[node]
5092       if res.failed or not res.data or my_vg not in res.data:
5093         raise errors.OpExecError("Volume group '%s' not found on %s" %
5094                                  (my_vg, node))
5095     for idx, dev in enumerate(instance.disks):
5096       if idx not in self.op.disks:
5097         continue
5098       for node in tgt_node, oth_node:
5099         info("checking disk/%d on %s" % (idx, node))
5100         cfg.SetDiskID(dev, node)
5101         result = self.rpc.call_blockdev_find(node, dev)
5102         msg = result.RemoteFailMsg()
5103         if not msg and not result.payload:
5104           msg = "disk not found"
5105         if msg:
5106           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5107                                    (idx, node, msg))
5108
5109     # Step: check other node consistency
5110     self.proc.LogStep(2, steps_total, "check peer consistency")
5111     for idx, dev in enumerate(instance.disks):
5112       if idx not in self.op.disks:
5113         continue
5114       info("checking disk/%d consistency on %s" % (idx, oth_node))
5115       if not _CheckDiskConsistency(self, dev, oth_node,
5116                                    oth_node==instance.primary_node):
5117         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5118                                  " to replace disks on this node (%s)" %
5119                                  (oth_node, tgt_node))
5120
5121     # Step: create new storage
5122     self.proc.LogStep(3, steps_total, "allocate new storage")
5123     for idx, dev in enumerate(instance.disks):
5124       if idx not in self.op.disks:
5125         continue
5126       size = dev.size
5127       cfg.SetDiskID(dev, tgt_node)
5128       lv_names = [".disk%d_%s" % (idx, suf)
5129                   for suf in ["data", "meta"]]
5130       names = _GenerateUniqueNames(self, lv_names)
5131       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5132                              logical_id=(vgname, names[0]))
5133       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5134                              logical_id=(vgname, names[1]))
5135       new_lvs = [lv_data, lv_meta]
5136       old_lvs = dev.children
5137       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5138       info("creating new local storage on %s for %s" %
5139            (tgt_node, dev.iv_name))
5140       # we pass force_create=True to force the LVM creation
5141       for new_lv in new_lvs:
5142         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5143                         _GetInstanceInfoText(instance), False)
5144
5145     # Step: for each lv, detach+rename*2+attach
5146     self.proc.LogStep(4, steps_total, "change drbd configuration")
5147     for dev, old_lvs, new_lvs in iv_names.itervalues():
5148       info("detaching %s drbd from local storage" % dev.iv_name)
5149       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5150       result.Raise()
5151       if not result.data:
5152         raise errors.OpExecError("Can't detach drbd from local storage on node"
5153                                  " %s for device %s" % (tgt_node, dev.iv_name))
5154       #dev.children = []
5155       #cfg.Update(instance)
5156
5157       # ok, we created the new LVs, so now we know we have the needed
5158       # storage; as such, we proceed on the target node to rename
5159       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5160       # using the assumption that logical_id == physical_id (which in
5161       # turn is the unique_id on that node)
5162
5163       # FIXME(iustin): use a better name for the replaced LVs
5164       temp_suffix = int(time.time())
5165       ren_fn = lambda d, suff: (d.physical_id[0],
5166                                 d.physical_id[1] + "_replaced-%s" % suff)
5167       # build the rename list based on what LVs exist on the node
5168       rlist = []
5169       for to_ren in old_lvs:
5170         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5171         if not result.RemoteFailMsg() and result.payload:
5172           # device exists
5173           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5174
5175       info("renaming the old LVs on the target node")
5176       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5177       result.Raise()
5178       if not result.data:
5179         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5180       # now we rename the new LVs to the old LVs
5181       info("renaming the new LVs on the target node")
5182       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5183       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5184       result.Raise()
5185       if not result.data:
5186         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5187
5188       for old, new in zip(old_lvs, new_lvs):
5189         new.logical_id = old.logical_id
5190         cfg.SetDiskID(new, tgt_node)
5191
5192       for disk in old_lvs:
5193         disk.logical_id = ren_fn(disk, temp_suffix)
5194         cfg.SetDiskID(disk, tgt_node)
5195
5196       # now that the new lvs have the old name, we can add them to the device
5197       info("adding new mirror component on %s" % tgt_node)
5198       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5199       if result.failed or not result.data:
5200         for new_lv in new_lvs:
5201           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5202           if msg:
5203             warning("Can't rollback device %s: %s", dev, msg,
5204                     hint="cleanup manually the unused logical volumes")
5205         raise errors.OpExecError("Can't add local storage to drbd")
5206
5207       dev.children = new_lvs
5208       cfg.Update(instance)
5209
5210     # Step: wait for sync
5211
5212     # this can fail as the old devices are degraded and _WaitForSync
5213     # does a combined result over all disks, so we don't check its
5214     # return value
5215     self.proc.LogStep(5, steps_total, "sync devices")
5216     _WaitForSync(self, instance, unlock=True)
5217
5218     # so check manually all the devices
5219     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5220       cfg.SetDiskID(dev, instance.primary_node)
5221       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5222       msg = result.RemoteFailMsg()
5223       if not msg and not result.payload:
5224         msg = "disk not found"
5225       if msg:
5226         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5227                                  (name, msg))
5228       if result.payload[5]:
5229         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5230
5231     # Step: remove old storage
5232     self.proc.LogStep(6, steps_total, "removing old storage")
5233     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5234       info("remove logical volumes for %s" % name)
5235       for lv in old_lvs:
5236         cfg.SetDiskID(lv, tgt_node)
5237         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5238         if msg:
5239           warning("Can't remove old LV: %s" % msg,
5240                   hint="manually remove unused LVs")
5241           continue
5242
5243   def _ExecD8Secondary(self, feedback_fn):
5244     """Replace the secondary node for drbd8.
5245
5246     The algorithm for replace is quite complicated:
5247       - for all disks of the instance:
5248         - create new LVs on the new node with same names
5249         - shutdown the drbd device on the old secondary
5250         - disconnect the drbd network on the primary
5251         - create the drbd device on the new secondary
5252         - network attach the drbd on the primary, using an artifice:
5253           the drbd code for Attach() will connect to the network if it
5254           finds a device which is connected to the good local disks but
5255           not network enabled
5256       - wait for sync across all devices
5257       - remove all disks from the old secondary
5258
5259     Failures are not very well handled.
5260
5261     """
5262     steps_total = 6
5263     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5264     instance = self.instance
5265     iv_names = {}
5266     # start of work
5267     cfg = self.cfg
5268     old_node = self.tgt_node
5269     new_node = self.new_node
5270     pri_node = instance.primary_node
5271     nodes_ip = {
5272       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5273       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5274       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5275       }
5276
5277     # Step: check device activation
5278     self.proc.LogStep(1, steps_total, "check device existence")
5279     info("checking volume groups")
5280     my_vg = cfg.GetVGName()
5281     results = self.rpc.call_vg_list([pri_node, new_node])
5282     for node in pri_node, new_node:
5283       res = results[node]
5284       if res.failed or not res.data or my_vg not in res.data:
5285         raise errors.OpExecError("Volume group '%s' not found on %s" %
5286                                  (my_vg, node))
5287     for idx, dev in enumerate(instance.disks):
5288       if idx not in self.op.disks:
5289         continue
5290       info("checking disk/%d on %s" % (idx, pri_node))
5291       cfg.SetDiskID(dev, pri_node)
5292       result = self.rpc.call_blockdev_find(pri_node, dev)
5293       msg = result.RemoteFailMsg()
5294       if not msg and not result.payload:
5295         msg = "disk not found"
5296       if msg:
5297         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5298                                  (idx, pri_node, msg))
5299
5300     # Step: check other node consistency
5301     self.proc.LogStep(2, steps_total, "check peer consistency")
5302     for idx, dev in enumerate(instance.disks):
5303       if idx not in self.op.disks:
5304         continue
5305       info("checking disk/%d consistency on %s" % (idx, pri_node))
5306       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5307         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5308                                  " unsafe to replace the secondary" %
5309                                  pri_node)
5310
5311     # Step: create new storage
5312     self.proc.LogStep(3, steps_total, "allocate new storage")
5313     for idx, dev in enumerate(instance.disks):
5314       info("adding new local storage on %s for disk/%d" %
5315            (new_node, idx))
5316       # we pass force_create=True to force LVM creation
5317       for new_lv in dev.children:
5318         _CreateBlockDev(self, new_node, instance, new_lv, True,
5319                         _GetInstanceInfoText(instance), False)
5320
5321     # Step 4: dbrd minors and drbd setups changes
5322     # after this, we must manually remove the drbd minors on both the
5323     # error and the success paths
5324     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5325                                    instance.name)
5326     logging.debug("Allocated minors %s" % (minors,))
5327     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5328     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5329       size = dev.size
5330       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5331       # create new devices on new_node; note that we create two IDs:
5332       # one without port, so the drbd will be activated without
5333       # networking information on the new node at this stage, and one
5334       # with network, for the latter activation in step 4
5335       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5336       if pri_node == o_node1:
5337         p_minor = o_minor1
5338       else:
5339         p_minor = o_minor2
5340
5341       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5342       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5343
5344       iv_names[idx] = (dev, dev.children, new_net_id)
5345       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5346                     new_net_id)
5347       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5348                               logical_id=new_alone_id,
5349                               children=dev.children)
5350       try:
5351         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5352                               _GetInstanceInfoText(instance), False)
5353       except errors.GenericError:
5354         self.cfg.ReleaseDRBDMinors(instance.name)
5355         raise
5356
5357     for idx, dev in enumerate(instance.disks):
5358       # we have new devices, shutdown the drbd on the old secondary
5359       info("shutting down drbd for disk/%d on old node" % idx)
5360       cfg.SetDiskID(dev, old_node)
5361       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5362       if msg:
5363         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5364                 (idx, msg),
5365                 hint="Please cleanup this device manually as soon as possible")
5366
5367     info("detaching primary drbds from the network (=> standalone)")
5368     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5369                                                instance.disks)[pri_node]
5370
5371     msg = result.RemoteFailMsg()
5372     if msg:
5373       # detaches didn't succeed (unlikely)
5374       self.cfg.ReleaseDRBDMinors(instance.name)
5375       raise errors.OpExecError("Can't detach the disks from the network on"
5376                                " old node: %s" % (msg,))
5377
5378     # if we managed to detach at least one, we update all the disks of
5379     # the instance to point to the new secondary
5380     info("updating instance configuration")
5381     for dev, _, new_logical_id in iv_names.itervalues():
5382       dev.logical_id = new_logical_id
5383       cfg.SetDiskID(dev, pri_node)
5384     cfg.Update(instance)
5385
5386     # and now perform the drbd attach
5387     info("attaching primary drbds to new secondary (standalone => connected)")
5388     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5389                                            instance.disks, instance.name,
5390                                            False)
5391     for to_node, to_result in result.items():
5392       msg = to_result.RemoteFailMsg()
5393       if msg:
5394         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5395                 hint="please do a gnt-instance info to see the"
5396                 " status of disks")
5397
5398     # this can fail as the old devices are degraded and _WaitForSync
5399     # does a combined result over all disks, so we don't check its
5400     # return value
5401     self.proc.LogStep(5, steps_total, "sync devices")
5402     _WaitForSync(self, instance, unlock=True)
5403
5404     # so check manually all the devices
5405     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5406       cfg.SetDiskID(dev, pri_node)
5407       result = self.rpc.call_blockdev_find(pri_node, dev)
5408       msg = result.RemoteFailMsg()
5409       if not msg and not result.payload:
5410         msg = "disk not found"
5411       if msg:
5412         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5413                                  (idx, msg))
5414       if result.payload[5]:
5415         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5416
5417     self.proc.LogStep(6, steps_total, "removing old storage")
5418     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5419       info("remove logical volumes for disk/%d" % idx)
5420       for lv in old_lvs:
5421         cfg.SetDiskID(lv, old_node)
5422         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5423         if msg:
5424           warning("Can't remove LV on old secondary: %s", msg,
5425                   hint="Cleanup stale volumes by hand")
5426
5427   def Exec(self, feedback_fn):
5428     """Execute disk replacement.
5429
5430     This dispatches the disk replacement to the appropriate handler.
5431
5432     """
5433     instance = self.instance
5434
5435     # Activate the instance disks if we're replacing them on a down instance
5436     if not instance.admin_up:
5437       _StartInstanceDisks(self, instance, True)
5438
5439     if self.op.mode == constants.REPLACE_DISK_CHG:
5440       fn = self._ExecD8Secondary
5441     else:
5442       fn = self._ExecD8DiskOnly
5443
5444     ret = fn(feedback_fn)
5445
5446     # Deactivate the instance disks if we're replacing them on a down instance
5447     if not instance.admin_up:
5448       _SafeShutdownInstanceDisks(self, instance)
5449
5450     return ret
5451
5452
5453 class LUGrowDisk(LogicalUnit):
5454   """Grow a disk of an instance.
5455
5456   """
5457   HPATH = "disk-grow"
5458   HTYPE = constants.HTYPE_INSTANCE
5459   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5460   REQ_BGL = False
5461
5462   def ExpandNames(self):
5463     self._ExpandAndLockInstance()
5464     self.needed_locks[locking.LEVEL_NODE] = []
5465     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5466
5467   def DeclareLocks(self, level):
5468     if level == locking.LEVEL_NODE:
5469       self._LockInstancesNodes()
5470
5471   def BuildHooksEnv(self):
5472     """Build hooks env.
5473
5474     This runs on the master, the primary and all the secondaries.
5475
5476     """
5477     env = {
5478       "DISK": self.op.disk,
5479       "AMOUNT": self.op.amount,
5480       }
5481     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5482     nl = [
5483       self.cfg.GetMasterNode(),
5484       self.instance.primary_node,
5485       ]
5486     return env, nl, nl
5487
5488   def CheckPrereq(self):
5489     """Check prerequisites.
5490
5491     This checks that the instance is in the cluster.
5492
5493     """
5494     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5495     assert instance is not None, \
5496       "Cannot retrieve locked instance %s" % self.op.instance_name
5497     nodenames = list(instance.all_nodes)
5498     for node in nodenames:
5499       _CheckNodeOnline(self, node)
5500
5501
5502     self.instance = instance
5503
5504     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5505       raise errors.OpPrereqError("Instance's disk layout does not support"
5506                                  " growing.")
5507
5508     self.disk = instance.FindDisk(self.op.disk)
5509
5510     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5511                                        instance.hypervisor)
5512     for node in nodenames:
5513       info = nodeinfo[node]
5514       if info.failed or not info.data:
5515         raise errors.OpPrereqError("Cannot get current information"
5516                                    " from node '%s'" % node)
5517       vg_free = info.data.get('vg_free', None)
5518       if not isinstance(vg_free, int):
5519         raise errors.OpPrereqError("Can't compute free disk space on"
5520                                    " node %s" % node)
5521       if self.op.amount > vg_free:
5522         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5523                                    " %d MiB available, %d MiB required" %
5524                                    (node, vg_free, self.op.amount))
5525
5526   def Exec(self, feedback_fn):
5527     """Execute disk grow.
5528
5529     """
5530     instance = self.instance
5531     disk = self.disk
5532     for node in instance.all_nodes:
5533       self.cfg.SetDiskID(disk, node)
5534       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5535       msg = result.RemoteFailMsg()
5536       if msg:
5537         raise errors.OpExecError("Grow request failed to node %s: %s" %
5538                                  (node, msg))
5539     disk.RecordGrow(self.op.amount)
5540     self.cfg.Update(instance)
5541     if self.op.wait_for_sync:
5542       disk_abort = not _WaitForSync(self, instance)
5543       if disk_abort:
5544         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5545                              " status.\nPlease check the instance.")
5546
5547
5548 class LUQueryInstanceData(NoHooksLU):
5549   """Query runtime instance data.
5550
5551   """
5552   _OP_REQP = ["instances", "static"]
5553   REQ_BGL = False
5554
5555   def ExpandNames(self):
5556     self.needed_locks = {}
5557     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5558
5559     if not isinstance(self.op.instances, list):
5560       raise errors.OpPrereqError("Invalid argument type 'instances'")
5561
5562     if self.op.instances:
5563       self.wanted_names = []
5564       for name in self.op.instances:
5565         full_name = self.cfg.ExpandInstanceName(name)
5566         if full_name is None:
5567           raise errors.OpPrereqError("Instance '%s' not known" % name)
5568         self.wanted_names.append(full_name)
5569       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5570     else:
5571       self.wanted_names = None
5572       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5573
5574     self.needed_locks[locking.LEVEL_NODE] = []
5575     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5576
5577   def DeclareLocks(self, level):
5578     if level == locking.LEVEL_NODE:
5579       self._LockInstancesNodes()
5580
5581   def CheckPrereq(self):
5582     """Check prerequisites.
5583
5584     This only checks the optional instance list against the existing names.
5585
5586     """
5587     if self.wanted_names is None:
5588       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5589
5590     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5591                              in self.wanted_names]
5592     return
5593
5594   def _ComputeDiskStatus(self, instance, snode, dev):
5595     """Compute block device status.
5596
5597     """
5598     static = self.op.static
5599     if not static:
5600       self.cfg.SetDiskID(dev, instance.primary_node)
5601       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5602       if dev_pstatus.offline:
5603         dev_pstatus = None
5604       else:
5605         msg = dev_pstatus.RemoteFailMsg()
5606         if msg:
5607           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5608                                    (instance.name, msg))
5609         dev_pstatus = dev_pstatus.payload
5610     else:
5611       dev_pstatus = None
5612
5613     if dev.dev_type in constants.LDS_DRBD:
5614       # we change the snode then (otherwise we use the one passed in)
5615       if dev.logical_id[0] == instance.primary_node:
5616         snode = dev.logical_id[1]
5617       else:
5618         snode = dev.logical_id[0]
5619
5620     if snode and not static:
5621       self.cfg.SetDiskID(dev, snode)
5622       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5623       if dev_sstatus.offline:
5624         dev_sstatus = None
5625       else:
5626         msg = dev_sstatus.RemoteFailMsg()
5627         if msg:
5628           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5629                                    (instance.name, msg))
5630         dev_sstatus = dev_sstatus.payload
5631     else:
5632       dev_sstatus = None
5633
5634     if dev.children:
5635       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5636                       for child in dev.children]
5637     else:
5638       dev_children = []
5639
5640     data = {
5641       "iv_name": dev.iv_name,
5642       "dev_type": dev.dev_type,
5643       "logical_id": dev.logical_id,
5644       "physical_id": dev.physical_id,
5645       "pstatus": dev_pstatus,
5646       "sstatus": dev_sstatus,
5647       "children": dev_children,
5648       "mode": dev.mode,
5649       }
5650
5651     return data
5652
5653   def Exec(self, feedback_fn):
5654     """Gather and return data"""
5655     result = {}
5656
5657     cluster = self.cfg.GetClusterInfo()
5658
5659     for instance in self.wanted_instances:
5660       if not self.op.static:
5661         remote_info = self.rpc.call_instance_info(instance.primary_node,
5662                                                   instance.name,
5663                                                   instance.hypervisor)
5664         remote_info.Raise()
5665         remote_info = remote_info.data
5666         if remote_info and "state" in remote_info:
5667           remote_state = "up"
5668         else:
5669           remote_state = "down"
5670       else:
5671         remote_state = None
5672       if instance.admin_up:
5673         config_state = "up"
5674       else:
5675         config_state = "down"
5676
5677       disks = [self._ComputeDiskStatus(instance, None, device)
5678                for device in instance.disks]
5679
5680       idict = {
5681         "name": instance.name,
5682         "config_state": config_state,
5683         "run_state": remote_state,
5684         "pnode": instance.primary_node,
5685         "snodes": instance.secondary_nodes,
5686         "os": instance.os,
5687         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5688         "disks": disks,
5689         "hypervisor": instance.hypervisor,
5690         "network_port": instance.network_port,
5691         "hv_instance": instance.hvparams,
5692         "hv_actual": cluster.FillHV(instance),
5693         "be_instance": instance.beparams,
5694         "be_actual": cluster.FillBE(instance),
5695         }
5696
5697       result[instance.name] = idict
5698
5699     return result
5700
5701
5702 class LUSetInstanceParams(LogicalUnit):
5703   """Modifies an instances's parameters.
5704
5705   """
5706   HPATH = "instance-modify"
5707   HTYPE = constants.HTYPE_INSTANCE
5708   _OP_REQP = ["instance_name"]
5709   REQ_BGL = False
5710
5711   def CheckArguments(self):
5712     if not hasattr(self.op, 'nics'):
5713       self.op.nics = []
5714     if not hasattr(self.op, 'disks'):
5715       self.op.disks = []
5716     if not hasattr(self.op, 'beparams'):
5717       self.op.beparams = {}
5718     if not hasattr(self.op, 'hvparams'):
5719       self.op.hvparams = {}
5720     self.op.force = getattr(self.op, "force", False)
5721     if not (self.op.nics or self.op.disks or
5722             self.op.hvparams or self.op.beparams):
5723       raise errors.OpPrereqError("No changes submitted")
5724
5725     # Disk validation
5726     disk_addremove = 0
5727     for disk_op, disk_dict in self.op.disks:
5728       if disk_op == constants.DDM_REMOVE:
5729         disk_addremove += 1
5730         continue
5731       elif disk_op == constants.DDM_ADD:
5732         disk_addremove += 1
5733       else:
5734         if not isinstance(disk_op, int):
5735           raise errors.OpPrereqError("Invalid disk index")
5736       if disk_op == constants.DDM_ADD:
5737         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5738         if mode not in constants.DISK_ACCESS_SET:
5739           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5740         size = disk_dict.get('size', None)
5741         if size is None:
5742           raise errors.OpPrereqError("Required disk parameter size missing")
5743         try:
5744           size = int(size)
5745         except ValueError, err:
5746           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5747                                      str(err))
5748         disk_dict['size'] = size
5749       else:
5750         # modification of disk
5751         if 'size' in disk_dict:
5752           raise errors.OpPrereqError("Disk size change not possible, use"
5753                                      " grow-disk")
5754
5755     if disk_addremove > 1:
5756       raise errors.OpPrereqError("Only one disk add or remove operation"
5757                                  " supported at a time")
5758
5759     # NIC validation
5760     nic_addremove = 0
5761     for nic_op, nic_dict in self.op.nics:
5762       if nic_op == constants.DDM_REMOVE:
5763         nic_addremove += 1
5764         continue
5765       elif nic_op == constants.DDM_ADD:
5766         nic_addremove += 1
5767       else:
5768         if not isinstance(nic_op, int):
5769           raise errors.OpPrereqError("Invalid nic index")
5770
5771       # nic_dict should be a dict
5772       nic_ip = nic_dict.get('ip', None)
5773       if nic_ip is not None:
5774         if nic_ip.lower() == constants.VALUE_NONE:
5775           nic_dict['ip'] = None
5776         else:
5777           if not utils.IsValidIP(nic_ip):
5778             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5779
5780       if nic_op == constants.DDM_ADD:
5781         nic_bridge = nic_dict.get('bridge', None)
5782         if nic_bridge is None:
5783           nic_dict['bridge'] = self.cfg.GetDefBridge()
5784         nic_mac = nic_dict.get('mac', None)
5785         if nic_mac is None:
5786           nic_dict['mac'] = constants.VALUE_AUTO
5787
5788       if 'mac' in nic_dict:
5789         nic_mac = nic_dict['mac']
5790         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5791           if not utils.IsValidMac(nic_mac):
5792             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5793         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5794           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5795                                      " modifying an existing nic")
5796
5797     if nic_addremove > 1:
5798       raise errors.OpPrereqError("Only one NIC add or remove operation"
5799                                  " supported at a time")
5800
5801   def ExpandNames(self):
5802     self._ExpandAndLockInstance()
5803     self.needed_locks[locking.LEVEL_NODE] = []
5804     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5805
5806   def DeclareLocks(self, level):
5807     if level == locking.LEVEL_NODE:
5808       self._LockInstancesNodes()
5809
5810   def BuildHooksEnv(self):
5811     """Build hooks env.
5812
5813     This runs on the master, primary and secondaries.
5814
5815     """
5816     args = dict()
5817     if constants.BE_MEMORY in self.be_new:
5818       args['memory'] = self.be_new[constants.BE_MEMORY]
5819     if constants.BE_VCPUS in self.be_new:
5820       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5821     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5822     # information at all.
5823     if self.op.nics:
5824       args['nics'] = []
5825       nic_override = dict(self.op.nics)
5826       for idx, nic in enumerate(self.instance.nics):
5827         if idx in nic_override:
5828           this_nic_override = nic_override[idx]
5829         else:
5830           this_nic_override = {}
5831         if 'ip' in this_nic_override:
5832           ip = this_nic_override['ip']
5833         else:
5834           ip = nic.ip
5835         if 'bridge' in this_nic_override:
5836           bridge = this_nic_override['bridge']
5837         else:
5838           bridge = nic.bridge
5839         if 'mac' in this_nic_override:
5840           mac = this_nic_override['mac']
5841         else:
5842           mac = nic.mac
5843         args['nics'].append((ip, bridge, mac))
5844       if constants.DDM_ADD in nic_override:
5845         ip = nic_override[constants.DDM_ADD].get('ip', None)
5846         bridge = nic_override[constants.DDM_ADD]['bridge']
5847         mac = nic_override[constants.DDM_ADD]['mac']
5848         args['nics'].append((ip, bridge, mac))
5849       elif constants.DDM_REMOVE in nic_override:
5850         del args['nics'][-1]
5851
5852     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5853     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5854     return env, nl, nl
5855
5856   def CheckPrereq(self):
5857     """Check prerequisites.
5858
5859     This only checks the instance list against the existing names.
5860
5861     """
5862     force = self.force = self.op.force
5863
5864     # checking the new params on the primary/secondary nodes
5865
5866     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5867     assert self.instance is not None, \
5868       "Cannot retrieve locked instance %s" % self.op.instance_name
5869     pnode = instance.primary_node
5870     nodelist = list(instance.all_nodes)
5871
5872     # hvparams processing
5873     if self.op.hvparams:
5874       i_hvdict = copy.deepcopy(instance.hvparams)
5875       for key, val in self.op.hvparams.iteritems():
5876         if val == constants.VALUE_DEFAULT:
5877           try:
5878             del i_hvdict[key]
5879           except KeyError:
5880             pass
5881         else:
5882           i_hvdict[key] = val
5883       cluster = self.cfg.GetClusterInfo()
5884       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5885       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5886                                 i_hvdict)
5887       # local check
5888       hypervisor.GetHypervisor(
5889         instance.hypervisor).CheckParameterSyntax(hv_new)
5890       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5891       self.hv_new = hv_new # the new actual values
5892       self.hv_inst = i_hvdict # the new dict (without defaults)
5893     else:
5894       self.hv_new = self.hv_inst = {}
5895
5896     # beparams processing
5897     if self.op.beparams:
5898       i_bedict = copy.deepcopy(instance.beparams)
5899       for key, val in self.op.beparams.iteritems():
5900         if val == constants.VALUE_DEFAULT:
5901           try:
5902             del i_bedict[key]
5903           except KeyError:
5904             pass
5905         else:
5906           i_bedict[key] = val
5907       cluster = self.cfg.GetClusterInfo()
5908       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5909       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5910                                 i_bedict)
5911       self.be_new = be_new # the new actual values
5912       self.be_inst = i_bedict # the new dict (without defaults)
5913     else:
5914       self.be_new = self.be_inst = {}
5915
5916     self.warn = []
5917
5918     if constants.BE_MEMORY in self.op.beparams and not self.force:
5919       mem_check_list = [pnode]
5920       if be_new[constants.BE_AUTO_BALANCE]:
5921         # either we changed auto_balance to yes or it was from before
5922         mem_check_list.extend(instance.secondary_nodes)
5923       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5924                                                   instance.hypervisor)
5925       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5926                                          instance.hypervisor)
5927       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5928         # Assume the primary node is unreachable and go ahead
5929         self.warn.append("Can't get info from primary node %s" % pnode)
5930       else:
5931         if not instance_info.failed and instance_info.data:
5932           current_mem = int(instance_info.data['memory'])
5933         else:
5934           # Assume instance not running
5935           # (there is a slight race condition here, but it's not very probable,
5936           # and we have no other way to check)
5937           current_mem = 0
5938         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5939                     nodeinfo[pnode].data['memory_free'])
5940         if miss_mem > 0:
5941           raise errors.OpPrereqError("This change will prevent the instance"
5942                                      " from starting, due to %d MB of memory"
5943                                      " missing on its primary node" % miss_mem)
5944
5945       if be_new[constants.BE_AUTO_BALANCE]:
5946         for node, nres in nodeinfo.iteritems():
5947           if node not in instance.secondary_nodes:
5948             continue
5949           if nres.failed or not isinstance(nres.data, dict):
5950             self.warn.append("Can't get info from secondary node %s" % node)
5951           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5952             self.warn.append("Not enough memory to failover instance to"
5953                              " secondary node %s" % node)
5954
5955     # NIC processing
5956     for nic_op, nic_dict in self.op.nics:
5957       if nic_op == constants.DDM_REMOVE:
5958         if not instance.nics:
5959           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5960         continue
5961       if nic_op != constants.DDM_ADD:
5962         # an existing nic
5963         if nic_op < 0 or nic_op >= len(instance.nics):
5964           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5965                                      " are 0 to %d" %
5966                                      (nic_op, len(instance.nics)))
5967       if 'bridge' in nic_dict:
5968         nic_bridge = nic_dict['bridge']
5969         if nic_bridge is None:
5970           raise errors.OpPrereqError('Cannot set the nic bridge to None')
5971         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5972           msg = ("Bridge '%s' doesn't exist on one of"
5973                  " the instance nodes" % nic_bridge)
5974           if self.force:
5975             self.warn.append(msg)
5976           else:
5977             raise errors.OpPrereqError(msg)
5978       if 'mac' in nic_dict:
5979         nic_mac = nic_dict['mac']
5980         if nic_mac is None:
5981           raise errors.OpPrereqError('Cannot set the nic mac to None')
5982         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5983           # otherwise generate the mac
5984           nic_dict['mac'] = self.cfg.GenerateMAC()
5985         else:
5986           # or validate/reserve the current one
5987           if self.cfg.IsMacInUse(nic_mac):
5988             raise errors.OpPrereqError("MAC address %s already in use"
5989                                        " in cluster" % nic_mac)
5990
5991     # DISK processing
5992     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5993       raise errors.OpPrereqError("Disk operations not supported for"
5994                                  " diskless instances")
5995     for disk_op, disk_dict in self.op.disks:
5996       if disk_op == constants.DDM_REMOVE:
5997         if len(instance.disks) == 1:
5998           raise errors.OpPrereqError("Cannot remove the last disk of"
5999                                      " an instance")
6000         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6001         ins_l = ins_l[pnode]
6002         if ins_l.failed or not isinstance(ins_l.data, list):
6003           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6004         if instance.name in ins_l.data:
6005           raise errors.OpPrereqError("Instance is running, can't remove"
6006                                      " disks.")
6007
6008       if (disk_op == constants.DDM_ADD and
6009           len(instance.nics) >= constants.MAX_DISKS):
6010         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6011                                    " add more" % constants.MAX_DISKS)
6012       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6013         # an existing disk
6014         if disk_op < 0 or disk_op >= len(instance.disks):
6015           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6016                                      " are 0 to %d" %
6017                                      (disk_op, len(instance.disks)))
6018
6019     return
6020
6021   def Exec(self, feedback_fn):
6022     """Modifies an instance.
6023
6024     All parameters take effect only at the next restart of the instance.
6025
6026     """
6027     # Process here the warnings from CheckPrereq, as we don't have a
6028     # feedback_fn there.
6029     for warn in self.warn:
6030       feedback_fn("WARNING: %s" % warn)
6031
6032     result = []
6033     instance = self.instance
6034     # disk changes
6035     for disk_op, disk_dict in self.op.disks:
6036       if disk_op == constants.DDM_REMOVE:
6037         # remove the last disk
6038         device = instance.disks.pop()
6039         device_idx = len(instance.disks)
6040         for node, disk in device.ComputeNodeTree(instance.primary_node):
6041           self.cfg.SetDiskID(disk, node)
6042           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6043           if msg:
6044             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6045                             " continuing anyway", device_idx, node, msg)
6046         result.append(("disk/%d" % device_idx, "remove"))
6047       elif disk_op == constants.DDM_ADD:
6048         # add a new disk
6049         if instance.disk_template == constants.DT_FILE:
6050           file_driver, file_path = instance.disks[0].logical_id
6051           file_path = os.path.dirname(file_path)
6052         else:
6053           file_driver = file_path = None
6054         disk_idx_base = len(instance.disks)
6055         new_disk = _GenerateDiskTemplate(self,
6056                                          instance.disk_template,
6057                                          instance.name, instance.primary_node,
6058                                          instance.secondary_nodes,
6059                                          [disk_dict],
6060                                          file_path,
6061                                          file_driver,
6062                                          disk_idx_base)[0]
6063         instance.disks.append(new_disk)
6064         info = _GetInstanceInfoText(instance)
6065
6066         logging.info("Creating volume %s for instance %s",
6067                      new_disk.iv_name, instance.name)
6068         # Note: this needs to be kept in sync with _CreateDisks
6069         #HARDCODE
6070         for node in instance.all_nodes:
6071           f_create = node == instance.primary_node
6072           try:
6073             _CreateBlockDev(self, node, instance, new_disk,
6074                             f_create, info, f_create)
6075           except errors.OpExecError, err:
6076             self.LogWarning("Failed to create volume %s (%s) on"
6077                             " node %s: %s",
6078                             new_disk.iv_name, new_disk, node, err)
6079         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6080                        (new_disk.size, new_disk.mode)))
6081       else:
6082         # change a given disk
6083         instance.disks[disk_op].mode = disk_dict['mode']
6084         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6085     # NIC changes
6086     for nic_op, nic_dict in self.op.nics:
6087       if nic_op == constants.DDM_REMOVE:
6088         # remove the last nic
6089         del instance.nics[-1]
6090         result.append(("nic.%d" % len(instance.nics), "remove"))
6091       elif nic_op == constants.DDM_ADD:
6092         # mac and bridge should be set, by now
6093         mac = nic_dict['mac']
6094         bridge = nic_dict['bridge']
6095         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6096                               bridge=bridge)
6097         instance.nics.append(new_nic)
6098         result.append(("nic.%d" % (len(instance.nics) - 1),
6099                        "add:mac=%s,ip=%s,bridge=%s" %
6100                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6101       else:
6102         # change a given nic
6103         for key in 'mac', 'ip', 'bridge':
6104           if key in nic_dict:
6105             setattr(instance.nics[nic_op], key, nic_dict[key])
6106             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6107
6108     # hvparams changes
6109     if self.op.hvparams:
6110       instance.hvparams = self.hv_inst
6111       for key, val in self.op.hvparams.iteritems():
6112         result.append(("hv/%s" % key, val))
6113
6114     # beparams changes
6115     if self.op.beparams:
6116       instance.beparams = self.be_inst
6117       for key, val in self.op.beparams.iteritems():
6118         result.append(("be/%s" % key, val))
6119
6120     self.cfg.Update(instance)
6121
6122     return result
6123
6124
6125 class LUQueryExports(NoHooksLU):
6126   """Query the exports list
6127
6128   """
6129   _OP_REQP = ['nodes']
6130   REQ_BGL = False
6131
6132   def ExpandNames(self):
6133     self.needed_locks = {}
6134     self.share_locks[locking.LEVEL_NODE] = 1
6135     if not self.op.nodes:
6136       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6137     else:
6138       self.needed_locks[locking.LEVEL_NODE] = \
6139         _GetWantedNodes(self, self.op.nodes)
6140
6141   def CheckPrereq(self):
6142     """Check prerequisites.
6143
6144     """
6145     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6146
6147   def Exec(self, feedback_fn):
6148     """Compute the list of all the exported system images.
6149
6150     @rtype: dict
6151     @return: a dictionary with the structure node->(export-list)
6152         where export-list is a list of the instances exported on
6153         that node.
6154
6155     """
6156     rpcresult = self.rpc.call_export_list(self.nodes)
6157     result = {}
6158     for node in rpcresult:
6159       if rpcresult[node].failed:
6160         result[node] = False
6161       else:
6162         result[node] = rpcresult[node].data
6163
6164     return result
6165
6166
6167 class LUExportInstance(LogicalUnit):
6168   """Export an instance to an image in the cluster.
6169
6170   """
6171   HPATH = "instance-export"
6172   HTYPE = constants.HTYPE_INSTANCE
6173   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6174   REQ_BGL = False
6175
6176   def ExpandNames(self):
6177     self._ExpandAndLockInstance()
6178     # FIXME: lock only instance primary and destination node
6179     #
6180     # Sad but true, for now we have do lock all nodes, as we don't know where
6181     # the previous export might be, and and in this LU we search for it and
6182     # remove it from its current node. In the future we could fix this by:
6183     #  - making a tasklet to search (share-lock all), then create the new one,
6184     #    then one to remove, after
6185     #  - removing the removal operation altoghether
6186     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6187
6188   def DeclareLocks(self, level):
6189     """Last minute lock declaration."""
6190     # All nodes are locked anyway, so nothing to do here.
6191
6192   def BuildHooksEnv(self):
6193     """Build hooks env.
6194
6195     This will run on the master, primary node and target node.
6196
6197     """
6198     env = {
6199       "EXPORT_NODE": self.op.target_node,
6200       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6201       }
6202     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6203     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6204           self.op.target_node]
6205     return env, nl, nl
6206
6207   def CheckPrereq(self):
6208     """Check prerequisites.
6209
6210     This checks that the instance and node names are valid.
6211
6212     """
6213     instance_name = self.op.instance_name
6214     self.instance = self.cfg.GetInstanceInfo(instance_name)
6215     assert self.instance is not None, \
6216           "Cannot retrieve locked instance %s" % self.op.instance_name
6217     _CheckNodeOnline(self, self.instance.primary_node)
6218
6219     self.dst_node = self.cfg.GetNodeInfo(
6220       self.cfg.ExpandNodeName(self.op.target_node))
6221
6222     if self.dst_node is None:
6223       # This is wrong node name, not a non-locked node
6224       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6225     _CheckNodeOnline(self, self.dst_node.name)
6226     _CheckNodeNotDrained(self, self.dst_node.name)
6227
6228     # instance disk type verification
6229     for disk in self.instance.disks:
6230       if disk.dev_type == constants.LD_FILE:
6231         raise errors.OpPrereqError("Export not supported for instances with"
6232                                    " file-based disks")
6233
6234   def Exec(self, feedback_fn):
6235     """Export an instance to an image in the cluster.
6236
6237     """
6238     instance = self.instance
6239     dst_node = self.dst_node
6240     src_node = instance.primary_node
6241     if self.op.shutdown:
6242       # shutdown the instance, but not the disks
6243       result = self.rpc.call_instance_shutdown(src_node, instance)
6244       msg = result.RemoteFailMsg()
6245       if msg:
6246         raise errors.OpExecError("Could not shutdown instance %s on"
6247                                  " node %s: %s" %
6248                                  (instance.name, src_node, msg))
6249
6250     vgname = self.cfg.GetVGName()
6251
6252     snap_disks = []
6253
6254     # set the disks ID correctly since call_instance_start needs the
6255     # correct drbd minor to create the symlinks
6256     for disk in instance.disks:
6257       self.cfg.SetDiskID(disk, src_node)
6258
6259     try:
6260       for disk in instance.disks:
6261         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6262         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6263         if new_dev_name.failed or not new_dev_name.data:
6264           self.LogWarning("Could not snapshot block device %s on node %s",
6265                           disk.logical_id[1], src_node)
6266           snap_disks.append(False)
6267         else:
6268           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6269                                  logical_id=(vgname, new_dev_name.data),
6270                                  physical_id=(vgname, new_dev_name.data),
6271                                  iv_name=disk.iv_name)
6272           snap_disks.append(new_dev)
6273
6274     finally:
6275       if self.op.shutdown and instance.admin_up:
6276         result = self.rpc.call_instance_start(src_node, instance, None, None)
6277         msg = result.RemoteFailMsg()
6278         if msg:
6279           _ShutdownInstanceDisks(self, instance)
6280           raise errors.OpExecError("Could not start instance: %s" % msg)
6281
6282     # TODO: check for size
6283
6284     cluster_name = self.cfg.GetClusterName()
6285     for idx, dev in enumerate(snap_disks):
6286       if dev:
6287         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6288                                                instance, cluster_name, idx)
6289         if result.failed or not result.data:
6290           self.LogWarning("Could not export block device %s from node %s to"
6291                           " node %s", dev.logical_id[1], src_node,
6292                           dst_node.name)
6293         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6294         if msg:
6295           self.LogWarning("Could not remove snapshot block device %s from node"
6296                           " %s: %s", dev.logical_id[1], src_node, msg)
6297
6298     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6299     if result.failed or not result.data:
6300       self.LogWarning("Could not finalize export for instance %s on node %s",
6301                       instance.name, dst_node.name)
6302
6303     nodelist = self.cfg.GetNodeList()
6304     nodelist.remove(dst_node.name)
6305
6306     # on one-node clusters nodelist will be empty after the removal
6307     # if we proceed the backup would be removed because OpQueryExports
6308     # substitutes an empty list with the full cluster node list.
6309     if nodelist:
6310       exportlist = self.rpc.call_export_list(nodelist)
6311       for node in exportlist:
6312         if exportlist[node].failed:
6313           continue
6314         if instance.name in exportlist[node].data:
6315           if not self.rpc.call_export_remove(node, instance.name):
6316             self.LogWarning("Could not remove older export for instance %s"
6317                             " on node %s", instance.name, node)
6318
6319
6320 class LURemoveExport(NoHooksLU):
6321   """Remove exports related to the named instance.
6322
6323   """
6324   _OP_REQP = ["instance_name"]
6325   REQ_BGL = False
6326
6327   def ExpandNames(self):
6328     self.needed_locks = {}
6329     # We need all nodes to be locked in order for RemoveExport to work, but we
6330     # don't need to lock the instance itself, as nothing will happen to it (and
6331     # we can remove exports also for a removed instance)
6332     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6333
6334   def CheckPrereq(self):
6335     """Check prerequisites.
6336     """
6337     pass
6338
6339   def Exec(self, feedback_fn):
6340     """Remove any export.
6341
6342     """
6343     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6344     # If the instance was not found we'll try with the name that was passed in.
6345     # This will only work if it was an FQDN, though.
6346     fqdn_warn = False
6347     if not instance_name:
6348       fqdn_warn = True
6349       instance_name = self.op.instance_name
6350
6351     exportlist = self.rpc.call_export_list(self.acquired_locks[
6352       locking.LEVEL_NODE])
6353     found = False
6354     for node in exportlist:
6355       if exportlist[node].failed:
6356         self.LogWarning("Failed to query node %s, continuing" % node)
6357         continue
6358       if instance_name in exportlist[node].data:
6359         found = True
6360         result = self.rpc.call_export_remove(node, instance_name)
6361         if result.failed or not result.data:
6362           logging.error("Could not remove export for instance %s"
6363                         " on node %s", instance_name, node)
6364
6365     if fqdn_warn and not found:
6366       feedback_fn("Export not found. If trying to remove an export belonging"
6367                   " to a deleted instance please use its Fully Qualified"
6368                   " Domain Name.")
6369
6370
6371 class TagsLU(NoHooksLU):
6372   """Generic tags LU.
6373
6374   This is an abstract class which is the parent of all the other tags LUs.
6375
6376   """
6377
6378   def ExpandNames(self):
6379     self.needed_locks = {}
6380     if self.op.kind == constants.TAG_NODE:
6381       name = self.cfg.ExpandNodeName(self.op.name)
6382       if name is None:
6383         raise errors.OpPrereqError("Invalid node name (%s)" %
6384                                    (self.op.name,))
6385       self.op.name = name
6386       self.needed_locks[locking.LEVEL_NODE] = name
6387     elif self.op.kind == constants.TAG_INSTANCE:
6388       name = self.cfg.ExpandInstanceName(self.op.name)
6389       if name is None:
6390         raise errors.OpPrereqError("Invalid instance name (%s)" %
6391                                    (self.op.name,))
6392       self.op.name = name
6393       self.needed_locks[locking.LEVEL_INSTANCE] = name
6394
6395   def CheckPrereq(self):
6396     """Check prerequisites.
6397
6398     """
6399     if self.op.kind == constants.TAG_CLUSTER:
6400       self.target = self.cfg.GetClusterInfo()
6401     elif self.op.kind == constants.TAG_NODE:
6402       self.target = self.cfg.GetNodeInfo(self.op.name)
6403     elif self.op.kind == constants.TAG_INSTANCE:
6404       self.target = self.cfg.GetInstanceInfo(self.op.name)
6405     else:
6406       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6407                                  str(self.op.kind))
6408
6409
6410 class LUGetTags(TagsLU):
6411   """Returns the tags of a given object.
6412
6413   """
6414   _OP_REQP = ["kind", "name"]
6415   REQ_BGL = False
6416
6417   def Exec(self, feedback_fn):
6418     """Returns the tag list.
6419
6420     """
6421     return list(self.target.GetTags())
6422
6423
6424 class LUSearchTags(NoHooksLU):
6425   """Searches the tags for a given pattern.
6426
6427   """
6428   _OP_REQP = ["pattern"]
6429   REQ_BGL = False
6430
6431   def ExpandNames(self):
6432     self.needed_locks = {}
6433
6434   def CheckPrereq(self):
6435     """Check prerequisites.
6436
6437     This checks the pattern passed for validity by compiling it.
6438
6439     """
6440     try:
6441       self.re = re.compile(self.op.pattern)
6442     except re.error, err:
6443       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6444                                  (self.op.pattern, err))
6445
6446   def Exec(self, feedback_fn):
6447     """Returns the tag list.
6448
6449     """
6450     cfg = self.cfg
6451     tgts = [("/cluster", cfg.GetClusterInfo())]
6452     ilist = cfg.GetAllInstancesInfo().values()
6453     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6454     nlist = cfg.GetAllNodesInfo().values()
6455     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6456     results = []
6457     for path, target in tgts:
6458       for tag in target.GetTags():
6459         if self.re.search(tag):
6460           results.append((path, tag))
6461     return results
6462
6463
6464 class LUAddTags(TagsLU):
6465   """Sets a tag on a given object.
6466
6467   """
6468   _OP_REQP = ["kind", "name", "tags"]
6469   REQ_BGL = False
6470
6471   def CheckPrereq(self):
6472     """Check prerequisites.
6473
6474     This checks the type and length of the tag name and value.
6475
6476     """
6477     TagsLU.CheckPrereq(self)
6478     for tag in self.op.tags:
6479       objects.TaggableObject.ValidateTag(tag)
6480
6481   def Exec(self, feedback_fn):
6482     """Sets the tag.
6483
6484     """
6485     try:
6486       for tag in self.op.tags:
6487         self.target.AddTag(tag)
6488     except errors.TagError, err:
6489       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6490     try:
6491       self.cfg.Update(self.target)
6492     except errors.ConfigurationError:
6493       raise errors.OpRetryError("There has been a modification to the"
6494                                 " config file and the operation has been"
6495                                 " aborted. Please retry.")
6496
6497
6498 class LUDelTags(TagsLU):
6499   """Delete a list of tags from a given object.
6500
6501   """
6502   _OP_REQP = ["kind", "name", "tags"]
6503   REQ_BGL = False
6504
6505   def CheckPrereq(self):
6506     """Check prerequisites.
6507
6508     This checks that we have the given tag.
6509
6510     """
6511     TagsLU.CheckPrereq(self)
6512     for tag in self.op.tags:
6513       objects.TaggableObject.ValidateTag(tag)
6514     del_tags = frozenset(self.op.tags)
6515     cur_tags = self.target.GetTags()
6516     if not del_tags <= cur_tags:
6517       diff_tags = del_tags - cur_tags
6518       diff_names = ["'%s'" % tag for tag in diff_tags]
6519       diff_names.sort()
6520       raise errors.OpPrereqError("Tag(s) %s not found" %
6521                                  (",".join(diff_names)))
6522
6523   def Exec(self, feedback_fn):
6524     """Remove the tag from the object.
6525
6526     """
6527     for tag in self.op.tags:
6528       self.target.RemoveTag(tag)
6529     try:
6530       self.cfg.Update(self.target)
6531     except errors.ConfigurationError:
6532       raise errors.OpRetryError("There has been a modification to the"
6533                                 " config file and the operation has been"
6534                                 " aborted. Please retry.")
6535
6536
6537 class LUTestDelay(NoHooksLU):
6538   """Sleep for a specified amount of time.
6539
6540   This LU sleeps on the master and/or nodes for a specified amount of
6541   time.
6542
6543   """
6544   _OP_REQP = ["duration", "on_master", "on_nodes"]
6545   REQ_BGL = False
6546
6547   def ExpandNames(self):
6548     """Expand names and set required locks.
6549
6550     This expands the node list, if any.
6551
6552     """
6553     self.needed_locks = {}
6554     if self.op.on_nodes:
6555       # _GetWantedNodes can be used here, but is not always appropriate to use
6556       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6557       # more information.
6558       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6559       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6560
6561   def CheckPrereq(self):
6562     """Check prerequisites.
6563
6564     """
6565
6566   def Exec(self, feedback_fn):
6567     """Do the actual sleep.
6568
6569     """
6570     if self.op.on_master:
6571       if not utils.TestDelay(self.op.duration):
6572         raise errors.OpExecError("Error during master delay test")
6573     if self.op.on_nodes:
6574       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6575       if not result:
6576         raise errors.OpExecError("Complete failure from rpc call")
6577       for node, node_result in result.items():
6578         node_result.Raise()
6579         if not node_result.data:
6580           raise errors.OpExecError("Failure during rpc call to node %s,"
6581                                    " result: %s" % (node, node_result.data))
6582
6583
6584 class IAllocator(object):
6585   """IAllocator framework.
6586
6587   An IAllocator instance has three sets of attributes:
6588     - cfg that is needed to query the cluster
6589     - input data (all members of the _KEYS class attribute are required)
6590     - four buffer attributes (in|out_data|text), that represent the
6591       input (to the external script) in text and data structure format,
6592       and the output from it, again in two formats
6593     - the result variables from the script (success, info, nodes) for
6594       easy usage
6595
6596   """
6597   _ALLO_KEYS = [
6598     "mem_size", "disks", "disk_template",
6599     "os", "tags", "nics", "vcpus", "hypervisor",
6600     ]
6601   _RELO_KEYS = [
6602     "relocate_from",
6603     ]
6604
6605   def __init__(self, lu, mode, name, **kwargs):
6606     self.lu = lu
6607     # init buffer variables
6608     self.in_text = self.out_text = self.in_data = self.out_data = None
6609     # init all input fields so that pylint is happy
6610     self.mode = mode
6611     self.name = name
6612     self.mem_size = self.disks = self.disk_template = None
6613     self.os = self.tags = self.nics = self.vcpus = None
6614     self.hypervisor = None
6615     self.relocate_from = None
6616     # computed fields
6617     self.required_nodes = None
6618     # init result fields
6619     self.success = self.info = self.nodes = None
6620     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6621       keyset = self._ALLO_KEYS
6622     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6623       keyset = self._RELO_KEYS
6624     else:
6625       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6626                                    " IAllocator" % self.mode)
6627     for key in kwargs:
6628       if key not in keyset:
6629         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6630                                      " IAllocator" % key)
6631       setattr(self, key, kwargs[key])
6632     for key in keyset:
6633       if key not in kwargs:
6634         raise errors.ProgrammerError("Missing input parameter '%s' to"
6635                                      " IAllocator" % key)
6636     self._BuildInputData()
6637
6638   def _ComputeClusterData(self):
6639     """Compute the generic allocator input data.
6640
6641     This is the data that is independent of the actual operation.
6642
6643     """
6644     cfg = self.lu.cfg
6645     cluster_info = cfg.GetClusterInfo()
6646     # cluster data
6647     data = {
6648       "version": constants.IALLOCATOR_VERSION,
6649       "cluster_name": cfg.GetClusterName(),
6650       "cluster_tags": list(cluster_info.GetTags()),
6651       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6652       # we don't have job IDs
6653       }
6654     iinfo = cfg.GetAllInstancesInfo().values()
6655     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6656
6657     # node data
6658     node_results = {}
6659     node_list = cfg.GetNodeList()
6660
6661     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6662       hypervisor_name = self.hypervisor
6663     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6664       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6665
6666     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6667                                            hypervisor_name)
6668     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6669                        cluster_info.enabled_hypervisors)
6670     for nname, nresult in node_data.items():
6671       # first fill in static (config-based) values
6672       ninfo = cfg.GetNodeInfo(nname)
6673       pnr = {
6674         "tags": list(ninfo.GetTags()),
6675         "primary_ip": ninfo.primary_ip,
6676         "secondary_ip": ninfo.secondary_ip,
6677         "offline": ninfo.offline,
6678         "drained": ninfo.drained,
6679         "master_candidate": ninfo.master_candidate,
6680         }
6681
6682       if not ninfo.offline:
6683         nresult.Raise()
6684         if not isinstance(nresult.data, dict):
6685           raise errors.OpExecError("Can't get data for node %s" % nname)
6686         remote_info = nresult.data
6687         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6688                      'vg_size', 'vg_free', 'cpu_total']:
6689           if attr not in remote_info:
6690             raise errors.OpExecError("Node '%s' didn't return attribute"
6691                                      " '%s'" % (nname, attr))
6692           try:
6693             remote_info[attr] = int(remote_info[attr])
6694           except ValueError, err:
6695             raise errors.OpExecError("Node '%s' returned invalid value"
6696                                      " for '%s': %s" % (nname, attr, err))
6697         # compute memory used by primary instances
6698         i_p_mem = i_p_up_mem = 0
6699         for iinfo, beinfo in i_list:
6700           if iinfo.primary_node == nname:
6701             i_p_mem += beinfo[constants.BE_MEMORY]
6702             if iinfo.name not in node_iinfo[nname].data:
6703               i_used_mem = 0
6704             else:
6705               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6706             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6707             remote_info['memory_free'] -= max(0, i_mem_diff)
6708
6709             if iinfo.admin_up:
6710               i_p_up_mem += beinfo[constants.BE_MEMORY]
6711
6712         # compute memory used by instances
6713         pnr_dyn = {
6714           "total_memory": remote_info['memory_total'],
6715           "reserved_memory": remote_info['memory_dom0'],
6716           "free_memory": remote_info['memory_free'],
6717           "total_disk": remote_info['vg_size'],
6718           "free_disk": remote_info['vg_free'],
6719           "total_cpus": remote_info['cpu_total'],
6720           "i_pri_memory": i_p_mem,
6721           "i_pri_up_memory": i_p_up_mem,
6722           }
6723         pnr.update(pnr_dyn)
6724
6725       node_results[nname] = pnr
6726     data["nodes"] = node_results
6727
6728     # instance data
6729     instance_data = {}
6730     for iinfo, beinfo in i_list:
6731       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6732                   for n in iinfo.nics]
6733       pir = {
6734         "tags": list(iinfo.GetTags()),
6735         "admin_up": iinfo.admin_up,
6736         "vcpus": beinfo[constants.BE_VCPUS],
6737         "memory": beinfo[constants.BE_MEMORY],
6738         "os": iinfo.os,
6739         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6740         "nics": nic_data,
6741         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6742         "disk_template": iinfo.disk_template,
6743         "hypervisor": iinfo.hypervisor,
6744         }
6745       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6746                                                  pir["disks"])
6747       instance_data[iinfo.name] = pir
6748
6749     data["instances"] = instance_data
6750
6751     self.in_data = data
6752
6753   def _AddNewInstance(self):
6754     """Add new instance data to allocator structure.
6755
6756     This in combination with _AllocatorGetClusterData will create the
6757     correct structure needed as input for the allocator.
6758
6759     The checks for the completeness of the opcode must have already been
6760     done.
6761
6762     """
6763     data = self.in_data
6764
6765     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6766
6767     if self.disk_template in constants.DTS_NET_MIRROR:
6768       self.required_nodes = 2
6769     else:
6770       self.required_nodes = 1
6771     request = {
6772       "type": "allocate",
6773       "name": self.name,
6774       "disk_template": self.disk_template,
6775       "tags": self.tags,
6776       "os": self.os,
6777       "vcpus": self.vcpus,
6778       "memory": self.mem_size,
6779       "disks": self.disks,
6780       "disk_space_total": disk_space,
6781       "nics": self.nics,
6782       "required_nodes": self.required_nodes,
6783       }
6784     data["request"] = request
6785
6786   def _AddRelocateInstance(self):
6787     """Add relocate instance data to allocator structure.
6788
6789     This in combination with _IAllocatorGetClusterData will create the
6790     correct structure needed as input for the allocator.
6791
6792     The checks for the completeness of the opcode must have already been
6793     done.
6794
6795     """
6796     instance = self.lu.cfg.GetInstanceInfo(self.name)
6797     if instance is None:
6798       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6799                                    " IAllocator" % self.name)
6800
6801     if instance.disk_template not in constants.DTS_NET_MIRROR:
6802       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6803
6804     if len(instance.secondary_nodes) != 1:
6805       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6806
6807     self.required_nodes = 1
6808     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6809     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6810
6811     request = {
6812       "type": "relocate",
6813       "name": self.name,
6814       "disk_space_total": disk_space,
6815       "required_nodes": self.required_nodes,
6816       "relocate_from": self.relocate_from,
6817       }
6818     self.in_data["request"] = request
6819
6820   def _BuildInputData(self):
6821     """Build input data structures.
6822
6823     """
6824     self._ComputeClusterData()
6825
6826     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6827       self._AddNewInstance()
6828     else:
6829       self._AddRelocateInstance()
6830
6831     self.in_text = serializer.Dump(self.in_data)
6832
6833   def Run(self, name, validate=True, call_fn=None):
6834     """Run an instance allocator and return the results.
6835
6836     """
6837     if call_fn is None:
6838       call_fn = self.lu.rpc.call_iallocator_runner
6839     data = self.in_text
6840
6841     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6842     result.Raise()
6843
6844     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6845       raise errors.OpExecError("Invalid result from master iallocator runner")
6846
6847     rcode, stdout, stderr, fail = result.data
6848
6849     if rcode == constants.IARUN_NOTFOUND:
6850       raise errors.OpExecError("Can't find allocator '%s'" % name)
6851     elif rcode == constants.IARUN_FAILURE:
6852       raise errors.OpExecError("Instance allocator call failed: %s,"
6853                                " output: %s" % (fail, stdout+stderr))
6854     self.out_text = stdout
6855     if validate:
6856       self._ValidateResult()
6857
6858   def _ValidateResult(self):
6859     """Process the allocator results.
6860
6861     This will process and if successful save the result in
6862     self.out_data and the other parameters.
6863
6864     """
6865     try:
6866       rdict = serializer.Load(self.out_text)
6867     except Exception, err:
6868       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6869
6870     if not isinstance(rdict, dict):
6871       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6872
6873     for key in "success", "info", "nodes":
6874       if key not in rdict:
6875         raise errors.OpExecError("Can't parse iallocator results:"
6876                                  " missing key '%s'" % key)
6877       setattr(self, key, rdict[key])
6878
6879     if not isinstance(rdict["nodes"], list):
6880       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6881                                " is not a list")
6882     self.out_data = rdict
6883
6884
6885 class LUTestAllocator(NoHooksLU):
6886   """Run allocator tests.
6887
6888   This LU runs the allocator tests
6889
6890   """
6891   _OP_REQP = ["direction", "mode", "name"]
6892
6893   def CheckPrereq(self):
6894     """Check prerequisites.
6895
6896     This checks the opcode parameters depending on the director and mode test.
6897
6898     """
6899     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6900       for attr in ["name", "mem_size", "disks", "disk_template",
6901                    "os", "tags", "nics", "vcpus"]:
6902         if not hasattr(self.op, attr):
6903           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6904                                      attr)
6905       iname = self.cfg.ExpandInstanceName(self.op.name)
6906       if iname is not None:
6907         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6908                                    iname)
6909       if not isinstance(self.op.nics, list):
6910         raise errors.OpPrereqError("Invalid parameter 'nics'")
6911       for row in self.op.nics:
6912         if (not isinstance(row, dict) or
6913             "mac" not in row or
6914             "ip" not in row or
6915             "bridge" not in row):
6916           raise errors.OpPrereqError("Invalid contents of the"
6917                                      " 'nics' parameter")
6918       if not isinstance(self.op.disks, list):
6919         raise errors.OpPrereqError("Invalid parameter 'disks'")
6920       for row in self.op.disks:
6921         if (not isinstance(row, dict) or
6922             "size" not in row or
6923             not isinstance(row["size"], int) or
6924             "mode" not in row or
6925             row["mode"] not in ['r', 'w']):
6926           raise errors.OpPrereqError("Invalid contents of the"
6927                                      " 'disks' parameter")
6928       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6929         self.op.hypervisor = self.cfg.GetHypervisorType()
6930     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6931       if not hasattr(self.op, "name"):
6932         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6933       fname = self.cfg.ExpandInstanceName(self.op.name)
6934       if fname is None:
6935         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6936                                    self.op.name)
6937       self.op.name = fname
6938       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6939     else:
6940       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6941                                  self.op.mode)
6942
6943     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6944       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6945         raise errors.OpPrereqError("Missing allocator name")
6946     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6947       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6948                                  self.op.direction)
6949
6950   def Exec(self, feedback_fn):
6951     """Run the allocator test.
6952
6953     """
6954     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6955       ial = IAllocator(self,
6956                        mode=self.op.mode,
6957                        name=self.op.name,
6958                        mem_size=self.op.mem_size,
6959                        disks=self.op.disks,
6960                        disk_template=self.op.disk_template,
6961                        os=self.op.os,
6962                        tags=self.op.tags,
6963                        nics=self.op.nics,
6964                        vcpus=self.op.vcpus,
6965                        hypervisor=self.op.hypervisor,
6966                        )
6967     else:
6968       ial = IAllocator(self,
6969                        mode=self.op.mode,
6970                        name=self.op.name,
6971                        relocate_from=list(self.relocate_from),
6972                        )
6973
6974     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6975       result = ial.in_text
6976     else:
6977       ial.Run(self.op.allocator, validate=False)
6978       result = ial.out_text
6979     return result