Convert UploadFile (and its callers) to new rpc
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384          msg = to_result.RemoteFailMsg()
1385          if msg:
1386            msg = ("Copy of file %s to node %s failed: %s" %
1387                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388            self.proc.LogWarning(msg)
1389
1390     finally:
1391       result = self.rpc.call_node_start_master(master, False)
1392       if result.failed or not result.data:
1393         self.LogWarning("Could not re-enable the master role on"
1394                         " the master, please restart manually.")
1395
1396
1397 def _RecursiveCheckIfLVMBased(disk):
1398   """Check if the given disk or its children are lvm-based.
1399
1400   @type disk: L{objects.Disk}
1401   @param disk: the disk to check
1402   @rtype: booleean
1403   @return: boolean indicating whether a LD_LV dev_type was found or not
1404
1405   """
1406   if disk.children:
1407     for chdisk in disk.children:
1408       if _RecursiveCheckIfLVMBased(chdisk):
1409         return True
1410   return disk.dev_type == constants.LD_LV
1411
1412
1413 class LUSetClusterParams(LogicalUnit):
1414   """Change the parameters of the cluster.
1415
1416   """
1417   HPATH = "cluster-modify"
1418   HTYPE = constants.HTYPE_CLUSTER
1419   _OP_REQP = []
1420   REQ_BGL = False
1421
1422   def CheckArguments(self):
1423     """Check parameters
1424
1425     """
1426     if not hasattr(self.op, "candidate_pool_size"):
1427       self.op.candidate_pool_size = None
1428     if self.op.candidate_pool_size is not None:
1429       try:
1430         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431       except (ValueError, TypeError), err:
1432         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433                                    str(err))
1434       if self.op.candidate_pool_size < 1:
1435         raise errors.OpPrereqError("At least one master candidate needed")
1436
1437   def ExpandNames(self):
1438     # FIXME: in the future maybe other cluster params won't require checking on
1439     # all nodes to be modified.
1440     self.needed_locks = {
1441       locking.LEVEL_NODE: locking.ALL_SET,
1442     }
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445   def BuildHooksEnv(self):
1446     """Build hooks env.
1447
1448     """
1449     env = {
1450       "OP_TARGET": self.cfg.GetClusterName(),
1451       "NEW_VG_NAME": self.op.vg_name,
1452       }
1453     mn = self.cfg.GetMasterNode()
1454     return env, [mn], [mn]
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     This checks whether the given params don't conflict and
1460     if the given volume group is valid.
1461
1462     """
1463     if self.op.vg_name is not None and not self.op.vg_name:
1464       instances = self.cfg.GetAllInstancesInfo().values()
1465       for inst in instances:
1466         for disk in inst.disks:
1467           if _RecursiveCheckIfLVMBased(disk):
1468             raise errors.OpPrereqError("Cannot disable lvm storage while"
1469                                        " lvm-based instances exist")
1470
1471     node_list = self.acquired_locks[locking.LEVEL_NODE]
1472
1473     # if vg_name not None, checks given volume group on all nodes
1474     if self.op.vg_name:
1475       vglist = self.rpc.call_vg_list(node_list)
1476       for node in node_list:
1477         if vglist[node].failed:
1478           # ignoring down node
1479           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480           continue
1481         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482                                               self.op.vg_name,
1483                                               constants.MIN_VG_SIZE)
1484         if vgstatus:
1485           raise errors.OpPrereqError("Error on node '%s': %s" %
1486                                      (node, vgstatus))
1487
1488     self.cluster = cluster = self.cfg.GetClusterInfo()
1489     # validate beparams changes
1490     if self.op.beparams:
1491       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492       self.new_beparams = cluster.FillDict(
1493         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1494
1495     # hypervisor list/parameters
1496     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1497     if self.op.hvparams:
1498       if not isinstance(self.op.hvparams, dict):
1499         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1500       for hv_name, hv_dict in self.op.hvparams.items():
1501         if hv_name not in self.new_hvparams:
1502           self.new_hvparams[hv_name] = hv_dict
1503         else:
1504           self.new_hvparams[hv_name].update(hv_dict)
1505
1506     if self.op.enabled_hypervisors is not None:
1507       self.hv_list = self.op.enabled_hypervisors
1508     else:
1509       self.hv_list = cluster.enabled_hypervisors
1510
1511     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1512       # either the enabled list has changed, or the parameters have, validate
1513       for hv_name, hv_params in self.new_hvparams.items():
1514         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1515             (self.op.enabled_hypervisors and
1516              hv_name in self.op.enabled_hypervisors)):
1517           # either this is a new hypervisor, or its parameters have changed
1518           hv_class = hypervisor.GetHypervisor(hv_name)
1519           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1520           hv_class.CheckParameterSyntax(hv_params)
1521           _CheckHVParams(self, node_list, hv_name, hv_params)
1522
1523   def Exec(self, feedback_fn):
1524     """Change the parameters of the cluster.
1525
1526     """
1527     if self.op.vg_name is not None:
1528       new_volume = self.op.vg_name
1529       if not new_volume:
1530         new_volume = None
1531       if new_volume != self.cfg.GetVGName():
1532         self.cfg.SetVGName(new_volume)
1533       else:
1534         feedback_fn("Cluster LVM configuration already in desired"
1535                     " state, not changing")
1536     if self.op.hvparams:
1537       self.cluster.hvparams = self.new_hvparams
1538     if self.op.enabled_hypervisors is not None:
1539       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1540     if self.op.beparams:
1541       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1542     if self.op.candidate_pool_size is not None:
1543       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1544
1545     self.cfg.Update(self.cluster)
1546
1547     # we want to update nodes after the cluster so that if any errors
1548     # happen, we have recorded and saved the cluster info
1549     if self.op.candidate_pool_size is not None:
1550       _AdjustCandidatePool(self)
1551
1552
1553 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1554   """Distribute additional files which are part of the cluster configuration.
1555
1556   ConfigWriter takes care of distributing the config and ssconf files, but
1557   there are more files which should be distributed to all nodes. This function
1558   makes sure those are copied.
1559
1560   @param lu: calling logical unit
1561   @param additional_nodes: list of nodes not in the config to distribute to
1562
1563   """
1564   # 1. Gather target nodes
1565   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1566   dist_nodes = lu.cfg.GetNodeList()
1567   if additional_nodes is not None:
1568     dist_nodes.extend(additional_nodes)
1569   if myself.name in dist_nodes:
1570     dist_nodes.remove(myself.name)
1571   # 2. Gather files to distribute
1572   dist_files = set([constants.ETC_HOSTS,
1573                     constants.SSH_KNOWN_HOSTS_FILE,
1574                     constants.RAPI_CERT_FILE,
1575                     constants.RAPI_USERS_FILE,
1576                    ])
1577
1578   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1579   for hv_name in enabled_hypervisors:
1580     hv_class = hypervisor.GetHypervisor(hv_name)
1581     dist_files.update(hv_class.GetAncillaryFiles())
1582
1583   # 3. Perform the files upload
1584   for fname in dist_files:
1585     if os.path.exists(fname):
1586       result = lu.rpc.call_upload_file(dist_nodes, fname)
1587       for to_node, to_result in result.items():
1588          msg = to_result.RemoteFailMsg()
1589          if msg:
1590            msg = ("Copy of file %s to node %s failed: %s" %
1591                    (fname, to_node, msg))
1592            lu.proc.LogWarning(msg)
1593
1594
1595 class LURedistributeConfig(NoHooksLU):
1596   """Force the redistribution of cluster configuration.
1597
1598   This is a very simple LU.
1599
1600   """
1601   _OP_REQP = []
1602   REQ_BGL = False
1603
1604   def ExpandNames(self):
1605     self.needed_locks = {
1606       locking.LEVEL_NODE: locking.ALL_SET,
1607     }
1608     self.share_locks[locking.LEVEL_NODE] = 1
1609
1610   def CheckPrereq(self):
1611     """Check prerequisites.
1612
1613     """
1614
1615   def Exec(self, feedback_fn):
1616     """Redistribute the configuration.
1617
1618     """
1619     self.cfg.Update(self.cfg.GetClusterInfo())
1620     _RedistributeAncillaryFiles(self)
1621
1622
1623 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1624   """Sleep and poll for an instance's disk to sync.
1625
1626   """
1627   if not instance.disks:
1628     return True
1629
1630   if not oneshot:
1631     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1632
1633   node = instance.primary_node
1634
1635   for dev in instance.disks:
1636     lu.cfg.SetDiskID(dev, node)
1637
1638   retries = 0
1639   while True:
1640     max_time = 0
1641     done = True
1642     cumul_degraded = False
1643     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1644     if rstats.failed or not rstats.data:
1645       lu.LogWarning("Can't get any data from node %s", node)
1646       retries += 1
1647       if retries >= 10:
1648         raise errors.RemoteError("Can't contact node %s for mirror data,"
1649                                  " aborting." % node)
1650       time.sleep(6)
1651       continue
1652     rstats = rstats.data
1653     retries = 0
1654     for i, mstat in enumerate(rstats):
1655       if mstat is None:
1656         lu.LogWarning("Can't compute data for node %s/%s",
1657                            node, instance.disks[i].iv_name)
1658         continue
1659       # we ignore the ldisk parameter
1660       perc_done, est_time, is_degraded, _ = mstat
1661       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1662       if perc_done is not None:
1663         done = False
1664         if est_time is not None:
1665           rem_time = "%d estimated seconds remaining" % est_time
1666           max_time = est_time
1667         else:
1668           rem_time = "no time estimate"
1669         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1670                         (instance.disks[i].iv_name, perc_done, rem_time))
1671     if done or oneshot:
1672       break
1673
1674     time.sleep(min(60, max_time))
1675
1676   if done:
1677     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1678   return not cumul_degraded
1679
1680
1681 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1682   """Check that mirrors are not degraded.
1683
1684   The ldisk parameter, if True, will change the test from the
1685   is_degraded attribute (which represents overall non-ok status for
1686   the device(s)) to the ldisk (representing the local storage status).
1687
1688   """
1689   lu.cfg.SetDiskID(dev, node)
1690   if ldisk:
1691     idx = 6
1692   else:
1693     idx = 5
1694
1695   result = True
1696   if on_primary or dev.AssembleOnSecondary():
1697     rstats = lu.rpc.call_blockdev_find(node, dev)
1698     msg = rstats.RemoteFailMsg()
1699     if msg:
1700       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1701       result = False
1702     elif not rstats.payload:
1703       lu.LogWarning("Can't find disk on node %s", node)
1704       result = False
1705     else:
1706       result = result and (not rstats.payload[idx])
1707   if dev.children:
1708     for child in dev.children:
1709       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1710
1711   return result
1712
1713
1714 class LUDiagnoseOS(NoHooksLU):
1715   """Logical unit for OS diagnose/query.
1716
1717   """
1718   _OP_REQP = ["output_fields", "names"]
1719   REQ_BGL = False
1720   _FIELDS_STATIC = utils.FieldSet()
1721   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1722
1723   def ExpandNames(self):
1724     if self.op.names:
1725       raise errors.OpPrereqError("Selective OS query not supported")
1726
1727     _CheckOutputFields(static=self._FIELDS_STATIC,
1728                        dynamic=self._FIELDS_DYNAMIC,
1729                        selected=self.op.output_fields)
1730
1731     # Lock all nodes, in shared mode
1732     # Temporary removal of locks, should be reverted later
1733     # TODO: reintroduce locks when they are lighter-weight
1734     self.needed_locks = {}
1735     #self.share_locks[locking.LEVEL_NODE] = 1
1736     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1737
1738   def CheckPrereq(self):
1739     """Check prerequisites.
1740
1741     """
1742
1743   @staticmethod
1744   def _DiagnoseByOS(node_list, rlist):
1745     """Remaps a per-node return list into an a per-os per-node dictionary
1746
1747     @param node_list: a list with the names of all nodes
1748     @param rlist: a map with node names as keys and OS objects as values
1749
1750     @rtype: dict
1751     @return: a dictionary with osnames as keys and as value another map, with
1752         nodes as keys and list of OS objects as values, eg::
1753
1754           {"debian-etch": {"node1": [<object>,...],
1755                            "node2": [<object>,]}
1756           }
1757
1758     """
1759     all_os = {}
1760     # we build here the list of nodes that didn't fail the RPC (at RPC
1761     # level), so that nodes with a non-responding node daemon don't
1762     # make all OSes invalid
1763     good_nodes = [node_name for node_name in rlist
1764                   if not rlist[node_name].failed]
1765     for node_name, nr in rlist.iteritems():
1766       if nr.failed or not nr.data:
1767         continue
1768       for os_obj in nr.data:
1769         if os_obj.name not in all_os:
1770           # build a list of nodes for this os containing empty lists
1771           # for each node in node_list
1772           all_os[os_obj.name] = {}
1773           for nname in good_nodes:
1774             all_os[os_obj.name][nname] = []
1775         all_os[os_obj.name][node_name].append(os_obj)
1776     return all_os
1777
1778   def Exec(self, feedback_fn):
1779     """Compute the list of OSes.
1780
1781     """
1782     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1783     node_data = self.rpc.call_os_diagnose(valid_nodes)
1784     if node_data == False:
1785       raise errors.OpExecError("Can't gather the list of OSes")
1786     pol = self._DiagnoseByOS(valid_nodes, node_data)
1787     output = []
1788     for os_name, os_data in pol.iteritems():
1789       row = []
1790       for field in self.op.output_fields:
1791         if field == "name":
1792           val = os_name
1793         elif field == "valid":
1794           val = utils.all([osl and osl[0] for osl in os_data.values()])
1795         elif field == "node_status":
1796           val = {}
1797           for node_name, nos_list in os_data.iteritems():
1798             val[node_name] = [(v.status, v.path) for v in nos_list]
1799         else:
1800           raise errors.ParameterError(field)
1801         row.append(val)
1802       output.append(row)
1803
1804     return output
1805
1806
1807 class LURemoveNode(LogicalUnit):
1808   """Logical unit for removing a node.
1809
1810   """
1811   HPATH = "node-remove"
1812   HTYPE = constants.HTYPE_NODE
1813   _OP_REQP = ["node_name"]
1814
1815   def BuildHooksEnv(self):
1816     """Build hooks env.
1817
1818     This doesn't run on the target node in the pre phase as a failed
1819     node would then be impossible to remove.
1820
1821     """
1822     env = {
1823       "OP_TARGET": self.op.node_name,
1824       "NODE_NAME": self.op.node_name,
1825       }
1826     all_nodes = self.cfg.GetNodeList()
1827     all_nodes.remove(self.op.node_name)
1828     return env, all_nodes, all_nodes
1829
1830   def CheckPrereq(self):
1831     """Check prerequisites.
1832
1833     This checks:
1834      - the node exists in the configuration
1835      - it does not have primary or secondary instances
1836      - it's not the master
1837
1838     Any errors are signalled by raising errors.OpPrereqError.
1839
1840     """
1841     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1842     if node is None:
1843       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1844
1845     instance_list = self.cfg.GetInstanceList()
1846
1847     masternode = self.cfg.GetMasterNode()
1848     if node.name == masternode:
1849       raise errors.OpPrereqError("Node is the master node,"
1850                                  " you need to failover first.")
1851
1852     for instance_name in instance_list:
1853       instance = self.cfg.GetInstanceInfo(instance_name)
1854       if node.name in instance.all_nodes:
1855         raise errors.OpPrereqError("Instance %s is still running on the node,"
1856                                    " please remove first." % instance_name)
1857     self.op.node_name = node.name
1858     self.node = node
1859
1860   def Exec(self, feedback_fn):
1861     """Removes the node from the cluster.
1862
1863     """
1864     node = self.node
1865     logging.info("Stopping the node daemon and removing configs from node %s",
1866                  node.name)
1867
1868     self.context.RemoveNode(node.name)
1869
1870     self.rpc.call_node_leave_cluster(node.name)
1871
1872     # Promote nodes to master candidate as needed
1873     _AdjustCandidatePool(self)
1874
1875
1876 class LUQueryNodes(NoHooksLU):
1877   """Logical unit for querying nodes.
1878
1879   """
1880   _OP_REQP = ["output_fields", "names", "use_locking"]
1881   REQ_BGL = False
1882   _FIELDS_DYNAMIC = utils.FieldSet(
1883     "dtotal", "dfree",
1884     "mtotal", "mnode", "mfree",
1885     "bootid",
1886     "ctotal", "cnodes", "csockets",
1887     )
1888
1889   _FIELDS_STATIC = utils.FieldSet(
1890     "name", "pinst_cnt", "sinst_cnt",
1891     "pinst_list", "sinst_list",
1892     "pip", "sip", "tags",
1893     "serial_no",
1894     "master_candidate",
1895     "master",
1896     "offline",
1897     "drained",
1898     )
1899
1900   def ExpandNames(self):
1901     _CheckOutputFields(static=self._FIELDS_STATIC,
1902                        dynamic=self._FIELDS_DYNAMIC,
1903                        selected=self.op.output_fields)
1904
1905     self.needed_locks = {}
1906     self.share_locks[locking.LEVEL_NODE] = 1
1907
1908     if self.op.names:
1909       self.wanted = _GetWantedNodes(self, self.op.names)
1910     else:
1911       self.wanted = locking.ALL_SET
1912
1913     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1914     self.do_locking = self.do_node_query and self.op.use_locking
1915     if self.do_locking:
1916       # if we don't request only static fields, we need to lock the nodes
1917       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1918
1919
1920   def CheckPrereq(self):
1921     """Check prerequisites.
1922
1923     """
1924     # The validation of the node list is done in the _GetWantedNodes,
1925     # if non empty, and if empty, there's no validation to do
1926     pass
1927
1928   def Exec(self, feedback_fn):
1929     """Computes the list of nodes and their attributes.
1930
1931     """
1932     all_info = self.cfg.GetAllNodesInfo()
1933     if self.do_locking:
1934       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1935     elif self.wanted != locking.ALL_SET:
1936       nodenames = self.wanted
1937       missing = set(nodenames).difference(all_info.keys())
1938       if missing:
1939         raise errors.OpExecError(
1940           "Some nodes were removed before retrieving their data: %s" % missing)
1941     else:
1942       nodenames = all_info.keys()
1943
1944     nodenames = utils.NiceSort(nodenames)
1945     nodelist = [all_info[name] for name in nodenames]
1946
1947     # begin data gathering
1948
1949     if self.do_node_query:
1950       live_data = {}
1951       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1952                                           self.cfg.GetHypervisorType())
1953       for name in nodenames:
1954         nodeinfo = node_data[name]
1955         if not nodeinfo.failed and nodeinfo.data:
1956           nodeinfo = nodeinfo.data
1957           fn = utils.TryConvert
1958           live_data[name] = {
1959             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1960             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1961             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1962             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1963             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1964             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1965             "bootid": nodeinfo.get('bootid', None),
1966             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1967             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1968             }
1969         else:
1970           live_data[name] = {}
1971     else:
1972       live_data = dict.fromkeys(nodenames, {})
1973
1974     node_to_primary = dict([(name, set()) for name in nodenames])
1975     node_to_secondary = dict([(name, set()) for name in nodenames])
1976
1977     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1978                              "sinst_cnt", "sinst_list"))
1979     if inst_fields & frozenset(self.op.output_fields):
1980       instancelist = self.cfg.GetInstanceList()
1981
1982       for instance_name in instancelist:
1983         inst = self.cfg.GetInstanceInfo(instance_name)
1984         if inst.primary_node in node_to_primary:
1985           node_to_primary[inst.primary_node].add(inst.name)
1986         for secnode in inst.secondary_nodes:
1987           if secnode in node_to_secondary:
1988             node_to_secondary[secnode].add(inst.name)
1989
1990     master_node = self.cfg.GetMasterNode()
1991
1992     # end data gathering
1993
1994     output = []
1995     for node in nodelist:
1996       node_output = []
1997       for field in self.op.output_fields:
1998         if field == "name":
1999           val = node.name
2000         elif field == "pinst_list":
2001           val = list(node_to_primary[node.name])
2002         elif field == "sinst_list":
2003           val = list(node_to_secondary[node.name])
2004         elif field == "pinst_cnt":
2005           val = len(node_to_primary[node.name])
2006         elif field == "sinst_cnt":
2007           val = len(node_to_secondary[node.name])
2008         elif field == "pip":
2009           val = node.primary_ip
2010         elif field == "sip":
2011           val = node.secondary_ip
2012         elif field == "tags":
2013           val = list(node.GetTags())
2014         elif field == "serial_no":
2015           val = node.serial_no
2016         elif field == "master_candidate":
2017           val = node.master_candidate
2018         elif field == "master":
2019           val = node.name == master_node
2020         elif field == "offline":
2021           val = node.offline
2022         elif field == "drained":
2023           val = node.drained
2024         elif self._FIELDS_DYNAMIC.Matches(field):
2025           val = live_data[node.name].get(field, None)
2026         else:
2027           raise errors.ParameterError(field)
2028         node_output.append(val)
2029       output.append(node_output)
2030
2031     return output
2032
2033
2034 class LUQueryNodeVolumes(NoHooksLU):
2035   """Logical unit for getting volumes on node(s).
2036
2037   """
2038   _OP_REQP = ["nodes", "output_fields"]
2039   REQ_BGL = False
2040   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2041   _FIELDS_STATIC = utils.FieldSet("node")
2042
2043   def ExpandNames(self):
2044     _CheckOutputFields(static=self._FIELDS_STATIC,
2045                        dynamic=self._FIELDS_DYNAMIC,
2046                        selected=self.op.output_fields)
2047
2048     self.needed_locks = {}
2049     self.share_locks[locking.LEVEL_NODE] = 1
2050     if not self.op.nodes:
2051       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2052     else:
2053       self.needed_locks[locking.LEVEL_NODE] = \
2054         _GetWantedNodes(self, self.op.nodes)
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     This checks that the fields required are valid output fields.
2060
2061     """
2062     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2063
2064   def Exec(self, feedback_fn):
2065     """Computes the list of nodes and their attributes.
2066
2067     """
2068     nodenames = self.nodes
2069     volumes = self.rpc.call_node_volumes(nodenames)
2070
2071     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2072              in self.cfg.GetInstanceList()]
2073
2074     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2075
2076     output = []
2077     for node in nodenames:
2078       if node not in volumes or volumes[node].failed or not volumes[node].data:
2079         continue
2080
2081       node_vols = volumes[node].data[:]
2082       node_vols.sort(key=lambda vol: vol['dev'])
2083
2084       for vol in node_vols:
2085         node_output = []
2086         for field in self.op.output_fields:
2087           if field == "node":
2088             val = node
2089           elif field == "phys":
2090             val = vol['dev']
2091           elif field == "vg":
2092             val = vol['vg']
2093           elif field == "name":
2094             val = vol['name']
2095           elif field == "size":
2096             val = int(float(vol['size']))
2097           elif field == "instance":
2098             for inst in ilist:
2099               if node not in lv_by_node[inst]:
2100                 continue
2101               if vol['name'] in lv_by_node[inst][node]:
2102                 val = inst.name
2103                 break
2104             else:
2105               val = '-'
2106           else:
2107             raise errors.ParameterError(field)
2108           node_output.append(str(val))
2109
2110         output.append(node_output)
2111
2112     return output
2113
2114
2115 class LUAddNode(LogicalUnit):
2116   """Logical unit for adding node to the cluster.
2117
2118   """
2119   HPATH = "node-add"
2120   HTYPE = constants.HTYPE_NODE
2121   _OP_REQP = ["node_name"]
2122
2123   def BuildHooksEnv(self):
2124     """Build hooks env.
2125
2126     This will run on all nodes before, and on all nodes + the new node after.
2127
2128     """
2129     env = {
2130       "OP_TARGET": self.op.node_name,
2131       "NODE_NAME": self.op.node_name,
2132       "NODE_PIP": self.op.primary_ip,
2133       "NODE_SIP": self.op.secondary_ip,
2134       }
2135     nodes_0 = self.cfg.GetNodeList()
2136     nodes_1 = nodes_0 + [self.op.node_name, ]
2137     return env, nodes_0, nodes_1
2138
2139   def CheckPrereq(self):
2140     """Check prerequisites.
2141
2142     This checks:
2143      - the new node is not already in the config
2144      - it is resolvable
2145      - its parameters (single/dual homed) matches the cluster
2146
2147     Any errors are signalled by raising errors.OpPrereqError.
2148
2149     """
2150     node_name = self.op.node_name
2151     cfg = self.cfg
2152
2153     dns_data = utils.HostInfo(node_name)
2154
2155     node = dns_data.name
2156     primary_ip = self.op.primary_ip = dns_data.ip
2157     secondary_ip = getattr(self.op, "secondary_ip", None)
2158     if secondary_ip is None:
2159       secondary_ip = primary_ip
2160     if not utils.IsValidIP(secondary_ip):
2161       raise errors.OpPrereqError("Invalid secondary IP given")
2162     self.op.secondary_ip = secondary_ip
2163
2164     node_list = cfg.GetNodeList()
2165     if not self.op.readd and node in node_list:
2166       raise errors.OpPrereqError("Node %s is already in the configuration" %
2167                                  node)
2168     elif self.op.readd and node not in node_list:
2169       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2170
2171     for existing_node_name in node_list:
2172       existing_node = cfg.GetNodeInfo(existing_node_name)
2173
2174       if self.op.readd and node == existing_node_name:
2175         if (existing_node.primary_ip != primary_ip or
2176             existing_node.secondary_ip != secondary_ip):
2177           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2178                                      " address configuration as before")
2179         continue
2180
2181       if (existing_node.primary_ip == primary_ip or
2182           existing_node.secondary_ip == primary_ip or
2183           existing_node.primary_ip == secondary_ip or
2184           existing_node.secondary_ip == secondary_ip):
2185         raise errors.OpPrereqError("New node ip address(es) conflict with"
2186                                    " existing node %s" % existing_node.name)
2187
2188     # check that the type of the node (single versus dual homed) is the
2189     # same as for the master
2190     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2191     master_singlehomed = myself.secondary_ip == myself.primary_ip
2192     newbie_singlehomed = secondary_ip == primary_ip
2193     if master_singlehomed != newbie_singlehomed:
2194       if master_singlehomed:
2195         raise errors.OpPrereqError("The master has no private ip but the"
2196                                    " new node has one")
2197       else:
2198         raise errors.OpPrereqError("The master has a private ip but the"
2199                                    " new node doesn't have one")
2200
2201     # checks reachablity
2202     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2203       raise errors.OpPrereqError("Node not reachable by ping")
2204
2205     if not newbie_singlehomed:
2206       # check reachability from my secondary ip to newbie's secondary ip
2207       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2208                            source=myself.secondary_ip):
2209         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2210                                    " based ping to noded port")
2211
2212     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2213     mc_now, _ = self.cfg.GetMasterCandidateStats()
2214     master_candidate = mc_now < cp_size
2215
2216     self.new_node = objects.Node(name=node,
2217                                  primary_ip=primary_ip,
2218                                  secondary_ip=secondary_ip,
2219                                  master_candidate=master_candidate,
2220                                  offline=False, drained=False)
2221
2222   def Exec(self, feedback_fn):
2223     """Adds the new node to the cluster.
2224
2225     """
2226     new_node = self.new_node
2227     node = new_node.name
2228
2229     # check connectivity
2230     result = self.rpc.call_version([node])[node]
2231     result.Raise()
2232     if result.data:
2233       if constants.PROTOCOL_VERSION == result.data:
2234         logging.info("Communication to node %s fine, sw version %s match",
2235                      node, result.data)
2236       else:
2237         raise errors.OpExecError("Version mismatch master version %s,"
2238                                  " node version %s" %
2239                                  (constants.PROTOCOL_VERSION, result.data))
2240     else:
2241       raise errors.OpExecError("Cannot get version from the new node")
2242
2243     # setup ssh on node
2244     logging.info("Copy ssh key to node %s", node)
2245     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2246     keyarray = []
2247     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2248                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2249                 priv_key, pub_key]
2250
2251     for i in keyfiles:
2252       f = open(i, 'r')
2253       try:
2254         keyarray.append(f.read())
2255       finally:
2256         f.close()
2257
2258     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2259                                     keyarray[2],
2260                                     keyarray[3], keyarray[4], keyarray[5])
2261
2262     msg = result.RemoteFailMsg()
2263     if msg:
2264       raise errors.OpExecError("Cannot transfer ssh keys to the"
2265                                " new node: %s" % msg)
2266
2267     # Add node to our /etc/hosts, and add key to known_hosts
2268     utils.AddHostToEtcHosts(new_node.name)
2269
2270     if new_node.secondary_ip != new_node.primary_ip:
2271       result = self.rpc.call_node_has_ip_address(new_node.name,
2272                                                  new_node.secondary_ip)
2273       if result.failed or not result.data:
2274         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2275                                  " you gave (%s). Please fix and re-run this"
2276                                  " command." % new_node.secondary_ip)
2277
2278     node_verify_list = [self.cfg.GetMasterNode()]
2279     node_verify_param = {
2280       'nodelist': [node],
2281       # TODO: do a node-net-test as well?
2282     }
2283
2284     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2285                                        self.cfg.GetClusterName())
2286     for verifier in node_verify_list:
2287       if result[verifier].failed or not result[verifier].data:
2288         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2289                                  " for remote verification" % verifier)
2290       if result[verifier].data['nodelist']:
2291         for failed in result[verifier].data['nodelist']:
2292           feedback_fn("ssh/hostname verification failed %s -> %s" %
2293                       (verifier, result[verifier].data['nodelist'][failed]))
2294         raise errors.OpExecError("ssh/hostname verification failed.")
2295
2296     if self.op.readd:
2297       _RedistributeAncillaryFiles(self)
2298       self.context.ReaddNode(new_node)
2299     else:
2300       _RedistributeAncillaryFiles(self, additional_nodes=node)
2301       self.context.AddNode(new_node)
2302
2303
2304 class LUSetNodeParams(LogicalUnit):
2305   """Modifies the parameters of a node.
2306
2307   """
2308   HPATH = "node-modify"
2309   HTYPE = constants.HTYPE_NODE
2310   _OP_REQP = ["node_name"]
2311   REQ_BGL = False
2312
2313   def CheckArguments(self):
2314     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2315     if node_name is None:
2316       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2317     self.op.node_name = node_name
2318     _CheckBooleanOpField(self.op, 'master_candidate')
2319     _CheckBooleanOpField(self.op, 'offline')
2320     _CheckBooleanOpField(self.op, 'drained')
2321     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2322     if all_mods.count(None) == 3:
2323       raise errors.OpPrereqError("Please pass at least one modification")
2324     if all_mods.count(True) > 1:
2325       raise errors.OpPrereqError("Can't set the node into more than one"
2326                                  " state at the same time")
2327
2328   def ExpandNames(self):
2329     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2330
2331   def BuildHooksEnv(self):
2332     """Build hooks env.
2333
2334     This runs on the master node.
2335
2336     """
2337     env = {
2338       "OP_TARGET": self.op.node_name,
2339       "MASTER_CANDIDATE": str(self.op.master_candidate),
2340       "OFFLINE": str(self.op.offline),
2341       "DRAINED": str(self.op.drained),
2342       }
2343     nl = [self.cfg.GetMasterNode(),
2344           self.op.node_name]
2345     return env, nl, nl
2346
2347   def CheckPrereq(self):
2348     """Check prerequisites.
2349
2350     This only checks the instance list against the existing names.
2351
2352     """
2353     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2354
2355     if ((self.op.master_candidate == False or self.op.offline == True or
2356          self.op.drained == True) and node.master_candidate):
2357       # we will demote the node from master_candidate
2358       if self.op.node_name == self.cfg.GetMasterNode():
2359         raise errors.OpPrereqError("The master node has to be a"
2360                                    " master candidate, online and not drained")
2361       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2362       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2363       if num_candidates <= cp_size:
2364         msg = ("Not enough master candidates (desired"
2365                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2366         if self.op.force:
2367           self.LogWarning(msg)
2368         else:
2369           raise errors.OpPrereqError(msg)
2370
2371     if (self.op.master_candidate == True and
2372         ((node.offline and not self.op.offline == False) or
2373          (node.drained and not self.op.drained == False))):
2374       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2375                                  " to master_candidate" % node.name)
2376
2377     return
2378
2379   def Exec(self, feedback_fn):
2380     """Modifies a node.
2381
2382     """
2383     node = self.node
2384
2385     result = []
2386     changed_mc = False
2387
2388     if self.op.offline is not None:
2389       node.offline = self.op.offline
2390       result.append(("offline", str(self.op.offline)))
2391       if self.op.offline == True:
2392         if node.master_candidate:
2393           node.master_candidate = False
2394           changed_mc = True
2395           result.append(("master_candidate", "auto-demotion due to offline"))
2396         if node.drained:
2397           node.drained = False
2398           result.append(("drained", "clear drained status due to offline"))
2399
2400     if self.op.master_candidate is not None:
2401       node.master_candidate = self.op.master_candidate
2402       changed_mc = True
2403       result.append(("master_candidate", str(self.op.master_candidate)))
2404       if self.op.master_candidate == False:
2405         rrc = self.rpc.call_node_demote_from_mc(node.name)
2406         msg = rrc.RemoteFailMsg()
2407         if msg:
2408           self.LogWarning("Node failed to demote itself: %s" % msg)
2409
2410     if self.op.drained is not None:
2411       node.drained = self.op.drained
2412       result.append(("drained", str(self.op.drained)))
2413       if self.op.drained == True:
2414         if node.master_candidate:
2415           node.master_candidate = False
2416           changed_mc = True
2417           result.append(("master_candidate", "auto-demotion due to drain"))
2418         if node.offline:
2419           node.offline = False
2420           result.append(("offline", "clear offline status due to drain"))
2421
2422     # this will trigger configuration file update, if needed
2423     self.cfg.Update(node)
2424     # this will trigger job queue propagation or cleanup
2425     if changed_mc:
2426       self.context.ReaddNode(node)
2427
2428     return result
2429
2430
2431 class LUPowercycleNode(NoHooksLU):
2432   """Powercycles a node.
2433
2434   """
2435   _OP_REQP = ["node_name", "force"]
2436   REQ_BGL = False
2437
2438   def CheckArguments(self):
2439     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2440     if node_name is None:
2441       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2442     self.op.node_name = node_name
2443     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2444       raise errors.OpPrereqError("The node is the master and the force"
2445                                  " parameter was not set")
2446
2447   def ExpandNames(self):
2448     """Locking for PowercycleNode.
2449
2450     This is a last-resource option and shouldn't block on other
2451     jobs. Therefore, we grab no locks.
2452
2453     """
2454     self.needed_locks = {}
2455
2456   def CheckPrereq(self):
2457     """Check prerequisites.
2458
2459     This LU has no prereqs.
2460
2461     """
2462     pass
2463
2464   def Exec(self, feedback_fn):
2465     """Reboots a node.
2466
2467     """
2468     result = self.rpc.call_node_powercycle(self.op.node_name,
2469                                            self.cfg.GetHypervisorType())
2470     msg = result.RemoteFailMsg()
2471     if msg:
2472       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2473     return result.payload
2474
2475
2476 class LUQueryClusterInfo(NoHooksLU):
2477   """Query cluster configuration.
2478
2479   """
2480   _OP_REQP = []
2481   REQ_BGL = False
2482
2483   def ExpandNames(self):
2484     self.needed_locks = {}
2485
2486   def CheckPrereq(self):
2487     """No prerequsites needed for this LU.
2488
2489     """
2490     pass
2491
2492   def Exec(self, feedback_fn):
2493     """Return cluster config.
2494
2495     """
2496     cluster = self.cfg.GetClusterInfo()
2497     result = {
2498       "software_version": constants.RELEASE_VERSION,
2499       "protocol_version": constants.PROTOCOL_VERSION,
2500       "config_version": constants.CONFIG_VERSION,
2501       "os_api_version": constants.OS_API_VERSION,
2502       "export_version": constants.EXPORT_VERSION,
2503       "architecture": (platform.architecture()[0], platform.machine()),
2504       "name": cluster.cluster_name,
2505       "master": cluster.master_node,
2506       "default_hypervisor": cluster.default_hypervisor,
2507       "enabled_hypervisors": cluster.enabled_hypervisors,
2508       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2509                         for hypervisor in cluster.enabled_hypervisors]),
2510       "beparams": cluster.beparams,
2511       "candidate_pool_size": cluster.candidate_pool_size,
2512       "default_bridge": cluster.default_bridge,
2513       "master_netdev": cluster.master_netdev,
2514       "volume_group_name": cluster.volume_group_name,
2515       "file_storage_dir": cluster.file_storage_dir,
2516       }
2517
2518     return result
2519
2520
2521 class LUQueryConfigValues(NoHooksLU):
2522   """Return configuration values.
2523
2524   """
2525   _OP_REQP = []
2526   REQ_BGL = False
2527   _FIELDS_DYNAMIC = utils.FieldSet()
2528   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2529
2530   def ExpandNames(self):
2531     self.needed_locks = {}
2532
2533     _CheckOutputFields(static=self._FIELDS_STATIC,
2534                        dynamic=self._FIELDS_DYNAMIC,
2535                        selected=self.op.output_fields)
2536
2537   def CheckPrereq(self):
2538     """No prerequisites.
2539
2540     """
2541     pass
2542
2543   def Exec(self, feedback_fn):
2544     """Dump a representation of the cluster config to the standard output.
2545
2546     """
2547     values = []
2548     for field in self.op.output_fields:
2549       if field == "cluster_name":
2550         entry = self.cfg.GetClusterName()
2551       elif field == "master_node":
2552         entry = self.cfg.GetMasterNode()
2553       elif field == "drain_flag":
2554         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2555       else:
2556         raise errors.ParameterError(field)
2557       values.append(entry)
2558     return values
2559
2560
2561 class LUActivateInstanceDisks(NoHooksLU):
2562   """Bring up an instance's disks.
2563
2564   """
2565   _OP_REQP = ["instance_name"]
2566   REQ_BGL = False
2567
2568   def ExpandNames(self):
2569     self._ExpandAndLockInstance()
2570     self.needed_locks[locking.LEVEL_NODE] = []
2571     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2572
2573   def DeclareLocks(self, level):
2574     if level == locking.LEVEL_NODE:
2575       self._LockInstancesNodes()
2576
2577   def CheckPrereq(self):
2578     """Check prerequisites.
2579
2580     This checks that the instance is in the cluster.
2581
2582     """
2583     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2584     assert self.instance is not None, \
2585       "Cannot retrieve locked instance %s" % self.op.instance_name
2586     _CheckNodeOnline(self, self.instance.primary_node)
2587
2588   def Exec(self, feedback_fn):
2589     """Activate the disks.
2590
2591     """
2592     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2593     if not disks_ok:
2594       raise errors.OpExecError("Cannot activate block devices")
2595
2596     return disks_info
2597
2598
2599 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2600   """Prepare the block devices for an instance.
2601
2602   This sets up the block devices on all nodes.
2603
2604   @type lu: L{LogicalUnit}
2605   @param lu: the logical unit on whose behalf we execute
2606   @type instance: L{objects.Instance}
2607   @param instance: the instance for whose disks we assemble
2608   @type ignore_secondaries: boolean
2609   @param ignore_secondaries: if true, errors on secondary nodes
2610       won't result in an error return from the function
2611   @return: False if the operation failed, otherwise a list of
2612       (host, instance_visible_name, node_visible_name)
2613       with the mapping from node devices to instance devices
2614
2615   """
2616   device_info = []
2617   disks_ok = True
2618   iname = instance.name
2619   # With the two passes mechanism we try to reduce the window of
2620   # opportunity for the race condition of switching DRBD to primary
2621   # before handshaking occured, but we do not eliminate it
2622
2623   # The proper fix would be to wait (with some limits) until the
2624   # connection has been made and drbd transitions from WFConnection
2625   # into any other network-connected state (Connected, SyncTarget,
2626   # SyncSource, etc.)
2627
2628   # 1st pass, assemble on all nodes in secondary mode
2629   for inst_disk in instance.disks:
2630     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2631       lu.cfg.SetDiskID(node_disk, node)
2632       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2633       msg = result.RemoteFailMsg()
2634       if msg:
2635         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2636                            " (is_primary=False, pass=1): %s",
2637                            inst_disk.iv_name, node, msg)
2638         if not ignore_secondaries:
2639           disks_ok = False
2640
2641   # FIXME: race condition on drbd migration to primary
2642
2643   # 2nd pass, do only the primary node
2644   for inst_disk in instance.disks:
2645     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2646       if node != instance.primary_node:
2647         continue
2648       lu.cfg.SetDiskID(node_disk, node)
2649       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2650       msg = result.RemoteFailMsg()
2651       if msg:
2652         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2653                            " (is_primary=True, pass=2): %s",
2654                            inst_disk.iv_name, node, msg)
2655         disks_ok = False
2656     device_info.append((instance.primary_node, inst_disk.iv_name,
2657                         result.payload))
2658
2659   # leave the disks configured for the primary node
2660   # this is a workaround that would be fixed better by
2661   # improving the logical/physical id handling
2662   for disk in instance.disks:
2663     lu.cfg.SetDiskID(disk, instance.primary_node)
2664
2665   return disks_ok, device_info
2666
2667
2668 def _StartInstanceDisks(lu, instance, force):
2669   """Start the disks of an instance.
2670
2671   """
2672   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2673                                            ignore_secondaries=force)
2674   if not disks_ok:
2675     _ShutdownInstanceDisks(lu, instance)
2676     if force is not None and not force:
2677       lu.proc.LogWarning("", hint="If the message above refers to a"
2678                          " secondary node,"
2679                          " you can retry the operation using '--force'.")
2680     raise errors.OpExecError("Disk consistency error")
2681
2682
2683 class LUDeactivateInstanceDisks(NoHooksLU):
2684   """Shutdown an instance's disks.
2685
2686   """
2687   _OP_REQP = ["instance_name"]
2688   REQ_BGL = False
2689
2690   def ExpandNames(self):
2691     self._ExpandAndLockInstance()
2692     self.needed_locks[locking.LEVEL_NODE] = []
2693     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2694
2695   def DeclareLocks(self, level):
2696     if level == locking.LEVEL_NODE:
2697       self._LockInstancesNodes()
2698
2699   def CheckPrereq(self):
2700     """Check prerequisites.
2701
2702     This checks that the instance is in the cluster.
2703
2704     """
2705     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2706     assert self.instance is not None, \
2707       "Cannot retrieve locked instance %s" % self.op.instance_name
2708
2709   def Exec(self, feedback_fn):
2710     """Deactivate the disks
2711
2712     """
2713     instance = self.instance
2714     _SafeShutdownInstanceDisks(self, instance)
2715
2716
2717 def _SafeShutdownInstanceDisks(lu, instance):
2718   """Shutdown block devices of an instance.
2719
2720   This function checks if an instance is running, before calling
2721   _ShutdownInstanceDisks.
2722
2723   """
2724   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2725                                       [instance.hypervisor])
2726   ins_l = ins_l[instance.primary_node]
2727   if ins_l.failed or not isinstance(ins_l.data, list):
2728     raise errors.OpExecError("Can't contact node '%s'" %
2729                              instance.primary_node)
2730
2731   if instance.name in ins_l.data:
2732     raise errors.OpExecError("Instance is running, can't shutdown"
2733                              " block devices.")
2734
2735   _ShutdownInstanceDisks(lu, instance)
2736
2737
2738 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2739   """Shutdown block devices of an instance.
2740
2741   This does the shutdown on all nodes of the instance.
2742
2743   If the ignore_primary is false, errors on the primary node are
2744   ignored.
2745
2746   """
2747   all_result = True
2748   for disk in instance.disks:
2749     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2750       lu.cfg.SetDiskID(top_disk, node)
2751       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2752       msg = result.RemoteFailMsg()
2753       if msg:
2754         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2755                       disk.iv_name, node, msg)
2756         if not ignore_primary or node != instance.primary_node:
2757           all_result = False
2758   return all_result
2759
2760
2761 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2762   """Checks if a node has enough free memory.
2763
2764   This function check if a given node has the needed amount of free
2765   memory. In case the node has less memory or we cannot get the
2766   information from the node, this function raise an OpPrereqError
2767   exception.
2768
2769   @type lu: C{LogicalUnit}
2770   @param lu: a logical unit from which we get configuration data
2771   @type node: C{str}
2772   @param node: the node to check
2773   @type reason: C{str}
2774   @param reason: string to use in the error message
2775   @type requested: C{int}
2776   @param requested: the amount of memory in MiB to check for
2777   @type hypervisor_name: C{str}
2778   @param hypervisor_name: the hypervisor to ask for memory stats
2779   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2780       we cannot check the node
2781
2782   """
2783   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2784   nodeinfo[node].Raise()
2785   free_mem = nodeinfo[node].data.get('memory_free')
2786   if not isinstance(free_mem, int):
2787     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2788                              " was '%s'" % (node, free_mem))
2789   if requested > free_mem:
2790     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2791                              " needed %s MiB, available %s MiB" %
2792                              (node, reason, requested, free_mem))
2793
2794
2795 class LUStartupInstance(LogicalUnit):
2796   """Starts an instance.
2797
2798   """
2799   HPATH = "instance-start"
2800   HTYPE = constants.HTYPE_INSTANCE
2801   _OP_REQP = ["instance_name", "force"]
2802   REQ_BGL = False
2803
2804   def ExpandNames(self):
2805     self._ExpandAndLockInstance()
2806
2807   def BuildHooksEnv(self):
2808     """Build hooks env.
2809
2810     This runs on master, primary and secondary nodes of the instance.
2811
2812     """
2813     env = {
2814       "FORCE": self.op.force,
2815       }
2816     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2817     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2818     return env, nl, nl
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827     assert self.instance is not None, \
2828       "Cannot retrieve locked instance %s" % self.op.instance_name
2829
2830     # extra beparams
2831     self.beparams = getattr(self.op, "beparams", {})
2832     if self.beparams:
2833       if not isinstance(self.beparams, dict):
2834         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2835                                    " dict" % (type(self.beparams), ))
2836       # fill the beparams dict
2837       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2838       self.op.beparams = self.beparams
2839
2840     # extra hvparams
2841     self.hvparams = getattr(self.op, "hvparams", {})
2842     if self.hvparams:
2843       if not isinstance(self.hvparams, dict):
2844         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2845                                    " dict" % (type(self.hvparams), ))
2846
2847       # check hypervisor parameter syntax (locally)
2848       cluster = self.cfg.GetClusterInfo()
2849       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2850       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2851                                     instance.hvparams)
2852       filled_hvp.update(self.hvparams)
2853       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2854       hv_type.CheckParameterSyntax(filled_hvp)
2855       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2856       self.op.hvparams = self.hvparams
2857
2858     _CheckNodeOnline(self, instance.primary_node)
2859
2860     bep = self.cfg.GetClusterInfo().FillBE(instance)
2861     # check bridges existance
2862     _CheckInstanceBridgesExist(self, instance)
2863
2864     remote_info = self.rpc.call_instance_info(instance.primary_node,
2865                                               instance.name,
2866                                               instance.hypervisor)
2867     remote_info.Raise()
2868     if not remote_info.data:
2869       _CheckNodeFreeMemory(self, instance.primary_node,
2870                            "starting instance %s" % instance.name,
2871                            bep[constants.BE_MEMORY], instance.hypervisor)
2872
2873   def Exec(self, feedback_fn):
2874     """Start the instance.
2875
2876     """
2877     instance = self.instance
2878     force = self.op.force
2879
2880     self.cfg.MarkInstanceUp(instance.name)
2881
2882     node_current = instance.primary_node
2883
2884     _StartInstanceDisks(self, instance, force)
2885
2886     result = self.rpc.call_instance_start(node_current, instance,
2887                                           self.hvparams, self.beparams)
2888     msg = result.RemoteFailMsg()
2889     if msg:
2890       _ShutdownInstanceDisks(self, instance)
2891       raise errors.OpExecError("Could not start instance: %s" % msg)
2892
2893
2894 class LURebootInstance(LogicalUnit):
2895   """Reboot an instance.
2896
2897   """
2898   HPATH = "instance-reboot"
2899   HTYPE = constants.HTYPE_INSTANCE
2900   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2901   REQ_BGL = False
2902
2903   def ExpandNames(self):
2904     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2905                                    constants.INSTANCE_REBOOT_HARD,
2906                                    constants.INSTANCE_REBOOT_FULL]:
2907       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2908                                   (constants.INSTANCE_REBOOT_SOFT,
2909                                    constants.INSTANCE_REBOOT_HARD,
2910                                    constants.INSTANCE_REBOOT_FULL))
2911     self._ExpandAndLockInstance()
2912
2913   def BuildHooksEnv(self):
2914     """Build hooks env.
2915
2916     This runs on master, primary and secondary nodes of the instance.
2917
2918     """
2919     env = {
2920       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2921       "REBOOT_TYPE": self.op.reboot_type,
2922       }
2923     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2924     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2925     return env, nl, nl
2926
2927   def CheckPrereq(self):
2928     """Check prerequisites.
2929
2930     This checks that the instance is in the cluster.
2931
2932     """
2933     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2934     assert self.instance is not None, \
2935       "Cannot retrieve locked instance %s" % self.op.instance_name
2936
2937     _CheckNodeOnline(self, instance.primary_node)
2938
2939     # check bridges existance
2940     _CheckInstanceBridgesExist(self, instance)
2941
2942   def Exec(self, feedback_fn):
2943     """Reboot the instance.
2944
2945     """
2946     instance = self.instance
2947     ignore_secondaries = self.op.ignore_secondaries
2948     reboot_type = self.op.reboot_type
2949
2950     node_current = instance.primary_node
2951
2952     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2953                        constants.INSTANCE_REBOOT_HARD]:
2954       for disk in instance.disks:
2955         self.cfg.SetDiskID(disk, node_current)
2956       result = self.rpc.call_instance_reboot(node_current, instance,
2957                                              reboot_type)
2958       msg = result.RemoteFailMsg()
2959       if msg:
2960         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2961     else:
2962       result = self.rpc.call_instance_shutdown(node_current, instance)
2963       msg = result.RemoteFailMsg()
2964       if msg:
2965         raise errors.OpExecError("Could not shutdown instance for"
2966                                  " full reboot: %s" % msg)
2967       _ShutdownInstanceDisks(self, instance)
2968       _StartInstanceDisks(self, instance, ignore_secondaries)
2969       result = self.rpc.call_instance_start(node_current, instance, None, None)
2970       msg = result.RemoteFailMsg()
2971       if msg:
2972         _ShutdownInstanceDisks(self, instance)
2973         raise errors.OpExecError("Could not start instance for"
2974                                  " full reboot: %s" % msg)
2975
2976     self.cfg.MarkInstanceUp(instance.name)
2977
2978
2979 class LUShutdownInstance(LogicalUnit):
2980   """Shutdown an instance.
2981
2982   """
2983   HPATH = "instance-stop"
2984   HTYPE = constants.HTYPE_INSTANCE
2985   _OP_REQP = ["instance_name"]
2986   REQ_BGL = False
2987
2988   def ExpandNames(self):
2989     self._ExpandAndLockInstance()
2990
2991   def BuildHooksEnv(self):
2992     """Build hooks env.
2993
2994     This runs on master, primary and secondary nodes of the instance.
2995
2996     """
2997     env = _BuildInstanceHookEnvByObject(self, self.instance)
2998     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2999     return env, nl, nl
3000
3001   def CheckPrereq(self):
3002     """Check prerequisites.
3003
3004     This checks that the instance is in the cluster.
3005
3006     """
3007     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3008     assert self.instance is not None, \
3009       "Cannot retrieve locked instance %s" % self.op.instance_name
3010     _CheckNodeOnline(self, self.instance.primary_node)
3011
3012   def Exec(self, feedback_fn):
3013     """Shutdown the instance.
3014
3015     """
3016     instance = self.instance
3017     node_current = instance.primary_node
3018     self.cfg.MarkInstanceDown(instance.name)
3019     result = self.rpc.call_instance_shutdown(node_current, instance)
3020     msg = result.RemoteFailMsg()
3021     if msg:
3022       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3023
3024     _ShutdownInstanceDisks(self, instance)
3025
3026
3027 class LUReinstallInstance(LogicalUnit):
3028   """Reinstall an instance.
3029
3030   """
3031   HPATH = "instance-reinstall"
3032   HTYPE = constants.HTYPE_INSTANCE
3033   _OP_REQP = ["instance_name"]
3034   REQ_BGL = False
3035
3036   def ExpandNames(self):
3037     self._ExpandAndLockInstance()
3038
3039   def BuildHooksEnv(self):
3040     """Build hooks env.
3041
3042     This runs on master, primary and secondary nodes of the instance.
3043
3044     """
3045     env = _BuildInstanceHookEnvByObject(self, self.instance)
3046     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3047     return env, nl, nl
3048
3049   def CheckPrereq(self):
3050     """Check prerequisites.
3051
3052     This checks that the instance is in the cluster and is not running.
3053
3054     """
3055     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3056     assert instance is not None, \
3057       "Cannot retrieve locked instance %s" % self.op.instance_name
3058     _CheckNodeOnline(self, instance.primary_node)
3059
3060     if instance.disk_template == constants.DT_DISKLESS:
3061       raise errors.OpPrereqError("Instance '%s' has no disks" %
3062                                  self.op.instance_name)
3063     if instance.admin_up:
3064       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3065                                  self.op.instance_name)
3066     remote_info = self.rpc.call_instance_info(instance.primary_node,
3067                                               instance.name,
3068                                               instance.hypervisor)
3069     remote_info.Raise()
3070     if remote_info.data:
3071       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3072                                  (self.op.instance_name,
3073                                   instance.primary_node))
3074
3075     self.op.os_type = getattr(self.op, "os_type", None)
3076     if self.op.os_type is not None:
3077       # OS verification
3078       pnode = self.cfg.GetNodeInfo(
3079         self.cfg.ExpandNodeName(instance.primary_node))
3080       if pnode is None:
3081         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3082                                    self.op.pnode)
3083       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3084       result.Raise()
3085       if not isinstance(result.data, objects.OS):
3086         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3087                                    " primary node"  % self.op.os_type)
3088
3089     self.instance = instance
3090
3091   def Exec(self, feedback_fn):
3092     """Reinstall the instance.
3093
3094     """
3095     inst = self.instance
3096
3097     if self.op.os_type is not None:
3098       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3099       inst.os = self.op.os_type
3100       self.cfg.Update(inst)
3101
3102     _StartInstanceDisks(self, inst, None)
3103     try:
3104       feedback_fn("Running the instance OS create scripts...")
3105       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3106       msg = result.RemoteFailMsg()
3107       if msg:
3108         raise errors.OpExecError("Could not install OS for instance %s"
3109                                  " on node %s: %s" %
3110                                  (inst.name, inst.primary_node, msg))
3111     finally:
3112       _ShutdownInstanceDisks(self, inst)
3113
3114
3115 class LURenameInstance(LogicalUnit):
3116   """Rename an instance.
3117
3118   """
3119   HPATH = "instance-rename"
3120   HTYPE = constants.HTYPE_INSTANCE
3121   _OP_REQP = ["instance_name", "new_name"]
3122
3123   def BuildHooksEnv(self):
3124     """Build hooks env.
3125
3126     This runs on master, primary and secondary nodes of the instance.
3127
3128     """
3129     env = _BuildInstanceHookEnvByObject(self, self.instance)
3130     env["INSTANCE_NEW_NAME"] = self.op.new_name
3131     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3132     return env, nl, nl
3133
3134   def CheckPrereq(self):
3135     """Check prerequisites.
3136
3137     This checks that the instance is in the cluster and is not running.
3138
3139     """
3140     instance = self.cfg.GetInstanceInfo(
3141       self.cfg.ExpandInstanceName(self.op.instance_name))
3142     if instance is None:
3143       raise errors.OpPrereqError("Instance '%s' not known" %
3144                                  self.op.instance_name)
3145     _CheckNodeOnline(self, instance.primary_node)
3146
3147     if instance.admin_up:
3148       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3149                                  self.op.instance_name)
3150     remote_info = self.rpc.call_instance_info(instance.primary_node,
3151                                               instance.name,
3152                                               instance.hypervisor)
3153     remote_info.Raise()
3154     if remote_info.data:
3155       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3156                                  (self.op.instance_name,
3157                                   instance.primary_node))
3158     self.instance = instance
3159
3160     # new name verification
3161     name_info = utils.HostInfo(self.op.new_name)
3162
3163     self.op.new_name = new_name = name_info.name
3164     instance_list = self.cfg.GetInstanceList()
3165     if new_name in instance_list:
3166       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3167                                  new_name)
3168
3169     if not getattr(self.op, "ignore_ip", False):
3170       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3171         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3172                                    (name_info.ip, new_name))
3173
3174
3175   def Exec(self, feedback_fn):
3176     """Reinstall the instance.
3177
3178     """
3179     inst = self.instance
3180     old_name = inst.name
3181
3182     if inst.disk_template == constants.DT_FILE:
3183       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3184
3185     self.cfg.RenameInstance(inst.name, self.op.new_name)
3186     # Change the instance lock. This is definitely safe while we hold the BGL
3187     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3188     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3189
3190     # re-read the instance from the configuration after rename
3191     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3192
3193     if inst.disk_template == constants.DT_FILE:
3194       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3196                                                      old_file_storage_dir,
3197                                                      new_file_storage_dir)
3198       result.Raise()
3199       if not result.data:
3200         raise errors.OpExecError("Could not connect to node '%s' to rename"
3201                                  " directory '%s' to '%s' (but the instance"
3202                                  " has been renamed in Ganeti)" % (
3203                                  inst.primary_node, old_file_storage_dir,
3204                                  new_file_storage_dir))
3205
3206       if not result.data[0]:
3207         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3208                                  " (but the instance has been renamed in"
3209                                  " Ganeti)" % (old_file_storage_dir,
3210                                                new_file_storage_dir))
3211
3212     _StartInstanceDisks(self, inst, None)
3213     try:
3214       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3215                                                  old_name)
3216       msg = result.RemoteFailMsg()
3217       if msg:
3218         msg = ("Could not run OS rename script for instance %s on node %s"
3219                " (but the instance has been renamed in Ganeti): %s" %
3220                (inst.name, inst.primary_node, msg))
3221         self.proc.LogWarning(msg)
3222     finally:
3223       _ShutdownInstanceDisks(self, inst)
3224
3225
3226 class LURemoveInstance(LogicalUnit):
3227   """Remove an instance.
3228
3229   """
3230   HPATH = "instance-remove"
3231   HTYPE = constants.HTYPE_INSTANCE
3232   _OP_REQP = ["instance_name", "ignore_failures"]
3233   REQ_BGL = False
3234
3235   def ExpandNames(self):
3236     self._ExpandAndLockInstance()
3237     self.needed_locks[locking.LEVEL_NODE] = []
3238     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3239
3240   def DeclareLocks(self, level):
3241     if level == locking.LEVEL_NODE:
3242       self._LockInstancesNodes()
3243
3244   def BuildHooksEnv(self):
3245     """Build hooks env.
3246
3247     This runs on master, primary and secondary nodes of the instance.
3248
3249     """
3250     env = _BuildInstanceHookEnvByObject(self, self.instance)
3251     nl = [self.cfg.GetMasterNode()]
3252     return env, nl, nl
3253
3254   def CheckPrereq(self):
3255     """Check prerequisites.
3256
3257     This checks that the instance is in the cluster.
3258
3259     """
3260     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3261     assert self.instance is not None, \
3262       "Cannot retrieve locked instance %s" % self.op.instance_name
3263
3264   def Exec(self, feedback_fn):
3265     """Remove the instance.
3266
3267     """
3268     instance = self.instance
3269     logging.info("Shutting down instance %s on node %s",
3270                  instance.name, instance.primary_node)
3271
3272     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3273     msg = result.RemoteFailMsg()
3274     if msg:
3275       if self.op.ignore_failures:
3276         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3277       else:
3278         raise errors.OpExecError("Could not shutdown instance %s on"
3279                                  " node %s: %s" %
3280                                  (instance.name, instance.primary_node, msg))
3281
3282     logging.info("Removing block devices for instance %s", instance.name)
3283
3284     if not _RemoveDisks(self, instance):
3285       if self.op.ignore_failures:
3286         feedback_fn("Warning: can't remove instance's disks")
3287       else:
3288         raise errors.OpExecError("Can't remove instance's disks")
3289
3290     logging.info("Removing instance %s out of cluster config", instance.name)
3291
3292     self.cfg.RemoveInstance(instance.name)
3293     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3294
3295
3296 class LUQueryInstances(NoHooksLU):
3297   """Logical unit for querying instances.
3298
3299   """
3300   _OP_REQP = ["output_fields", "names", "use_locking"]
3301   REQ_BGL = False
3302   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3303                                     "admin_state",
3304                                     "disk_template", "ip", "mac", "bridge",
3305                                     "sda_size", "sdb_size", "vcpus", "tags",
3306                                     "network_port", "beparams",
3307                                     r"(disk)\.(size)/([0-9]+)",
3308                                     r"(disk)\.(sizes)", "disk_usage",
3309                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3310                                     r"(nic)\.(macs|ips|bridges)",
3311                                     r"(disk|nic)\.(count)",
3312                                     "serial_no", "hypervisor", "hvparams",] +
3313                                   ["hv/%s" % name
3314                                    for name in constants.HVS_PARAMETERS] +
3315                                   ["be/%s" % name
3316                                    for name in constants.BES_PARAMETERS])
3317   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3318
3319
3320   def ExpandNames(self):
3321     _CheckOutputFields(static=self._FIELDS_STATIC,
3322                        dynamic=self._FIELDS_DYNAMIC,
3323                        selected=self.op.output_fields)
3324
3325     self.needed_locks = {}
3326     self.share_locks[locking.LEVEL_INSTANCE] = 1
3327     self.share_locks[locking.LEVEL_NODE] = 1
3328
3329     if self.op.names:
3330       self.wanted = _GetWantedInstances(self, self.op.names)
3331     else:
3332       self.wanted = locking.ALL_SET
3333
3334     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3335     self.do_locking = self.do_node_query and self.op.use_locking
3336     if self.do_locking:
3337       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3338       self.needed_locks[locking.LEVEL_NODE] = []
3339       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3340
3341   def DeclareLocks(self, level):
3342     if level == locking.LEVEL_NODE and self.do_locking:
3343       self._LockInstancesNodes()
3344
3345   def CheckPrereq(self):
3346     """Check prerequisites.
3347
3348     """
3349     pass
3350
3351   def Exec(self, feedback_fn):
3352     """Computes the list of nodes and their attributes.
3353
3354     """
3355     all_info = self.cfg.GetAllInstancesInfo()
3356     if self.wanted == locking.ALL_SET:
3357       # caller didn't specify instance names, so ordering is not important
3358       if self.do_locking:
3359         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3360       else:
3361         instance_names = all_info.keys()
3362       instance_names = utils.NiceSort(instance_names)
3363     else:
3364       # caller did specify names, so we must keep the ordering
3365       if self.do_locking:
3366         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3367       else:
3368         tgt_set = all_info.keys()
3369       missing = set(self.wanted).difference(tgt_set)
3370       if missing:
3371         raise errors.OpExecError("Some instances were removed before"
3372                                  " retrieving their data: %s" % missing)
3373       instance_names = self.wanted
3374
3375     instance_list = [all_info[iname] for iname in instance_names]
3376
3377     # begin data gathering
3378
3379     nodes = frozenset([inst.primary_node for inst in instance_list])
3380     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3381
3382     bad_nodes = []
3383     off_nodes = []
3384     if self.do_node_query:
3385       live_data = {}
3386       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3387       for name in nodes:
3388         result = node_data[name]
3389         if result.offline:
3390           # offline nodes will be in both lists
3391           off_nodes.append(name)
3392         if result.failed:
3393           bad_nodes.append(name)
3394         else:
3395           if result.data:
3396             live_data.update(result.data)
3397             # else no instance is alive
3398     else:
3399       live_data = dict([(name, {}) for name in instance_names])
3400
3401     # end data gathering
3402
3403     HVPREFIX = "hv/"
3404     BEPREFIX = "be/"
3405     output = []
3406     for instance in instance_list:
3407       iout = []
3408       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3409       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3410       for field in self.op.output_fields:
3411         st_match = self._FIELDS_STATIC.Matches(field)
3412         if field == "name":
3413           val = instance.name
3414         elif field == "os":
3415           val = instance.os
3416         elif field == "pnode":
3417           val = instance.primary_node
3418         elif field == "snodes":
3419           val = list(instance.secondary_nodes)
3420         elif field == "admin_state":
3421           val = instance.admin_up
3422         elif field == "oper_state":
3423           if instance.primary_node in bad_nodes:
3424             val = None
3425           else:
3426             val = bool(live_data.get(instance.name))
3427         elif field == "status":
3428           if instance.primary_node in off_nodes:
3429             val = "ERROR_nodeoffline"
3430           elif instance.primary_node in bad_nodes:
3431             val = "ERROR_nodedown"
3432           else:
3433             running = bool(live_data.get(instance.name))
3434             if running:
3435               if instance.admin_up:
3436                 val = "running"
3437               else:
3438                 val = "ERROR_up"
3439             else:
3440               if instance.admin_up:
3441                 val = "ERROR_down"
3442               else:
3443                 val = "ADMIN_down"
3444         elif field == "oper_ram":
3445           if instance.primary_node in bad_nodes:
3446             val = None
3447           elif instance.name in live_data:
3448             val = live_data[instance.name].get("memory", "?")
3449           else:
3450             val = "-"
3451         elif field == "disk_template":
3452           val = instance.disk_template
3453         elif field == "ip":
3454           val = instance.nics[0].ip
3455         elif field == "bridge":
3456           val = instance.nics[0].bridge
3457         elif field == "mac":
3458           val = instance.nics[0].mac
3459         elif field == "sda_size" or field == "sdb_size":
3460           idx = ord(field[2]) - ord('a')
3461           try:
3462             val = instance.FindDisk(idx).size
3463           except errors.OpPrereqError:
3464             val = None
3465         elif field == "disk_usage": # total disk usage per node
3466           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3467           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3468         elif field == "tags":
3469           val = list(instance.GetTags())
3470         elif field == "serial_no":
3471           val = instance.serial_no
3472         elif field == "network_port":
3473           val = instance.network_port
3474         elif field == "hypervisor":
3475           val = instance.hypervisor
3476         elif field == "hvparams":
3477           val = i_hv
3478         elif (field.startswith(HVPREFIX) and
3479               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3480           val = i_hv.get(field[len(HVPREFIX):], None)
3481         elif field == "beparams":
3482           val = i_be
3483         elif (field.startswith(BEPREFIX) and
3484               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3485           val = i_be.get(field[len(BEPREFIX):], None)
3486         elif st_match and st_match.groups():
3487           # matches a variable list
3488           st_groups = st_match.groups()
3489           if st_groups and st_groups[0] == "disk":
3490             if st_groups[1] == "count":
3491               val = len(instance.disks)
3492             elif st_groups[1] == "sizes":
3493               val = [disk.size for disk in instance.disks]
3494             elif st_groups[1] == "size":
3495               try:
3496                 val = instance.FindDisk(st_groups[2]).size
3497               except errors.OpPrereqError:
3498                 val = None
3499             else:
3500               assert False, "Unhandled disk parameter"
3501           elif st_groups[0] == "nic":
3502             if st_groups[1] == "count":
3503               val = len(instance.nics)
3504             elif st_groups[1] == "macs":
3505               val = [nic.mac for nic in instance.nics]
3506             elif st_groups[1] == "ips":
3507               val = [nic.ip for nic in instance.nics]
3508             elif st_groups[1] == "bridges":
3509               val = [nic.bridge for nic in instance.nics]
3510             else:
3511               # index-based item
3512               nic_idx = int(st_groups[2])
3513               if nic_idx >= len(instance.nics):
3514                 val = None
3515               else:
3516                 if st_groups[1] == "mac":
3517                   val = instance.nics[nic_idx].mac
3518                 elif st_groups[1] == "ip":
3519                   val = instance.nics[nic_idx].ip
3520                 elif st_groups[1] == "bridge":
3521                   val = instance.nics[nic_idx].bridge
3522                 else:
3523                   assert False, "Unhandled NIC parameter"
3524           else:
3525             assert False, "Unhandled variable parameter"
3526         else:
3527           raise errors.ParameterError(field)
3528         iout.append(val)
3529       output.append(iout)
3530
3531     return output
3532
3533
3534 class LUFailoverInstance(LogicalUnit):
3535   """Failover an instance.
3536
3537   """
3538   HPATH = "instance-failover"
3539   HTYPE = constants.HTYPE_INSTANCE
3540   _OP_REQP = ["instance_name", "ignore_consistency"]
3541   REQ_BGL = False
3542
3543   def ExpandNames(self):
3544     self._ExpandAndLockInstance()
3545     self.needed_locks[locking.LEVEL_NODE] = []
3546     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3547
3548   def DeclareLocks(self, level):
3549     if level == locking.LEVEL_NODE:
3550       self._LockInstancesNodes()
3551
3552   def BuildHooksEnv(self):
3553     """Build hooks env.
3554
3555     This runs on master, primary and secondary nodes of the instance.
3556
3557     """
3558     env = {
3559       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3560       }
3561     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3562     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3563     return env, nl, nl
3564
3565   def CheckPrereq(self):
3566     """Check prerequisites.
3567
3568     This checks that the instance is in the cluster.
3569
3570     """
3571     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3572     assert self.instance is not None, \
3573       "Cannot retrieve locked instance %s" % self.op.instance_name
3574
3575     bep = self.cfg.GetClusterInfo().FillBE(instance)
3576     if instance.disk_template not in constants.DTS_NET_MIRROR:
3577       raise errors.OpPrereqError("Instance's disk layout is not"
3578                                  " network mirrored, cannot failover.")
3579
3580     secondary_nodes = instance.secondary_nodes
3581     if not secondary_nodes:
3582       raise errors.ProgrammerError("no secondary node but using "
3583                                    "a mirrored disk template")
3584
3585     target_node = secondary_nodes[0]
3586     _CheckNodeOnline(self, target_node)
3587     _CheckNodeNotDrained(self, target_node)
3588     # check memory requirements on the secondary node
3589     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3590                          instance.name, bep[constants.BE_MEMORY],
3591                          instance.hypervisor)
3592
3593     # check bridge existance
3594     brlist = [nic.bridge for nic in instance.nics]
3595     result = self.rpc.call_bridges_exist(target_node, brlist)
3596     result.Raise()
3597     if not result.data:
3598       raise errors.OpPrereqError("One or more target bridges %s does not"
3599                                  " exist on destination node '%s'" %
3600                                  (brlist, target_node))
3601
3602   def Exec(self, feedback_fn):
3603     """Failover an instance.
3604
3605     The failover is done by shutting it down on its present node and
3606     starting it on the secondary.
3607
3608     """
3609     instance = self.instance
3610
3611     source_node = instance.primary_node
3612     target_node = instance.secondary_nodes[0]
3613
3614     feedback_fn("* checking disk consistency between source and target")
3615     for dev in instance.disks:
3616       # for drbd, these are drbd over lvm
3617       if not _CheckDiskConsistency(self, dev, target_node, False):
3618         if instance.admin_up and not self.op.ignore_consistency:
3619           raise errors.OpExecError("Disk %s is degraded on target node,"
3620                                    " aborting failover." % dev.iv_name)
3621
3622     feedback_fn("* shutting down instance on source node")
3623     logging.info("Shutting down instance %s on node %s",
3624                  instance.name, source_node)
3625
3626     result = self.rpc.call_instance_shutdown(source_node, instance)
3627     msg = result.RemoteFailMsg()
3628     if msg:
3629       if self.op.ignore_consistency:
3630         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3631                              " Proceeding anyway. Please make sure node"
3632                              " %s is down. Error details: %s",
3633                              instance.name, source_node, source_node, msg)
3634       else:
3635         raise errors.OpExecError("Could not shutdown instance %s on"
3636                                  " node %s: %s" %
3637                                  (instance.name, source_node, msg))
3638
3639     feedback_fn("* deactivating the instance's disks on source node")
3640     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3641       raise errors.OpExecError("Can't shut down the instance's disks.")
3642
3643     instance.primary_node = target_node
3644     # distribute new instance config to the other nodes
3645     self.cfg.Update(instance)
3646
3647     # Only start the instance if it's marked as up
3648     if instance.admin_up:
3649       feedback_fn("* activating the instance's disks on target node")
3650       logging.info("Starting instance %s on node %s",
3651                    instance.name, target_node)
3652
3653       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3654                                                ignore_secondaries=True)
3655       if not disks_ok:
3656         _ShutdownInstanceDisks(self, instance)
3657         raise errors.OpExecError("Can't activate the instance's disks")
3658
3659       feedback_fn("* starting the instance on the target node")
3660       result = self.rpc.call_instance_start(target_node, instance, None, None)
3661       msg = result.RemoteFailMsg()
3662       if msg:
3663         _ShutdownInstanceDisks(self, instance)
3664         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3665                                  (instance.name, target_node, msg))
3666
3667
3668 class LUMigrateInstance(LogicalUnit):
3669   """Migrate an instance.
3670
3671   This is migration without shutting down, compared to the failover,
3672   which is done with shutdown.
3673
3674   """
3675   HPATH = "instance-migrate"
3676   HTYPE = constants.HTYPE_INSTANCE
3677   _OP_REQP = ["instance_name", "live", "cleanup"]
3678
3679   REQ_BGL = False
3680
3681   def ExpandNames(self):
3682     self._ExpandAndLockInstance()
3683     self.needed_locks[locking.LEVEL_NODE] = []
3684     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3685
3686   def DeclareLocks(self, level):
3687     if level == locking.LEVEL_NODE:
3688       self._LockInstancesNodes()
3689
3690   def BuildHooksEnv(self):
3691     """Build hooks env.
3692
3693     This runs on master, primary and secondary nodes of the instance.
3694
3695     """
3696     env = _BuildInstanceHookEnvByObject(self, self.instance)
3697     env["MIGRATE_LIVE"] = self.op.live
3698     env["MIGRATE_CLEANUP"] = self.op.cleanup
3699     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3700     return env, nl, nl
3701
3702   def CheckPrereq(self):
3703     """Check prerequisites.
3704
3705     This checks that the instance is in the cluster.
3706
3707     """
3708     instance = self.cfg.GetInstanceInfo(
3709       self.cfg.ExpandInstanceName(self.op.instance_name))
3710     if instance is None:
3711       raise errors.OpPrereqError("Instance '%s' not known" %
3712                                  self.op.instance_name)
3713
3714     if instance.disk_template != constants.DT_DRBD8:
3715       raise errors.OpPrereqError("Instance's disk layout is not"
3716                                  " drbd8, cannot migrate.")
3717
3718     secondary_nodes = instance.secondary_nodes
3719     if not secondary_nodes:
3720       raise errors.ConfigurationError("No secondary node but using"
3721                                       " drbd8 disk template")
3722
3723     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3724
3725     target_node = secondary_nodes[0]
3726     # check memory requirements on the secondary node
3727     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3728                          instance.name, i_be[constants.BE_MEMORY],
3729                          instance.hypervisor)
3730
3731     # check bridge existance
3732     brlist = [nic.bridge for nic in instance.nics]
3733     result = self.rpc.call_bridges_exist(target_node, brlist)
3734     if result.failed or not result.data:
3735       raise errors.OpPrereqError("One or more target bridges %s does not"
3736                                  " exist on destination node '%s'" %
3737                                  (brlist, target_node))
3738
3739     if not self.op.cleanup:
3740       _CheckNodeNotDrained(self, target_node)
3741       result = self.rpc.call_instance_migratable(instance.primary_node,
3742                                                  instance)
3743       msg = result.RemoteFailMsg()
3744       if msg:
3745         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3746                                    msg)
3747
3748     self.instance = instance
3749
3750   def _WaitUntilSync(self):
3751     """Poll with custom rpc for disk sync.
3752
3753     This uses our own step-based rpc call.
3754
3755     """
3756     self.feedback_fn("* wait until resync is done")
3757     all_done = False
3758     while not all_done:
3759       all_done = True
3760       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3761                                             self.nodes_ip,
3762                                             self.instance.disks)
3763       min_percent = 100
3764       for node, nres in result.items():
3765         msg = nres.RemoteFailMsg()
3766         if msg:
3767           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3768                                    (node, msg))
3769         node_done, node_percent = nres.payload
3770         all_done = all_done and node_done
3771         if node_percent is not None:
3772           min_percent = min(min_percent, node_percent)
3773       if not all_done:
3774         if min_percent < 100:
3775           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3776         time.sleep(2)
3777
3778   def _EnsureSecondary(self, node):
3779     """Demote a node to secondary.
3780
3781     """
3782     self.feedback_fn("* switching node %s to secondary mode" % node)
3783
3784     for dev in self.instance.disks:
3785       self.cfg.SetDiskID(dev, node)
3786
3787     result = self.rpc.call_blockdev_close(node, self.instance.name,
3788                                           self.instance.disks)
3789     msg = result.RemoteFailMsg()
3790     if msg:
3791       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3792                                " error %s" % (node, msg))
3793
3794   def _GoStandalone(self):
3795     """Disconnect from the network.
3796
3797     """
3798     self.feedback_fn("* changing into standalone mode")
3799     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3800                                                self.instance.disks)
3801     for node, nres in result.items():
3802       msg = nres.RemoteFailMsg()
3803       if msg:
3804         raise errors.OpExecError("Cannot disconnect disks node %s,"
3805                                  " error %s" % (node, msg))
3806
3807   def _GoReconnect(self, multimaster):
3808     """Reconnect to the network.
3809
3810     """
3811     if multimaster:
3812       msg = "dual-master"
3813     else:
3814       msg = "single-master"
3815     self.feedback_fn("* changing disks into %s mode" % msg)
3816     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3817                                            self.instance.disks,
3818                                            self.instance.name, multimaster)
3819     for node, nres in result.items():
3820       msg = nres.RemoteFailMsg()
3821       if msg:
3822         raise errors.OpExecError("Cannot change disks config on node %s,"
3823                                  " error: %s" % (node, msg))
3824
3825   def _ExecCleanup(self):
3826     """Try to cleanup after a failed migration.
3827
3828     The cleanup is done by:
3829       - check that the instance is running only on one node
3830         (and update the config if needed)
3831       - change disks on its secondary node to secondary
3832       - wait until disks are fully synchronized
3833       - disconnect from the network
3834       - change disks into single-master mode
3835       - wait again until disks are fully synchronized
3836
3837     """
3838     instance = self.instance
3839     target_node = self.target_node
3840     source_node = self.source_node
3841
3842     # check running on only one node
3843     self.feedback_fn("* checking where the instance actually runs"
3844                      " (if this hangs, the hypervisor might be in"
3845                      " a bad state)")
3846     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3847     for node, result in ins_l.items():
3848       result.Raise()
3849       if not isinstance(result.data, list):
3850         raise errors.OpExecError("Can't contact node '%s'" % node)
3851
3852     runningon_source = instance.name in ins_l[source_node].data
3853     runningon_target = instance.name in ins_l[target_node].data
3854
3855     if runningon_source and runningon_target:
3856       raise errors.OpExecError("Instance seems to be running on two nodes,"
3857                                " or the hypervisor is confused. You will have"
3858                                " to ensure manually that it runs only on one"
3859                                " and restart this operation.")
3860
3861     if not (runningon_source or runningon_target):
3862       raise errors.OpExecError("Instance does not seem to be running at all."
3863                                " In this case, it's safer to repair by"
3864                                " running 'gnt-instance stop' to ensure disk"
3865                                " shutdown, and then restarting it.")
3866
3867     if runningon_target:
3868       # the migration has actually succeeded, we need to update the config
3869       self.feedback_fn("* instance running on secondary node (%s),"
3870                        " updating config" % target_node)
3871       instance.primary_node = target_node
3872       self.cfg.Update(instance)
3873       demoted_node = source_node
3874     else:
3875       self.feedback_fn("* instance confirmed to be running on its"
3876                        " primary node (%s)" % source_node)
3877       demoted_node = target_node
3878
3879     self._EnsureSecondary(demoted_node)
3880     try:
3881       self._WaitUntilSync()
3882     except errors.OpExecError:
3883       # we ignore here errors, since if the device is standalone, it
3884       # won't be able to sync
3885       pass
3886     self._GoStandalone()
3887     self._GoReconnect(False)
3888     self._WaitUntilSync()
3889
3890     self.feedback_fn("* done")
3891
3892   def _RevertDiskStatus(self):
3893     """Try to revert the disk status after a failed migration.
3894
3895     """
3896     target_node = self.target_node
3897     try:
3898       self._EnsureSecondary(target_node)
3899       self._GoStandalone()
3900       self._GoReconnect(False)
3901       self._WaitUntilSync()
3902     except errors.OpExecError, err:
3903       self.LogWarning("Migration failed and I can't reconnect the"
3904                       " drives: error '%s'\n"
3905                       "Please look and recover the instance status" %
3906                       str(err))
3907
3908   def _AbortMigration(self):
3909     """Call the hypervisor code to abort a started migration.
3910
3911     """
3912     instance = self.instance
3913     target_node = self.target_node
3914     migration_info = self.migration_info
3915
3916     abort_result = self.rpc.call_finalize_migration(target_node,
3917                                                     instance,
3918                                                     migration_info,
3919                                                     False)
3920     abort_msg = abort_result.RemoteFailMsg()
3921     if abort_msg:
3922       logging.error("Aborting migration failed on target node %s: %s" %
3923                     (target_node, abort_msg))
3924       # Don't raise an exception here, as we stil have to try to revert the
3925       # disk status, even if this step failed.
3926
3927   def _ExecMigration(self):
3928     """Migrate an instance.
3929
3930     The migrate is done by:
3931       - change the disks into dual-master mode
3932       - wait until disks are fully synchronized again
3933       - migrate the instance
3934       - change disks on the new secondary node (the old primary) to secondary
3935       - wait until disks are fully synchronized
3936       - change disks into single-master mode
3937
3938     """
3939     instance = self.instance
3940     target_node = self.target_node
3941     source_node = self.source_node
3942
3943     self.feedback_fn("* checking disk consistency between source and target")
3944     for dev in instance.disks:
3945       if not _CheckDiskConsistency(self, dev, target_node, False):
3946         raise errors.OpExecError("Disk %s is degraded or not fully"
3947                                  " synchronized on target node,"
3948                                  " aborting migrate." % dev.iv_name)
3949
3950     # First get the migration information from the remote node
3951     result = self.rpc.call_migration_info(source_node, instance)
3952     msg = result.RemoteFailMsg()
3953     if msg:
3954       log_err = ("Failed fetching source migration information from %s: %s" %
3955                  (source_node, msg))
3956       logging.error(log_err)
3957       raise errors.OpExecError(log_err)
3958
3959     self.migration_info = migration_info = result.payload
3960
3961     # Then switch the disks to master/master mode
3962     self._EnsureSecondary(target_node)
3963     self._GoStandalone()
3964     self._GoReconnect(True)
3965     self._WaitUntilSync()
3966
3967     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3968     result = self.rpc.call_accept_instance(target_node,
3969                                            instance,
3970                                            migration_info,
3971                                            self.nodes_ip[target_node])
3972
3973     msg = result.RemoteFailMsg()
3974     if msg:
3975       logging.error("Instance pre-migration failed, trying to revert"
3976                     " disk status: %s", msg)
3977       self._AbortMigration()
3978       self._RevertDiskStatus()
3979       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3980                                (instance.name, msg))
3981
3982     self.feedback_fn("* migrating instance to %s" % target_node)
3983     time.sleep(10)
3984     result = self.rpc.call_instance_migrate(source_node, instance,
3985                                             self.nodes_ip[target_node],
3986                                             self.op.live)
3987     msg = result.RemoteFailMsg()
3988     if msg:
3989       logging.error("Instance migration failed, trying to revert"
3990                     " disk status: %s", msg)
3991       self._AbortMigration()
3992       self._RevertDiskStatus()
3993       raise errors.OpExecError("Could not migrate instance %s: %s" %
3994                                (instance.name, msg))
3995     time.sleep(10)
3996
3997     instance.primary_node = target_node
3998     # distribute new instance config to the other nodes
3999     self.cfg.Update(instance)
4000
4001     result = self.rpc.call_finalize_migration(target_node,
4002                                               instance,
4003                                               migration_info,
4004                                               True)
4005     msg = result.RemoteFailMsg()
4006     if msg:
4007       logging.error("Instance migration succeeded, but finalization failed:"
4008                     " %s" % msg)
4009       raise errors.OpExecError("Could not finalize instance migration: %s" %
4010                                msg)
4011
4012     self._EnsureSecondary(source_node)
4013     self._WaitUntilSync()
4014     self._GoStandalone()
4015     self._GoReconnect(False)
4016     self._WaitUntilSync()
4017
4018     self.feedback_fn("* done")
4019
4020   def Exec(self, feedback_fn):
4021     """Perform the migration.
4022
4023     """
4024     self.feedback_fn = feedback_fn
4025
4026     self.source_node = self.instance.primary_node
4027     self.target_node = self.instance.secondary_nodes[0]
4028     self.all_nodes = [self.source_node, self.target_node]
4029     self.nodes_ip = {
4030       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4031       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4032       }
4033     if self.op.cleanup:
4034       return self._ExecCleanup()
4035     else:
4036       return self._ExecMigration()
4037
4038
4039 def _CreateBlockDev(lu, node, instance, device, force_create,
4040                     info, force_open):
4041   """Create a tree of block devices on a given node.
4042
4043   If this device type has to be created on secondaries, create it and
4044   all its children.
4045
4046   If not, just recurse to children keeping the same 'force' value.
4047
4048   @param lu: the lu on whose behalf we execute
4049   @param node: the node on which to create the device
4050   @type instance: L{objects.Instance}
4051   @param instance: the instance which owns the device
4052   @type device: L{objects.Disk}
4053   @param device: the device to create
4054   @type force_create: boolean
4055   @param force_create: whether to force creation of this device; this
4056       will be change to True whenever we find a device which has
4057       CreateOnSecondary() attribute
4058   @param info: the extra 'metadata' we should attach to the device
4059       (this will be represented as a LVM tag)
4060   @type force_open: boolean
4061   @param force_open: this parameter will be passes to the
4062       L{backend.BlockdevCreate} function where it specifies
4063       whether we run on primary or not, and it affects both
4064       the child assembly and the device own Open() execution
4065
4066   """
4067   if device.CreateOnSecondary():
4068     force_create = True
4069
4070   if device.children:
4071     for child in device.children:
4072       _CreateBlockDev(lu, node, instance, child, force_create,
4073                       info, force_open)
4074
4075   if not force_create:
4076     return
4077
4078   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4079
4080
4081 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4082   """Create a single block device on a given node.
4083
4084   This will not recurse over children of the device, so they must be
4085   created in advance.
4086
4087   @param lu: the lu on whose behalf we execute
4088   @param node: the node on which to create the device
4089   @type instance: L{objects.Instance}
4090   @param instance: the instance which owns the device
4091   @type device: L{objects.Disk}
4092   @param device: the device to create
4093   @param info: the extra 'metadata' we should attach to the device
4094       (this will be represented as a LVM tag)
4095   @type force_open: boolean
4096   @param force_open: this parameter will be passes to the
4097       L{backend.BlockdevCreate} function where it specifies
4098       whether we run on primary or not, and it affects both
4099       the child assembly and the device own Open() execution
4100
4101   """
4102   lu.cfg.SetDiskID(device, node)
4103   result = lu.rpc.call_blockdev_create(node, device, device.size,
4104                                        instance.name, force_open, info)
4105   msg = result.RemoteFailMsg()
4106   if msg:
4107     raise errors.OpExecError("Can't create block device %s on"
4108                              " node %s for instance %s: %s" %
4109                              (device, node, instance.name, msg))
4110   if device.physical_id is None:
4111     device.physical_id = result.payload
4112
4113
4114 def _GenerateUniqueNames(lu, exts):
4115   """Generate a suitable LV name.
4116
4117   This will generate a logical volume name for the given instance.
4118
4119   """
4120   results = []
4121   for val in exts:
4122     new_id = lu.cfg.GenerateUniqueID()
4123     results.append("%s%s" % (new_id, val))
4124   return results
4125
4126
4127 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4128                          p_minor, s_minor):
4129   """Generate a drbd8 device complete with its children.
4130
4131   """
4132   port = lu.cfg.AllocatePort()
4133   vgname = lu.cfg.GetVGName()
4134   shared_secret = lu.cfg.GenerateDRBDSecret()
4135   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4136                           logical_id=(vgname, names[0]))
4137   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4138                           logical_id=(vgname, names[1]))
4139   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4140                           logical_id=(primary, secondary, port,
4141                                       p_minor, s_minor,
4142                                       shared_secret),
4143                           children=[dev_data, dev_meta],
4144                           iv_name=iv_name)
4145   return drbd_dev
4146
4147
4148 def _GenerateDiskTemplate(lu, template_name,
4149                           instance_name, primary_node,
4150                           secondary_nodes, disk_info,
4151                           file_storage_dir, file_driver,
4152                           base_index):
4153   """Generate the entire disk layout for a given template type.
4154
4155   """
4156   #TODO: compute space requirements
4157
4158   vgname = lu.cfg.GetVGName()
4159   disk_count = len(disk_info)
4160   disks = []
4161   if template_name == constants.DT_DISKLESS:
4162     pass
4163   elif template_name == constants.DT_PLAIN:
4164     if len(secondary_nodes) != 0:
4165       raise errors.ProgrammerError("Wrong template configuration")
4166
4167     names = _GenerateUniqueNames(lu, [".disk%d" % i
4168                                       for i in range(disk_count)])
4169     for idx, disk in enumerate(disk_info):
4170       disk_index = idx + base_index
4171       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4172                               logical_id=(vgname, names[idx]),
4173                               iv_name="disk/%d" % disk_index,
4174                               mode=disk["mode"])
4175       disks.append(disk_dev)
4176   elif template_name == constants.DT_DRBD8:
4177     if len(secondary_nodes) != 1:
4178       raise errors.ProgrammerError("Wrong template configuration")
4179     remote_node = secondary_nodes[0]
4180     minors = lu.cfg.AllocateDRBDMinor(
4181       [primary_node, remote_node] * len(disk_info), instance_name)
4182
4183     names = []
4184     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4185                                                for i in range(disk_count)]):
4186       names.append(lv_prefix + "_data")
4187       names.append(lv_prefix + "_meta")
4188     for idx, disk in enumerate(disk_info):
4189       disk_index = idx + base_index
4190       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4191                                       disk["size"], names[idx*2:idx*2+2],
4192                                       "disk/%d" % disk_index,
4193                                       minors[idx*2], minors[idx*2+1])
4194       disk_dev.mode = disk["mode"]
4195       disks.append(disk_dev)
4196   elif template_name == constants.DT_FILE:
4197     if len(secondary_nodes) != 0:
4198       raise errors.ProgrammerError("Wrong template configuration")
4199
4200     for idx, disk in enumerate(disk_info):
4201       disk_index = idx + base_index
4202       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4203                               iv_name="disk/%d" % disk_index,
4204                               logical_id=(file_driver,
4205                                           "%s/disk%d" % (file_storage_dir,
4206                                                          disk_index)),
4207                               mode=disk["mode"])
4208       disks.append(disk_dev)
4209   else:
4210     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4211   return disks
4212
4213
4214 def _GetInstanceInfoText(instance):
4215   """Compute that text that should be added to the disk's metadata.
4216
4217   """
4218   return "originstname+%s" % instance.name
4219
4220
4221 def _CreateDisks(lu, instance):
4222   """Create all disks for an instance.
4223
4224   This abstracts away some work from AddInstance.
4225
4226   @type lu: L{LogicalUnit}
4227   @param lu: the logical unit on whose behalf we execute
4228   @type instance: L{objects.Instance}
4229   @param instance: the instance whose disks we should create
4230   @rtype: boolean
4231   @return: the success of the creation
4232
4233   """
4234   info = _GetInstanceInfoText(instance)
4235   pnode = instance.primary_node
4236
4237   if instance.disk_template == constants.DT_FILE:
4238     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4239     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4240
4241     if result.failed or not result.data:
4242       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4243
4244     if not result.data[0]:
4245       raise errors.OpExecError("Failed to create directory '%s'" %
4246                                file_storage_dir)
4247
4248   # Note: this needs to be kept in sync with adding of disks in
4249   # LUSetInstanceParams
4250   for device in instance.disks:
4251     logging.info("Creating volume %s for instance %s",
4252                  device.iv_name, instance.name)
4253     #HARDCODE
4254     for node in instance.all_nodes:
4255       f_create = node == pnode
4256       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4257
4258
4259 def _RemoveDisks(lu, instance):
4260   """Remove all disks for an instance.
4261
4262   This abstracts away some work from `AddInstance()` and
4263   `RemoveInstance()`. Note that in case some of the devices couldn't
4264   be removed, the removal will continue with the other ones (compare
4265   with `_CreateDisks()`).
4266
4267   @type lu: L{LogicalUnit}
4268   @param lu: the logical unit on whose behalf we execute
4269   @type instance: L{objects.Instance}
4270   @param instance: the instance whose disks we should remove
4271   @rtype: boolean
4272   @return: the success of the removal
4273
4274   """
4275   logging.info("Removing block devices for instance %s", instance.name)
4276
4277   all_result = True
4278   for device in instance.disks:
4279     for node, disk in device.ComputeNodeTree(instance.primary_node):
4280       lu.cfg.SetDiskID(disk, node)
4281       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4282       if msg:
4283         lu.LogWarning("Could not remove block device %s on node %s,"
4284                       " continuing anyway: %s", device.iv_name, node, msg)
4285         all_result = False
4286
4287   if instance.disk_template == constants.DT_FILE:
4288     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4289     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4290                                                  file_storage_dir)
4291     if result.failed or not result.data:
4292       logging.error("Could not remove directory '%s'", file_storage_dir)
4293       all_result = False
4294
4295   return all_result
4296
4297
4298 def _ComputeDiskSize(disk_template, disks):
4299   """Compute disk size requirements in the volume group
4300
4301   """
4302   # Required free disk space as a function of disk and swap space
4303   req_size_dict = {
4304     constants.DT_DISKLESS: None,
4305     constants.DT_PLAIN: sum(d["size"] for d in disks),
4306     # 128 MB are added for drbd metadata for each disk
4307     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4308     constants.DT_FILE: None,
4309   }
4310
4311   if disk_template not in req_size_dict:
4312     raise errors.ProgrammerError("Disk template '%s' size requirement"
4313                                  " is unknown" %  disk_template)
4314
4315   return req_size_dict[disk_template]
4316
4317
4318 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4319   """Hypervisor parameter validation.
4320
4321   This function abstract the hypervisor parameter validation to be
4322   used in both instance create and instance modify.
4323
4324   @type lu: L{LogicalUnit}
4325   @param lu: the logical unit for which we check
4326   @type nodenames: list
4327   @param nodenames: the list of nodes on which we should check
4328   @type hvname: string
4329   @param hvname: the name of the hypervisor we should use
4330   @type hvparams: dict
4331   @param hvparams: the parameters which we need to check
4332   @raise errors.OpPrereqError: if the parameters are not valid
4333
4334   """
4335   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4336                                                   hvname,
4337                                                   hvparams)
4338   for node in nodenames:
4339     info = hvinfo[node]
4340     if info.offline:
4341       continue
4342     msg = info.RemoteFailMsg()
4343     if msg:
4344       raise errors.OpPrereqError("Hypervisor parameter validation"
4345                                  " failed on node %s: %s" % (node, msg))
4346
4347
4348 class LUCreateInstance(LogicalUnit):
4349   """Create an instance.
4350
4351   """
4352   HPATH = "instance-add"
4353   HTYPE = constants.HTYPE_INSTANCE
4354   _OP_REQP = ["instance_name", "disks", "disk_template",
4355               "mode", "start",
4356               "wait_for_sync", "ip_check", "nics",
4357               "hvparams", "beparams"]
4358   REQ_BGL = False
4359
4360   def _ExpandNode(self, node):
4361     """Expands and checks one node name.
4362
4363     """
4364     node_full = self.cfg.ExpandNodeName(node)
4365     if node_full is None:
4366       raise errors.OpPrereqError("Unknown node %s" % node)
4367     return node_full
4368
4369   def ExpandNames(self):
4370     """ExpandNames for CreateInstance.
4371
4372     Figure out the right locks for instance creation.
4373
4374     """
4375     self.needed_locks = {}
4376
4377     # set optional parameters to none if they don't exist
4378     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4379       if not hasattr(self.op, attr):
4380         setattr(self.op, attr, None)
4381
4382     # cheap checks, mostly valid constants given
4383
4384     # verify creation mode
4385     if self.op.mode not in (constants.INSTANCE_CREATE,
4386                             constants.INSTANCE_IMPORT):
4387       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4388                                  self.op.mode)
4389
4390     # disk template and mirror node verification
4391     if self.op.disk_template not in constants.DISK_TEMPLATES:
4392       raise errors.OpPrereqError("Invalid disk template name")
4393
4394     if self.op.hypervisor is None:
4395       self.op.hypervisor = self.cfg.GetHypervisorType()
4396
4397     cluster = self.cfg.GetClusterInfo()
4398     enabled_hvs = cluster.enabled_hypervisors
4399     if self.op.hypervisor not in enabled_hvs:
4400       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4401                                  " cluster (%s)" % (self.op.hypervisor,
4402                                   ",".join(enabled_hvs)))
4403
4404     # check hypervisor parameter syntax (locally)
4405     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4406     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4407                                   self.op.hvparams)
4408     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4409     hv_type.CheckParameterSyntax(filled_hvp)
4410
4411     # fill and remember the beparams dict
4412     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4413     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4414                                     self.op.beparams)
4415
4416     #### instance parameters check
4417
4418     # instance name verification
4419     hostname1 = utils.HostInfo(self.op.instance_name)
4420     self.op.instance_name = instance_name = hostname1.name
4421
4422     # this is just a preventive check, but someone might still add this
4423     # instance in the meantime, and creation will fail at lock-add time
4424     if instance_name in self.cfg.GetInstanceList():
4425       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4426                                  instance_name)
4427
4428     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4429
4430     # NIC buildup
4431     self.nics = []
4432     for nic in self.op.nics:
4433       # ip validity checks
4434       ip = nic.get("ip", None)
4435       if ip is None or ip.lower() == "none":
4436         nic_ip = None
4437       elif ip.lower() == constants.VALUE_AUTO:
4438         nic_ip = hostname1.ip
4439       else:
4440         if not utils.IsValidIP(ip):
4441           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4442                                      " like a valid IP" % ip)
4443         nic_ip = ip
4444
4445       # MAC address verification
4446       mac = nic.get("mac", constants.VALUE_AUTO)
4447       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4448         if not utils.IsValidMac(mac.lower()):
4449           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4450                                      mac)
4451       # bridge verification
4452       bridge = nic.get("bridge", None)
4453       if bridge is None:
4454         bridge = self.cfg.GetDefBridge()
4455       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4456
4457     # disk checks/pre-build
4458     self.disks = []
4459     for disk in self.op.disks:
4460       mode = disk.get("mode", constants.DISK_RDWR)
4461       if mode not in constants.DISK_ACCESS_SET:
4462         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4463                                    mode)
4464       size = disk.get("size", None)
4465       if size is None:
4466         raise errors.OpPrereqError("Missing disk size")
4467       try:
4468         size = int(size)
4469       except ValueError:
4470         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4471       self.disks.append({"size": size, "mode": mode})
4472
4473     # used in CheckPrereq for ip ping check
4474     self.check_ip = hostname1.ip
4475
4476     # file storage checks
4477     if (self.op.file_driver and
4478         not self.op.file_driver in constants.FILE_DRIVER):
4479       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4480                                  self.op.file_driver)
4481
4482     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4483       raise errors.OpPrereqError("File storage directory path not absolute")
4484
4485     ### Node/iallocator related checks
4486     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4487       raise errors.OpPrereqError("One and only one of iallocator and primary"
4488                                  " node must be given")
4489
4490     if self.op.iallocator:
4491       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4492     else:
4493       self.op.pnode = self._ExpandNode(self.op.pnode)
4494       nodelist = [self.op.pnode]
4495       if self.op.snode is not None:
4496         self.op.snode = self._ExpandNode(self.op.snode)
4497         nodelist.append(self.op.snode)
4498       self.needed_locks[locking.LEVEL_NODE] = nodelist
4499
4500     # in case of import lock the source node too
4501     if self.op.mode == constants.INSTANCE_IMPORT:
4502       src_node = getattr(self.op, "src_node", None)
4503       src_path = getattr(self.op, "src_path", None)
4504
4505       if src_path is None:
4506         self.op.src_path = src_path = self.op.instance_name
4507
4508       if src_node is None:
4509         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4510         self.op.src_node = None
4511         if os.path.isabs(src_path):
4512           raise errors.OpPrereqError("Importing an instance from an absolute"
4513                                      " path requires a source node option.")
4514       else:
4515         self.op.src_node = src_node = self._ExpandNode(src_node)
4516         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4517           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4518         if not os.path.isabs(src_path):
4519           self.op.src_path = src_path = \
4520             os.path.join(constants.EXPORT_DIR, src_path)
4521
4522     else: # INSTANCE_CREATE
4523       if getattr(self.op, "os_type", None) is None:
4524         raise errors.OpPrereqError("No guest OS specified")
4525
4526   def _RunAllocator(self):
4527     """Run the allocator based on input opcode.
4528
4529     """
4530     nics = [n.ToDict() for n in self.nics]
4531     ial = IAllocator(self,
4532                      mode=constants.IALLOCATOR_MODE_ALLOC,
4533                      name=self.op.instance_name,
4534                      disk_template=self.op.disk_template,
4535                      tags=[],
4536                      os=self.op.os_type,
4537                      vcpus=self.be_full[constants.BE_VCPUS],
4538                      mem_size=self.be_full[constants.BE_MEMORY],
4539                      disks=self.disks,
4540                      nics=nics,
4541                      hypervisor=self.op.hypervisor,
4542                      )
4543
4544     ial.Run(self.op.iallocator)
4545
4546     if not ial.success:
4547       raise errors.OpPrereqError("Can't compute nodes using"
4548                                  " iallocator '%s': %s" % (self.op.iallocator,
4549                                                            ial.info))
4550     if len(ial.nodes) != ial.required_nodes:
4551       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4552                                  " of nodes (%s), required %s" %
4553                                  (self.op.iallocator, len(ial.nodes),
4554                                   ial.required_nodes))
4555     self.op.pnode = ial.nodes[0]
4556     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4557                  self.op.instance_name, self.op.iallocator,
4558                  ", ".join(ial.nodes))
4559     if ial.required_nodes == 2:
4560       self.op.snode = ial.nodes[1]
4561
4562   def BuildHooksEnv(self):
4563     """Build hooks env.
4564
4565     This runs on master, primary and secondary nodes of the instance.
4566
4567     """
4568     env = {
4569       "ADD_MODE": self.op.mode,
4570       }
4571     if self.op.mode == constants.INSTANCE_IMPORT:
4572       env["SRC_NODE"] = self.op.src_node
4573       env["SRC_PATH"] = self.op.src_path
4574       env["SRC_IMAGES"] = self.src_images
4575
4576     env.update(_BuildInstanceHookEnv(
4577       name=self.op.instance_name,
4578       primary_node=self.op.pnode,
4579       secondary_nodes=self.secondaries,
4580       status=self.op.start,
4581       os_type=self.op.os_type,
4582       memory=self.be_full[constants.BE_MEMORY],
4583       vcpus=self.be_full[constants.BE_VCPUS],
4584       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4585       disk_template=self.op.disk_template,
4586       disks=[(d["size"], d["mode"]) for d in self.disks],
4587     ))
4588
4589     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4590           self.secondaries)
4591     return env, nl, nl
4592
4593
4594   def CheckPrereq(self):
4595     """Check prerequisites.
4596
4597     """
4598     if (not self.cfg.GetVGName() and
4599         self.op.disk_template not in constants.DTS_NOT_LVM):
4600       raise errors.OpPrereqError("Cluster does not support lvm-based"
4601                                  " instances")
4602
4603     if self.op.mode == constants.INSTANCE_IMPORT:
4604       src_node = self.op.src_node
4605       src_path = self.op.src_path
4606
4607       if src_node is None:
4608         exp_list = self.rpc.call_export_list(
4609           self.acquired_locks[locking.LEVEL_NODE])
4610         found = False
4611         for node in exp_list:
4612           if not exp_list[node].failed and src_path in exp_list[node].data:
4613             found = True
4614             self.op.src_node = src_node = node
4615             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4616                                                        src_path)
4617             break
4618         if not found:
4619           raise errors.OpPrereqError("No export found for relative path %s" %
4620                                       src_path)
4621
4622       _CheckNodeOnline(self, src_node)
4623       result = self.rpc.call_export_info(src_node, src_path)
4624       result.Raise()
4625       if not result.data:
4626         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4627
4628       export_info = result.data
4629       if not export_info.has_section(constants.INISECT_EXP):
4630         raise errors.ProgrammerError("Corrupted export config")
4631
4632       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4633       if (int(ei_version) != constants.EXPORT_VERSION):
4634         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4635                                    (ei_version, constants.EXPORT_VERSION))
4636
4637       # Check that the new instance doesn't have less disks than the export
4638       instance_disks = len(self.disks)
4639       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4640       if instance_disks < export_disks:
4641         raise errors.OpPrereqError("Not enough disks to import."
4642                                    " (instance: %d, export: %d)" %
4643                                    (instance_disks, export_disks))
4644
4645       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4646       disk_images = []
4647       for idx in range(export_disks):
4648         option = 'disk%d_dump' % idx
4649         if export_info.has_option(constants.INISECT_INS, option):
4650           # FIXME: are the old os-es, disk sizes, etc. useful?
4651           export_name = export_info.get(constants.INISECT_INS, option)
4652           image = os.path.join(src_path, export_name)
4653           disk_images.append(image)
4654         else:
4655           disk_images.append(False)
4656
4657       self.src_images = disk_images
4658
4659       old_name = export_info.get(constants.INISECT_INS, 'name')
4660       # FIXME: int() here could throw a ValueError on broken exports
4661       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4662       if self.op.instance_name == old_name:
4663         for idx, nic in enumerate(self.nics):
4664           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4665             nic_mac_ini = 'nic%d_mac' % idx
4666             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4667
4668     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4669     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4670     if self.op.start and not self.op.ip_check:
4671       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4672                                  " adding an instance in start mode")
4673
4674     if self.op.ip_check:
4675       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4676         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4677                                    (self.check_ip, self.op.instance_name))
4678
4679     #### mac address generation
4680     # By generating here the mac address both the allocator and the hooks get
4681     # the real final mac address rather than the 'auto' or 'generate' value.
4682     # There is a race condition between the generation and the instance object
4683     # creation, which means that we know the mac is valid now, but we're not
4684     # sure it will be when we actually add the instance. If things go bad
4685     # adding the instance will abort because of a duplicate mac, and the
4686     # creation job will fail.
4687     for nic in self.nics:
4688       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4689         nic.mac = self.cfg.GenerateMAC()
4690
4691     #### allocator run
4692
4693     if self.op.iallocator is not None:
4694       self._RunAllocator()
4695
4696     #### node related checks
4697
4698     # check primary node
4699     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4700     assert self.pnode is not None, \
4701       "Cannot retrieve locked node %s" % self.op.pnode
4702     if pnode.offline:
4703       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4704                                  pnode.name)
4705     if pnode.drained:
4706       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4707                                  pnode.name)
4708
4709     self.secondaries = []
4710
4711     # mirror node verification
4712     if self.op.disk_template in constants.DTS_NET_MIRROR:
4713       if self.op.snode is None:
4714         raise errors.OpPrereqError("The networked disk templates need"
4715                                    " a mirror node")
4716       if self.op.snode == pnode.name:
4717         raise errors.OpPrereqError("The secondary node cannot be"
4718                                    " the primary node.")
4719       _CheckNodeOnline(self, self.op.snode)
4720       _CheckNodeNotDrained(self, self.op.snode)
4721       self.secondaries.append(self.op.snode)
4722
4723     nodenames = [pnode.name] + self.secondaries
4724
4725     req_size = _ComputeDiskSize(self.op.disk_template,
4726                                 self.disks)
4727
4728     # Check lv size requirements
4729     if req_size is not None:
4730       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4731                                          self.op.hypervisor)
4732       for node in nodenames:
4733         info = nodeinfo[node]
4734         info.Raise()
4735         info = info.data
4736         if not info:
4737           raise errors.OpPrereqError("Cannot get current information"
4738                                      " from node '%s'" % node)
4739         vg_free = info.get('vg_free', None)
4740         if not isinstance(vg_free, int):
4741           raise errors.OpPrereqError("Can't compute free disk space on"
4742                                      " node %s" % node)
4743         if req_size > info['vg_free']:
4744           raise errors.OpPrereqError("Not enough disk space on target node %s."
4745                                      " %d MB available, %d MB required" %
4746                                      (node, info['vg_free'], req_size))
4747
4748     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4749
4750     # os verification
4751     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4752     result.Raise()
4753     if not isinstance(result.data, objects.OS):
4754       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4755                                  " primary node"  % self.op.os_type)
4756
4757     # bridge check on primary node
4758     bridges = [n.bridge for n in self.nics]
4759     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4760     result.Raise()
4761     if not result.data:
4762       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4763                                  " exist on destination node '%s'" %
4764                                  (",".join(bridges), pnode.name))
4765
4766     # memory check on primary node
4767     if self.op.start:
4768       _CheckNodeFreeMemory(self, self.pnode.name,
4769                            "creating instance %s" % self.op.instance_name,
4770                            self.be_full[constants.BE_MEMORY],
4771                            self.op.hypervisor)
4772
4773   def Exec(self, feedback_fn):
4774     """Create and add the instance to the cluster.
4775
4776     """
4777     instance = self.op.instance_name
4778     pnode_name = self.pnode.name
4779
4780     ht_kind = self.op.hypervisor
4781     if ht_kind in constants.HTS_REQ_PORT:
4782       network_port = self.cfg.AllocatePort()
4783     else:
4784       network_port = None
4785
4786     ##if self.op.vnc_bind_address is None:
4787     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4788
4789     # this is needed because os.path.join does not accept None arguments
4790     if self.op.file_storage_dir is None:
4791       string_file_storage_dir = ""
4792     else:
4793       string_file_storage_dir = self.op.file_storage_dir
4794
4795     # build the full file storage dir path
4796     file_storage_dir = os.path.normpath(os.path.join(
4797                                         self.cfg.GetFileStorageDir(),
4798                                         string_file_storage_dir, instance))
4799
4800
4801     disks = _GenerateDiskTemplate(self,
4802                                   self.op.disk_template,
4803                                   instance, pnode_name,
4804                                   self.secondaries,
4805                                   self.disks,
4806                                   file_storage_dir,
4807                                   self.op.file_driver,
4808                                   0)
4809
4810     iobj = objects.Instance(name=instance, os=self.op.os_type,
4811                             primary_node=pnode_name,
4812                             nics=self.nics, disks=disks,
4813                             disk_template=self.op.disk_template,
4814                             admin_up=False,
4815                             network_port=network_port,
4816                             beparams=self.op.beparams,
4817                             hvparams=self.op.hvparams,
4818                             hypervisor=self.op.hypervisor,
4819                             )
4820
4821     feedback_fn("* creating instance disks...")
4822     try:
4823       _CreateDisks(self, iobj)
4824     except errors.OpExecError:
4825       self.LogWarning("Device creation failed, reverting...")
4826       try:
4827         _RemoveDisks(self, iobj)
4828       finally:
4829         self.cfg.ReleaseDRBDMinors(instance)
4830         raise
4831
4832     feedback_fn("adding instance %s to cluster config" % instance)
4833
4834     self.cfg.AddInstance(iobj)
4835     # Declare that we don't want to remove the instance lock anymore, as we've
4836     # added the instance to the config
4837     del self.remove_locks[locking.LEVEL_INSTANCE]
4838     # Unlock all the nodes
4839     if self.op.mode == constants.INSTANCE_IMPORT:
4840       nodes_keep = [self.op.src_node]
4841       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4842                        if node != self.op.src_node]
4843       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4844       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4845     else:
4846       self.context.glm.release(locking.LEVEL_NODE)
4847       del self.acquired_locks[locking.LEVEL_NODE]
4848
4849     if self.op.wait_for_sync:
4850       disk_abort = not _WaitForSync(self, iobj)
4851     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4852       # make sure the disks are not degraded (still sync-ing is ok)
4853       time.sleep(15)
4854       feedback_fn("* checking mirrors status")
4855       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4856     else:
4857       disk_abort = False
4858
4859     if disk_abort:
4860       _RemoveDisks(self, iobj)
4861       self.cfg.RemoveInstance(iobj.name)
4862       # Make sure the instance lock gets removed
4863       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4864       raise errors.OpExecError("There are some degraded disks for"
4865                                " this instance")
4866
4867     feedback_fn("creating os for instance %s on node %s" %
4868                 (instance, pnode_name))
4869
4870     if iobj.disk_template != constants.DT_DISKLESS:
4871       if self.op.mode == constants.INSTANCE_CREATE:
4872         feedback_fn("* running the instance OS create scripts...")
4873         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4874         msg = result.RemoteFailMsg()
4875         if msg:
4876           raise errors.OpExecError("Could not add os for instance %s"
4877                                    " on node %s: %s" %
4878                                    (instance, pnode_name, msg))
4879
4880       elif self.op.mode == constants.INSTANCE_IMPORT:
4881         feedback_fn("* running the instance OS import scripts...")
4882         src_node = self.op.src_node
4883         src_images = self.src_images
4884         cluster_name = self.cfg.GetClusterName()
4885         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4886                                                          src_node, src_images,
4887                                                          cluster_name)
4888         import_result.Raise()
4889         for idx, result in enumerate(import_result.data):
4890           if not result:
4891             self.LogWarning("Could not import the image %s for instance"
4892                             " %s, disk %d, on node %s" %
4893                             (src_images[idx], instance, idx, pnode_name))
4894       else:
4895         # also checked in the prereq part
4896         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4897                                      % self.op.mode)
4898
4899     if self.op.start:
4900       iobj.admin_up = True
4901       self.cfg.Update(iobj)
4902       logging.info("Starting instance %s on node %s", instance, pnode_name)
4903       feedback_fn("* starting instance...")
4904       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4905       msg = result.RemoteFailMsg()
4906       if msg:
4907         raise errors.OpExecError("Could not start instance: %s" % msg)
4908
4909
4910 class LUConnectConsole(NoHooksLU):
4911   """Connect to an instance's console.
4912
4913   This is somewhat special in that it returns the command line that
4914   you need to run on the master node in order to connect to the
4915   console.
4916
4917   """
4918   _OP_REQP = ["instance_name"]
4919   REQ_BGL = False
4920
4921   def ExpandNames(self):
4922     self._ExpandAndLockInstance()
4923
4924   def CheckPrereq(self):
4925     """Check prerequisites.
4926
4927     This checks that the instance is in the cluster.
4928
4929     """
4930     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4931     assert self.instance is not None, \
4932       "Cannot retrieve locked instance %s" % self.op.instance_name
4933     _CheckNodeOnline(self, self.instance.primary_node)
4934
4935   def Exec(self, feedback_fn):
4936     """Connect to the console of an instance
4937
4938     """
4939     instance = self.instance
4940     node = instance.primary_node
4941
4942     node_insts = self.rpc.call_instance_list([node],
4943                                              [instance.hypervisor])[node]
4944     node_insts.Raise()
4945
4946     if instance.name not in node_insts.data:
4947       raise errors.OpExecError("Instance %s is not running." % instance.name)
4948
4949     logging.debug("Connecting to console of %s on %s", instance.name, node)
4950
4951     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4952     cluster = self.cfg.GetClusterInfo()
4953     # beparams and hvparams are passed separately, to avoid editing the
4954     # instance and then saving the defaults in the instance itself.
4955     hvparams = cluster.FillHV(instance)
4956     beparams = cluster.FillBE(instance)
4957     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4958
4959     # build ssh cmdline
4960     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4961
4962
4963 class LUReplaceDisks(LogicalUnit):
4964   """Replace the disks of an instance.
4965
4966   """
4967   HPATH = "mirrors-replace"
4968   HTYPE = constants.HTYPE_INSTANCE
4969   _OP_REQP = ["instance_name", "mode", "disks"]
4970   REQ_BGL = False
4971
4972   def CheckArguments(self):
4973     if not hasattr(self.op, "remote_node"):
4974       self.op.remote_node = None
4975     if not hasattr(self.op, "iallocator"):
4976       self.op.iallocator = None
4977
4978     # check for valid parameter combination
4979     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4980     if self.op.mode == constants.REPLACE_DISK_CHG:
4981       if cnt == 2:
4982         raise errors.OpPrereqError("When changing the secondary either an"
4983                                    " iallocator script must be used or the"
4984                                    " new node given")
4985       elif cnt == 0:
4986         raise errors.OpPrereqError("Give either the iallocator or the new"
4987                                    " secondary, not both")
4988     else: # not replacing the secondary
4989       if cnt != 2:
4990         raise errors.OpPrereqError("The iallocator and new node options can"
4991                                    " be used only when changing the"
4992                                    " secondary node")
4993
4994   def ExpandNames(self):
4995     self._ExpandAndLockInstance()
4996
4997     if self.op.iallocator is not None:
4998       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4999     elif self.op.remote_node is not None:
5000       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5001       if remote_node is None:
5002         raise errors.OpPrereqError("Node '%s' not known" %
5003                                    self.op.remote_node)
5004       self.op.remote_node = remote_node
5005       # Warning: do not remove the locking of the new secondary here
5006       # unless DRBD8.AddChildren is changed to work in parallel;
5007       # currently it doesn't since parallel invocations of
5008       # FindUnusedMinor will conflict
5009       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5010       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5011     else:
5012       self.needed_locks[locking.LEVEL_NODE] = []
5013       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5014
5015   def DeclareLocks(self, level):
5016     # If we're not already locking all nodes in the set we have to declare the
5017     # instance's primary/secondary nodes.
5018     if (level == locking.LEVEL_NODE and
5019         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5020       self._LockInstancesNodes()
5021
5022   def _RunAllocator(self):
5023     """Compute a new secondary node using an IAllocator.
5024
5025     """
5026     ial = IAllocator(self,
5027                      mode=constants.IALLOCATOR_MODE_RELOC,
5028                      name=self.op.instance_name,
5029                      relocate_from=[self.sec_node])
5030
5031     ial.Run(self.op.iallocator)
5032
5033     if not ial.success:
5034       raise errors.OpPrereqError("Can't compute nodes using"
5035                                  " iallocator '%s': %s" % (self.op.iallocator,
5036                                                            ial.info))
5037     if len(ial.nodes) != ial.required_nodes:
5038       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5039                                  " of nodes (%s), required %s" %
5040                                  (len(ial.nodes), ial.required_nodes))
5041     self.op.remote_node = ial.nodes[0]
5042     self.LogInfo("Selected new secondary for the instance: %s",
5043                  self.op.remote_node)
5044
5045   def BuildHooksEnv(self):
5046     """Build hooks env.
5047
5048     This runs on the master, the primary and all the secondaries.
5049
5050     """
5051     env = {
5052       "MODE": self.op.mode,
5053       "NEW_SECONDARY": self.op.remote_node,
5054       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5055       }
5056     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5057     nl = [
5058       self.cfg.GetMasterNode(),
5059       self.instance.primary_node,
5060       ]
5061     if self.op.remote_node is not None:
5062       nl.append(self.op.remote_node)
5063     return env, nl, nl
5064
5065   def CheckPrereq(self):
5066     """Check prerequisites.
5067
5068     This checks that the instance is in the cluster.
5069
5070     """
5071     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5072     assert instance is not None, \
5073       "Cannot retrieve locked instance %s" % self.op.instance_name
5074     self.instance = instance
5075
5076     if instance.disk_template != constants.DT_DRBD8:
5077       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5078                                  " instances")
5079
5080     if len(instance.secondary_nodes) != 1:
5081       raise errors.OpPrereqError("The instance has a strange layout,"
5082                                  " expected one secondary but found %d" %
5083                                  len(instance.secondary_nodes))
5084
5085     self.sec_node = instance.secondary_nodes[0]
5086
5087     if self.op.iallocator is not None:
5088       self._RunAllocator()
5089
5090     remote_node = self.op.remote_node
5091     if remote_node is not None:
5092       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5093       assert self.remote_node_info is not None, \
5094         "Cannot retrieve locked node %s" % remote_node
5095     else:
5096       self.remote_node_info = None
5097     if remote_node == instance.primary_node:
5098       raise errors.OpPrereqError("The specified node is the primary node of"
5099                                  " the instance.")
5100     elif remote_node == self.sec_node:
5101       raise errors.OpPrereqError("The specified node is already the"
5102                                  " secondary node of the instance.")
5103
5104     if self.op.mode == constants.REPLACE_DISK_PRI:
5105       n1 = self.tgt_node = instance.primary_node
5106       n2 = self.oth_node = self.sec_node
5107     elif self.op.mode == constants.REPLACE_DISK_SEC:
5108       n1 = self.tgt_node = self.sec_node
5109       n2 = self.oth_node = instance.primary_node
5110     elif self.op.mode == constants.REPLACE_DISK_CHG:
5111       n1 = self.new_node = remote_node
5112       n2 = self.oth_node = instance.primary_node
5113       self.tgt_node = self.sec_node
5114       _CheckNodeNotDrained(self, remote_node)
5115     else:
5116       raise errors.ProgrammerError("Unhandled disk replace mode")
5117
5118     _CheckNodeOnline(self, n1)
5119     _CheckNodeOnline(self, n2)
5120
5121     if not self.op.disks:
5122       self.op.disks = range(len(instance.disks))
5123
5124     for disk_idx in self.op.disks:
5125       instance.FindDisk(disk_idx)
5126
5127   def _ExecD8DiskOnly(self, feedback_fn):
5128     """Replace a disk on the primary or secondary for dbrd8.
5129
5130     The algorithm for replace is quite complicated:
5131
5132       1. for each disk to be replaced:
5133
5134         1. create new LVs on the target node with unique names
5135         1. detach old LVs from the drbd device
5136         1. rename old LVs to name_replaced.<time_t>
5137         1. rename new LVs to old LVs
5138         1. attach the new LVs (with the old names now) to the drbd device
5139
5140       1. wait for sync across all devices
5141
5142       1. for each modified disk:
5143
5144         1. remove old LVs (which have the name name_replaces.<time_t>)
5145
5146     Failures are not very well handled.
5147
5148     """
5149     steps_total = 6
5150     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5151     instance = self.instance
5152     iv_names = {}
5153     vgname = self.cfg.GetVGName()
5154     # start of work
5155     cfg = self.cfg
5156     tgt_node = self.tgt_node
5157     oth_node = self.oth_node
5158
5159     # Step: check device activation
5160     self.proc.LogStep(1, steps_total, "check device existence")
5161     info("checking volume groups")
5162     my_vg = cfg.GetVGName()
5163     results = self.rpc.call_vg_list([oth_node, tgt_node])
5164     if not results:
5165       raise errors.OpExecError("Can't list volume groups on the nodes")
5166     for node in oth_node, tgt_node:
5167       res = results[node]
5168       if res.failed or not res.data or my_vg not in res.data:
5169         raise errors.OpExecError("Volume group '%s' not found on %s" %
5170                                  (my_vg, node))
5171     for idx, dev in enumerate(instance.disks):
5172       if idx not in self.op.disks:
5173         continue
5174       for node in tgt_node, oth_node:
5175         info("checking disk/%d on %s" % (idx, node))
5176         cfg.SetDiskID(dev, node)
5177         result = self.rpc.call_blockdev_find(node, dev)
5178         msg = result.RemoteFailMsg()
5179         if not msg and not result.payload:
5180           msg = "disk not found"
5181         if msg:
5182           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5183                                    (idx, node, msg))
5184
5185     # Step: check other node consistency
5186     self.proc.LogStep(2, steps_total, "check peer consistency")
5187     for idx, dev in enumerate(instance.disks):
5188       if idx not in self.op.disks:
5189         continue
5190       info("checking disk/%d consistency on %s" % (idx, oth_node))
5191       if not _CheckDiskConsistency(self, dev, oth_node,
5192                                    oth_node==instance.primary_node):
5193         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5194                                  " to replace disks on this node (%s)" %
5195                                  (oth_node, tgt_node))
5196
5197     # Step: create new storage
5198     self.proc.LogStep(3, steps_total, "allocate new storage")
5199     for idx, dev in enumerate(instance.disks):
5200       if idx not in self.op.disks:
5201         continue
5202       size = dev.size
5203       cfg.SetDiskID(dev, tgt_node)
5204       lv_names = [".disk%d_%s" % (idx, suf)
5205                   for suf in ["data", "meta"]]
5206       names = _GenerateUniqueNames(self, lv_names)
5207       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5208                              logical_id=(vgname, names[0]))
5209       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5210                              logical_id=(vgname, names[1]))
5211       new_lvs = [lv_data, lv_meta]
5212       old_lvs = dev.children
5213       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5214       info("creating new local storage on %s for %s" %
5215            (tgt_node, dev.iv_name))
5216       # we pass force_create=True to force the LVM creation
5217       for new_lv in new_lvs:
5218         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5219                         _GetInstanceInfoText(instance), False)
5220
5221     # Step: for each lv, detach+rename*2+attach
5222     self.proc.LogStep(4, steps_total, "change drbd configuration")
5223     for dev, old_lvs, new_lvs in iv_names.itervalues():
5224       info("detaching %s drbd from local storage" % dev.iv_name)
5225       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5226       result.Raise()
5227       if not result.data:
5228         raise errors.OpExecError("Can't detach drbd from local storage on node"
5229                                  " %s for device %s" % (tgt_node, dev.iv_name))
5230       #dev.children = []
5231       #cfg.Update(instance)
5232
5233       # ok, we created the new LVs, so now we know we have the needed
5234       # storage; as such, we proceed on the target node to rename
5235       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5236       # using the assumption that logical_id == physical_id (which in
5237       # turn is the unique_id on that node)
5238
5239       # FIXME(iustin): use a better name for the replaced LVs
5240       temp_suffix = int(time.time())
5241       ren_fn = lambda d, suff: (d.physical_id[0],
5242                                 d.physical_id[1] + "_replaced-%s" % suff)
5243       # build the rename list based on what LVs exist on the node
5244       rlist = []
5245       for to_ren in old_lvs:
5246         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5247         if not result.RemoteFailMsg() and result.payload:
5248           # device exists
5249           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5250
5251       info("renaming the old LVs on the target node")
5252       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5253       result.Raise()
5254       if not result.data:
5255         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5256       # now we rename the new LVs to the old LVs
5257       info("renaming the new LVs on the target node")
5258       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5259       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5260       result.Raise()
5261       if not result.data:
5262         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5263
5264       for old, new in zip(old_lvs, new_lvs):
5265         new.logical_id = old.logical_id
5266         cfg.SetDiskID(new, tgt_node)
5267
5268       for disk in old_lvs:
5269         disk.logical_id = ren_fn(disk, temp_suffix)
5270         cfg.SetDiskID(disk, tgt_node)
5271
5272       # now that the new lvs have the old name, we can add them to the device
5273       info("adding new mirror component on %s" % tgt_node)
5274       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5275       if result.failed or not result.data:
5276         for new_lv in new_lvs:
5277           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5278           if msg:
5279             warning("Can't rollback device %s: %s", dev, msg,
5280                     hint="cleanup manually the unused logical volumes")
5281         raise errors.OpExecError("Can't add local storage to drbd")
5282
5283       dev.children = new_lvs
5284       cfg.Update(instance)
5285
5286     # Step: wait for sync
5287
5288     # this can fail as the old devices are degraded and _WaitForSync
5289     # does a combined result over all disks, so we don't check its
5290     # return value
5291     self.proc.LogStep(5, steps_total, "sync devices")
5292     _WaitForSync(self, instance, unlock=True)
5293
5294     # so check manually all the devices
5295     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5296       cfg.SetDiskID(dev, instance.primary_node)
5297       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5298       msg = result.RemoteFailMsg()
5299       if not msg and not result.payload:
5300         msg = "disk not found"
5301       if msg:
5302         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5303                                  (name, msg))
5304       if result.payload[5]:
5305         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5306
5307     # Step: remove old storage
5308     self.proc.LogStep(6, steps_total, "removing old storage")
5309     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5310       info("remove logical volumes for %s" % name)
5311       for lv in old_lvs:
5312         cfg.SetDiskID(lv, tgt_node)
5313         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5314         if msg:
5315           warning("Can't remove old LV: %s" % msg,
5316                   hint="manually remove unused LVs")
5317           continue
5318
5319   def _ExecD8Secondary(self, feedback_fn):
5320     """Replace the secondary node for drbd8.
5321
5322     The algorithm for replace is quite complicated:
5323       - for all disks of the instance:
5324         - create new LVs on the new node with same names
5325         - shutdown the drbd device on the old secondary
5326         - disconnect the drbd network on the primary
5327         - create the drbd device on the new secondary
5328         - network attach the drbd on the primary, using an artifice:
5329           the drbd code for Attach() will connect to the network if it
5330           finds a device which is connected to the good local disks but
5331           not network enabled
5332       - wait for sync across all devices
5333       - remove all disks from the old secondary
5334
5335     Failures are not very well handled.
5336
5337     """
5338     steps_total = 6
5339     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5340     instance = self.instance
5341     iv_names = {}
5342     # start of work
5343     cfg = self.cfg
5344     old_node = self.tgt_node
5345     new_node = self.new_node
5346     pri_node = instance.primary_node
5347     nodes_ip = {
5348       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5349       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5350       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5351       }
5352
5353     # Step: check device activation
5354     self.proc.LogStep(1, steps_total, "check device existence")
5355     info("checking volume groups")
5356     my_vg = cfg.GetVGName()
5357     results = self.rpc.call_vg_list([pri_node, new_node])
5358     for node in pri_node, new_node:
5359       res = results[node]
5360       if res.failed or not res.data or my_vg not in res.data:
5361         raise errors.OpExecError("Volume group '%s' not found on %s" %
5362                                  (my_vg, node))
5363     for idx, dev in enumerate(instance.disks):
5364       if idx not in self.op.disks:
5365         continue
5366       info("checking disk/%d on %s" % (idx, pri_node))
5367       cfg.SetDiskID(dev, pri_node)
5368       result = self.rpc.call_blockdev_find(pri_node, dev)
5369       msg = result.RemoteFailMsg()
5370       if not msg and not result.payload:
5371         msg = "disk not found"
5372       if msg:
5373         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5374                                  (idx, pri_node, msg))
5375
5376     # Step: check other node consistency
5377     self.proc.LogStep(2, steps_total, "check peer consistency")
5378     for idx, dev in enumerate(instance.disks):
5379       if idx not in self.op.disks:
5380         continue
5381       info("checking disk/%d consistency on %s" % (idx, pri_node))
5382       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5383         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5384                                  " unsafe to replace the secondary" %
5385                                  pri_node)
5386
5387     # Step: create new storage
5388     self.proc.LogStep(3, steps_total, "allocate new storage")
5389     for idx, dev in enumerate(instance.disks):
5390       info("adding new local storage on %s for disk/%d" %
5391            (new_node, idx))
5392       # we pass force_create=True to force LVM creation
5393       for new_lv in dev.children:
5394         _CreateBlockDev(self, new_node, instance, new_lv, True,
5395                         _GetInstanceInfoText(instance), False)
5396
5397     # Step 4: dbrd minors and drbd setups changes
5398     # after this, we must manually remove the drbd minors on both the
5399     # error and the success paths
5400     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5401                                    instance.name)
5402     logging.debug("Allocated minors %s" % (minors,))
5403     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5404     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5405       size = dev.size
5406       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5407       # create new devices on new_node; note that we create two IDs:
5408       # one without port, so the drbd will be activated without
5409       # networking information on the new node at this stage, and one
5410       # with network, for the latter activation in step 4
5411       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5412       if pri_node == o_node1:
5413         p_minor = o_minor1
5414       else:
5415         p_minor = o_minor2
5416
5417       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5418       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5419
5420       iv_names[idx] = (dev, dev.children, new_net_id)
5421       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5422                     new_net_id)
5423       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5424                               logical_id=new_alone_id,
5425                               children=dev.children)
5426       try:
5427         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5428                               _GetInstanceInfoText(instance), False)
5429       except errors.GenericError:
5430         self.cfg.ReleaseDRBDMinors(instance.name)
5431         raise
5432
5433     for idx, dev in enumerate(instance.disks):
5434       # we have new devices, shutdown the drbd on the old secondary
5435       info("shutting down drbd for disk/%d on old node" % idx)
5436       cfg.SetDiskID(dev, old_node)
5437       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5438       if msg:
5439         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5440                 (idx, msg),
5441                 hint="Please cleanup this device manually as soon as possible")
5442
5443     info("detaching primary drbds from the network (=> standalone)")
5444     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5445                                                instance.disks)[pri_node]
5446
5447     msg = result.RemoteFailMsg()
5448     if msg:
5449       # detaches didn't succeed (unlikely)
5450       self.cfg.ReleaseDRBDMinors(instance.name)
5451       raise errors.OpExecError("Can't detach the disks from the network on"
5452                                " old node: %s" % (msg,))
5453
5454     # if we managed to detach at least one, we update all the disks of
5455     # the instance to point to the new secondary
5456     info("updating instance configuration")
5457     for dev, _, new_logical_id in iv_names.itervalues():
5458       dev.logical_id = new_logical_id
5459       cfg.SetDiskID(dev, pri_node)
5460     cfg.Update(instance)
5461
5462     # and now perform the drbd attach
5463     info("attaching primary drbds to new secondary (standalone => connected)")
5464     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5465                                            instance.disks, instance.name,
5466                                            False)
5467     for to_node, to_result in result.items():
5468       msg = to_result.RemoteFailMsg()
5469       if msg:
5470         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5471                 hint="please do a gnt-instance info to see the"
5472                 " status of disks")
5473
5474     # this can fail as the old devices are degraded and _WaitForSync
5475     # does a combined result over all disks, so we don't check its
5476     # return value
5477     self.proc.LogStep(5, steps_total, "sync devices")
5478     _WaitForSync(self, instance, unlock=True)
5479
5480     # so check manually all the devices
5481     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5482       cfg.SetDiskID(dev, pri_node)
5483       result = self.rpc.call_blockdev_find(pri_node, dev)
5484       msg = result.RemoteFailMsg()
5485       if not msg and not result.payload:
5486         msg = "disk not found"
5487       if msg:
5488         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5489                                  (idx, msg))
5490       if result.payload[5]:
5491         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5492
5493     self.proc.LogStep(6, steps_total, "removing old storage")
5494     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5495       info("remove logical volumes for disk/%d" % idx)
5496       for lv in old_lvs:
5497         cfg.SetDiskID(lv, old_node)
5498         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5499         if msg:
5500           warning("Can't remove LV on old secondary: %s", msg,
5501                   hint="Cleanup stale volumes by hand")
5502
5503   def Exec(self, feedback_fn):
5504     """Execute disk replacement.
5505
5506     This dispatches the disk replacement to the appropriate handler.
5507
5508     """
5509     instance = self.instance
5510
5511     # Activate the instance disks if we're replacing them on a down instance
5512     if not instance.admin_up:
5513       _StartInstanceDisks(self, instance, True)
5514
5515     if self.op.mode == constants.REPLACE_DISK_CHG:
5516       fn = self._ExecD8Secondary
5517     else:
5518       fn = self._ExecD8DiskOnly
5519
5520     ret = fn(feedback_fn)
5521
5522     # Deactivate the instance disks if we're replacing them on a down instance
5523     if not instance.admin_up:
5524       _SafeShutdownInstanceDisks(self, instance)
5525
5526     return ret
5527
5528
5529 class LUGrowDisk(LogicalUnit):
5530   """Grow a disk of an instance.
5531
5532   """
5533   HPATH = "disk-grow"
5534   HTYPE = constants.HTYPE_INSTANCE
5535   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5536   REQ_BGL = False
5537
5538   def ExpandNames(self):
5539     self._ExpandAndLockInstance()
5540     self.needed_locks[locking.LEVEL_NODE] = []
5541     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5542
5543   def DeclareLocks(self, level):
5544     if level == locking.LEVEL_NODE:
5545       self._LockInstancesNodes()
5546
5547   def BuildHooksEnv(self):
5548     """Build hooks env.
5549
5550     This runs on the master, the primary and all the secondaries.
5551
5552     """
5553     env = {
5554       "DISK": self.op.disk,
5555       "AMOUNT": self.op.amount,
5556       }
5557     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5558     nl = [
5559       self.cfg.GetMasterNode(),
5560       self.instance.primary_node,
5561       ]
5562     return env, nl, nl
5563
5564   def CheckPrereq(self):
5565     """Check prerequisites.
5566
5567     This checks that the instance is in the cluster.
5568
5569     """
5570     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5571     assert instance is not None, \
5572       "Cannot retrieve locked instance %s" % self.op.instance_name
5573     nodenames = list(instance.all_nodes)
5574     for node in nodenames:
5575       _CheckNodeOnline(self, node)
5576
5577
5578     self.instance = instance
5579
5580     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5581       raise errors.OpPrereqError("Instance's disk layout does not support"
5582                                  " growing.")
5583
5584     self.disk = instance.FindDisk(self.op.disk)
5585
5586     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5587                                        instance.hypervisor)
5588     for node in nodenames:
5589       info = nodeinfo[node]
5590       if info.failed or not info.data:
5591         raise errors.OpPrereqError("Cannot get current information"
5592                                    " from node '%s'" % node)
5593       vg_free = info.data.get('vg_free', None)
5594       if not isinstance(vg_free, int):
5595         raise errors.OpPrereqError("Can't compute free disk space on"
5596                                    " node %s" % node)
5597       if self.op.amount > vg_free:
5598         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5599                                    " %d MiB available, %d MiB required" %
5600                                    (node, vg_free, self.op.amount))
5601
5602   def Exec(self, feedback_fn):
5603     """Execute disk grow.
5604
5605     """
5606     instance = self.instance
5607     disk = self.disk
5608     for node in instance.all_nodes:
5609       self.cfg.SetDiskID(disk, node)
5610       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5611       msg = result.RemoteFailMsg()
5612       if msg:
5613         raise errors.OpExecError("Grow request failed to node %s: %s" %
5614                                  (node, msg))
5615     disk.RecordGrow(self.op.amount)
5616     self.cfg.Update(instance)
5617     if self.op.wait_for_sync:
5618       disk_abort = not _WaitForSync(self, instance)
5619       if disk_abort:
5620         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5621                              " status.\nPlease check the instance.")
5622
5623
5624 class LUQueryInstanceData(NoHooksLU):
5625   """Query runtime instance data.
5626
5627   """
5628   _OP_REQP = ["instances", "static"]
5629   REQ_BGL = False
5630
5631   def ExpandNames(self):
5632     self.needed_locks = {}
5633     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5634
5635     if not isinstance(self.op.instances, list):
5636       raise errors.OpPrereqError("Invalid argument type 'instances'")
5637
5638     if self.op.instances:
5639       self.wanted_names = []
5640       for name in self.op.instances:
5641         full_name = self.cfg.ExpandInstanceName(name)
5642         if full_name is None:
5643           raise errors.OpPrereqError("Instance '%s' not known" % name)
5644         self.wanted_names.append(full_name)
5645       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5646     else:
5647       self.wanted_names = None
5648       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5649
5650     self.needed_locks[locking.LEVEL_NODE] = []
5651     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5652
5653   def DeclareLocks(self, level):
5654     if level == locking.LEVEL_NODE:
5655       self._LockInstancesNodes()
5656
5657   def CheckPrereq(self):
5658     """Check prerequisites.
5659
5660     This only checks the optional instance list against the existing names.
5661
5662     """
5663     if self.wanted_names is None:
5664       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5665
5666     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5667                              in self.wanted_names]
5668     return
5669
5670   def _ComputeDiskStatus(self, instance, snode, dev):
5671     """Compute block device status.
5672
5673     """
5674     static = self.op.static
5675     if not static:
5676       self.cfg.SetDiskID(dev, instance.primary_node)
5677       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5678       if dev_pstatus.offline:
5679         dev_pstatus = None
5680       else:
5681         msg = dev_pstatus.RemoteFailMsg()
5682         if msg:
5683           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5684                                    (instance.name, msg))
5685         dev_pstatus = dev_pstatus.payload
5686     else:
5687       dev_pstatus = None
5688
5689     if dev.dev_type in constants.LDS_DRBD:
5690       # we change the snode then (otherwise we use the one passed in)
5691       if dev.logical_id[0] == instance.primary_node:
5692         snode = dev.logical_id[1]
5693       else:
5694         snode = dev.logical_id[0]
5695
5696     if snode and not static:
5697       self.cfg.SetDiskID(dev, snode)
5698       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5699       if dev_sstatus.offline:
5700         dev_sstatus = None
5701       else:
5702         msg = dev_sstatus.RemoteFailMsg()
5703         if msg:
5704           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5705                                    (instance.name, msg))
5706         dev_sstatus = dev_sstatus.payload
5707     else:
5708       dev_sstatus = None
5709
5710     if dev.children:
5711       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5712                       for child in dev.children]
5713     else:
5714       dev_children = []
5715
5716     data = {
5717       "iv_name": dev.iv_name,
5718       "dev_type": dev.dev_type,
5719       "logical_id": dev.logical_id,
5720       "physical_id": dev.physical_id,
5721       "pstatus": dev_pstatus,
5722       "sstatus": dev_sstatus,
5723       "children": dev_children,
5724       "mode": dev.mode,
5725       }
5726
5727     return data
5728
5729   def Exec(self, feedback_fn):
5730     """Gather and return data"""
5731     result = {}
5732
5733     cluster = self.cfg.GetClusterInfo()
5734
5735     for instance in self.wanted_instances:
5736       if not self.op.static:
5737         remote_info = self.rpc.call_instance_info(instance.primary_node,
5738                                                   instance.name,
5739                                                   instance.hypervisor)
5740         remote_info.Raise()
5741         remote_info = remote_info.data
5742         if remote_info and "state" in remote_info:
5743           remote_state = "up"
5744         else:
5745           remote_state = "down"
5746       else:
5747         remote_state = None
5748       if instance.admin_up:
5749         config_state = "up"
5750       else:
5751         config_state = "down"
5752
5753       disks = [self._ComputeDiskStatus(instance, None, device)
5754                for device in instance.disks]
5755
5756       idict = {
5757         "name": instance.name,
5758         "config_state": config_state,
5759         "run_state": remote_state,
5760         "pnode": instance.primary_node,
5761         "snodes": instance.secondary_nodes,
5762         "os": instance.os,
5763         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5764         "disks": disks,
5765         "hypervisor": instance.hypervisor,
5766         "network_port": instance.network_port,
5767         "hv_instance": instance.hvparams,
5768         "hv_actual": cluster.FillHV(instance),
5769         "be_instance": instance.beparams,
5770         "be_actual": cluster.FillBE(instance),
5771         }
5772
5773       result[instance.name] = idict
5774
5775     return result
5776
5777
5778 class LUSetInstanceParams(LogicalUnit):
5779   """Modifies an instances's parameters.
5780
5781   """
5782   HPATH = "instance-modify"
5783   HTYPE = constants.HTYPE_INSTANCE
5784   _OP_REQP = ["instance_name"]
5785   REQ_BGL = False
5786
5787   def CheckArguments(self):
5788     if not hasattr(self.op, 'nics'):
5789       self.op.nics = []
5790     if not hasattr(self.op, 'disks'):
5791       self.op.disks = []
5792     if not hasattr(self.op, 'beparams'):
5793       self.op.beparams = {}
5794     if not hasattr(self.op, 'hvparams'):
5795       self.op.hvparams = {}
5796     self.op.force = getattr(self.op, "force", False)
5797     if not (self.op.nics or self.op.disks or
5798             self.op.hvparams or self.op.beparams):
5799       raise errors.OpPrereqError("No changes submitted")
5800
5801     # Disk validation
5802     disk_addremove = 0
5803     for disk_op, disk_dict in self.op.disks:
5804       if disk_op == constants.DDM_REMOVE:
5805         disk_addremove += 1
5806         continue
5807       elif disk_op == constants.DDM_ADD:
5808         disk_addremove += 1
5809       else:
5810         if not isinstance(disk_op, int):
5811           raise errors.OpPrereqError("Invalid disk index")
5812       if disk_op == constants.DDM_ADD:
5813         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5814         if mode not in constants.DISK_ACCESS_SET:
5815           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5816         size = disk_dict.get('size', None)
5817         if size is None:
5818           raise errors.OpPrereqError("Required disk parameter size missing")
5819         try:
5820           size = int(size)
5821         except ValueError, err:
5822           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5823                                      str(err))
5824         disk_dict['size'] = size
5825       else:
5826         # modification of disk
5827         if 'size' in disk_dict:
5828           raise errors.OpPrereqError("Disk size change not possible, use"
5829                                      " grow-disk")
5830
5831     if disk_addremove > 1:
5832       raise errors.OpPrereqError("Only one disk add or remove operation"
5833                                  " supported at a time")
5834
5835     # NIC validation
5836     nic_addremove = 0
5837     for nic_op, nic_dict in self.op.nics:
5838       if nic_op == constants.DDM_REMOVE:
5839         nic_addremove += 1
5840         continue
5841       elif nic_op == constants.DDM_ADD:
5842         nic_addremove += 1
5843       else:
5844         if not isinstance(nic_op, int):
5845           raise errors.OpPrereqError("Invalid nic index")
5846
5847       # nic_dict should be a dict
5848       nic_ip = nic_dict.get('ip', None)
5849       if nic_ip is not None:
5850         if nic_ip.lower() == constants.VALUE_NONE:
5851           nic_dict['ip'] = None
5852         else:
5853           if not utils.IsValidIP(nic_ip):
5854             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5855
5856       if nic_op == constants.DDM_ADD:
5857         nic_bridge = nic_dict.get('bridge', None)
5858         if nic_bridge is None:
5859           nic_dict['bridge'] = self.cfg.GetDefBridge()
5860         nic_mac = nic_dict.get('mac', None)
5861         if nic_mac is None:
5862           nic_dict['mac'] = constants.VALUE_AUTO
5863
5864       if 'mac' in nic_dict:
5865         nic_mac = nic_dict['mac']
5866         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5867           if not utils.IsValidMac(nic_mac):
5868             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5869         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5870           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5871                                      " modifying an existing nic")
5872
5873     if nic_addremove > 1:
5874       raise errors.OpPrereqError("Only one NIC add or remove operation"
5875                                  " supported at a time")
5876
5877   def ExpandNames(self):
5878     self._ExpandAndLockInstance()
5879     self.needed_locks[locking.LEVEL_NODE] = []
5880     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5881
5882   def DeclareLocks(self, level):
5883     if level == locking.LEVEL_NODE:
5884       self._LockInstancesNodes()
5885
5886   def BuildHooksEnv(self):
5887     """Build hooks env.
5888
5889     This runs on the master, primary and secondaries.
5890
5891     """
5892     args = dict()
5893     if constants.BE_MEMORY in self.be_new:
5894       args['memory'] = self.be_new[constants.BE_MEMORY]
5895     if constants.BE_VCPUS in self.be_new:
5896       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5897     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5898     # information at all.
5899     if self.op.nics:
5900       args['nics'] = []
5901       nic_override = dict(self.op.nics)
5902       for idx, nic in enumerate(self.instance.nics):
5903         if idx in nic_override:
5904           this_nic_override = nic_override[idx]
5905         else:
5906           this_nic_override = {}
5907         if 'ip' in this_nic_override:
5908           ip = this_nic_override['ip']
5909         else:
5910           ip = nic.ip
5911         if 'bridge' in this_nic_override:
5912           bridge = this_nic_override['bridge']
5913         else:
5914           bridge = nic.bridge
5915         if 'mac' in this_nic_override:
5916           mac = this_nic_override['mac']
5917         else:
5918           mac = nic.mac
5919         args['nics'].append((ip, bridge, mac))
5920       if constants.DDM_ADD in nic_override:
5921         ip = nic_override[constants.DDM_ADD].get('ip', None)
5922         bridge = nic_override[constants.DDM_ADD]['bridge']
5923         mac = nic_override[constants.DDM_ADD]['mac']
5924         args['nics'].append((ip, bridge, mac))
5925       elif constants.DDM_REMOVE in nic_override:
5926         del args['nics'][-1]
5927
5928     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5929     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5930     return env, nl, nl
5931
5932   def CheckPrereq(self):
5933     """Check prerequisites.
5934
5935     This only checks the instance list against the existing names.
5936
5937     """
5938     force = self.force = self.op.force
5939
5940     # checking the new params on the primary/secondary nodes
5941
5942     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5943     assert self.instance is not None, \
5944       "Cannot retrieve locked instance %s" % self.op.instance_name
5945     pnode = instance.primary_node
5946     nodelist = list(instance.all_nodes)
5947
5948     # hvparams processing
5949     if self.op.hvparams:
5950       i_hvdict = copy.deepcopy(instance.hvparams)
5951       for key, val in self.op.hvparams.iteritems():
5952         if val == constants.VALUE_DEFAULT:
5953           try:
5954             del i_hvdict[key]
5955           except KeyError:
5956             pass
5957         else:
5958           i_hvdict[key] = val
5959       cluster = self.cfg.GetClusterInfo()
5960       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5961       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5962                                 i_hvdict)
5963       # local check
5964       hypervisor.GetHypervisor(
5965         instance.hypervisor).CheckParameterSyntax(hv_new)
5966       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5967       self.hv_new = hv_new # the new actual values
5968       self.hv_inst = i_hvdict # the new dict (without defaults)
5969     else:
5970       self.hv_new = self.hv_inst = {}
5971
5972     # beparams processing
5973     if self.op.beparams:
5974       i_bedict = copy.deepcopy(instance.beparams)
5975       for key, val in self.op.beparams.iteritems():
5976         if val == constants.VALUE_DEFAULT:
5977           try:
5978             del i_bedict[key]
5979           except KeyError:
5980             pass
5981         else:
5982           i_bedict[key] = val
5983       cluster = self.cfg.GetClusterInfo()
5984       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5985       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5986                                 i_bedict)
5987       self.be_new = be_new # the new actual values
5988       self.be_inst = i_bedict # the new dict (without defaults)
5989     else:
5990       self.be_new = self.be_inst = {}
5991
5992     self.warn = []
5993
5994     if constants.BE_MEMORY in self.op.beparams and not self.force:
5995       mem_check_list = [pnode]
5996       if be_new[constants.BE_AUTO_BALANCE]:
5997         # either we changed auto_balance to yes or it was from before
5998         mem_check_list.extend(instance.secondary_nodes)
5999       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6000                                                   instance.hypervisor)
6001       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6002                                          instance.hypervisor)
6003       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6004         # Assume the primary node is unreachable and go ahead
6005         self.warn.append("Can't get info from primary node %s" % pnode)
6006       else:
6007         if not instance_info.failed and instance_info.data:
6008           current_mem = int(instance_info.data['memory'])
6009         else:
6010           # Assume instance not running
6011           # (there is a slight race condition here, but it's not very probable,
6012           # and we have no other way to check)
6013           current_mem = 0
6014         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6015                     nodeinfo[pnode].data['memory_free'])
6016         if miss_mem > 0:
6017           raise errors.OpPrereqError("This change will prevent the instance"
6018                                      " from starting, due to %d MB of memory"
6019                                      " missing on its primary node" % miss_mem)
6020
6021       if be_new[constants.BE_AUTO_BALANCE]:
6022         for node, nres in nodeinfo.iteritems():
6023           if node not in instance.secondary_nodes:
6024             continue
6025           if nres.failed or not isinstance(nres.data, dict):
6026             self.warn.append("Can't get info from secondary node %s" % node)
6027           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6028             self.warn.append("Not enough memory to failover instance to"
6029                              " secondary node %s" % node)
6030
6031     # NIC processing
6032     for nic_op, nic_dict in self.op.nics:
6033       if nic_op == constants.DDM_REMOVE:
6034         if not instance.nics:
6035           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6036         continue
6037       if nic_op != constants.DDM_ADD:
6038         # an existing nic
6039         if nic_op < 0 or nic_op >= len(instance.nics):
6040           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6041                                      " are 0 to %d" %
6042                                      (nic_op, len(instance.nics)))
6043       if 'bridge' in nic_dict:
6044         nic_bridge = nic_dict['bridge']
6045         if nic_bridge is None:
6046           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6047         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6048           msg = ("Bridge '%s' doesn't exist on one of"
6049                  " the instance nodes" % nic_bridge)
6050           if self.force:
6051             self.warn.append(msg)
6052           else:
6053             raise errors.OpPrereqError(msg)
6054       if 'mac' in nic_dict:
6055         nic_mac = nic_dict['mac']
6056         if nic_mac is None:
6057           raise errors.OpPrereqError('Cannot set the nic mac to None')
6058         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6059           # otherwise generate the mac
6060           nic_dict['mac'] = self.cfg.GenerateMAC()
6061         else:
6062           # or validate/reserve the current one
6063           if self.cfg.IsMacInUse(nic_mac):
6064             raise errors.OpPrereqError("MAC address %s already in use"
6065                                        " in cluster" % nic_mac)
6066
6067     # DISK processing
6068     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6069       raise errors.OpPrereqError("Disk operations not supported for"
6070                                  " diskless instances")
6071     for disk_op, disk_dict in self.op.disks:
6072       if disk_op == constants.DDM_REMOVE:
6073         if len(instance.disks) == 1:
6074           raise errors.OpPrereqError("Cannot remove the last disk of"
6075                                      " an instance")
6076         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6077         ins_l = ins_l[pnode]
6078         if ins_l.failed or not isinstance(ins_l.data, list):
6079           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6080         if instance.name in ins_l.data:
6081           raise errors.OpPrereqError("Instance is running, can't remove"
6082                                      " disks.")
6083
6084       if (disk_op == constants.DDM_ADD and
6085           len(instance.nics) >= constants.MAX_DISKS):
6086         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6087                                    " add more" % constants.MAX_DISKS)
6088       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6089         # an existing disk
6090         if disk_op < 0 or disk_op >= len(instance.disks):
6091           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6092                                      " are 0 to %d" %
6093                                      (disk_op, len(instance.disks)))
6094
6095     return
6096
6097   def Exec(self, feedback_fn):
6098     """Modifies an instance.
6099
6100     All parameters take effect only at the next restart of the instance.
6101
6102     """
6103     # Process here the warnings from CheckPrereq, as we don't have a
6104     # feedback_fn there.
6105     for warn in self.warn:
6106       feedback_fn("WARNING: %s" % warn)
6107
6108     result = []
6109     instance = self.instance
6110     # disk changes
6111     for disk_op, disk_dict in self.op.disks:
6112       if disk_op == constants.DDM_REMOVE:
6113         # remove the last disk
6114         device = instance.disks.pop()
6115         device_idx = len(instance.disks)
6116         for node, disk in device.ComputeNodeTree(instance.primary_node):
6117           self.cfg.SetDiskID(disk, node)
6118           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6119           if msg:
6120             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6121                             " continuing anyway", device_idx, node, msg)
6122         result.append(("disk/%d" % device_idx, "remove"))
6123       elif disk_op == constants.DDM_ADD:
6124         # add a new disk
6125         if instance.disk_template == constants.DT_FILE:
6126           file_driver, file_path = instance.disks[0].logical_id
6127           file_path = os.path.dirname(file_path)
6128         else:
6129           file_driver = file_path = None
6130         disk_idx_base = len(instance.disks)
6131         new_disk = _GenerateDiskTemplate(self,
6132                                          instance.disk_template,
6133                                          instance.name, instance.primary_node,
6134                                          instance.secondary_nodes,
6135                                          [disk_dict],
6136                                          file_path,
6137                                          file_driver,
6138                                          disk_idx_base)[0]
6139         instance.disks.append(new_disk)
6140         info = _GetInstanceInfoText(instance)
6141
6142         logging.info("Creating volume %s for instance %s",
6143                      new_disk.iv_name, instance.name)
6144         # Note: this needs to be kept in sync with _CreateDisks
6145         #HARDCODE
6146         for node in instance.all_nodes:
6147           f_create = node == instance.primary_node
6148           try:
6149             _CreateBlockDev(self, node, instance, new_disk,
6150                             f_create, info, f_create)
6151           except errors.OpExecError, err:
6152             self.LogWarning("Failed to create volume %s (%s) on"
6153                             " node %s: %s",
6154                             new_disk.iv_name, new_disk, node, err)
6155         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6156                        (new_disk.size, new_disk.mode)))
6157       else:
6158         # change a given disk
6159         instance.disks[disk_op].mode = disk_dict['mode']
6160         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6161     # NIC changes
6162     for nic_op, nic_dict in self.op.nics:
6163       if nic_op == constants.DDM_REMOVE:
6164         # remove the last nic
6165         del instance.nics[-1]
6166         result.append(("nic.%d" % len(instance.nics), "remove"))
6167       elif nic_op == constants.DDM_ADD:
6168         # mac and bridge should be set, by now
6169         mac = nic_dict['mac']
6170         bridge = nic_dict['bridge']
6171         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6172                               bridge=bridge)
6173         instance.nics.append(new_nic)
6174         result.append(("nic.%d" % (len(instance.nics) - 1),
6175                        "add:mac=%s,ip=%s,bridge=%s" %
6176                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6177       else:
6178         # change a given nic
6179         for key in 'mac', 'ip', 'bridge':
6180           if key in nic_dict:
6181             setattr(instance.nics[nic_op], key, nic_dict[key])
6182             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6183
6184     # hvparams changes
6185     if self.op.hvparams:
6186       instance.hvparams = self.hv_inst
6187       for key, val in self.op.hvparams.iteritems():
6188         result.append(("hv/%s" % key, val))
6189
6190     # beparams changes
6191     if self.op.beparams:
6192       instance.beparams = self.be_inst
6193       for key, val in self.op.beparams.iteritems():
6194         result.append(("be/%s" % key, val))
6195
6196     self.cfg.Update(instance)
6197
6198     return result
6199
6200
6201 class LUQueryExports(NoHooksLU):
6202   """Query the exports list
6203
6204   """
6205   _OP_REQP = ['nodes']
6206   REQ_BGL = False
6207
6208   def ExpandNames(self):
6209     self.needed_locks = {}
6210     self.share_locks[locking.LEVEL_NODE] = 1
6211     if not self.op.nodes:
6212       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6213     else:
6214       self.needed_locks[locking.LEVEL_NODE] = \
6215         _GetWantedNodes(self, self.op.nodes)
6216
6217   def CheckPrereq(self):
6218     """Check prerequisites.
6219
6220     """
6221     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6222
6223   def Exec(self, feedback_fn):
6224     """Compute the list of all the exported system images.
6225
6226     @rtype: dict
6227     @return: a dictionary with the structure node->(export-list)
6228         where export-list is a list of the instances exported on
6229         that node.
6230
6231     """
6232     rpcresult = self.rpc.call_export_list(self.nodes)
6233     result = {}
6234     for node in rpcresult:
6235       if rpcresult[node].failed:
6236         result[node] = False
6237       else:
6238         result[node] = rpcresult[node].data
6239
6240     return result
6241
6242
6243 class LUExportInstance(LogicalUnit):
6244   """Export an instance to an image in the cluster.
6245
6246   """
6247   HPATH = "instance-export"
6248   HTYPE = constants.HTYPE_INSTANCE
6249   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6250   REQ_BGL = False
6251
6252   def ExpandNames(self):
6253     self._ExpandAndLockInstance()
6254     # FIXME: lock only instance primary and destination node
6255     #
6256     # Sad but true, for now we have do lock all nodes, as we don't know where
6257     # the previous export might be, and and in this LU we search for it and
6258     # remove it from its current node. In the future we could fix this by:
6259     #  - making a tasklet to search (share-lock all), then create the new one,
6260     #    then one to remove, after
6261     #  - removing the removal operation altoghether
6262     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6263
6264   def DeclareLocks(self, level):
6265     """Last minute lock declaration."""
6266     # All nodes are locked anyway, so nothing to do here.
6267
6268   def BuildHooksEnv(self):
6269     """Build hooks env.
6270
6271     This will run on the master, primary node and target node.
6272
6273     """
6274     env = {
6275       "EXPORT_NODE": self.op.target_node,
6276       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6277       }
6278     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6279     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6280           self.op.target_node]
6281     return env, nl, nl
6282
6283   def CheckPrereq(self):
6284     """Check prerequisites.
6285
6286     This checks that the instance and node names are valid.
6287
6288     """
6289     instance_name = self.op.instance_name
6290     self.instance = self.cfg.GetInstanceInfo(instance_name)
6291     assert self.instance is not None, \
6292           "Cannot retrieve locked instance %s" % self.op.instance_name
6293     _CheckNodeOnline(self, self.instance.primary_node)
6294
6295     self.dst_node = self.cfg.GetNodeInfo(
6296       self.cfg.ExpandNodeName(self.op.target_node))
6297
6298     if self.dst_node is None:
6299       # This is wrong node name, not a non-locked node
6300       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6301     _CheckNodeOnline(self, self.dst_node.name)
6302     _CheckNodeNotDrained(self, self.dst_node.name)
6303
6304     # instance disk type verification
6305     for disk in self.instance.disks:
6306       if disk.dev_type == constants.LD_FILE:
6307         raise errors.OpPrereqError("Export not supported for instances with"
6308                                    " file-based disks")
6309
6310   def Exec(self, feedback_fn):
6311     """Export an instance to an image in the cluster.
6312
6313     """
6314     instance = self.instance
6315     dst_node = self.dst_node
6316     src_node = instance.primary_node
6317     if self.op.shutdown:
6318       # shutdown the instance, but not the disks
6319       result = self.rpc.call_instance_shutdown(src_node, instance)
6320       msg = result.RemoteFailMsg()
6321       if msg:
6322         raise errors.OpExecError("Could not shutdown instance %s on"
6323                                  " node %s: %s" %
6324                                  (instance.name, src_node, msg))
6325
6326     vgname = self.cfg.GetVGName()
6327
6328     snap_disks = []
6329
6330     # set the disks ID correctly since call_instance_start needs the
6331     # correct drbd minor to create the symlinks
6332     for disk in instance.disks:
6333       self.cfg.SetDiskID(disk, src_node)
6334
6335     try:
6336       for disk in instance.disks:
6337         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6338         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6339         if new_dev_name.failed or not new_dev_name.data:
6340           self.LogWarning("Could not snapshot block device %s on node %s",
6341                           disk.logical_id[1], src_node)
6342           snap_disks.append(False)
6343         else:
6344           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6345                                  logical_id=(vgname, new_dev_name.data),
6346                                  physical_id=(vgname, new_dev_name.data),
6347                                  iv_name=disk.iv_name)
6348           snap_disks.append(new_dev)
6349
6350     finally:
6351       if self.op.shutdown and instance.admin_up:
6352         result = self.rpc.call_instance_start(src_node, instance, None, None)
6353         msg = result.RemoteFailMsg()
6354         if msg:
6355           _ShutdownInstanceDisks(self, instance)
6356           raise errors.OpExecError("Could not start instance: %s" % msg)
6357
6358     # TODO: check for size
6359
6360     cluster_name = self.cfg.GetClusterName()
6361     for idx, dev in enumerate(snap_disks):
6362       if dev:
6363         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6364                                                instance, cluster_name, idx)
6365         if result.failed or not result.data:
6366           self.LogWarning("Could not export block device %s from node %s to"
6367                           " node %s", dev.logical_id[1], src_node,
6368                           dst_node.name)
6369         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6370         if msg:
6371           self.LogWarning("Could not remove snapshot block device %s from node"
6372                           " %s: %s", dev.logical_id[1], src_node, msg)
6373
6374     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6375     if result.failed or not result.data:
6376       self.LogWarning("Could not finalize export for instance %s on node %s",
6377                       instance.name, dst_node.name)
6378
6379     nodelist = self.cfg.GetNodeList()
6380     nodelist.remove(dst_node.name)
6381
6382     # on one-node clusters nodelist will be empty after the removal
6383     # if we proceed the backup would be removed because OpQueryExports
6384     # substitutes an empty list with the full cluster node list.
6385     if nodelist:
6386       exportlist = self.rpc.call_export_list(nodelist)
6387       for node in exportlist:
6388         if exportlist[node].failed:
6389           continue
6390         if instance.name in exportlist[node].data:
6391           if not self.rpc.call_export_remove(node, instance.name):
6392             self.LogWarning("Could not remove older export for instance %s"
6393                             " on node %s", instance.name, node)
6394
6395
6396 class LURemoveExport(NoHooksLU):
6397   """Remove exports related to the named instance.
6398
6399   """
6400   _OP_REQP = ["instance_name"]
6401   REQ_BGL = False
6402
6403   def ExpandNames(self):
6404     self.needed_locks = {}
6405     # We need all nodes to be locked in order for RemoveExport to work, but we
6406     # don't need to lock the instance itself, as nothing will happen to it (and
6407     # we can remove exports also for a removed instance)
6408     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6409
6410   def CheckPrereq(self):
6411     """Check prerequisites.
6412     """
6413     pass
6414
6415   def Exec(self, feedback_fn):
6416     """Remove any export.
6417
6418     """
6419     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6420     # If the instance was not found we'll try with the name that was passed in.
6421     # This will only work if it was an FQDN, though.
6422     fqdn_warn = False
6423     if not instance_name:
6424       fqdn_warn = True
6425       instance_name = self.op.instance_name
6426
6427     exportlist = self.rpc.call_export_list(self.acquired_locks[
6428       locking.LEVEL_NODE])
6429     found = False
6430     for node in exportlist:
6431       if exportlist[node].failed:
6432         self.LogWarning("Failed to query node %s, continuing" % node)
6433         continue
6434       if instance_name in exportlist[node].data:
6435         found = True
6436         result = self.rpc.call_export_remove(node, instance_name)
6437         if result.failed or not result.data:
6438           logging.error("Could not remove export for instance %s"
6439                         " on node %s", instance_name, node)
6440
6441     if fqdn_warn and not found:
6442       feedback_fn("Export not found. If trying to remove an export belonging"
6443                   " to a deleted instance please use its Fully Qualified"
6444                   " Domain Name.")
6445
6446
6447 class TagsLU(NoHooksLU):
6448   """Generic tags LU.
6449
6450   This is an abstract class which is the parent of all the other tags LUs.
6451
6452   """
6453
6454   def ExpandNames(self):
6455     self.needed_locks = {}
6456     if self.op.kind == constants.TAG_NODE:
6457       name = self.cfg.ExpandNodeName(self.op.name)
6458       if name is None:
6459         raise errors.OpPrereqError("Invalid node name (%s)" %
6460                                    (self.op.name,))
6461       self.op.name = name
6462       self.needed_locks[locking.LEVEL_NODE] = name
6463     elif self.op.kind == constants.TAG_INSTANCE:
6464       name = self.cfg.ExpandInstanceName(self.op.name)
6465       if name is None:
6466         raise errors.OpPrereqError("Invalid instance name (%s)" %
6467                                    (self.op.name,))
6468       self.op.name = name
6469       self.needed_locks[locking.LEVEL_INSTANCE] = name
6470
6471   def CheckPrereq(self):
6472     """Check prerequisites.
6473
6474     """
6475     if self.op.kind == constants.TAG_CLUSTER:
6476       self.target = self.cfg.GetClusterInfo()
6477     elif self.op.kind == constants.TAG_NODE:
6478       self.target = self.cfg.GetNodeInfo(self.op.name)
6479     elif self.op.kind == constants.TAG_INSTANCE:
6480       self.target = self.cfg.GetInstanceInfo(self.op.name)
6481     else:
6482       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6483                                  str(self.op.kind))
6484
6485
6486 class LUGetTags(TagsLU):
6487   """Returns the tags of a given object.
6488
6489   """
6490   _OP_REQP = ["kind", "name"]
6491   REQ_BGL = False
6492
6493   def Exec(self, feedback_fn):
6494     """Returns the tag list.
6495
6496     """
6497     return list(self.target.GetTags())
6498
6499
6500 class LUSearchTags(NoHooksLU):
6501   """Searches the tags for a given pattern.
6502
6503   """
6504   _OP_REQP = ["pattern"]
6505   REQ_BGL = False
6506
6507   def ExpandNames(self):
6508     self.needed_locks = {}
6509
6510   def CheckPrereq(self):
6511     """Check prerequisites.
6512
6513     This checks the pattern passed for validity by compiling it.
6514
6515     """
6516     try:
6517       self.re = re.compile(self.op.pattern)
6518     except re.error, err:
6519       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6520                                  (self.op.pattern, err))
6521
6522   def Exec(self, feedback_fn):
6523     """Returns the tag list.
6524
6525     """
6526     cfg = self.cfg
6527     tgts = [("/cluster", cfg.GetClusterInfo())]
6528     ilist = cfg.GetAllInstancesInfo().values()
6529     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6530     nlist = cfg.GetAllNodesInfo().values()
6531     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6532     results = []
6533     for path, target in tgts:
6534       for tag in target.GetTags():
6535         if self.re.search(tag):
6536           results.append((path, tag))
6537     return results
6538
6539
6540 class LUAddTags(TagsLU):
6541   """Sets a tag on a given object.
6542
6543   """
6544   _OP_REQP = ["kind", "name", "tags"]
6545   REQ_BGL = False
6546
6547   def CheckPrereq(self):
6548     """Check prerequisites.
6549
6550     This checks the type and length of the tag name and value.
6551
6552     """
6553     TagsLU.CheckPrereq(self)
6554     for tag in self.op.tags:
6555       objects.TaggableObject.ValidateTag(tag)
6556
6557   def Exec(self, feedback_fn):
6558     """Sets the tag.
6559
6560     """
6561     try:
6562       for tag in self.op.tags:
6563         self.target.AddTag(tag)
6564     except errors.TagError, err:
6565       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6566     try:
6567       self.cfg.Update(self.target)
6568     except errors.ConfigurationError:
6569       raise errors.OpRetryError("There has been a modification to the"
6570                                 " config file and the operation has been"
6571                                 " aborted. Please retry.")
6572
6573
6574 class LUDelTags(TagsLU):
6575   """Delete a list of tags from a given object.
6576
6577   """
6578   _OP_REQP = ["kind", "name", "tags"]
6579   REQ_BGL = False
6580
6581   def CheckPrereq(self):
6582     """Check prerequisites.
6583
6584     This checks that we have the given tag.
6585
6586     """
6587     TagsLU.CheckPrereq(self)
6588     for tag in self.op.tags:
6589       objects.TaggableObject.ValidateTag(tag)
6590     del_tags = frozenset(self.op.tags)
6591     cur_tags = self.target.GetTags()
6592     if not del_tags <= cur_tags:
6593       diff_tags = del_tags - cur_tags
6594       diff_names = ["'%s'" % tag for tag in diff_tags]
6595       diff_names.sort()
6596       raise errors.OpPrereqError("Tag(s) %s not found" %
6597                                  (",".join(diff_names)))
6598
6599   def Exec(self, feedback_fn):
6600     """Remove the tag from the object.
6601
6602     """
6603     for tag in self.op.tags:
6604       self.target.RemoveTag(tag)
6605     try:
6606       self.cfg.Update(self.target)
6607     except errors.ConfigurationError:
6608       raise errors.OpRetryError("There has been a modification to the"
6609                                 " config file and the operation has been"
6610                                 " aborted. Please retry.")
6611
6612
6613 class LUTestDelay(NoHooksLU):
6614   """Sleep for a specified amount of time.
6615
6616   This LU sleeps on the master and/or nodes for a specified amount of
6617   time.
6618
6619   """
6620   _OP_REQP = ["duration", "on_master", "on_nodes"]
6621   REQ_BGL = False
6622
6623   def ExpandNames(self):
6624     """Expand names and set required locks.
6625
6626     This expands the node list, if any.
6627
6628     """
6629     self.needed_locks = {}
6630     if self.op.on_nodes:
6631       # _GetWantedNodes can be used here, but is not always appropriate to use
6632       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6633       # more information.
6634       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6635       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6636
6637   def CheckPrereq(self):
6638     """Check prerequisites.
6639
6640     """
6641
6642   def Exec(self, feedback_fn):
6643     """Do the actual sleep.
6644
6645     """
6646     if self.op.on_master:
6647       if not utils.TestDelay(self.op.duration):
6648         raise errors.OpExecError("Error during master delay test")
6649     if self.op.on_nodes:
6650       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6651       if not result:
6652         raise errors.OpExecError("Complete failure from rpc call")
6653       for node, node_result in result.items():
6654         node_result.Raise()
6655         if not node_result.data:
6656           raise errors.OpExecError("Failure during rpc call to node %s,"
6657                                    " result: %s" % (node, node_result.data))
6658
6659
6660 class IAllocator(object):
6661   """IAllocator framework.
6662
6663   An IAllocator instance has three sets of attributes:
6664     - cfg that is needed to query the cluster
6665     - input data (all members of the _KEYS class attribute are required)
6666     - four buffer attributes (in|out_data|text), that represent the
6667       input (to the external script) in text and data structure format,
6668       and the output from it, again in two formats
6669     - the result variables from the script (success, info, nodes) for
6670       easy usage
6671
6672   """
6673   _ALLO_KEYS = [
6674     "mem_size", "disks", "disk_template",
6675     "os", "tags", "nics", "vcpus", "hypervisor",
6676     ]
6677   _RELO_KEYS = [
6678     "relocate_from",
6679     ]
6680
6681   def __init__(self, lu, mode, name, **kwargs):
6682     self.lu = lu
6683     # init buffer variables
6684     self.in_text = self.out_text = self.in_data = self.out_data = None
6685     # init all input fields so that pylint is happy
6686     self.mode = mode
6687     self.name = name
6688     self.mem_size = self.disks = self.disk_template = None
6689     self.os = self.tags = self.nics = self.vcpus = None
6690     self.hypervisor = None
6691     self.relocate_from = None
6692     # computed fields
6693     self.required_nodes = None
6694     # init result fields
6695     self.success = self.info = self.nodes = None
6696     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6697       keyset = self._ALLO_KEYS
6698     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6699       keyset = self._RELO_KEYS
6700     else:
6701       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6702                                    " IAllocator" % self.mode)
6703     for key in kwargs:
6704       if key not in keyset:
6705         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6706                                      " IAllocator" % key)
6707       setattr(self, key, kwargs[key])
6708     for key in keyset:
6709       if key not in kwargs:
6710         raise errors.ProgrammerError("Missing input parameter '%s' to"
6711                                      " IAllocator" % key)
6712     self._BuildInputData()
6713
6714   def _ComputeClusterData(self):
6715     """Compute the generic allocator input data.
6716
6717     This is the data that is independent of the actual operation.
6718
6719     """
6720     cfg = self.lu.cfg
6721     cluster_info = cfg.GetClusterInfo()
6722     # cluster data
6723     data = {
6724       "version": constants.IALLOCATOR_VERSION,
6725       "cluster_name": cfg.GetClusterName(),
6726       "cluster_tags": list(cluster_info.GetTags()),
6727       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6728       # we don't have job IDs
6729       }
6730     iinfo = cfg.GetAllInstancesInfo().values()
6731     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6732
6733     # node data
6734     node_results = {}
6735     node_list = cfg.GetNodeList()
6736
6737     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6738       hypervisor_name = self.hypervisor
6739     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6740       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6741
6742     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6743                                            hypervisor_name)
6744     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6745                        cluster_info.enabled_hypervisors)
6746     for nname, nresult in node_data.items():
6747       # first fill in static (config-based) values
6748       ninfo = cfg.GetNodeInfo(nname)
6749       pnr = {
6750         "tags": list(ninfo.GetTags()),
6751         "primary_ip": ninfo.primary_ip,
6752         "secondary_ip": ninfo.secondary_ip,
6753         "offline": ninfo.offline,
6754         "drained": ninfo.drained,
6755         "master_candidate": ninfo.master_candidate,
6756         }
6757
6758       if not ninfo.offline:
6759         nresult.Raise()
6760         if not isinstance(nresult.data, dict):
6761           raise errors.OpExecError("Can't get data for node %s" % nname)
6762         remote_info = nresult.data
6763         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6764                      'vg_size', 'vg_free', 'cpu_total']:
6765           if attr not in remote_info:
6766             raise errors.OpExecError("Node '%s' didn't return attribute"
6767                                      " '%s'" % (nname, attr))
6768           try:
6769             remote_info[attr] = int(remote_info[attr])
6770           except ValueError, err:
6771             raise errors.OpExecError("Node '%s' returned invalid value"
6772                                      " for '%s': %s" % (nname, attr, err))
6773         # compute memory used by primary instances
6774         i_p_mem = i_p_up_mem = 0
6775         for iinfo, beinfo in i_list:
6776           if iinfo.primary_node == nname:
6777             i_p_mem += beinfo[constants.BE_MEMORY]
6778             if iinfo.name not in node_iinfo[nname].data:
6779               i_used_mem = 0
6780             else:
6781               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6782             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6783             remote_info['memory_free'] -= max(0, i_mem_diff)
6784
6785             if iinfo.admin_up:
6786               i_p_up_mem += beinfo[constants.BE_MEMORY]
6787
6788         # compute memory used by instances
6789         pnr_dyn = {
6790           "total_memory": remote_info['memory_total'],
6791           "reserved_memory": remote_info['memory_dom0'],
6792           "free_memory": remote_info['memory_free'],
6793           "total_disk": remote_info['vg_size'],
6794           "free_disk": remote_info['vg_free'],
6795           "total_cpus": remote_info['cpu_total'],
6796           "i_pri_memory": i_p_mem,
6797           "i_pri_up_memory": i_p_up_mem,
6798           }
6799         pnr.update(pnr_dyn)
6800
6801       node_results[nname] = pnr
6802     data["nodes"] = node_results
6803
6804     # instance data
6805     instance_data = {}
6806     for iinfo, beinfo in i_list:
6807       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6808                   for n in iinfo.nics]
6809       pir = {
6810         "tags": list(iinfo.GetTags()),
6811         "admin_up": iinfo.admin_up,
6812         "vcpus": beinfo[constants.BE_VCPUS],
6813         "memory": beinfo[constants.BE_MEMORY],
6814         "os": iinfo.os,
6815         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6816         "nics": nic_data,
6817         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6818         "disk_template": iinfo.disk_template,
6819         "hypervisor": iinfo.hypervisor,
6820         }
6821       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6822                                                  pir["disks"])
6823       instance_data[iinfo.name] = pir
6824
6825     data["instances"] = instance_data
6826
6827     self.in_data = data
6828
6829   def _AddNewInstance(self):
6830     """Add new instance data to allocator structure.
6831
6832     This in combination with _AllocatorGetClusterData will create the
6833     correct structure needed as input for the allocator.
6834
6835     The checks for the completeness of the opcode must have already been
6836     done.
6837
6838     """
6839     data = self.in_data
6840
6841     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6842
6843     if self.disk_template in constants.DTS_NET_MIRROR:
6844       self.required_nodes = 2
6845     else:
6846       self.required_nodes = 1
6847     request = {
6848       "type": "allocate",
6849       "name": self.name,
6850       "disk_template": self.disk_template,
6851       "tags": self.tags,
6852       "os": self.os,
6853       "vcpus": self.vcpus,
6854       "memory": self.mem_size,
6855       "disks": self.disks,
6856       "disk_space_total": disk_space,
6857       "nics": self.nics,
6858       "required_nodes": self.required_nodes,
6859       }
6860     data["request"] = request
6861
6862   def _AddRelocateInstance(self):
6863     """Add relocate instance data to allocator structure.
6864
6865     This in combination with _IAllocatorGetClusterData will create the
6866     correct structure needed as input for the allocator.
6867
6868     The checks for the completeness of the opcode must have already been
6869     done.
6870
6871     """
6872     instance = self.lu.cfg.GetInstanceInfo(self.name)
6873     if instance is None:
6874       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6875                                    " IAllocator" % self.name)
6876
6877     if instance.disk_template not in constants.DTS_NET_MIRROR:
6878       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6879
6880     if len(instance.secondary_nodes) != 1:
6881       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6882
6883     self.required_nodes = 1
6884     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6885     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6886
6887     request = {
6888       "type": "relocate",
6889       "name": self.name,
6890       "disk_space_total": disk_space,
6891       "required_nodes": self.required_nodes,
6892       "relocate_from": self.relocate_from,
6893       }
6894     self.in_data["request"] = request
6895
6896   def _BuildInputData(self):
6897     """Build input data structures.
6898
6899     """
6900     self._ComputeClusterData()
6901
6902     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6903       self._AddNewInstance()
6904     else:
6905       self._AddRelocateInstance()
6906
6907     self.in_text = serializer.Dump(self.in_data)
6908
6909   def Run(self, name, validate=True, call_fn=None):
6910     """Run an instance allocator and return the results.
6911
6912     """
6913     if call_fn is None:
6914       call_fn = self.lu.rpc.call_iallocator_runner
6915     data = self.in_text
6916
6917     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6918     result.Raise()
6919
6920     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6921       raise errors.OpExecError("Invalid result from master iallocator runner")
6922
6923     rcode, stdout, stderr, fail = result.data
6924
6925     if rcode == constants.IARUN_NOTFOUND:
6926       raise errors.OpExecError("Can't find allocator '%s'" % name)
6927     elif rcode == constants.IARUN_FAILURE:
6928       raise errors.OpExecError("Instance allocator call failed: %s,"
6929                                " output: %s" % (fail, stdout+stderr))
6930     self.out_text = stdout
6931     if validate:
6932       self._ValidateResult()
6933
6934   def _ValidateResult(self):
6935     """Process the allocator results.
6936
6937     This will process and if successful save the result in
6938     self.out_data and the other parameters.
6939
6940     """
6941     try:
6942       rdict = serializer.Load(self.out_text)
6943     except Exception, err:
6944       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6945
6946     if not isinstance(rdict, dict):
6947       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6948
6949     for key in "success", "info", "nodes":
6950       if key not in rdict:
6951         raise errors.OpExecError("Can't parse iallocator results:"
6952                                  " missing key '%s'" % key)
6953       setattr(self, key, rdict[key])
6954
6955     if not isinstance(rdict["nodes"], list):
6956       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6957                                " is not a list")
6958     self.out_data = rdict
6959
6960
6961 class LUTestAllocator(NoHooksLU):
6962   """Run allocator tests.
6963
6964   This LU runs the allocator tests
6965
6966   """
6967   _OP_REQP = ["direction", "mode", "name"]
6968
6969   def CheckPrereq(self):
6970     """Check prerequisites.
6971
6972     This checks the opcode parameters depending on the director and mode test.
6973
6974     """
6975     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6976       for attr in ["name", "mem_size", "disks", "disk_template",
6977                    "os", "tags", "nics", "vcpus"]:
6978         if not hasattr(self.op, attr):
6979           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6980                                      attr)
6981       iname = self.cfg.ExpandInstanceName(self.op.name)
6982       if iname is not None:
6983         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6984                                    iname)
6985       if not isinstance(self.op.nics, list):
6986         raise errors.OpPrereqError("Invalid parameter 'nics'")
6987       for row in self.op.nics:
6988         if (not isinstance(row, dict) or
6989             "mac" not in row or
6990             "ip" not in row or
6991             "bridge" not in row):
6992           raise errors.OpPrereqError("Invalid contents of the"
6993                                      " 'nics' parameter")
6994       if not isinstance(self.op.disks, list):
6995         raise errors.OpPrereqError("Invalid parameter 'disks'")
6996       for row in self.op.disks:
6997         if (not isinstance(row, dict) or
6998             "size" not in row or
6999             not isinstance(row["size"], int) or
7000             "mode" not in row or
7001             row["mode"] not in ['r', 'w']):
7002           raise errors.OpPrereqError("Invalid contents of the"
7003                                      " 'disks' parameter")
7004       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7005         self.op.hypervisor = self.cfg.GetHypervisorType()
7006     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7007       if not hasattr(self.op, "name"):
7008         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7009       fname = self.cfg.ExpandInstanceName(self.op.name)
7010       if fname is None:
7011         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7012                                    self.op.name)
7013       self.op.name = fname
7014       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7015     else:
7016       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7017                                  self.op.mode)
7018
7019     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7020       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7021         raise errors.OpPrereqError("Missing allocator name")
7022     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7023       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7024                                  self.op.direction)
7025
7026   def Exec(self, feedback_fn):
7027     """Run the allocator test.
7028
7029     """
7030     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7031       ial = IAllocator(self,
7032                        mode=self.op.mode,
7033                        name=self.op.name,
7034                        mem_size=self.op.mem_size,
7035                        disks=self.op.disks,
7036                        disk_template=self.op.disk_template,
7037                        os=self.op.os,
7038                        tags=self.op.tags,
7039                        nics=self.op.nics,
7040                        vcpus=self.op.vcpus,
7041                        hypervisor=self.op.hypervisor,
7042                        )
7043     else:
7044       ial = IAllocator(self,
7045                        mode=self.op.mode,
7046                        name=self.op.name,
7047                        relocate_from=list(self.relocate_from),
7048                        )
7049
7050     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7051       result = ial.in_text
7052     else:
7053       ial.Run(self.op.allocator, validate=False)
7054       result = ial.out_text
7055     return result