ganeti.cmdlib: Check remote API certificate on "gnt-cluster verify"
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: string
459   @param status: the desired status of the instances
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   env = {
472     "OP_TARGET": name,
473     "INSTANCE_NAME": name,
474     "INSTANCE_PRIMARY": primary_node,
475     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476     "INSTANCE_OS_TYPE": os_type,
477     "INSTANCE_STATUS": status,
478     "INSTANCE_MEMORY": memory,
479     "INSTANCE_VCPUS": vcpus,
480   }
481
482   if nics:
483     nic_count = len(nics)
484     for idx, (ip, bridge, mac) in enumerate(nics):
485       if ip is None:
486         ip = ""
487       env["INSTANCE_NIC%d_IP" % idx] = ip
488       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490   else:
491     nic_count = 0
492
493   env["INSTANCE_NIC_COUNT"] = nic_count
494
495   return env
496
497
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499   """Builds instance related env variables for hooks from an object.
500
501   @type lu: L{LogicalUnit}
502   @param lu: the logical unit on whose behalf we execute
503   @type instance: L{objects.Instance}
504   @param instance: the instance for which we should build the
505       environment
506   @type override: dict
507   @param override: dictionary with key/values that will override
508       our values
509   @rtype: dict
510   @return: the hook environment dictionary
511
512   """
513   bep = lu.cfg.GetClusterInfo().FillBE(instance)
514   args = {
515     'name': instance.name,
516     'primary_node': instance.primary_node,
517     'secondary_nodes': instance.secondary_nodes,
518     'os_type': instance.os,
519     'status': instance.os,
520     'memory': bep[constants.BE_MEMORY],
521     'vcpus': bep[constants.BE_VCPUS],
522     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523   }
524   if override:
525     args.update(override)
526   return _BuildInstanceHookEnv(**args)
527
528
529 def _AdjustCandidatePool(lu):
530   """Adjust the candidate pool after node operations.
531
532   """
533   mod_list = lu.cfg.MaintainCandidatePool()
534   if mod_list:
535     lu.LogInfo("Promoted nodes to master candidate role: %s",
536                ", ".join(node.name for node in mod_list))
537     for name in mod_list:
538       lu.context.ReaddNode(name)
539   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540   if mc_now > mc_max:
541     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542                (mc_now, mc_max))
543
544
545 def _CheckInstanceBridgesExist(lu, instance):
546   """Check that the brigdes needed by an instance exist.
547
548   """
549   # check bridges existance
550   brlist = [nic.bridge for nic in instance.nics]
551   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552   result.Raise()
553   if not result.data:
554     raise errors.OpPrereqError("One or more target bridges %s does not"
555                                " exist on destination node '%s'" %
556                                (brlist, instance.primary_node))
557
558
559 class LUDestroyCluster(NoHooksLU):
560   """Logical unit for destroying the cluster.
561
562   """
563   _OP_REQP = []
564
565   def CheckPrereq(self):
566     """Check prerequisites.
567
568     This checks whether the cluster is empty.
569
570     Any errors are signalled by raising errors.OpPrereqError.
571
572     """
573     master = self.cfg.GetMasterNode()
574
575     nodelist = self.cfg.GetNodeList()
576     if len(nodelist) != 1 or nodelist[0] != master:
577       raise errors.OpPrereqError("There are still %d node(s) in"
578                                  " this cluster." % (len(nodelist) - 1))
579     instancelist = self.cfg.GetInstanceList()
580     if instancelist:
581       raise errors.OpPrereqError("There are still %d instance(s) in"
582                                  " this cluster." % len(instancelist))
583
584   def Exec(self, feedback_fn):
585     """Destroys the cluster.
586
587     """
588     master = self.cfg.GetMasterNode()
589     result = self.rpc.call_node_stop_master(master, False)
590     result.Raise()
591     if not result.data:
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     return master
597
598
599 class LUVerifyCluster(LogicalUnit):
600   """Verifies the cluster status.
601
602   """
603   HPATH = "cluster-verify"
604   HTYPE = constants.HTYPE_CLUSTER
605   _OP_REQP = ["skip_checks"]
606   REQ_BGL = False
607
608   def ExpandNames(self):
609     self.needed_locks = {
610       locking.LEVEL_NODE: locking.ALL_SET,
611       locking.LEVEL_INSTANCE: locking.ALL_SET,
612     }
613     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614
615   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616                   node_result, feedback_fn, master_files):
617     """Run multiple tests against a node.
618
619     Test list:
620
621       - compares ganeti version
622       - checks vg existance and size > 20G
623       - checks config file checksum
624       - checks ssh to other nodes
625
626     @type nodeinfo: L{objects.Node}
627     @param nodeinfo: the node to check
628     @param file_list: required list of files
629     @param local_cksum: dictionary of local files and their checksums
630     @param node_result: the results from the node
631     @param feedback_fn: function used to accumulate results
632     @param master_files: list of files that only masters should have
633
634     """
635     node = nodeinfo.name
636
637     # main result, node_result should be a non-empty dict
638     if not node_result or not isinstance(node_result, dict):
639       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640       return True
641
642     # compares ganeti version
643     local_version = constants.PROTOCOL_VERSION
644     remote_version = node_result.get('version', None)
645     if not remote_version:
646       feedback_fn("  - ERROR: connection to %s failed" % (node))
647       return True
648
649     if local_version != remote_version:
650       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651                       (local_version, node, remote_version))
652       return True
653
654     # checks vg existance and size > 20G
655
656     bad = False
657     vglist = node_result.get(constants.NV_VGLIST, None)
658     if not vglist:
659       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660                       (node,))
661       bad = True
662     else:
663       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664                                             constants.MIN_VG_SIZE)
665       if vgstatus:
666         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667         bad = True
668
669     # checks config file checksum
670
671     remote_cksum = node_result.get(constants.NV_FILELIST, None)
672     if not isinstance(remote_cksum, dict):
673       bad = True
674       feedback_fn("  - ERROR: node hasn't returned file checksum data")
675     else:
676       for file_name in file_list:
677         node_is_mc = nodeinfo.master_candidate
678         must_have_file = file_name not in master_files
679         if file_name not in remote_cksum:
680           if node_is_mc or must_have_file:
681             bad = True
682             feedback_fn("  - ERROR: file '%s' missing" % file_name)
683         elif remote_cksum[file_name] != local_cksum[file_name]:
684           if node_is_mc or must_have_file:
685             bad = True
686             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687           else:
688             # not candidate and this is not a must-have file
689             bad = True
690             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691                         " '%s'" % file_name)
692         else:
693           # all good, except non-master/non-must have combination
694           if not node_is_mc and not must_have_file:
695             feedback_fn("  - ERROR: file '%s' should not exist on non master"
696                         " candidates" % file_name)
697
698     # checks ssh to any
699
700     if constants.NV_NODELIST not in node_result:
701       bad = True
702       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703     else:
704       if node_result[constants.NV_NODELIST]:
705         bad = True
706         for node in node_result[constants.NV_NODELIST]:
707           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708                           (node, node_result[constants.NV_NODELIST][node]))
709
710     if constants.NV_NODENETTEST not in node_result:
711       bad = True
712       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713     else:
714       if node_result[constants.NV_NODENETTEST]:
715         bad = True
716         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717         for node in nlist:
718           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719                           (node, node_result[constants.NV_NODENETTEST][node]))
720
721     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722     if isinstance(hyp_result, dict):
723       for hv_name, hv_result in hyp_result.iteritems():
724         if hv_result is not None:
725           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726                       (hv_name, hv_result))
727     return bad
728
729   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730                       node_instance, feedback_fn, n_offline):
731     """Verify an instance.
732
733     This function checks to see if the required block devices are
734     available on the instance's node.
735
736     """
737     bad = False
738
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       if node in n_offline:
746         # ignore missing volumes on offline nodes
747         continue
748       for volume in node_vol_should[node]:
749         if node not in node_vol_is or volume not in node_vol_is[node]:
750           feedback_fn("  - ERROR: volume %s missing on node %s" %
751                           (volume, node))
752           bad = True
753
754     if not instanceconfig.status == 'down':
755       if ((node_current not in node_instance or
756           not instance in node_instance[node_current]) and
757           node_current not in n_offline):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804     """Verify N+1 Memory Resilience.
805
806     Check that if one single node dies we can still start all the instances it
807     was primary for.
808
809     """
810     bad = False
811
812     for node, nodeinfo in node_info.iteritems():
813       # This code checks that every node which is now listed as secondary has
814       # enough memory to host all instances it is supposed to should a single
815       # other node in the cluster fail.
816       # FIXME: not ready for failover to an arbitrary node
817       # FIXME: does not support file-backed instances
818       # WARNING: we currently take into account down instances as well as up
819       # ones, considering that even if they're down someone might want to start
820       # them even in the event of a node failure.
821       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822         needed_mem = 0
823         for instance in instances:
824           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825           if bep[constants.BE_AUTO_BALANCE]:
826             needed_mem += bep[constants.BE_MEMORY]
827         if nodeinfo['mfree'] < needed_mem:
828           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829                       " failovers should node %s fail" % (node, prinode))
830           bad = True
831     return bad
832
833   def CheckPrereq(self):
834     """Check prerequisites.
835
836     Transform the list of checks we're going to skip into a set and check that
837     all its members are valid.
838
839     """
840     self.skip_set = frozenset(self.op.skip_checks)
841     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842       raise errors.OpPrereqError("Invalid checks to be skipped specified")
843
844   def BuildHooksEnv(self):
845     """Build hooks env.
846
847     Cluster-Verify hooks just rone in the post phase and their failure makes
848     the output be logged in the verify output and the verification to fail.
849
850     """
851     all_nodes = self.cfg.GetNodeList()
852     # TODO: populate the environment with useful information for verify hooks
853     env = {}
854     return env, [], all_nodes
855
856   def Exec(self, feedback_fn):
857     """Verify integrity of cluster, performing various test on nodes.
858
859     """
860     bad = False
861     feedback_fn("* Verifying global settings")
862     for msg in self.cfg.VerifyConfig():
863       feedback_fn("  - ERROR: %s" % msg)
864
865     vg_name = self.cfg.GetVGName()
866     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867     nodelist = utils.NiceSort(self.cfg.GetNodeList())
868     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870     i_non_redundant = [] # Non redundant instances
871     i_non_a_balanced = [] # Non auto-balanced instances
872     n_offline = [] # List of offline nodes
873     node_volume = {}
874     node_instance = {}
875     node_info = {}
876     instance_cfg = {}
877
878     # FIXME: verify OS list
879     # do local checksums
880     master_files = [constants.CLUSTER_CONF_FILE]
881
882     file_names = ssconf.SimpleStore().GetFileList()
883     file_names.append(constants.SSL_CERT_FILE)
884     file_names.append(constants.RAPI_CERT_FILE)
885     file_names.extend(master_files)
886
887     local_checksums = utils.FingerprintFiles(file_names)
888
889     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
890     node_verify_param = {
891       constants.NV_FILELIST: file_names,
892       constants.NV_NODELIST: nodelist,
893       constants.NV_HYPERVISOR: hypervisors,
894       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
895                                   node.secondary_ip) for node in nodeinfo],
896       constants.NV_LVLIST: vg_name,
897       constants.NV_INSTANCELIST: hypervisors,
898       constants.NV_VGLIST: None,
899       constants.NV_VERSION: None,
900       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
901       }
902     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
903                                            self.cfg.GetClusterName())
904
905     cluster = self.cfg.GetClusterInfo()
906     master_node = self.cfg.GetMasterNode()
907     for node_i in nodeinfo:
908       node = node_i.name
909       nresult = all_nvinfo[node].data
910
911       if node_i.offline:
912         feedback_fn("* Skipping offline node %s" % (node,))
913         n_offline.append(node)
914         continue
915
916       if node == master_node:
917         ntype = "master"
918       elif node_i.master_candidate:
919         ntype = "master candidate"
920       else:
921         ntype = "regular"
922       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
923
924       if all_nvinfo[node].failed or not isinstance(nresult, dict):
925         feedback_fn("  - ERROR: connection to %s failed" % (node,))
926         bad = True
927         continue
928
929       result = self._VerifyNode(node_i, file_names, local_checksums,
930                                 nresult, feedback_fn, master_files)
931       bad = bad or result
932
933       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
934       if isinstance(lvdata, basestring):
935         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
936                     (node, lvdata.encode('string_escape')))
937         bad = True
938         node_volume[node] = {}
939       elif not isinstance(lvdata, dict):
940         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
941         bad = True
942         continue
943       else:
944         node_volume[node] = lvdata
945
946       # node_instance
947       idata = nresult.get(constants.NV_INSTANCELIST, None)
948       if not isinstance(idata, list):
949         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
950                     (node,))
951         bad = True
952         continue
953
954       node_instance[node] = idata
955
956       # node_info
957       nodeinfo = nresult.get(constants.NV_HVINFO, None)
958       if not isinstance(nodeinfo, dict):
959         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
960         bad = True
961         continue
962
963       try:
964         node_info[node] = {
965           "mfree": int(nodeinfo['memory_free']),
966           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
967           "pinst": [],
968           "sinst": [],
969           # dictionary holding all instances this node is secondary for,
970           # grouped by their primary node. Each key is a cluster node, and each
971           # value is a list of instances which have the key as primary and the
972           # current node as secondary.  this is handy to calculate N+1 memory
973           # availability if you can only failover from a primary to its
974           # secondary.
975           "sinst-by-pnode": {},
976         }
977       except ValueError:
978         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
979         bad = True
980         continue
981
982     node_vol_should = {}
983
984     for instance in instancelist:
985       feedback_fn("* Verifying instance %s" % instance)
986       inst_config = self.cfg.GetInstanceInfo(instance)
987       result =  self._VerifyInstance(instance, inst_config, node_volume,
988                                      node_instance, feedback_fn, n_offline)
989       bad = bad or result
990       inst_nodes_offline = []
991
992       inst_config.MapLVsByNode(node_vol_should)
993
994       instance_cfg[instance] = inst_config
995
996       pnode = inst_config.primary_node
997       if pnode in node_info:
998         node_info[pnode]['pinst'].append(instance)
999       elif pnode not in n_offline:
1000         feedback_fn("  - ERROR: instance %s, connection to primary node"
1001                     " %s failed" % (instance, pnode))
1002         bad = True
1003
1004       if pnode in n_offline:
1005         inst_nodes_offline.append(pnode)
1006
1007       # If the instance is non-redundant we cannot survive losing its primary
1008       # node, so we are not N+1 compliant. On the other hand we have no disk
1009       # templates with more than one secondary so that situation is not well
1010       # supported either.
1011       # FIXME: does not support file-backed instances
1012       if len(inst_config.secondary_nodes) == 0:
1013         i_non_redundant.append(instance)
1014       elif len(inst_config.secondary_nodes) > 1:
1015         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1016                     % instance)
1017
1018       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1019         i_non_a_balanced.append(instance)
1020
1021       for snode in inst_config.secondary_nodes:
1022         if snode in node_info:
1023           node_info[snode]['sinst'].append(instance)
1024           if pnode not in node_info[snode]['sinst-by-pnode']:
1025             node_info[snode]['sinst-by-pnode'][pnode] = []
1026           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1027         elif snode not in n_offline:
1028           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1029                       " %s failed" % (instance, snode))
1030           bad = True
1031         if snode in n_offline:
1032           inst_nodes_offline.append(snode)
1033
1034       if inst_nodes_offline:
1035         # warn that the instance lives on offline nodes, and set bad=True
1036         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1037                     ", ".join(inst_nodes_offline))
1038         bad = True
1039
1040     feedback_fn("* Verifying orphan volumes")
1041     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1042                                        feedback_fn)
1043     bad = bad or result
1044
1045     feedback_fn("* Verifying remaining instances")
1046     result = self._VerifyOrphanInstances(instancelist, node_instance,
1047                                          feedback_fn)
1048     bad = bad or result
1049
1050     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1051       feedback_fn("* Verifying N+1 Memory redundancy")
1052       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1053       bad = bad or result
1054
1055     feedback_fn("* Other Notes")
1056     if i_non_redundant:
1057       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1058                   % len(i_non_redundant))
1059
1060     if i_non_a_balanced:
1061       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1062                   % len(i_non_a_balanced))
1063
1064     if n_offline:
1065       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1066
1067     return not bad
1068
1069   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1070     """Analize the post-hooks' result
1071
1072     This method analyses the hook result, handles it, and sends some
1073     nicely-formatted feedback back to the user.
1074
1075     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1076         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1077     @param hooks_results: the results of the multi-node hooks rpc call
1078     @param feedback_fn: function used send feedback back to the caller
1079     @param lu_result: previous Exec result
1080     @return: the new Exec result, based on the previous result
1081         and hook results
1082
1083     """
1084     # We only really run POST phase hooks, and are only interested in
1085     # their results
1086     if phase == constants.HOOKS_PHASE_POST:
1087       # Used to change hooks' output to proper indentation
1088       indent_re = re.compile('^', re.M)
1089       feedback_fn("* Hooks Results")
1090       if not hooks_results:
1091         feedback_fn("  - ERROR: general communication failure")
1092         lu_result = 1
1093       else:
1094         for node_name in hooks_results:
1095           show_node_header = True
1096           res = hooks_results[node_name]
1097           if res.failed or res.data is False or not isinstance(res.data, list):
1098             if res.offline:
1099               # no need to warn or set fail return value
1100               continue
1101             feedback_fn("    Communication failure in hooks execution")
1102             lu_result = 1
1103             continue
1104           for script, hkr, output in res.data:
1105             if hkr == constants.HKR_FAIL:
1106               # The node header is only shown once, if there are
1107               # failing hooks on that node
1108               if show_node_header:
1109                 feedback_fn("  Node %s:" % node_name)
1110                 show_node_header = False
1111               feedback_fn("    ERROR: Script %s failed, output:" % script)
1112               output = indent_re.sub('      ', output)
1113               feedback_fn("%s" % output)
1114               lu_result = 1
1115
1116       return lu_result
1117
1118
1119 class LUVerifyDisks(NoHooksLU):
1120   """Verifies the cluster disks status.
1121
1122   """
1123   _OP_REQP = []
1124   REQ_BGL = False
1125
1126   def ExpandNames(self):
1127     self.needed_locks = {
1128       locking.LEVEL_NODE: locking.ALL_SET,
1129       locking.LEVEL_INSTANCE: locking.ALL_SET,
1130     }
1131     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1132
1133   def CheckPrereq(self):
1134     """Check prerequisites.
1135
1136     This has no prerequisites.
1137
1138     """
1139     pass
1140
1141   def Exec(self, feedback_fn):
1142     """Verify integrity of cluster disks.
1143
1144     """
1145     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1146
1147     vg_name = self.cfg.GetVGName()
1148     nodes = utils.NiceSort(self.cfg.GetNodeList())
1149     instances = [self.cfg.GetInstanceInfo(name)
1150                  for name in self.cfg.GetInstanceList()]
1151
1152     nv_dict = {}
1153     for inst in instances:
1154       inst_lvs = {}
1155       if (inst.status != "up" or
1156           inst.disk_template not in constants.DTS_NET_MIRROR):
1157         continue
1158       inst.MapLVsByNode(inst_lvs)
1159       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1160       for node, vol_list in inst_lvs.iteritems():
1161         for vol in vol_list:
1162           nv_dict[(node, vol)] = inst
1163
1164     if not nv_dict:
1165       return result
1166
1167     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1168
1169     to_act = set()
1170     for node in nodes:
1171       # node_volume
1172       lvs = node_lvs[node]
1173       if lvs.failed:
1174         if not lvs.offline:
1175           self.LogWarning("Connection to node %s failed: %s" %
1176                           (node, lvs.data))
1177         continue
1178       lvs = lvs.data
1179       if isinstance(lvs, basestring):
1180         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1181         res_nlvm[node] = lvs
1182       elif not isinstance(lvs, dict):
1183         logging.warning("Connection to node %s failed or invalid data"
1184                         " returned", node)
1185         res_nodes.append(node)
1186         continue
1187
1188       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1189         inst = nv_dict.pop((node, lv_name), None)
1190         if (not lv_online and inst is not None
1191             and inst.name not in res_instances):
1192           res_instances.append(inst.name)
1193
1194     # any leftover items in nv_dict are missing LVs, let's arrange the
1195     # data better
1196     for key, inst in nv_dict.iteritems():
1197       if inst.name not in res_missing:
1198         res_missing[inst.name] = []
1199       res_missing[inst.name].append(key)
1200
1201     return result
1202
1203
1204 class LURenameCluster(LogicalUnit):
1205   """Rename the cluster.
1206
1207   """
1208   HPATH = "cluster-rename"
1209   HTYPE = constants.HTYPE_CLUSTER
1210   _OP_REQP = ["name"]
1211
1212   def BuildHooksEnv(self):
1213     """Build hooks env.
1214
1215     """
1216     env = {
1217       "OP_TARGET": self.cfg.GetClusterName(),
1218       "NEW_NAME": self.op.name,
1219       }
1220     mn = self.cfg.GetMasterNode()
1221     return env, [mn], [mn]
1222
1223   def CheckPrereq(self):
1224     """Verify that the passed name is a valid one.
1225
1226     """
1227     hostname = utils.HostInfo(self.op.name)
1228
1229     new_name = hostname.name
1230     self.ip = new_ip = hostname.ip
1231     old_name = self.cfg.GetClusterName()
1232     old_ip = self.cfg.GetMasterIP()
1233     if new_name == old_name and new_ip == old_ip:
1234       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1235                                  " cluster has changed")
1236     if new_ip != old_ip:
1237       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1238         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1239                                    " reachable on the network. Aborting." %
1240                                    new_ip)
1241
1242     self.op.name = new_name
1243
1244   def Exec(self, feedback_fn):
1245     """Rename the cluster.
1246
1247     """
1248     clustername = self.op.name
1249     ip = self.ip
1250
1251     # shutdown the master IP
1252     master = self.cfg.GetMasterNode()
1253     result = self.rpc.call_node_stop_master(master, False)
1254     if result.failed or not result.data:
1255       raise errors.OpExecError("Could not disable the master role")
1256
1257     try:
1258       cluster = self.cfg.GetClusterInfo()
1259       cluster.cluster_name = clustername
1260       cluster.master_ip = ip
1261       self.cfg.Update(cluster)
1262
1263       # update the known hosts file
1264       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1265       node_list = self.cfg.GetNodeList()
1266       try:
1267         node_list.remove(master)
1268       except ValueError:
1269         pass
1270       result = self.rpc.call_upload_file(node_list,
1271                                          constants.SSH_KNOWN_HOSTS_FILE)
1272       for to_node, to_result in result.iteritems():
1273         if to_result.failed or not to_result.data:
1274           logging.error("Copy of file %s to node %s failed",
1275                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1276
1277     finally:
1278       result = self.rpc.call_node_start_master(master, False)
1279       if result.failed or not result.data:
1280         self.LogWarning("Could not re-enable the master role on"
1281                         " the master, please restart manually.")
1282
1283
1284 def _RecursiveCheckIfLVMBased(disk):
1285   """Check if the given disk or its children are lvm-based.
1286
1287   @type disk: L{objects.Disk}
1288   @param disk: the disk to check
1289   @rtype: booleean
1290   @return: boolean indicating whether a LD_LV dev_type was found or not
1291
1292   """
1293   if disk.children:
1294     for chdisk in disk.children:
1295       if _RecursiveCheckIfLVMBased(chdisk):
1296         return True
1297   return disk.dev_type == constants.LD_LV
1298
1299
1300 class LUSetClusterParams(LogicalUnit):
1301   """Change the parameters of the cluster.
1302
1303   """
1304   HPATH = "cluster-modify"
1305   HTYPE = constants.HTYPE_CLUSTER
1306   _OP_REQP = []
1307   REQ_BGL = False
1308
1309   def CheckParameters(self):
1310     """Check parameters
1311
1312     """
1313     if not hasattr(self.op, "candidate_pool_size"):
1314       self.op.candidate_pool_size = None
1315     if self.op.candidate_pool_size is not None:
1316       try:
1317         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1318       except ValueError, err:
1319         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1320                                    str(err))
1321       if self.op.candidate_pool_size < 1:
1322         raise errors.OpPrereqError("At least one master candidate needed")
1323
1324   def ExpandNames(self):
1325     # FIXME: in the future maybe other cluster params won't require checking on
1326     # all nodes to be modified.
1327     self.needed_locks = {
1328       locking.LEVEL_NODE: locking.ALL_SET,
1329     }
1330     self.share_locks[locking.LEVEL_NODE] = 1
1331
1332   def BuildHooksEnv(self):
1333     """Build hooks env.
1334
1335     """
1336     env = {
1337       "OP_TARGET": self.cfg.GetClusterName(),
1338       "NEW_VG_NAME": self.op.vg_name,
1339       }
1340     mn = self.cfg.GetMasterNode()
1341     return env, [mn], [mn]
1342
1343   def CheckPrereq(self):
1344     """Check prerequisites.
1345
1346     This checks whether the given params don't conflict and
1347     if the given volume group is valid.
1348
1349     """
1350     # FIXME: This only works because there is only one parameter that can be
1351     # changed or removed.
1352     if self.op.vg_name is not None and not self.op.vg_name:
1353       instances = self.cfg.GetAllInstancesInfo().values()
1354       for inst in instances:
1355         for disk in inst.disks:
1356           if _RecursiveCheckIfLVMBased(disk):
1357             raise errors.OpPrereqError("Cannot disable lvm storage while"
1358                                        " lvm-based instances exist")
1359
1360     node_list = self.acquired_locks[locking.LEVEL_NODE]
1361
1362     # if vg_name not None, checks given volume group on all nodes
1363     if self.op.vg_name:
1364       vglist = self.rpc.call_vg_list(node_list)
1365       for node in node_list:
1366         if vglist[node].failed:
1367           # ignoring down node
1368           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1369           continue
1370         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1371                                               self.op.vg_name,
1372                                               constants.MIN_VG_SIZE)
1373         if vgstatus:
1374           raise errors.OpPrereqError("Error on node '%s': %s" %
1375                                      (node, vgstatus))
1376
1377     self.cluster = cluster = self.cfg.GetClusterInfo()
1378     # validate beparams changes
1379     if self.op.beparams:
1380       utils.CheckBEParams(self.op.beparams)
1381       self.new_beparams = cluster.FillDict(
1382         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1383
1384     # hypervisor list/parameters
1385     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1386     if self.op.hvparams:
1387       if not isinstance(self.op.hvparams, dict):
1388         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1389       for hv_name, hv_dict in self.op.hvparams.items():
1390         if hv_name not in self.new_hvparams:
1391           self.new_hvparams[hv_name] = hv_dict
1392         else:
1393           self.new_hvparams[hv_name].update(hv_dict)
1394
1395     if self.op.enabled_hypervisors is not None:
1396       self.hv_list = self.op.enabled_hypervisors
1397     else:
1398       self.hv_list = cluster.enabled_hypervisors
1399
1400     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1401       # either the enabled list has changed, or the parameters have, validate
1402       for hv_name, hv_params in self.new_hvparams.items():
1403         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1404             (self.op.enabled_hypervisors and
1405              hv_name in self.op.enabled_hypervisors)):
1406           # either this is a new hypervisor, or its parameters have changed
1407           hv_class = hypervisor.GetHypervisor(hv_name)
1408           hv_class.CheckParameterSyntax(hv_params)
1409           _CheckHVParams(self, node_list, hv_name, hv_params)
1410
1411   def Exec(self, feedback_fn):
1412     """Change the parameters of the cluster.
1413
1414     """
1415     if self.op.vg_name is not None:
1416       if self.op.vg_name != self.cfg.GetVGName():
1417         self.cfg.SetVGName(self.op.vg_name)
1418       else:
1419         feedback_fn("Cluster LVM configuration already in desired"
1420                     " state, not changing")
1421     if self.op.hvparams:
1422       self.cluster.hvparams = self.new_hvparams
1423     if self.op.enabled_hypervisors is not None:
1424       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1425     if self.op.beparams:
1426       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1427     if self.op.candidate_pool_size is not None:
1428       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1429
1430     self.cfg.Update(self.cluster)
1431
1432     # we want to update nodes after the cluster so that if any errors
1433     # happen, we have recorded and saved the cluster info
1434     if self.op.candidate_pool_size is not None:
1435       _AdjustCandidatePool(self)
1436
1437
1438 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1439   """Sleep and poll for an instance's disk to sync.
1440
1441   """
1442   if not instance.disks:
1443     return True
1444
1445   if not oneshot:
1446     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1447
1448   node = instance.primary_node
1449
1450   for dev in instance.disks:
1451     lu.cfg.SetDiskID(dev, node)
1452
1453   retries = 0
1454   while True:
1455     max_time = 0
1456     done = True
1457     cumul_degraded = False
1458     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1459     if rstats.failed or not rstats.data:
1460       lu.LogWarning("Can't get any data from node %s", node)
1461       retries += 1
1462       if retries >= 10:
1463         raise errors.RemoteError("Can't contact node %s for mirror data,"
1464                                  " aborting." % node)
1465       time.sleep(6)
1466       continue
1467     rstats = rstats.data
1468     retries = 0
1469     for i in range(len(rstats)):
1470       mstat = rstats[i]
1471       if mstat is None:
1472         lu.LogWarning("Can't compute data for node %s/%s",
1473                            node, instance.disks[i].iv_name)
1474         continue
1475       # we ignore the ldisk parameter
1476       perc_done, est_time, is_degraded, _ = mstat
1477       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1478       if perc_done is not None:
1479         done = False
1480         if est_time is not None:
1481           rem_time = "%d estimated seconds remaining" % est_time
1482           max_time = est_time
1483         else:
1484           rem_time = "no time estimate"
1485         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1486                         (instance.disks[i].iv_name, perc_done, rem_time))
1487     if done or oneshot:
1488       break
1489
1490     time.sleep(min(60, max_time))
1491
1492   if done:
1493     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1494   return not cumul_degraded
1495
1496
1497 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1498   """Check that mirrors are not degraded.
1499
1500   The ldisk parameter, if True, will change the test from the
1501   is_degraded attribute (which represents overall non-ok status for
1502   the device(s)) to the ldisk (representing the local storage status).
1503
1504   """
1505   lu.cfg.SetDiskID(dev, node)
1506   if ldisk:
1507     idx = 6
1508   else:
1509     idx = 5
1510
1511   result = True
1512   if on_primary or dev.AssembleOnSecondary():
1513     rstats = lu.rpc.call_blockdev_find(node, dev)
1514     if rstats.failed or not rstats.data:
1515       logging.warning("Node %s: disk degraded, not found or node down", node)
1516       result = False
1517     else:
1518       result = result and (not rstats.data[idx])
1519   if dev.children:
1520     for child in dev.children:
1521       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1522
1523   return result
1524
1525
1526 class LUDiagnoseOS(NoHooksLU):
1527   """Logical unit for OS diagnose/query.
1528
1529   """
1530   _OP_REQP = ["output_fields", "names"]
1531   REQ_BGL = False
1532   _FIELDS_STATIC = utils.FieldSet()
1533   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1534
1535   def ExpandNames(self):
1536     if self.op.names:
1537       raise errors.OpPrereqError("Selective OS query not supported")
1538
1539     _CheckOutputFields(static=self._FIELDS_STATIC,
1540                        dynamic=self._FIELDS_DYNAMIC,
1541                        selected=self.op.output_fields)
1542
1543     # Lock all nodes, in shared mode
1544     self.needed_locks = {}
1545     self.share_locks[locking.LEVEL_NODE] = 1
1546     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1547
1548   def CheckPrereq(self):
1549     """Check prerequisites.
1550
1551     """
1552
1553   @staticmethod
1554   def _DiagnoseByOS(node_list, rlist):
1555     """Remaps a per-node return list into an a per-os per-node dictionary
1556
1557     @param node_list: a list with the names of all nodes
1558     @param rlist: a map with node names as keys and OS objects as values
1559
1560     @rtype: dict
1561     @returns: a dictionary with osnames as keys and as value another map, with
1562         nodes as keys and list of OS objects as values, eg::
1563
1564           {"debian-etch": {"node1": [<object>,...],
1565                            "node2": [<object>,]}
1566           }
1567
1568     """
1569     all_os = {}
1570     for node_name, nr in rlist.iteritems():
1571       if nr.failed or not nr.data:
1572         continue
1573       for os_obj in nr.data:
1574         if os_obj.name not in all_os:
1575           # build a list of nodes for this os containing empty lists
1576           # for each node in node_list
1577           all_os[os_obj.name] = {}
1578           for nname in node_list:
1579             all_os[os_obj.name][nname] = []
1580         all_os[os_obj.name][node_name].append(os_obj)
1581     return all_os
1582
1583   def Exec(self, feedback_fn):
1584     """Compute the list of OSes.
1585
1586     """
1587     node_list = self.acquired_locks[locking.LEVEL_NODE]
1588     node_data = self.rpc.call_os_diagnose(node_list)
1589     if node_data == False:
1590       raise errors.OpExecError("Can't gather the list of OSes")
1591     pol = self._DiagnoseByOS(node_list, node_data)
1592     output = []
1593     for os_name, os_data in pol.iteritems():
1594       row = []
1595       for field in self.op.output_fields:
1596         if field == "name":
1597           val = os_name
1598         elif field == "valid":
1599           val = utils.all([osl and osl[0] for osl in os_data.values()])
1600         elif field == "node_status":
1601           val = {}
1602           for node_name, nos_list in os_data.iteritems():
1603             val[node_name] = [(v.status, v.path) for v in nos_list]
1604         else:
1605           raise errors.ParameterError(field)
1606         row.append(val)
1607       output.append(row)
1608
1609     return output
1610
1611
1612 class LURemoveNode(LogicalUnit):
1613   """Logical unit for removing a node.
1614
1615   """
1616   HPATH = "node-remove"
1617   HTYPE = constants.HTYPE_NODE
1618   _OP_REQP = ["node_name"]
1619
1620   def BuildHooksEnv(self):
1621     """Build hooks env.
1622
1623     This doesn't run on the target node in the pre phase as a failed
1624     node would then be impossible to remove.
1625
1626     """
1627     env = {
1628       "OP_TARGET": self.op.node_name,
1629       "NODE_NAME": self.op.node_name,
1630       }
1631     all_nodes = self.cfg.GetNodeList()
1632     all_nodes.remove(self.op.node_name)
1633     return env, all_nodes, all_nodes
1634
1635   def CheckPrereq(self):
1636     """Check prerequisites.
1637
1638     This checks:
1639      - the node exists in the configuration
1640      - it does not have primary or secondary instances
1641      - it's not the master
1642
1643     Any errors are signalled by raising errors.OpPrereqError.
1644
1645     """
1646     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1647     if node is None:
1648       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1649
1650     instance_list = self.cfg.GetInstanceList()
1651
1652     masternode = self.cfg.GetMasterNode()
1653     if node.name == masternode:
1654       raise errors.OpPrereqError("Node is the master node,"
1655                                  " you need to failover first.")
1656
1657     for instance_name in instance_list:
1658       instance = self.cfg.GetInstanceInfo(instance_name)
1659       if node.name == instance.primary_node:
1660         raise errors.OpPrereqError("Instance %s still running on the node,"
1661                                    " please remove first." % instance_name)
1662       if node.name in instance.secondary_nodes:
1663         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1664                                    " please remove first." % instance_name)
1665     self.op.node_name = node.name
1666     self.node = node
1667
1668   def Exec(self, feedback_fn):
1669     """Removes the node from the cluster.
1670
1671     """
1672     node = self.node
1673     logging.info("Stopping the node daemon and removing configs from node %s",
1674                  node.name)
1675
1676     self.context.RemoveNode(node.name)
1677
1678     self.rpc.call_node_leave_cluster(node.name)
1679
1680     # Promote nodes to master candidate as needed
1681     _AdjustCandidatePool(self)
1682
1683
1684 class LUQueryNodes(NoHooksLU):
1685   """Logical unit for querying nodes.
1686
1687   """
1688   _OP_REQP = ["output_fields", "names"]
1689   REQ_BGL = False
1690   _FIELDS_DYNAMIC = utils.FieldSet(
1691     "dtotal", "dfree",
1692     "mtotal", "mnode", "mfree",
1693     "bootid",
1694     "ctotal",
1695     )
1696
1697   _FIELDS_STATIC = utils.FieldSet(
1698     "name", "pinst_cnt", "sinst_cnt",
1699     "pinst_list", "sinst_list",
1700     "pip", "sip", "tags",
1701     "serial_no",
1702     "master_candidate",
1703     "master",
1704     "offline",
1705     )
1706
1707   def ExpandNames(self):
1708     _CheckOutputFields(static=self._FIELDS_STATIC,
1709                        dynamic=self._FIELDS_DYNAMIC,
1710                        selected=self.op.output_fields)
1711
1712     self.needed_locks = {}
1713     self.share_locks[locking.LEVEL_NODE] = 1
1714
1715     if self.op.names:
1716       self.wanted = _GetWantedNodes(self, self.op.names)
1717     else:
1718       self.wanted = locking.ALL_SET
1719
1720     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1721     if self.do_locking:
1722       # if we don't request only static fields, we need to lock the nodes
1723       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1724
1725
1726   def CheckPrereq(self):
1727     """Check prerequisites.
1728
1729     """
1730     # The validation of the node list is done in the _GetWantedNodes,
1731     # if non empty, and if empty, there's no validation to do
1732     pass
1733
1734   def Exec(self, feedback_fn):
1735     """Computes the list of nodes and their attributes.
1736
1737     """
1738     all_info = self.cfg.GetAllNodesInfo()
1739     if self.do_locking:
1740       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1741     elif self.wanted != locking.ALL_SET:
1742       nodenames = self.wanted
1743       missing = set(nodenames).difference(all_info.keys())
1744       if missing:
1745         raise errors.OpExecError(
1746           "Some nodes were removed before retrieving their data: %s" % missing)
1747     else:
1748       nodenames = all_info.keys()
1749
1750     nodenames = utils.NiceSort(nodenames)
1751     nodelist = [all_info[name] for name in nodenames]
1752
1753     # begin data gathering
1754
1755     if self.do_locking:
1756       live_data = {}
1757       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1758                                           self.cfg.GetHypervisorType())
1759       for name in nodenames:
1760         nodeinfo = node_data[name]
1761         if not nodeinfo.failed and nodeinfo.data:
1762           nodeinfo = nodeinfo.data
1763           fn = utils.TryConvert
1764           live_data[name] = {
1765             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1766             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1767             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1768             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1769             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1770             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1771             "bootid": nodeinfo.get('bootid', None),
1772             }
1773         else:
1774           live_data[name] = {}
1775     else:
1776       live_data = dict.fromkeys(nodenames, {})
1777
1778     node_to_primary = dict([(name, set()) for name in nodenames])
1779     node_to_secondary = dict([(name, set()) for name in nodenames])
1780
1781     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1782                              "sinst_cnt", "sinst_list"))
1783     if inst_fields & frozenset(self.op.output_fields):
1784       instancelist = self.cfg.GetInstanceList()
1785
1786       for instance_name in instancelist:
1787         inst = self.cfg.GetInstanceInfo(instance_name)
1788         if inst.primary_node in node_to_primary:
1789           node_to_primary[inst.primary_node].add(inst.name)
1790         for secnode in inst.secondary_nodes:
1791           if secnode in node_to_secondary:
1792             node_to_secondary[secnode].add(inst.name)
1793
1794     master_node = self.cfg.GetMasterNode()
1795
1796     # end data gathering
1797
1798     output = []
1799     for node in nodelist:
1800       node_output = []
1801       for field in self.op.output_fields:
1802         if field == "name":
1803           val = node.name
1804         elif field == "pinst_list":
1805           val = list(node_to_primary[node.name])
1806         elif field == "sinst_list":
1807           val = list(node_to_secondary[node.name])
1808         elif field == "pinst_cnt":
1809           val = len(node_to_primary[node.name])
1810         elif field == "sinst_cnt":
1811           val = len(node_to_secondary[node.name])
1812         elif field == "pip":
1813           val = node.primary_ip
1814         elif field == "sip":
1815           val = node.secondary_ip
1816         elif field == "tags":
1817           val = list(node.GetTags())
1818         elif field == "serial_no":
1819           val = node.serial_no
1820         elif field == "master_candidate":
1821           val = node.master_candidate
1822         elif field == "master":
1823           val = node.name == master_node
1824         elif field == "offline":
1825           val = node.offline
1826         elif self._FIELDS_DYNAMIC.Matches(field):
1827           val = live_data[node.name].get(field, None)
1828         else:
1829           raise errors.ParameterError(field)
1830         node_output.append(val)
1831       output.append(node_output)
1832
1833     return output
1834
1835
1836 class LUQueryNodeVolumes(NoHooksLU):
1837   """Logical unit for getting volumes on node(s).
1838
1839   """
1840   _OP_REQP = ["nodes", "output_fields"]
1841   REQ_BGL = False
1842   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1843   _FIELDS_STATIC = utils.FieldSet("node")
1844
1845   def ExpandNames(self):
1846     _CheckOutputFields(static=self._FIELDS_STATIC,
1847                        dynamic=self._FIELDS_DYNAMIC,
1848                        selected=self.op.output_fields)
1849
1850     self.needed_locks = {}
1851     self.share_locks[locking.LEVEL_NODE] = 1
1852     if not self.op.nodes:
1853       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1854     else:
1855       self.needed_locks[locking.LEVEL_NODE] = \
1856         _GetWantedNodes(self, self.op.nodes)
1857
1858   def CheckPrereq(self):
1859     """Check prerequisites.
1860
1861     This checks that the fields required are valid output fields.
1862
1863     """
1864     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1865
1866   def Exec(self, feedback_fn):
1867     """Computes the list of nodes and their attributes.
1868
1869     """
1870     nodenames = self.nodes
1871     volumes = self.rpc.call_node_volumes(nodenames)
1872
1873     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1874              in self.cfg.GetInstanceList()]
1875
1876     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1877
1878     output = []
1879     for node in nodenames:
1880       if node not in volumes or volumes[node].failed or not volumes[node].data:
1881         continue
1882
1883       node_vols = volumes[node].data[:]
1884       node_vols.sort(key=lambda vol: vol['dev'])
1885
1886       for vol in node_vols:
1887         node_output = []
1888         for field in self.op.output_fields:
1889           if field == "node":
1890             val = node
1891           elif field == "phys":
1892             val = vol['dev']
1893           elif field == "vg":
1894             val = vol['vg']
1895           elif field == "name":
1896             val = vol['name']
1897           elif field == "size":
1898             val = int(float(vol['size']))
1899           elif field == "instance":
1900             for inst in ilist:
1901               if node not in lv_by_node[inst]:
1902                 continue
1903               if vol['name'] in lv_by_node[inst][node]:
1904                 val = inst.name
1905                 break
1906             else:
1907               val = '-'
1908           else:
1909             raise errors.ParameterError(field)
1910           node_output.append(str(val))
1911
1912         output.append(node_output)
1913
1914     return output
1915
1916
1917 class LUAddNode(LogicalUnit):
1918   """Logical unit for adding node to the cluster.
1919
1920   """
1921   HPATH = "node-add"
1922   HTYPE = constants.HTYPE_NODE
1923   _OP_REQP = ["node_name"]
1924
1925   def BuildHooksEnv(self):
1926     """Build hooks env.
1927
1928     This will run on all nodes before, and on all nodes + the new node after.
1929
1930     """
1931     env = {
1932       "OP_TARGET": self.op.node_name,
1933       "NODE_NAME": self.op.node_name,
1934       "NODE_PIP": self.op.primary_ip,
1935       "NODE_SIP": self.op.secondary_ip,
1936       }
1937     nodes_0 = self.cfg.GetNodeList()
1938     nodes_1 = nodes_0 + [self.op.node_name, ]
1939     return env, nodes_0, nodes_1
1940
1941   def CheckPrereq(self):
1942     """Check prerequisites.
1943
1944     This checks:
1945      - the new node is not already in the config
1946      - it is resolvable
1947      - its parameters (single/dual homed) matches the cluster
1948
1949     Any errors are signalled by raising errors.OpPrereqError.
1950
1951     """
1952     node_name = self.op.node_name
1953     cfg = self.cfg
1954
1955     dns_data = utils.HostInfo(node_name)
1956
1957     node = dns_data.name
1958     primary_ip = self.op.primary_ip = dns_data.ip
1959     secondary_ip = getattr(self.op, "secondary_ip", None)
1960     if secondary_ip is None:
1961       secondary_ip = primary_ip
1962     if not utils.IsValidIP(secondary_ip):
1963       raise errors.OpPrereqError("Invalid secondary IP given")
1964     self.op.secondary_ip = secondary_ip
1965
1966     node_list = cfg.GetNodeList()
1967     if not self.op.readd and node in node_list:
1968       raise errors.OpPrereqError("Node %s is already in the configuration" %
1969                                  node)
1970     elif self.op.readd and node not in node_list:
1971       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1972
1973     for existing_node_name in node_list:
1974       existing_node = cfg.GetNodeInfo(existing_node_name)
1975
1976       if self.op.readd and node == existing_node_name:
1977         if (existing_node.primary_ip != primary_ip or
1978             existing_node.secondary_ip != secondary_ip):
1979           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1980                                      " address configuration as before")
1981         continue
1982
1983       if (existing_node.primary_ip == primary_ip or
1984           existing_node.secondary_ip == primary_ip or
1985           existing_node.primary_ip == secondary_ip or
1986           existing_node.secondary_ip == secondary_ip):
1987         raise errors.OpPrereqError("New node ip address(es) conflict with"
1988                                    " existing node %s" % existing_node.name)
1989
1990     # check that the type of the node (single versus dual homed) is the
1991     # same as for the master
1992     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1993     master_singlehomed = myself.secondary_ip == myself.primary_ip
1994     newbie_singlehomed = secondary_ip == primary_ip
1995     if master_singlehomed != newbie_singlehomed:
1996       if master_singlehomed:
1997         raise errors.OpPrereqError("The master has no private ip but the"
1998                                    " new node has one")
1999       else:
2000         raise errors.OpPrereqError("The master has a private ip but the"
2001                                    " new node doesn't have one")
2002
2003     # checks reachablity
2004     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2005       raise errors.OpPrereqError("Node not reachable by ping")
2006
2007     if not newbie_singlehomed:
2008       # check reachability from my secondary ip to newbie's secondary ip
2009       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2010                            source=myself.secondary_ip):
2011         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2012                                    " based ping to noded port")
2013
2014     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2015     mc_now, _ = self.cfg.GetMasterCandidateStats()
2016     master_candidate = mc_now < cp_size
2017
2018     self.new_node = objects.Node(name=node,
2019                                  primary_ip=primary_ip,
2020                                  secondary_ip=secondary_ip,
2021                                  master_candidate=master_candidate,
2022                                  offline=False)
2023
2024   def Exec(self, feedback_fn):
2025     """Adds the new node to the cluster.
2026
2027     """
2028     new_node = self.new_node
2029     node = new_node.name
2030
2031     # check connectivity
2032     result = self.rpc.call_version([node])[node]
2033     result.Raise()
2034     if result.data:
2035       if constants.PROTOCOL_VERSION == result.data:
2036         logging.info("Communication to node %s fine, sw version %s match",
2037                      node, result.data)
2038       else:
2039         raise errors.OpExecError("Version mismatch master version %s,"
2040                                  " node version %s" %
2041                                  (constants.PROTOCOL_VERSION, result.data))
2042     else:
2043       raise errors.OpExecError("Cannot get version from the new node")
2044
2045     # setup ssh on node
2046     logging.info("Copy ssh key to node %s", node)
2047     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2048     keyarray = []
2049     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2050                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2051                 priv_key, pub_key]
2052
2053     for i in keyfiles:
2054       f = open(i, 'r')
2055       try:
2056         keyarray.append(f.read())
2057       finally:
2058         f.close()
2059
2060     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2061                                     keyarray[2],
2062                                     keyarray[3], keyarray[4], keyarray[5])
2063
2064     if result.failed or not result.data:
2065       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2066
2067     # Add node to our /etc/hosts, and add key to known_hosts
2068     utils.AddHostToEtcHosts(new_node.name)
2069
2070     if new_node.secondary_ip != new_node.primary_ip:
2071       result = self.rpc.call_node_has_ip_address(new_node.name,
2072                                                  new_node.secondary_ip)
2073       if result.failed or not result.data:
2074         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2075                                  " you gave (%s). Please fix and re-run this"
2076                                  " command." % new_node.secondary_ip)
2077
2078     node_verify_list = [self.cfg.GetMasterNode()]
2079     node_verify_param = {
2080       'nodelist': [node],
2081       # TODO: do a node-net-test as well?
2082     }
2083
2084     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2085                                        self.cfg.GetClusterName())
2086     for verifier in node_verify_list:
2087       if result[verifier].failed or not result[verifier].data:
2088         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2089                                  " for remote verification" % verifier)
2090       if result[verifier].data['nodelist']:
2091         for failed in result[verifier].data['nodelist']:
2092           feedback_fn("ssh/hostname verification failed %s -> %s" %
2093                       (verifier, result[verifier]['nodelist'][failed]))
2094         raise errors.OpExecError("ssh/hostname verification failed.")
2095
2096     # Distribute updated /etc/hosts and known_hosts to all nodes,
2097     # including the node just added
2098     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2099     dist_nodes = self.cfg.GetNodeList()
2100     if not self.op.readd:
2101       dist_nodes.append(node)
2102     if myself.name in dist_nodes:
2103       dist_nodes.remove(myself.name)
2104
2105     logging.debug("Copying hosts and known_hosts to all nodes")
2106     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2107       result = self.rpc.call_upload_file(dist_nodes, fname)
2108       for to_node, to_result in result.iteritems():
2109         if to_result.failed or not to_result.data:
2110           logging.error("Copy of file %s to node %s failed", fname, to_node)
2111
2112     to_copy = []
2113     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2114       to_copy.append(constants.VNC_PASSWORD_FILE)
2115     for fname in to_copy:
2116       result = self.rpc.call_upload_file([node], fname)
2117       if result[node].failed or not result[node]:
2118         logging.error("Could not copy file %s to node %s", fname, node)
2119
2120     if self.op.readd:
2121       self.context.ReaddNode(new_node)
2122     else:
2123       self.context.AddNode(new_node)
2124
2125
2126 class LUSetNodeParams(LogicalUnit):
2127   """Modifies the parameters of a node.
2128
2129   """
2130   HPATH = "node-modify"
2131   HTYPE = constants.HTYPE_NODE
2132   _OP_REQP = ["node_name"]
2133   REQ_BGL = False
2134
2135   def CheckArguments(self):
2136     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2137     if node_name is None:
2138       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2139     self.op.node_name = node_name
2140     _CheckBooleanOpField(self.op, 'master_candidate')
2141     _CheckBooleanOpField(self.op, 'offline')
2142     if self.op.master_candidate is None and self.op.offline is None:
2143       raise errors.OpPrereqError("Please pass at least one modification")
2144     if self.op.offline == True and self.op.master_candidate == True:
2145       raise errors.OpPrereqError("Can't set the node into offline and"
2146                                  " master_candidate at the same time")
2147
2148   def ExpandNames(self):
2149     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2150
2151   def BuildHooksEnv(self):
2152     """Build hooks env.
2153
2154     This runs on the master node.
2155
2156     """
2157     env = {
2158       "OP_TARGET": self.op.node_name,
2159       "MASTER_CANDIDATE": str(self.op.master_candidate),
2160       "OFFLINE": str(self.op.offline),
2161       }
2162     nl = [self.cfg.GetMasterNode(),
2163           self.op.node_name]
2164     return env, nl, nl
2165
2166   def CheckPrereq(self):
2167     """Check prerequisites.
2168
2169     This only checks the instance list against the existing names.
2170
2171     """
2172     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2173
2174     if ((self.op.master_candidate == False or self.op.offline == True)
2175         and node.master_candidate):
2176       # we will demote the node from master_candidate
2177       if self.op.node_name == self.cfg.GetMasterNode():
2178         raise errors.OpPrereqError("The master node has to be a"
2179                                    " master candidate and online")
2180       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2181       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2182       if num_candidates <= cp_size:
2183         msg = ("Not enough master candidates (desired"
2184                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2185         if self.op.force:
2186           self.LogWarning(msg)
2187         else:
2188           raise errors.OpPrereqError(msg)
2189
2190     if (self.op.master_candidate == True and node.offline and
2191         not self.op.offline == False):
2192       raise errors.OpPrereqError("Can't set an offline node to"
2193                                  " master_candidate")
2194
2195     return
2196
2197   def Exec(self, feedback_fn):
2198     """Modifies a node.
2199
2200     """
2201     node = self.node
2202
2203     result = []
2204
2205     if self.op.offline is not None:
2206       node.offline = self.op.offline
2207       result.append(("offline", str(self.op.offline)))
2208       if self.op.offline == True and node.master_candidate:
2209         node.master_candidate = False
2210         result.append(("master_candidate", "auto-demotion due to offline"))
2211
2212     if self.op.master_candidate is not None:
2213       node.master_candidate = self.op.master_candidate
2214       result.append(("master_candidate", str(self.op.master_candidate)))
2215       if self.op.master_candidate == False:
2216         rrc = self.rpc.call_node_demote_from_mc(node.name)
2217         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2218             or len(rrc.data) != 2):
2219           self.LogWarning("Node rpc error: %s" % rrc.error)
2220         elif not rrc.data[0]:
2221           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2222
2223     # this will trigger configuration file update, if needed
2224     self.cfg.Update(node)
2225     # this will trigger job queue propagation or cleanup
2226     if self.op.node_name != self.cfg.GetMasterNode():
2227       self.context.ReaddNode(node)
2228
2229     return result
2230
2231
2232 class LUQueryClusterInfo(NoHooksLU):
2233   """Query cluster configuration.
2234
2235   """
2236   _OP_REQP = []
2237   REQ_BGL = False
2238
2239   def ExpandNames(self):
2240     self.needed_locks = {}
2241
2242   def CheckPrereq(self):
2243     """No prerequsites needed for this LU.
2244
2245     """
2246     pass
2247
2248   def Exec(self, feedback_fn):
2249     """Return cluster config.
2250
2251     """
2252     cluster = self.cfg.GetClusterInfo()
2253     result = {
2254       "software_version": constants.RELEASE_VERSION,
2255       "protocol_version": constants.PROTOCOL_VERSION,
2256       "config_version": constants.CONFIG_VERSION,
2257       "os_api_version": constants.OS_API_VERSION,
2258       "export_version": constants.EXPORT_VERSION,
2259       "architecture": (platform.architecture()[0], platform.machine()),
2260       "name": cluster.cluster_name,
2261       "master": cluster.master_node,
2262       "default_hypervisor": cluster.default_hypervisor,
2263       "enabled_hypervisors": cluster.enabled_hypervisors,
2264       "hvparams": cluster.hvparams,
2265       "beparams": cluster.beparams,
2266       "candidate_pool_size": cluster.candidate_pool_size,
2267       }
2268
2269     return result
2270
2271
2272 class LUQueryConfigValues(NoHooksLU):
2273   """Return configuration values.
2274
2275   """
2276   _OP_REQP = []
2277   REQ_BGL = False
2278   _FIELDS_DYNAMIC = utils.FieldSet()
2279   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2280
2281   def ExpandNames(self):
2282     self.needed_locks = {}
2283
2284     _CheckOutputFields(static=self._FIELDS_STATIC,
2285                        dynamic=self._FIELDS_DYNAMIC,
2286                        selected=self.op.output_fields)
2287
2288   def CheckPrereq(self):
2289     """No prerequisites.
2290
2291     """
2292     pass
2293
2294   def Exec(self, feedback_fn):
2295     """Dump a representation of the cluster config to the standard output.
2296
2297     """
2298     values = []
2299     for field in self.op.output_fields:
2300       if field == "cluster_name":
2301         entry = self.cfg.GetClusterName()
2302       elif field == "master_node":
2303         entry = self.cfg.GetMasterNode()
2304       elif field == "drain_flag":
2305         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2306       else:
2307         raise errors.ParameterError(field)
2308       values.append(entry)
2309     return values
2310
2311
2312 class LUActivateInstanceDisks(NoHooksLU):
2313   """Bring up an instance's disks.
2314
2315   """
2316   _OP_REQP = ["instance_name"]
2317   REQ_BGL = False
2318
2319   def ExpandNames(self):
2320     self._ExpandAndLockInstance()
2321     self.needed_locks[locking.LEVEL_NODE] = []
2322     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2323
2324   def DeclareLocks(self, level):
2325     if level == locking.LEVEL_NODE:
2326       self._LockInstancesNodes()
2327
2328   def CheckPrereq(self):
2329     """Check prerequisites.
2330
2331     This checks that the instance is in the cluster.
2332
2333     """
2334     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2335     assert self.instance is not None, \
2336       "Cannot retrieve locked instance %s" % self.op.instance_name
2337     _CheckNodeOnline(self, self.instance.primary_node)
2338
2339   def Exec(self, feedback_fn):
2340     """Activate the disks.
2341
2342     """
2343     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2344     if not disks_ok:
2345       raise errors.OpExecError("Cannot activate block devices")
2346
2347     return disks_info
2348
2349
2350 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2351   """Prepare the block devices for an instance.
2352
2353   This sets up the block devices on all nodes.
2354
2355   @type lu: L{LogicalUnit}
2356   @param lu: the logical unit on whose behalf we execute
2357   @type instance: L{objects.Instance}
2358   @param instance: the instance for whose disks we assemble
2359   @type ignore_secondaries: boolean
2360   @param ignore_secondaries: if true, errors on secondary nodes
2361       won't result in an error return from the function
2362   @return: False if the operation failed, otherwise a list of
2363       (host, instance_visible_name, node_visible_name)
2364       with the mapping from node devices to instance devices
2365
2366   """
2367   device_info = []
2368   disks_ok = True
2369   iname = instance.name
2370   # With the two passes mechanism we try to reduce the window of
2371   # opportunity for the race condition of switching DRBD to primary
2372   # before handshaking occured, but we do not eliminate it
2373
2374   # The proper fix would be to wait (with some limits) until the
2375   # connection has been made and drbd transitions from WFConnection
2376   # into any other network-connected state (Connected, SyncTarget,
2377   # SyncSource, etc.)
2378
2379   # 1st pass, assemble on all nodes in secondary mode
2380   for inst_disk in instance.disks:
2381     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2382       lu.cfg.SetDiskID(node_disk, node)
2383       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2384       if result.failed or not result:
2385         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2386                            " (is_primary=False, pass=1)",
2387                            inst_disk.iv_name, node)
2388         if not ignore_secondaries:
2389           disks_ok = False
2390
2391   # FIXME: race condition on drbd migration to primary
2392
2393   # 2nd pass, do only the primary node
2394   for inst_disk in instance.disks:
2395     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2396       if node != instance.primary_node:
2397         continue
2398       lu.cfg.SetDiskID(node_disk, node)
2399       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2400       if result.failed or not result:
2401         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2402                            " (is_primary=True, pass=2)",
2403                            inst_disk.iv_name, node)
2404         disks_ok = False
2405     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2406
2407   # leave the disks configured for the primary node
2408   # this is a workaround that would be fixed better by
2409   # improving the logical/physical id handling
2410   for disk in instance.disks:
2411     lu.cfg.SetDiskID(disk, instance.primary_node)
2412
2413   return disks_ok, device_info
2414
2415
2416 def _StartInstanceDisks(lu, instance, force):
2417   """Start the disks of an instance.
2418
2419   """
2420   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2421                                            ignore_secondaries=force)
2422   if not disks_ok:
2423     _ShutdownInstanceDisks(lu, instance)
2424     if force is not None and not force:
2425       lu.proc.LogWarning("", hint="If the message above refers to a"
2426                          " secondary node,"
2427                          " you can retry the operation using '--force'.")
2428     raise errors.OpExecError("Disk consistency error")
2429
2430
2431 class LUDeactivateInstanceDisks(NoHooksLU):
2432   """Shutdown an instance's disks.
2433
2434   """
2435   _OP_REQP = ["instance_name"]
2436   REQ_BGL = False
2437
2438   def ExpandNames(self):
2439     self._ExpandAndLockInstance()
2440     self.needed_locks[locking.LEVEL_NODE] = []
2441     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2442
2443   def DeclareLocks(self, level):
2444     if level == locking.LEVEL_NODE:
2445       self._LockInstancesNodes()
2446
2447   def CheckPrereq(self):
2448     """Check prerequisites.
2449
2450     This checks that the instance is in the cluster.
2451
2452     """
2453     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2454     assert self.instance is not None, \
2455       "Cannot retrieve locked instance %s" % self.op.instance_name
2456
2457   def Exec(self, feedback_fn):
2458     """Deactivate the disks
2459
2460     """
2461     instance = self.instance
2462     _SafeShutdownInstanceDisks(self, instance)
2463
2464
2465 def _SafeShutdownInstanceDisks(lu, instance):
2466   """Shutdown block devices of an instance.
2467
2468   This function checks if an instance is running, before calling
2469   _ShutdownInstanceDisks.
2470
2471   """
2472   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2473                                       [instance.hypervisor])
2474   ins_l = ins_l[instance.primary_node]
2475   if ins_l.failed or not isinstance(ins_l.data, list):
2476     raise errors.OpExecError("Can't contact node '%s'" %
2477                              instance.primary_node)
2478
2479   if instance.name in ins_l.data:
2480     raise errors.OpExecError("Instance is running, can't shutdown"
2481                              " block devices.")
2482
2483   _ShutdownInstanceDisks(lu, instance)
2484
2485
2486 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2487   """Shutdown block devices of an instance.
2488
2489   This does the shutdown on all nodes of the instance.
2490
2491   If the ignore_primary is false, errors on the primary node are
2492   ignored.
2493
2494   """
2495   result = True
2496   for disk in instance.disks:
2497     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2498       lu.cfg.SetDiskID(top_disk, node)
2499       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2500       if result.failed or not result.data:
2501         logging.error("Could not shutdown block device %s on node %s",
2502                       disk.iv_name, node)
2503         if not ignore_primary or node != instance.primary_node:
2504           result = False
2505   return result
2506
2507
2508 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2509   """Checks if a node has enough free memory.
2510
2511   This function check if a given node has the needed amount of free
2512   memory. In case the node has less memory or we cannot get the
2513   information from the node, this function raise an OpPrereqError
2514   exception.
2515
2516   @type lu: C{LogicalUnit}
2517   @param lu: a logical unit from which we get configuration data
2518   @type node: C{str}
2519   @param node: the node to check
2520   @type reason: C{str}
2521   @param reason: string to use in the error message
2522   @type requested: C{int}
2523   @param requested: the amount of memory in MiB to check for
2524   @type hypervisor_name: C{str}
2525   @param hypervisor_name: the hypervisor to ask for memory stats
2526   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2527       we cannot check the node
2528
2529   """
2530   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2531   nodeinfo[node].Raise()
2532   free_mem = nodeinfo[node].data.get('memory_free')
2533   if not isinstance(free_mem, int):
2534     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2535                              " was '%s'" % (node, free_mem))
2536   if requested > free_mem:
2537     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2538                              " needed %s MiB, available %s MiB" %
2539                              (node, reason, requested, free_mem))
2540
2541
2542 class LUStartupInstance(LogicalUnit):
2543   """Starts an instance.
2544
2545   """
2546   HPATH = "instance-start"
2547   HTYPE = constants.HTYPE_INSTANCE
2548   _OP_REQP = ["instance_name", "force"]
2549   REQ_BGL = False
2550
2551   def ExpandNames(self):
2552     self._ExpandAndLockInstance()
2553
2554   def BuildHooksEnv(self):
2555     """Build hooks env.
2556
2557     This runs on master, primary and secondary nodes of the instance.
2558
2559     """
2560     env = {
2561       "FORCE": self.op.force,
2562       }
2563     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2564     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2565           list(self.instance.secondary_nodes))
2566     return env, nl, nl
2567
2568   def CheckPrereq(self):
2569     """Check prerequisites.
2570
2571     This checks that the instance is in the cluster.
2572
2573     """
2574     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2575     assert self.instance is not None, \
2576       "Cannot retrieve locked instance %s" % self.op.instance_name
2577
2578     _CheckNodeOnline(self, instance.primary_node)
2579
2580     bep = self.cfg.GetClusterInfo().FillBE(instance)
2581     # check bridges existance
2582     _CheckInstanceBridgesExist(self, instance)
2583
2584     _CheckNodeFreeMemory(self, instance.primary_node,
2585                          "starting instance %s" % instance.name,
2586                          bep[constants.BE_MEMORY], instance.hypervisor)
2587
2588   def Exec(self, feedback_fn):
2589     """Start the instance.
2590
2591     """
2592     instance = self.instance
2593     force = self.op.force
2594     extra_args = getattr(self.op, "extra_args", "")
2595
2596     self.cfg.MarkInstanceUp(instance.name)
2597
2598     node_current = instance.primary_node
2599
2600     _StartInstanceDisks(self, instance, force)
2601
2602     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2603     if result.failed or not result.data:
2604       _ShutdownInstanceDisks(self, instance)
2605       raise errors.OpExecError("Could not start instance")
2606
2607
2608 class LURebootInstance(LogicalUnit):
2609   """Reboot an instance.
2610
2611   """
2612   HPATH = "instance-reboot"
2613   HTYPE = constants.HTYPE_INSTANCE
2614   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2615   REQ_BGL = False
2616
2617   def ExpandNames(self):
2618     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2619                                    constants.INSTANCE_REBOOT_HARD,
2620                                    constants.INSTANCE_REBOOT_FULL]:
2621       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2622                                   (constants.INSTANCE_REBOOT_SOFT,
2623                                    constants.INSTANCE_REBOOT_HARD,
2624                                    constants.INSTANCE_REBOOT_FULL))
2625     self._ExpandAndLockInstance()
2626
2627   def BuildHooksEnv(self):
2628     """Build hooks env.
2629
2630     This runs on master, primary and secondary nodes of the instance.
2631
2632     """
2633     env = {
2634       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2635       }
2636     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2637     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2638           list(self.instance.secondary_nodes))
2639     return env, nl, nl
2640
2641   def CheckPrereq(self):
2642     """Check prerequisites.
2643
2644     This checks that the instance is in the cluster.
2645
2646     """
2647     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2648     assert self.instance is not None, \
2649       "Cannot retrieve locked instance %s" % self.op.instance_name
2650
2651     _CheckNodeOnline(self, instance.primary_node)
2652
2653     # check bridges existance
2654     _CheckInstanceBridgesExist(self, instance)
2655
2656   def Exec(self, feedback_fn):
2657     """Reboot the instance.
2658
2659     """
2660     instance = self.instance
2661     ignore_secondaries = self.op.ignore_secondaries
2662     reboot_type = self.op.reboot_type
2663     extra_args = getattr(self.op, "extra_args", "")
2664
2665     node_current = instance.primary_node
2666
2667     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2668                        constants.INSTANCE_REBOOT_HARD]:
2669       result = self.rpc.call_instance_reboot(node_current, instance,
2670                                              reboot_type, extra_args)
2671       if result.failed or not result.data:
2672         raise errors.OpExecError("Could not reboot instance")
2673     else:
2674       if not self.rpc.call_instance_shutdown(node_current, instance):
2675         raise errors.OpExecError("could not shutdown instance for full reboot")
2676       _ShutdownInstanceDisks(self, instance)
2677       _StartInstanceDisks(self, instance, ignore_secondaries)
2678       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2679       if result.failed or not result.data:
2680         _ShutdownInstanceDisks(self, instance)
2681         raise errors.OpExecError("Could not start instance for full reboot")
2682
2683     self.cfg.MarkInstanceUp(instance.name)
2684
2685
2686 class LUShutdownInstance(LogicalUnit):
2687   """Shutdown an instance.
2688
2689   """
2690   HPATH = "instance-stop"
2691   HTYPE = constants.HTYPE_INSTANCE
2692   _OP_REQP = ["instance_name"]
2693   REQ_BGL = False
2694
2695   def ExpandNames(self):
2696     self._ExpandAndLockInstance()
2697
2698   def BuildHooksEnv(self):
2699     """Build hooks env.
2700
2701     This runs on master, primary and secondary nodes of the instance.
2702
2703     """
2704     env = _BuildInstanceHookEnvByObject(self, self.instance)
2705     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2706           list(self.instance.secondary_nodes))
2707     return env, nl, nl
2708
2709   def CheckPrereq(self):
2710     """Check prerequisites.
2711
2712     This checks that the instance is in the cluster.
2713
2714     """
2715     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716     assert self.instance is not None, \
2717       "Cannot retrieve locked instance %s" % self.op.instance_name
2718     _CheckNodeOnline(self, self.instance.primary_node)
2719
2720   def Exec(self, feedback_fn):
2721     """Shutdown the instance.
2722
2723     """
2724     instance = self.instance
2725     node_current = instance.primary_node
2726     self.cfg.MarkInstanceDown(instance.name)
2727     result = self.rpc.call_instance_shutdown(node_current, instance)
2728     if result.failed or not result.data:
2729       self.proc.LogWarning("Could not shutdown instance")
2730
2731     _ShutdownInstanceDisks(self, instance)
2732
2733
2734 class LUReinstallInstance(LogicalUnit):
2735   """Reinstall an instance.
2736
2737   """
2738   HPATH = "instance-reinstall"
2739   HTYPE = constants.HTYPE_INSTANCE
2740   _OP_REQP = ["instance_name"]
2741   REQ_BGL = False
2742
2743   def ExpandNames(self):
2744     self._ExpandAndLockInstance()
2745
2746   def BuildHooksEnv(self):
2747     """Build hooks env.
2748
2749     This runs on master, primary and secondary nodes of the instance.
2750
2751     """
2752     env = _BuildInstanceHookEnvByObject(self, self.instance)
2753     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2754           list(self.instance.secondary_nodes))
2755     return env, nl, nl
2756
2757   def CheckPrereq(self):
2758     """Check prerequisites.
2759
2760     This checks that the instance is in the cluster and is not running.
2761
2762     """
2763     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2764     assert instance is not None, \
2765       "Cannot retrieve locked instance %s" % self.op.instance_name
2766     _CheckNodeOnline(self, instance.primary_node)
2767
2768     if instance.disk_template == constants.DT_DISKLESS:
2769       raise errors.OpPrereqError("Instance '%s' has no disks" %
2770                                  self.op.instance_name)
2771     if instance.status != "down":
2772       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2773                                  self.op.instance_name)
2774     remote_info = self.rpc.call_instance_info(instance.primary_node,
2775                                               instance.name,
2776                                               instance.hypervisor)
2777     if remote_info.failed or remote_info.data:
2778       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2779                                  (self.op.instance_name,
2780                                   instance.primary_node))
2781
2782     self.op.os_type = getattr(self.op, "os_type", None)
2783     if self.op.os_type is not None:
2784       # OS verification
2785       pnode = self.cfg.GetNodeInfo(
2786         self.cfg.ExpandNodeName(instance.primary_node))
2787       if pnode is None:
2788         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2789                                    self.op.pnode)
2790       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2791       result.Raise()
2792       if not isinstance(result.data, objects.OS):
2793         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2794                                    " primary node"  % self.op.os_type)
2795
2796     self.instance = instance
2797
2798   def Exec(self, feedback_fn):
2799     """Reinstall the instance.
2800
2801     """
2802     inst = self.instance
2803
2804     if self.op.os_type is not None:
2805       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2806       inst.os = self.op.os_type
2807       self.cfg.Update(inst)
2808
2809     _StartInstanceDisks(self, inst, None)
2810     try:
2811       feedback_fn("Running the instance OS create scripts...")
2812       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2813       result.Raise()
2814       if not result.data:
2815         raise errors.OpExecError("Could not install OS for instance %s"
2816                                  " on node %s" %
2817                                  (inst.name, inst.primary_node))
2818     finally:
2819       _ShutdownInstanceDisks(self, inst)
2820
2821
2822 class LURenameInstance(LogicalUnit):
2823   """Rename an instance.
2824
2825   """
2826   HPATH = "instance-rename"
2827   HTYPE = constants.HTYPE_INSTANCE
2828   _OP_REQP = ["instance_name", "new_name"]
2829
2830   def BuildHooksEnv(self):
2831     """Build hooks env.
2832
2833     This runs on master, primary and secondary nodes of the instance.
2834
2835     """
2836     env = _BuildInstanceHookEnvByObject(self, self.instance)
2837     env["INSTANCE_NEW_NAME"] = self.op.new_name
2838     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2839           list(self.instance.secondary_nodes))
2840     return env, nl, nl
2841
2842   def CheckPrereq(self):
2843     """Check prerequisites.
2844
2845     This checks that the instance is in the cluster and is not running.
2846
2847     """
2848     instance = self.cfg.GetInstanceInfo(
2849       self.cfg.ExpandInstanceName(self.op.instance_name))
2850     if instance is None:
2851       raise errors.OpPrereqError("Instance '%s' not known" %
2852                                  self.op.instance_name)
2853     _CheckNodeOnline(self, instance.primary_node)
2854
2855     if instance.status != "down":
2856       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2857                                  self.op.instance_name)
2858     remote_info = self.rpc.call_instance_info(instance.primary_node,
2859                                               instance.name,
2860                                               instance.hypervisor)
2861     remote_info.Raise()
2862     if remote_info.data:
2863       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2864                                  (self.op.instance_name,
2865                                   instance.primary_node))
2866     self.instance = instance
2867
2868     # new name verification
2869     name_info = utils.HostInfo(self.op.new_name)
2870
2871     self.op.new_name = new_name = name_info.name
2872     instance_list = self.cfg.GetInstanceList()
2873     if new_name in instance_list:
2874       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2875                                  new_name)
2876
2877     if not getattr(self.op, "ignore_ip", False):
2878       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2879         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2880                                    (name_info.ip, new_name))
2881
2882
2883   def Exec(self, feedback_fn):
2884     """Reinstall the instance.
2885
2886     """
2887     inst = self.instance
2888     old_name = inst.name
2889
2890     if inst.disk_template == constants.DT_FILE:
2891       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2892
2893     self.cfg.RenameInstance(inst.name, self.op.new_name)
2894     # Change the instance lock. This is definitely safe while we hold the BGL
2895     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2896     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2897
2898     # re-read the instance from the configuration after rename
2899     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2900
2901     if inst.disk_template == constants.DT_FILE:
2902       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2903       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2904                                                      old_file_storage_dir,
2905                                                      new_file_storage_dir)
2906       result.Raise()
2907       if not result.data:
2908         raise errors.OpExecError("Could not connect to node '%s' to rename"
2909                                  " directory '%s' to '%s' (but the instance"
2910                                  " has been renamed in Ganeti)" % (
2911                                  inst.primary_node, old_file_storage_dir,
2912                                  new_file_storage_dir))
2913
2914       if not result.data[0]:
2915         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2916                                  " (but the instance has been renamed in"
2917                                  " Ganeti)" % (old_file_storage_dir,
2918                                                new_file_storage_dir))
2919
2920     _StartInstanceDisks(self, inst, None)
2921     try:
2922       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2923                                                  old_name)
2924       if result.failed or not result.data:
2925         msg = ("Could not run OS rename script for instance %s on node %s"
2926                " (but the instance has been renamed in Ganeti)" %
2927                (inst.name, inst.primary_node))
2928         self.proc.LogWarning(msg)
2929     finally:
2930       _ShutdownInstanceDisks(self, inst)
2931
2932
2933 class LURemoveInstance(LogicalUnit):
2934   """Remove an instance.
2935
2936   """
2937   HPATH = "instance-remove"
2938   HTYPE = constants.HTYPE_INSTANCE
2939   _OP_REQP = ["instance_name", "ignore_failures"]
2940   REQ_BGL = False
2941
2942   def ExpandNames(self):
2943     self._ExpandAndLockInstance()
2944     self.needed_locks[locking.LEVEL_NODE] = []
2945     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2946
2947   def DeclareLocks(self, level):
2948     if level == locking.LEVEL_NODE:
2949       self._LockInstancesNodes()
2950
2951   def BuildHooksEnv(self):
2952     """Build hooks env.
2953
2954     This runs on master, primary and secondary nodes of the instance.
2955
2956     """
2957     env = _BuildInstanceHookEnvByObject(self, self.instance)
2958     nl = [self.cfg.GetMasterNode()]
2959     return env, nl, nl
2960
2961   def CheckPrereq(self):
2962     """Check prerequisites.
2963
2964     This checks that the instance is in the cluster.
2965
2966     """
2967     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2968     assert self.instance is not None, \
2969       "Cannot retrieve locked instance %s" % self.op.instance_name
2970
2971   def Exec(self, feedback_fn):
2972     """Remove the instance.
2973
2974     """
2975     instance = self.instance
2976     logging.info("Shutting down instance %s on node %s",
2977                  instance.name, instance.primary_node)
2978
2979     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2980     if result.failed or not result.data:
2981       if self.op.ignore_failures:
2982         feedback_fn("Warning: can't shutdown instance")
2983       else:
2984         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2985                                  (instance.name, instance.primary_node))
2986
2987     logging.info("Removing block devices for instance %s", instance.name)
2988
2989     if not _RemoveDisks(self, instance):
2990       if self.op.ignore_failures:
2991         feedback_fn("Warning: can't remove instance's disks")
2992       else:
2993         raise errors.OpExecError("Can't remove instance's disks")
2994
2995     logging.info("Removing instance %s out of cluster config", instance.name)
2996
2997     self.cfg.RemoveInstance(instance.name)
2998     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2999
3000
3001 class LUQueryInstances(NoHooksLU):
3002   """Logical unit for querying instances.
3003
3004   """
3005   _OP_REQP = ["output_fields", "names"]
3006   REQ_BGL = False
3007   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3008                                     "admin_state", "admin_ram",
3009                                     "disk_template", "ip", "mac", "bridge",
3010                                     "sda_size", "sdb_size", "vcpus", "tags",
3011                                     "network_port", "beparams",
3012                                     "(disk).(size)/([0-9]+)",
3013                                     "(disk).(sizes)",
3014                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3015                                     "(nic).(macs|ips|bridges)",
3016                                     "(disk|nic).(count)",
3017                                     "serial_no", "hypervisor", "hvparams",] +
3018                                   ["hv/%s" % name
3019                                    for name in constants.HVS_PARAMETERS] +
3020                                   ["be/%s" % name
3021                                    for name in constants.BES_PARAMETERS])
3022   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3023
3024
3025   def ExpandNames(self):
3026     _CheckOutputFields(static=self._FIELDS_STATIC,
3027                        dynamic=self._FIELDS_DYNAMIC,
3028                        selected=self.op.output_fields)
3029
3030     self.needed_locks = {}
3031     self.share_locks[locking.LEVEL_INSTANCE] = 1
3032     self.share_locks[locking.LEVEL_NODE] = 1
3033
3034     if self.op.names:
3035       self.wanted = _GetWantedInstances(self, self.op.names)
3036     else:
3037       self.wanted = locking.ALL_SET
3038
3039     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3040     if self.do_locking:
3041       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3042       self.needed_locks[locking.LEVEL_NODE] = []
3043       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3044
3045   def DeclareLocks(self, level):
3046     if level == locking.LEVEL_NODE and self.do_locking:
3047       self._LockInstancesNodes()
3048
3049   def CheckPrereq(self):
3050     """Check prerequisites.
3051
3052     """
3053     pass
3054
3055   def Exec(self, feedback_fn):
3056     """Computes the list of nodes and their attributes.
3057
3058     """
3059     all_info = self.cfg.GetAllInstancesInfo()
3060     if self.do_locking:
3061       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3062     elif self.wanted != locking.ALL_SET:
3063       instance_names = self.wanted
3064       missing = set(instance_names).difference(all_info.keys())
3065       if missing:
3066         raise errors.OpExecError(
3067           "Some instances were removed before retrieving their data: %s"
3068           % missing)
3069     else:
3070       instance_names = all_info.keys()
3071
3072     instance_names = utils.NiceSort(instance_names)
3073     instance_list = [all_info[iname] for iname in instance_names]
3074
3075     # begin data gathering
3076
3077     nodes = frozenset([inst.primary_node for inst in instance_list])
3078     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3079
3080     bad_nodes = []
3081     off_nodes = []
3082     if self.do_locking:
3083       live_data = {}
3084       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3085       for name in nodes:
3086         result = node_data[name]
3087         if result.offline:
3088           # offline nodes will be in both lists
3089           off_nodes.append(name)
3090         if result.failed:
3091           bad_nodes.append(name)
3092         else:
3093           if result.data:
3094             live_data.update(result.data)
3095             # else no instance is alive
3096     else:
3097       live_data = dict([(name, {}) for name in instance_names])
3098
3099     # end data gathering
3100
3101     HVPREFIX = "hv/"
3102     BEPREFIX = "be/"
3103     output = []
3104     for instance in instance_list:
3105       iout = []
3106       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3107       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3108       for field in self.op.output_fields:
3109         st_match = self._FIELDS_STATIC.Matches(field)
3110         if field == "name":
3111           val = instance.name
3112         elif field == "os":
3113           val = instance.os
3114         elif field == "pnode":
3115           val = instance.primary_node
3116         elif field == "snodes":
3117           val = list(instance.secondary_nodes)
3118         elif field == "admin_state":
3119           val = (instance.status != "down")
3120         elif field == "oper_state":
3121           if instance.primary_node in bad_nodes:
3122             val = None
3123           else:
3124             val = bool(live_data.get(instance.name))
3125         elif field == "status":
3126           if instance.primary_node in off_nodes:
3127             val = "ERROR_nodeoffline"
3128           elif instance.primary_node in bad_nodes:
3129             val = "ERROR_nodedown"
3130           else:
3131             running = bool(live_data.get(instance.name))
3132             if running:
3133               if instance.status != "down":
3134                 val = "running"
3135               else:
3136                 val = "ERROR_up"
3137             else:
3138               if instance.status != "down":
3139                 val = "ERROR_down"
3140               else:
3141                 val = "ADMIN_down"
3142         elif field == "oper_ram":
3143           if instance.primary_node in bad_nodes:
3144             val = None
3145           elif instance.name in live_data:
3146             val = live_data[instance.name].get("memory", "?")
3147           else:
3148             val = "-"
3149         elif field == "disk_template":
3150           val = instance.disk_template
3151         elif field == "ip":
3152           val = instance.nics[0].ip
3153         elif field == "bridge":
3154           val = instance.nics[0].bridge
3155         elif field == "mac":
3156           val = instance.nics[0].mac
3157         elif field == "sda_size" or field == "sdb_size":
3158           idx = ord(field[2]) - ord('a')
3159           try:
3160             val = instance.FindDisk(idx).size
3161           except errors.OpPrereqError:
3162             val = None
3163         elif field == "tags":
3164           val = list(instance.GetTags())
3165         elif field == "serial_no":
3166           val = instance.serial_no
3167         elif field == "network_port":
3168           val = instance.network_port
3169         elif field == "hypervisor":
3170           val = instance.hypervisor
3171         elif field == "hvparams":
3172           val = i_hv
3173         elif (field.startswith(HVPREFIX) and
3174               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3175           val = i_hv.get(field[len(HVPREFIX):], None)
3176         elif field == "beparams":
3177           val = i_be
3178         elif (field.startswith(BEPREFIX) and
3179               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3180           val = i_be.get(field[len(BEPREFIX):], None)
3181         elif st_match and st_match.groups():
3182           # matches a variable list
3183           st_groups = st_match.groups()
3184           if st_groups and st_groups[0] == "disk":
3185             if st_groups[1] == "count":
3186               val = len(instance.disks)
3187             elif st_groups[1] == "sizes":
3188               val = [disk.size for disk in instance.disks]
3189             elif st_groups[1] == "size":
3190               try:
3191                 val = instance.FindDisk(st_groups[2]).size
3192               except errors.OpPrereqError:
3193                 val = None
3194             else:
3195               assert False, "Unhandled disk parameter"
3196           elif st_groups[0] == "nic":
3197             if st_groups[1] == "count":
3198               val = len(instance.nics)
3199             elif st_groups[1] == "macs":
3200               val = [nic.mac for nic in instance.nics]
3201             elif st_groups[1] == "ips":
3202               val = [nic.ip for nic in instance.nics]
3203             elif st_groups[1] == "bridges":
3204               val = [nic.bridge for nic in instance.nics]
3205             else:
3206               # index-based item
3207               nic_idx = int(st_groups[2])
3208               if nic_idx >= len(instance.nics):
3209                 val = None
3210               else:
3211                 if st_groups[1] == "mac":
3212                   val = instance.nics[nic_idx].mac
3213                 elif st_groups[1] == "ip":
3214                   val = instance.nics[nic_idx].ip
3215                 elif st_groups[1] == "bridge":
3216                   val = instance.nics[nic_idx].bridge
3217                 else:
3218                   assert False, "Unhandled NIC parameter"
3219           else:
3220             assert False, "Unhandled variable parameter"
3221         else:
3222           raise errors.ParameterError(field)
3223         iout.append(val)
3224       output.append(iout)
3225
3226     return output
3227
3228
3229 class LUFailoverInstance(LogicalUnit):
3230   """Failover an instance.
3231
3232   """
3233   HPATH = "instance-failover"
3234   HTYPE = constants.HTYPE_INSTANCE
3235   _OP_REQP = ["instance_name", "ignore_consistency"]
3236   REQ_BGL = False
3237
3238   def ExpandNames(self):
3239     self._ExpandAndLockInstance()
3240     self.needed_locks[locking.LEVEL_NODE] = []
3241     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3242
3243   def DeclareLocks(self, level):
3244     if level == locking.LEVEL_NODE:
3245       self._LockInstancesNodes()
3246
3247   def BuildHooksEnv(self):
3248     """Build hooks env.
3249
3250     This runs on master, primary and secondary nodes of the instance.
3251
3252     """
3253     env = {
3254       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3255       }
3256     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3257     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3258     return env, nl, nl
3259
3260   def CheckPrereq(self):
3261     """Check prerequisites.
3262
3263     This checks that the instance is in the cluster.
3264
3265     """
3266     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3267     assert self.instance is not None, \
3268       "Cannot retrieve locked instance %s" % self.op.instance_name
3269
3270     bep = self.cfg.GetClusterInfo().FillBE(instance)
3271     if instance.disk_template not in constants.DTS_NET_MIRROR:
3272       raise errors.OpPrereqError("Instance's disk layout is not"
3273                                  " network mirrored, cannot failover.")
3274
3275     secondary_nodes = instance.secondary_nodes
3276     if not secondary_nodes:
3277       raise errors.ProgrammerError("no secondary node but using "
3278                                    "a mirrored disk template")
3279
3280     target_node = secondary_nodes[0]
3281     _CheckNodeOnline(self, target_node)
3282     # check memory requirements on the secondary node
3283     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3284                          instance.name, bep[constants.BE_MEMORY],
3285                          instance.hypervisor)
3286
3287     # check bridge existance
3288     brlist = [nic.bridge for nic in instance.nics]
3289     result = self.rpc.call_bridges_exist(target_node, brlist)
3290     result.Raise()
3291     if not result.data:
3292       raise errors.OpPrereqError("One or more target bridges %s does not"
3293                                  " exist on destination node '%s'" %
3294                                  (brlist, target_node))
3295
3296   def Exec(self, feedback_fn):
3297     """Failover an instance.
3298
3299     The failover is done by shutting it down on its present node and
3300     starting it on the secondary.
3301
3302     """
3303     instance = self.instance
3304
3305     source_node = instance.primary_node
3306     target_node = instance.secondary_nodes[0]
3307
3308     feedback_fn("* checking disk consistency between source and target")
3309     for dev in instance.disks:
3310       # for drbd, these are drbd over lvm
3311       if not _CheckDiskConsistency(self, dev, target_node, False):
3312         if instance.status == "up" and not self.op.ignore_consistency:
3313           raise errors.OpExecError("Disk %s is degraded on target node,"
3314                                    " aborting failover." % dev.iv_name)
3315
3316     feedback_fn("* shutting down instance on source node")
3317     logging.info("Shutting down instance %s on node %s",
3318                  instance.name, source_node)
3319
3320     result = self.rpc.call_instance_shutdown(source_node, instance)
3321     if result.failed or not result.data:
3322       if self.op.ignore_consistency:
3323         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3324                              " Proceeding"
3325                              " anyway. Please make sure node %s is down",
3326                              instance.name, source_node, source_node)
3327       else:
3328         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3329                                  (instance.name, source_node))
3330
3331     feedback_fn("* deactivating the instance's disks on source node")
3332     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3333       raise errors.OpExecError("Can't shut down the instance's disks.")
3334
3335     instance.primary_node = target_node
3336     # distribute new instance config to the other nodes
3337     self.cfg.Update(instance)
3338
3339     # Only start the instance if it's marked as up
3340     if instance.status == "up":
3341       feedback_fn("* activating the instance's disks on target node")
3342       logging.info("Starting instance %s on node %s",
3343                    instance.name, target_node)
3344
3345       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3346                                                ignore_secondaries=True)
3347       if not disks_ok:
3348         _ShutdownInstanceDisks(self, instance)
3349         raise errors.OpExecError("Can't activate the instance's disks")
3350
3351       feedback_fn("* starting the instance on the target node")
3352       result = self.rpc.call_instance_start(target_node, instance, None)
3353       if result.failed or not result.data:
3354         _ShutdownInstanceDisks(self, instance)
3355         raise errors.OpExecError("Could not start instance %s on node %s." %
3356                                  (instance.name, target_node))
3357
3358
3359 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3360   """Create a tree of block devices on the primary node.
3361
3362   This always creates all devices.
3363
3364   """
3365   if device.children:
3366     for child in device.children:
3367       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3368         return False
3369
3370   lu.cfg.SetDiskID(device, node)
3371   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3372                                        instance.name, True, info)
3373   if new_id.failed or not new_id.data:
3374     return False
3375   if device.physical_id is None:
3376     device.physical_id = new_id
3377   return True
3378
3379
3380 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3381   """Create a tree of block devices on a secondary node.
3382
3383   If this device type has to be created on secondaries, create it and
3384   all its children.
3385
3386   If not, just recurse to children keeping the same 'force' value.
3387
3388   """
3389   if device.CreateOnSecondary():
3390     force = True
3391   if device.children:
3392     for child in device.children:
3393       if not _CreateBlockDevOnSecondary(lu, node, instance,
3394                                         child, force, info):
3395         return False
3396
3397   if not force:
3398     return True
3399   lu.cfg.SetDiskID(device, node)
3400   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3401                                        instance.name, False, info)
3402   if new_id.failed or not new_id.data:
3403     return False
3404   if device.physical_id is None:
3405     device.physical_id = new_id
3406   return True
3407
3408
3409 def _GenerateUniqueNames(lu, exts):
3410   """Generate a suitable LV name.
3411
3412   This will generate a logical volume name for the given instance.
3413
3414   """
3415   results = []
3416   for val in exts:
3417     new_id = lu.cfg.GenerateUniqueID()
3418     results.append("%s%s" % (new_id, val))
3419   return results
3420
3421
3422 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3423                          p_minor, s_minor):
3424   """Generate a drbd8 device complete with its children.
3425
3426   """
3427   port = lu.cfg.AllocatePort()
3428   vgname = lu.cfg.GetVGName()
3429   shared_secret = lu.cfg.GenerateDRBDSecret()
3430   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3431                           logical_id=(vgname, names[0]))
3432   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3433                           logical_id=(vgname, names[1]))
3434   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3435                           logical_id=(primary, secondary, port,
3436                                       p_minor, s_minor,
3437                                       shared_secret),
3438                           children=[dev_data, dev_meta],
3439                           iv_name=iv_name)
3440   return drbd_dev
3441
3442
3443 def _GenerateDiskTemplate(lu, template_name,
3444                           instance_name, primary_node,
3445                           secondary_nodes, disk_info,
3446                           file_storage_dir, file_driver,
3447                           base_index):
3448   """Generate the entire disk layout for a given template type.
3449
3450   """
3451   #TODO: compute space requirements
3452
3453   vgname = lu.cfg.GetVGName()
3454   disk_count = len(disk_info)
3455   disks = []
3456   if template_name == constants.DT_DISKLESS:
3457     pass
3458   elif template_name == constants.DT_PLAIN:
3459     if len(secondary_nodes) != 0:
3460       raise errors.ProgrammerError("Wrong template configuration")
3461
3462     names = _GenerateUniqueNames(lu, [".disk%d" % i
3463                                       for i in range(disk_count)])
3464     for idx, disk in enumerate(disk_info):
3465       disk_index = idx + base_index
3466       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3467                               logical_id=(vgname, names[idx]),
3468                               iv_name="disk/%d" % disk_index)
3469       disks.append(disk_dev)
3470   elif template_name == constants.DT_DRBD8:
3471     if len(secondary_nodes) != 1:
3472       raise errors.ProgrammerError("Wrong template configuration")
3473     remote_node = secondary_nodes[0]
3474     minors = lu.cfg.AllocateDRBDMinor(
3475       [primary_node, remote_node] * len(disk_info), instance_name)
3476
3477     names = _GenerateUniqueNames(lu,
3478                                  [".disk%d_%s" % (i, s)
3479                                   for i in range(disk_count)
3480                                   for s in ("data", "meta")
3481                                   ])
3482     for idx, disk in enumerate(disk_info):
3483       disk_index = idx + base_index
3484       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3485                                       disk["size"], names[idx*2:idx*2+2],
3486                                       "disk/%d" % disk_index,
3487                                       minors[idx*2], minors[idx*2+1])
3488       disks.append(disk_dev)
3489   elif template_name == constants.DT_FILE:
3490     if len(secondary_nodes) != 0:
3491       raise errors.ProgrammerError("Wrong template configuration")
3492
3493     for idx, disk in enumerate(disk_info):
3494       disk_index = idx + base_index
3495       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3496                               iv_name="disk/%d" % disk_index,
3497                               logical_id=(file_driver,
3498                                           "%s/disk%d" % (file_storage_dir,
3499                                                          idx)))
3500       disks.append(disk_dev)
3501   else:
3502     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3503   return disks
3504
3505
3506 def _GetInstanceInfoText(instance):
3507   """Compute that text that should be added to the disk's metadata.
3508
3509   """
3510   return "originstname+%s" % instance.name
3511
3512
3513 def _CreateDisks(lu, instance):
3514   """Create all disks for an instance.
3515
3516   This abstracts away some work from AddInstance.
3517
3518   @type lu: L{LogicalUnit}
3519   @param lu: the logical unit on whose behalf we execute
3520   @type instance: L{objects.Instance}
3521   @param instance: the instance whose disks we should create
3522   @rtype: boolean
3523   @return: the success of the creation
3524
3525   """
3526   info = _GetInstanceInfoText(instance)
3527
3528   if instance.disk_template == constants.DT_FILE:
3529     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3530     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3531                                                  file_storage_dir)
3532
3533     if result.failed or not result.data:
3534       logging.error("Could not connect to node '%s'", instance.primary_node)
3535       return False
3536
3537     if not result.data[0]:
3538       logging.error("Failed to create directory '%s'", file_storage_dir)
3539       return False
3540
3541   # Note: this needs to be kept in sync with adding of disks in
3542   # LUSetInstanceParams
3543   for device in instance.disks:
3544     logging.info("Creating volume %s for instance %s",
3545                  device.iv_name, instance.name)
3546     #HARDCODE
3547     for secondary_node in instance.secondary_nodes:
3548       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3549                                         device, False, info):
3550         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3551                       device.iv_name, device, secondary_node)
3552         return False
3553     #HARDCODE
3554     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3555                                     instance, device, info):
3556       logging.error("Failed to create volume %s on primary!", device.iv_name)
3557       return False
3558
3559   return True
3560
3561
3562 def _RemoveDisks(lu, instance):
3563   """Remove all disks for an instance.
3564
3565   This abstracts away some work from `AddInstance()` and
3566   `RemoveInstance()`. Note that in case some of the devices couldn't
3567   be removed, the removal will continue with the other ones (compare
3568   with `_CreateDisks()`).
3569
3570   @type lu: L{LogicalUnit}
3571   @param lu: the logical unit on whose behalf we execute
3572   @type instance: L{objects.Instance}
3573   @param instance: the instance whose disks we should remove
3574   @rtype: boolean
3575   @return: the success of the removal
3576
3577   """
3578   logging.info("Removing block devices for instance %s", instance.name)
3579
3580   result = True
3581   for device in instance.disks:
3582     for node, disk in device.ComputeNodeTree(instance.primary_node):
3583       lu.cfg.SetDiskID(disk, node)
3584       result = lu.rpc.call_blockdev_remove(node, disk)
3585       if result.failed or not result.data:
3586         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3587                            " continuing anyway", device.iv_name, node)
3588         result = False
3589
3590   if instance.disk_template == constants.DT_FILE:
3591     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3592     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3593                                                  file_storage_dir)
3594     if result.failed or not result.data:
3595       logging.error("Could not remove directory '%s'", file_storage_dir)
3596       result = False
3597
3598   return result
3599
3600
3601 def _ComputeDiskSize(disk_template, disks):
3602   """Compute disk size requirements in the volume group
3603
3604   """
3605   # Required free disk space as a function of disk and swap space
3606   req_size_dict = {
3607     constants.DT_DISKLESS: None,
3608     constants.DT_PLAIN: sum(d["size"] for d in disks),
3609     # 128 MB are added for drbd metadata for each disk
3610     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3611     constants.DT_FILE: None,
3612   }
3613
3614   if disk_template not in req_size_dict:
3615     raise errors.ProgrammerError("Disk template '%s' size requirement"
3616                                  " is unknown" %  disk_template)
3617
3618   return req_size_dict[disk_template]
3619
3620
3621 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3622   """Hypervisor parameter validation.
3623
3624   This function abstract the hypervisor parameter validation to be
3625   used in both instance create and instance modify.
3626
3627   @type lu: L{LogicalUnit}
3628   @param lu: the logical unit for which we check
3629   @type nodenames: list
3630   @param nodenames: the list of nodes on which we should check
3631   @type hvname: string
3632   @param hvname: the name of the hypervisor we should use
3633   @type hvparams: dict
3634   @param hvparams: the parameters which we need to check
3635   @raise errors.OpPrereqError: if the parameters are not valid
3636
3637   """
3638   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3639                                                   hvname,
3640                                                   hvparams)
3641   for node in nodenames:
3642     info = hvinfo[node]
3643     info.Raise()
3644     if not info.data or not isinstance(info.data, (tuple, list)):
3645       raise errors.OpPrereqError("Cannot get current information"
3646                                  " from node '%s' (%s)" % (node, info.data))
3647     if not info.data[0]:
3648       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3649                                  " %s" % info.data[1])
3650
3651
3652 class LUCreateInstance(LogicalUnit):
3653   """Create an instance.
3654
3655   """
3656   HPATH = "instance-add"
3657   HTYPE = constants.HTYPE_INSTANCE
3658   _OP_REQP = ["instance_name", "disks", "disk_template",
3659               "mode", "start",
3660               "wait_for_sync", "ip_check", "nics",
3661               "hvparams", "beparams"]
3662   REQ_BGL = False
3663
3664   def _ExpandNode(self, node):
3665     """Expands and checks one node name.
3666
3667     """
3668     node_full = self.cfg.ExpandNodeName(node)
3669     if node_full is None:
3670       raise errors.OpPrereqError("Unknown node %s" % node)
3671     return node_full
3672
3673   def ExpandNames(self):
3674     """ExpandNames for CreateInstance.
3675
3676     Figure out the right locks for instance creation.
3677
3678     """
3679     self.needed_locks = {}
3680
3681     # set optional parameters to none if they don't exist
3682     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3683       if not hasattr(self.op, attr):
3684         setattr(self.op, attr, None)
3685
3686     # cheap checks, mostly valid constants given
3687
3688     # verify creation mode
3689     if self.op.mode not in (constants.INSTANCE_CREATE,
3690                             constants.INSTANCE_IMPORT):
3691       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3692                                  self.op.mode)
3693
3694     # disk template and mirror node verification
3695     if self.op.disk_template not in constants.DISK_TEMPLATES:
3696       raise errors.OpPrereqError("Invalid disk template name")
3697
3698     if self.op.hypervisor is None:
3699       self.op.hypervisor = self.cfg.GetHypervisorType()
3700
3701     cluster = self.cfg.GetClusterInfo()
3702     enabled_hvs = cluster.enabled_hypervisors
3703     if self.op.hypervisor not in enabled_hvs:
3704       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3705                                  " cluster (%s)" % (self.op.hypervisor,
3706                                   ",".join(enabled_hvs)))
3707
3708     # check hypervisor parameter syntax (locally)
3709
3710     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3711                                   self.op.hvparams)
3712     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3713     hv_type.CheckParameterSyntax(filled_hvp)
3714
3715     # fill and remember the beparams dict
3716     utils.CheckBEParams(self.op.beparams)
3717     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3718                                     self.op.beparams)
3719
3720     #### instance parameters check
3721
3722     # instance name verification
3723     hostname1 = utils.HostInfo(self.op.instance_name)
3724     self.op.instance_name = instance_name = hostname1.name
3725
3726     # this is just a preventive check, but someone might still add this
3727     # instance in the meantime, and creation will fail at lock-add time
3728     if instance_name in self.cfg.GetInstanceList():
3729       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3730                                  instance_name)
3731
3732     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3733
3734     # NIC buildup
3735     self.nics = []
3736     for nic in self.op.nics:
3737       # ip validity checks
3738       ip = nic.get("ip", None)
3739       if ip is None or ip.lower() == "none":
3740         nic_ip = None
3741       elif ip.lower() == constants.VALUE_AUTO:
3742         nic_ip = hostname1.ip
3743       else:
3744         if not utils.IsValidIP(ip):
3745           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3746                                      " like a valid IP" % ip)
3747         nic_ip = ip
3748
3749       # MAC address verification
3750       mac = nic.get("mac", constants.VALUE_AUTO)
3751       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3752         if not utils.IsValidMac(mac.lower()):
3753           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3754                                      mac)
3755       # bridge verification
3756       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3757       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3758
3759     # disk checks/pre-build
3760     self.disks = []
3761     for disk in self.op.disks:
3762       mode = disk.get("mode", constants.DISK_RDWR)
3763       if mode not in constants.DISK_ACCESS_SET:
3764         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3765                                    mode)
3766       size = disk.get("size", None)
3767       if size is None:
3768         raise errors.OpPrereqError("Missing disk size")
3769       try:
3770         size = int(size)
3771       except ValueError:
3772         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3773       self.disks.append({"size": size, "mode": mode})
3774
3775     # used in CheckPrereq for ip ping check
3776     self.check_ip = hostname1.ip
3777
3778     # file storage checks
3779     if (self.op.file_driver and
3780         not self.op.file_driver in constants.FILE_DRIVER):
3781       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3782                                  self.op.file_driver)
3783
3784     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3785       raise errors.OpPrereqError("File storage directory path not absolute")
3786
3787     ### Node/iallocator related checks
3788     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3789       raise errors.OpPrereqError("One and only one of iallocator and primary"
3790                                  " node must be given")
3791
3792     if self.op.iallocator:
3793       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3794     else:
3795       self.op.pnode = self._ExpandNode(self.op.pnode)
3796       nodelist = [self.op.pnode]
3797       if self.op.snode is not None:
3798         self.op.snode = self._ExpandNode(self.op.snode)
3799         nodelist.append(self.op.snode)
3800       self.needed_locks[locking.LEVEL_NODE] = nodelist
3801
3802     # in case of import lock the source node too
3803     if self.op.mode == constants.INSTANCE_IMPORT:
3804       src_node = getattr(self.op, "src_node", None)
3805       src_path = getattr(self.op, "src_path", None)
3806
3807       if src_path is None:
3808         self.op.src_path = src_path = self.op.instance_name
3809
3810       if src_node is None:
3811         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3812         self.op.src_node = None
3813         if os.path.isabs(src_path):
3814           raise errors.OpPrereqError("Importing an instance from an absolute"
3815                                      " path requires a source node option.")
3816       else:
3817         self.op.src_node = src_node = self._ExpandNode(src_node)
3818         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3819           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3820         if not os.path.isabs(src_path):
3821           self.op.src_path = src_path = \
3822             os.path.join(constants.EXPORT_DIR, src_path)
3823
3824     else: # INSTANCE_CREATE
3825       if getattr(self.op, "os_type", None) is None:
3826         raise errors.OpPrereqError("No guest OS specified")
3827
3828   def _RunAllocator(self):
3829     """Run the allocator based on input opcode.
3830
3831     """
3832     nics = [n.ToDict() for n in self.nics]
3833     ial = IAllocator(self,
3834                      mode=constants.IALLOCATOR_MODE_ALLOC,
3835                      name=self.op.instance_name,
3836                      disk_template=self.op.disk_template,
3837                      tags=[],
3838                      os=self.op.os_type,
3839                      vcpus=self.be_full[constants.BE_VCPUS],
3840                      mem_size=self.be_full[constants.BE_MEMORY],
3841                      disks=self.disks,
3842                      nics=nics,
3843                      hypervisor=self.op.hypervisor,
3844                      )
3845
3846     ial.Run(self.op.iallocator)
3847
3848     if not ial.success:
3849       raise errors.OpPrereqError("Can't compute nodes using"
3850                                  " iallocator '%s': %s" % (self.op.iallocator,
3851                                                            ial.info))
3852     if len(ial.nodes) != ial.required_nodes:
3853       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3854                                  " of nodes (%s), required %s" %
3855                                  (self.op.iallocator, len(ial.nodes),
3856                                   ial.required_nodes))
3857     self.op.pnode = ial.nodes[0]
3858     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3859                  self.op.instance_name, self.op.iallocator,
3860                  ", ".join(ial.nodes))
3861     if ial.required_nodes == 2:
3862       self.op.snode = ial.nodes[1]
3863
3864   def BuildHooksEnv(self):
3865     """Build hooks env.
3866
3867     This runs on master, primary and secondary nodes of the instance.
3868
3869     """
3870     env = {
3871       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3872       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3873       "INSTANCE_ADD_MODE": self.op.mode,
3874       }
3875     if self.op.mode == constants.INSTANCE_IMPORT:
3876       env["INSTANCE_SRC_NODE"] = self.op.src_node
3877       env["INSTANCE_SRC_PATH"] = self.op.src_path
3878       env["INSTANCE_SRC_IMAGES"] = self.src_images
3879
3880     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3881       primary_node=self.op.pnode,
3882       secondary_nodes=self.secondaries,
3883       status=self.instance_status,
3884       os_type=self.op.os_type,
3885       memory=self.be_full[constants.BE_MEMORY],
3886       vcpus=self.be_full[constants.BE_VCPUS],
3887       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3888     ))
3889
3890     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3891           self.secondaries)
3892     return env, nl, nl
3893
3894
3895   def CheckPrereq(self):
3896     """Check prerequisites.
3897
3898     """
3899     if (not self.cfg.GetVGName() and
3900         self.op.disk_template not in constants.DTS_NOT_LVM):
3901       raise errors.OpPrereqError("Cluster does not support lvm-based"
3902                                  " instances")
3903
3904
3905     if self.op.mode == constants.INSTANCE_IMPORT:
3906       src_node = self.op.src_node
3907       src_path = self.op.src_path
3908
3909       if src_node is None:
3910         exp_list = self.rpc.call_export_list(
3911           self.acquired_locks[locking.LEVEL_NODE])
3912         found = False
3913         for node in exp_list:
3914           if not exp_list[node].failed and src_path in exp_list[node].data:
3915             found = True
3916             self.op.src_node = src_node = node
3917             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3918                                                        src_path)
3919             break
3920         if not found:
3921           raise errors.OpPrereqError("No export found for relative path %s" %
3922                                       src_path)
3923
3924       _CheckNodeOnline(self, src_node)
3925       result = self.rpc.call_export_info(src_node, src_path)
3926       result.Raise()
3927       if not result.data:
3928         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3929
3930       export_info = result.data
3931       if not export_info.has_section(constants.INISECT_EXP):
3932         raise errors.ProgrammerError("Corrupted export config")
3933
3934       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3935       if (int(ei_version) != constants.EXPORT_VERSION):
3936         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3937                                    (ei_version, constants.EXPORT_VERSION))
3938
3939       # Check that the new instance doesn't have less disks than the export
3940       instance_disks = len(self.disks)
3941       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3942       if instance_disks < export_disks:
3943         raise errors.OpPrereqError("Not enough disks to import."
3944                                    " (instance: %d, export: %d)" %
3945                                    (instance_disks, export_disks))
3946
3947       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3948       disk_images = []
3949       for idx in range(export_disks):
3950         option = 'disk%d_dump' % idx
3951         if export_info.has_option(constants.INISECT_INS, option):
3952           # FIXME: are the old os-es, disk sizes, etc. useful?
3953           export_name = export_info.get(constants.INISECT_INS, option)
3954           image = os.path.join(src_path, export_name)
3955           disk_images.append(image)
3956         else:
3957           disk_images.append(False)
3958
3959       self.src_images = disk_images
3960
3961       old_name = export_info.get(constants.INISECT_INS, 'name')
3962       # FIXME: int() here could throw a ValueError on broken exports
3963       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3964       if self.op.instance_name == old_name:
3965         for idx, nic in enumerate(self.nics):
3966           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3967             nic_mac_ini = 'nic%d_mac' % idx
3968             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3969
3970     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3971     if self.op.start and not self.op.ip_check:
3972       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3973                                  " adding an instance in start mode")
3974
3975     if self.op.ip_check:
3976       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3977         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3978                                    (self.check_ip, self.op.instance_name))
3979
3980     #### allocator run
3981
3982     if self.op.iallocator is not None:
3983       self._RunAllocator()
3984
3985     #### node related checks
3986
3987     # check primary node
3988     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3989     assert self.pnode is not None, \
3990       "Cannot retrieve locked node %s" % self.op.pnode
3991     if pnode.offline:
3992       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3993                                  pnode.name)
3994
3995     self.secondaries = []
3996
3997     # mirror node verification
3998     if self.op.disk_template in constants.DTS_NET_MIRROR:
3999       if self.op.snode is None:
4000         raise errors.OpPrereqError("The networked disk templates need"
4001                                    " a mirror node")
4002       if self.op.snode == pnode.name:
4003         raise errors.OpPrereqError("The secondary node cannot be"
4004                                    " the primary node.")
4005       self.secondaries.append(self.op.snode)
4006       _CheckNodeOnline(self, self.op.snode)
4007
4008     nodenames = [pnode.name] + self.secondaries
4009
4010     req_size = _ComputeDiskSize(self.op.disk_template,
4011                                 self.disks)
4012
4013     # Check lv size requirements
4014     if req_size is not None:
4015       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4016                                          self.op.hypervisor)
4017       for node in nodenames:
4018         info = nodeinfo[node]
4019         info.Raise()
4020         info = info.data
4021         if not info:
4022           raise errors.OpPrereqError("Cannot get current information"
4023                                      " from node '%s'" % node)
4024         vg_free = info.get('vg_free', None)
4025         if not isinstance(vg_free, int):
4026           raise errors.OpPrereqError("Can't compute free disk space on"
4027                                      " node %s" % node)
4028         if req_size > info['vg_free']:
4029           raise errors.OpPrereqError("Not enough disk space on target node %s."
4030                                      " %d MB available, %d MB required" %
4031                                      (node, info['vg_free'], req_size))
4032
4033     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4034
4035     # os verification
4036     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4037     result.Raise()
4038     if not isinstance(result.data, objects.OS):
4039       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4040                                  " primary node"  % self.op.os_type)
4041
4042     # bridge check on primary node
4043     bridges = [n.bridge for n in self.nics]
4044     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4045     result.Raise()
4046     if not result.data:
4047       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4048                                  " exist on destination node '%s'" %
4049                                  (",".join(bridges), pnode.name))
4050
4051     # memory check on primary node
4052     if self.op.start:
4053       _CheckNodeFreeMemory(self, self.pnode.name,
4054                            "creating instance %s" % self.op.instance_name,
4055                            self.be_full[constants.BE_MEMORY],
4056                            self.op.hypervisor)
4057
4058     if self.op.start:
4059       self.instance_status = 'up'
4060     else:
4061       self.instance_status = 'down'
4062
4063   def Exec(self, feedback_fn):
4064     """Create and add the instance to the cluster.
4065
4066     """
4067     instance = self.op.instance_name
4068     pnode_name = self.pnode.name
4069
4070     for nic in self.nics:
4071       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4072         nic.mac = self.cfg.GenerateMAC()
4073
4074     ht_kind = self.op.hypervisor
4075     if ht_kind in constants.HTS_REQ_PORT:
4076       network_port = self.cfg.AllocatePort()
4077     else:
4078       network_port = None
4079
4080     ##if self.op.vnc_bind_address is None:
4081     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4082
4083     # this is needed because os.path.join does not accept None arguments
4084     if self.op.file_storage_dir is None:
4085       string_file_storage_dir = ""
4086     else:
4087       string_file_storage_dir = self.op.file_storage_dir
4088
4089     # build the full file storage dir path
4090     file_storage_dir = os.path.normpath(os.path.join(
4091                                         self.cfg.GetFileStorageDir(),
4092                                         string_file_storage_dir, instance))
4093
4094
4095     disks = _GenerateDiskTemplate(self,
4096                                   self.op.disk_template,
4097                                   instance, pnode_name,
4098                                   self.secondaries,
4099                                   self.disks,
4100                                   file_storage_dir,
4101                                   self.op.file_driver,
4102                                   0)
4103
4104     iobj = objects.Instance(name=instance, os=self.op.os_type,
4105                             primary_node=pnode_name,
4106                             nics=self.nics, disks=disks,
4107                             disk_template=self.op.disk_template,
4108                             status=self.instance_status,
4109                             network_port=network_port,
4110                             beparams=self.op.beparams,
4111                             hvparams=self.op.hvparams,
4112                             hypervisor=self.op.hypervisor,
4113                             )
4114
4115     feedback_fn("* creating instance disks...")
4116     if not _CreateDisks(self, iobj):
4117       _RemoveDisks(self, iobj)
4118       self.cfg.ReleaseDRBDMinors(instance)
4119       raise errors.OpExecError("Device creation failed, reverting...")
4120
4121     feedback_fn("adding instance %s to cluster config" % instance)
4122
4123     self.cfg.AddInstance(iobj)
4124     # Declare that we don't want to remove the instance lock anymore, as we've
4125     # added the instance to the config
4126     del self.remove_locks[locking.LEVEL_INSTANCE]
4127     # Remove the temp. assignements for the instance's drbds
4128     self.cfg.ReleaseDRBDMinors(instance)
4129     # Unlock all the nodes
4130     if self.op.mode == constants.INSTANCE_IMPORT:
4131       nodes_keep = [self.op.src_node]
4132       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4133                        if node != self.op.src_node]
4134       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4135       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4136     else:
4137       self.context.glm.release(locking.LEVEL_NODE)
4138       del self.acquired_locks[locking.LEVEL_NODE]
4139
4140     if self.op.wait_for_sync:
4141       disk_abort = not _WaitForSync(self, iobj)
4142     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4143       # make sure the disks are not degraded (still sync-ing is ok)
4144       time.sleep(15)
4145       feedback_fn("* checking mirrors status")
4146       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4147     else:
4148       disk_abort = False
4149
4150     if disk_abort:
4151       _RemoveDisks(self, iobj)
4152       self.cfg.RemoveInstance(iobj.name)
4153       # Make sure the instance lock gets removed
4154       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4155       raise errors.OpExecError("There are some degraded disks for"
4156                                " this instance")
4157
4158     feedback_fn("creating os for instance %s on node %s" %
4159                 (instance, pnode_name))
4160
4161     if iobj.disk_template != constants.DT_DISKLESS:
4162       if self.op.mode == constants.INSTANCE_CREATE:
4163         feedback_fn("* running the instance OS create scripts...")
4164         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4165         result.Raise()
4166         if not result.data:
4167           raise errors.OpExecError("Could not add os for instance %s"
4168                                    " on node %s" %
4169                                    (instance, pnode_name))
4170
4171       elif self.op.mode == constants.INSTANCE_IMPORT:
4172         feedback_fn("* running the instance OS import scripts...")
4173         src_node = self.op.src_node
4174         src_images = self.src_images
4175         cluster_name = self.cfg.GetClusterName()
4176         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4177                                                          src_node, src_images,
4178                                                          cluster_name)
4179         import_result.Raise()
4180         for idx, result in enumerate(import_result.data):
4181           if not result:
4182             self.LogWarning("Could not import the image %s for instance"
4183                             " %s, disk %d, on node %s" %
4184                             (src_images[idx], instance, idx, pnode_name))
4185       else:
4186         # also checked in the prereq part
4187         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4188                                      % self.op.mode)
4189
4190     if self.op.start:
4191       logging.info("Starting instance %s on node %s", instance, pnode_name)
4192       feedback_fn("* starting instance...")
4193       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4194       result.Raise()
4195       if not result.data:
4196         raise errors.OpExecError("Could not start instance")
4197
4198
4199 class LUConnectConsole(NoHooksLU):
4200   """Connect to an instance's console.
4201
4202   This is somewhat special in that it returns the command line that
4203   you need to run on the master node in order to connect to the
4204   console.
4205
4206   """
4207   _OP_REQP = ["instance_name"]
4208   REQ_BGL = False
4209
4210   def ExpandNames(self):
4211     self._ExpandAndLockInstance()
4212
4213   def CheckPrereq(self):
4214     """Check prerequisites.
4215
4216     This checks that the instance is in the cluster.
4217
4218     """
4219     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4220     assert self.instance is not None, \
4221       "Cannot retrieve locked instance %s" % self.op.instance_name
4222     _CheckNodeOnline(self, self.instance.primary_node)
4223
4224   def Exec(self, feedback_fn):
4225     """Connect to the console of an instance
4226
4227     """
4228     instance = self.instance
4229     node = instance.primary_node
4230
4231     node_insts = self.rpc.call_instance_list([node],
4232                                              [instance.hypervisor])[node]
4233     node_insts.Raise()
4234
4235     if instance.name not in node_insts.data:
4236       raise errors.OpExecError("Instance %s is not running." % instance.name)
4237
4238     logging.debug("Connecting to console of %s on %s", instance.name, node)
4239
4240     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4241     console_cmd = hyper.GetShellCommandForConsole(instance)
4242
4243     # build ssh cmdline
4244     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4245
4246
4247 class LUReplaceDisks(LogicalUnit):
4248   """Replace the disks of an instance.
4249
4250   """
4251   HPATH = "mirrors-replace"
4252   HTYPE = constants.HTYPE_INSTANCE
4253   _OP_REQP = ["instance_name", "mode", "disks"]
4254   REQ_BGL = False
4255
4256   def ExpandNames(self):
4257     self._ExpandAndLockInstance()
4258
4259     if not hasattr(self.op, "remote_node"):
4260       self.op.remote_node = None
4261
4262     ia_name = getattr(self.op, "iallocator", None)
4263     if ia_name is not None:
4264       if self.op.remote_node is not None:
4265         raise errors.OpPrereqError("Give either the iallocator or the new"
4266                                    " secondary, not both")
4267       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4268     elif self.op.remote_node is not None:
4269       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4270       if remote_node is None:
4271         raise errors.OpPrereqError("Node '%s' not known" %
4272                                    self.op.remote_node)
4273       self.op.remote_node = remote_node
4274       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4275       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4276     else:
4277       self.needed_locks[locking.LEVEL_NODE] = []
4278       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4279
4280   def DeclareLocks(self, level):
4281     # If we're not already locking all nodes in the set we have to declare the
4282     # instance's primary/secondary nodes.
4283     if (level == locking.LEVEL_NODE and
4284         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4285       self._LockInstancesNodes()
4286
4287   def _RunAllocator(self):
4288     """Compute a new secondary node using an IAllocator.
4289
4290     """
4291     ial = IAllocator(self,
4292                      mode=constants.IALLOCATOR_MODE_RELOC,
4293                      name=self.op.instance_name,
4294                      relocate_from=[self.sec_node])
4295
4296     ial.Run(self.op.iallocator)
4297
4298     if not ial.success:
4299       raise errors.OpPrereqError("Can't compute nodes using"
4300                                  " iallocator '%s': %s" % (self.op.iallocator,
4301                                                            ial.info))
4302     if len(ial.nodes) != ial.required_nodes:
4303       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4304                                  " of nodes (%s), required %s" %
4305                                  (len(ial.nodes), ial.required_nodes))
4306     self.op.remote_node = ial.nodes[0]
4307     self.LogInfo("Selected new secondary for the instance: %s",
4308                  self.op.remote_node)
4309
4310   def BuildHooksEnv(self):
4311     """Build hooks env.
4312
4313     This runs on the master, the primary and all the secondaries.
4314
4315     """
4316     env = {
4317       "MODE": self.op.mode,
4318       "NEW_SECONDARY": self.op.remote_node,
4319       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4320       }
4321     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4322     nl = [
4323       self.cfg.GetMasterNode(),
4324       self.instance.primary_node,
4325       ]
4326     if self.op.remote_node is not None:
4327       nl.append(self.op.remote_node)
4328     return env, nl, nl
4329
4330   def CheckPrereq(self):
4331     """Check prerequisites.
4332
4333     This checks that the instance is in the cluster.
4334
4335     """
4336     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4337     assert instance is not None, \
4338       "Cannot retrieve locked instance %s" % self.op.instance_name
4339     self.instance = instance
4340
4341     if instance.disk_template not in constants.DTS_NET_MIRROR:
4342       raise errors.OpPrereqError("Instance's disk layout is not"
4343                                  " network mirrored.")
4344
4345     if len(instance.secondary_nodes) != 1:
4346       raise errors.OpPrereqError("The instance has a strange layout,"
4347                                  " expected one secondary but found %d" %
4348                                  len(instance.secondary_nodes))
4349
4350     self.sec_node = instance.secondary_nodes[0]
4351
4352     ia_name = getattr(self.op, "iallocator", None)
4353     if ia_name is not None:
4354       self._RunAllocator()
4355
4356     remote_node = self.op.remote_node
4357     if remote_node is not None:
4358       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4359       assert self.remote_node_info is not None, \
4360         "Cannot retrieve locked node %s" % remote_node
4361     else:
4362       self.remote_node_info = None
4363     if remote_node == instance.primary_node:
4364       raise errors.OpPrereqError("The specified node is the primary node of"
4365                                  " the instance.")
4366     elif remote_node == self.sec_node:
4367       if self.op.mode == constants.REPLACE_DISK_SEC:
4368         # this is for DRBD8, where we can't execute the same mode of
4369         # replacement as for drbd7 (no different port allocated)
4370         raise errors.OpPrereqError("Same secondary given, cannot execute"
4371                                    " replacement")
4372     if instance.disk_template == constants.DT_DRBD8:
4373       if (self.op.mode == constants.REPLACE_DISK_ALL and
4374           remote_node is not None):
4375         # switch to replace secondary mode
4376         self.op.mode = constants.REPLACE_DISK_SEC
4377
4378       if self.op.mode == constants.REPLACE_DISK_ALL:
4379         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4380                                    " secondary disk replacement, not"
4381                                    " both at once")
4382       elif self.op.mode == constants.REPLACE_DISK_PRI:
4383         if remote_node is not None:
4384           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4385                                      " the secondary while doing a primary"
4386                                      " node disk replacement")
4387         self.tgt_node = instance.primary_node
4388         self.oth_node = instance.secondary_nodes[0]
4389         _CheckNodeOnline(self, self.tgt_node)
4390         _CheckNodeOnline(self, self.oth_node)
4391       elif self.op.mode == constants.REPLACE_DISK_SEC:
4392         self.new_node = remote_node # this can be None, in which case
4393                                     # we don't change the secondary
4394         self.tgt_node = instance.secondary_nodes[0]
4395         self.oth_node = instance.primary_node
4396         _CheckNodeOnline(self, self.oth_node)
4397         if self.new_node is not None:
4398           _CheckNodeOnline(self, self.new_node)
4399         else:
4400           _CheckNodeOnline(self, self.tgt_node)
4401       else:
4402         raise errors.ProgrammerError("Unhandled disk replace mode")
4403
4404     if not self.op.disks:
4405       self.op.disks = range(len(instance.disks))
4406
4407     for disk_idx in self.op.disks:
4408       instance.FindDisk(disk_idx)
4409
4410   def _ExecD8DiskOnly(self, feedback_fn):
4411     """Replace a disk on the primary or secondary for dbrd8.
4412
4413     The algorithm for replace is quite complicated:
4414
4415       1. for each disk to be replaced:
4416
4417         1. create new LVs on the target node with unique names
4418         1. detach old LVs from the drbd device
4419         1. rename old LVs to name_replaced.<time_t>
4420         1. rename new LVs to old LVs
4421         1. attach the new LVs (with the old names now) to the drbd device
4422
4423       1. wait for sync across all devices
4424
4425       1. for each modified disk:
4426
4427         1. remove old LVs (which have the name name_replaces.<time_t>)
4428
4429     Failures are not very well handled.
4430
4431     """
4432     steps_total = 6
4433     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4434     instance = self.instance
4435     iv_names = {}
4436     vgname = self.cfg.GetVGName()
4437     # start of work
4438     cfg = self.cfg
4439     tgt_node = self.tgt_node
4440     oth_node = self.oth_node
4441
4442     # Step: check device activation
4443     self.proc.LogStep(1, steps_total, "check device existence")
4444     info("checking volume groups")
4445     my_vg = cfg.GetVGName()
4446     results = self.rpc.call_vg_list([oth_node, tgt_node])
4447     if not results:
4448       raise errors.OpExecError("Can't list volume groups on the nodes")
4449     for node in oth_node, tgt_node:
4450       res = results[node]
4451       if res.failed or not res.data or my_vg not in res.data:
4452         raise errors.OpExecError("Volume group '%s' not found on %s" %
4453                                  (my_vg, node))
4454     for idx, dev in enumerate(instance.disks):
4455       if idx not in self.op.disks:
4456         continue
4457       for node in tgt_node, oth_node:
4458         info("checking disk/%d on %s" % (idx, node))
4459         cfg.SetDiskID(dev, node)
4460         if not self.rpc.call_blockdev_find(node, dev):
4461           raise errors.OpExecError("Can't find disk/%d on node %s" %
4462                                    (idx, node))
4463
4464     # Step: check other node consistency
4465     self.proc.LogStep(2, steps_total, "check peer consistency")
4466     for idx, dev in enumerate(instance.disks):
4467       if idx not in self.op.disks:
4468         continue
4469       info("checking disk/%d consistency on %s" % (idx, oth_node))
4470       if not _CheckDiskConsistency(self, dev, oth_node,
4471                                    oth_node==instance.primary_node):
4472         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4473                                  " to replace disks on this node (%s)" %
4474                                  (oth_node, tgt_node))
4475
4476     # Step: create new storage
4477     self.proc.LogStep(3, steps_total, "allocate new storage")
4478     for idx, dev in enumerate(instance.disks):
4479       if idx not in self.op.disks:
4480         continue
4481       size = dev.size
4482       cfg.SetDiskID(dev, tgt_node)
4483       lv_names = [".disk%d_%s" % (idx, suf)
4484                   for suf in ["data", "meta"]]
4485       names = _GenerateUniqueNames(self, lv_names)
4486       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4487                              logical_id=(vgname, names[0]))
4488       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4489                              logical_id=(vgname, names[1]))
4490       new_lvs = [lv_data, lv_meta]
4491       old_lvs = dev.children
4492       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4493       info("creating new local storage on %s for %s" %
4494            (tgt_node, dev.iv_name))
4495       # since we *always* want to create this LV, we use the
4496       # _Create...OnPrimary (which forces the creation), even if we
4497       # are talking about the secondary node
4498       for new_lv in new_lvs:
4499         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4500                                         _GetInstanceInfoText(instance)):
4501           raise errors.OpExecError("Failed to create new LV named '%s' on"
4502                                    " node '%s'" %
4503                                    (new_lv.logical_id[1], tgt_node))
4504
4505     # Step: for each lv, detach+rename*2+attach
4506     self.proc.LogStep(4, steps_total, "change drbd configuration")
4507     for dev, old_lvs, new_lvs in iv_names.itervalues():
4508       info("detaching %s drbd from local storage" % dev.iv_name)
4509       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4510       result.Raise()
4511       if not result.data:
4512         raise errors.OpExecError("Can't detach drbd from local storage on node"
4513                                  " %s for device %s" % (tgt_node, dev.iv_name))
4514       #dev.children = []
4515       #cfg.Update(instance)
4516
4517       # ok, we created the new LVs, so now we know we have the needed
4518       # storage; as such, we proceed on the target node to rename
4519       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4520       # using the assumption that logical_id == physical_id (which in
4521       # turn is the unique_id on that node)
4522
4523       # FIXME(iustin): use a better name for the replaced LVs
4524       temp_suffix = int(time.time())
4525       ren_fn = lambda d, suff: (d.physical_id[0],
4526                                 d.physical_id[1] + "_replaced-%s" % suff)
4527       # build the rename list based on what LVs exist on the node
4528       rlist = []
4529       for to_ren in old_lvs:
4530         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4531         if not find_res.failed and find_res.data is not None: # device exists
4532           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4533
4534       info("renaming the old LVs on the target node")
4535       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4536       result.Raise()
4537       if not result.data:
4538         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4539       # now we rename the new LVs to the old LVs
4540       info("renaming the new LVs on the target node")
4541       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4542       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4543       result.Raise()
4544       if not result.data:
4545         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4546
4547       for old, new in zip(old_lvs, new_lvs):
4548         new.logical_id = old.logical_id
4549         cfg.SetDiskID(new, tgt_node)
4550
4551       for disk in old_lvs:
4552         disk.logical_id = ren_fn(disk, temp_suffix)
4553         cfg.SetDiskID(disk, tgt_node)
4554
4555       # now that the new lvs have the old name, we can add them to the device
4556       info("adding new mirror component on %s" % tgt_node)
4557       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4558       if result.failed or not result.data:
4559         for new_lv in new_lvs:
4560           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4561           if result.failed or not result.data:
4562             warning("Can't rollback device %s", hint="manually cleanup unused"
4563                     " logical volumes")
4564         raise errors.OpExecError("Can't add local storage to drbd")
4565
4566       dev.children = new_lvs
4567       cfg.Update(instance)
4568
4569     # Step: wait for sync
4570
4571     # this can fail as the old devices are degraded and _WaitForSync
4572     # does a combined result over all disks, so we don't check its
4573     # return value
4574     self.proc.LogStep(5, steps_total, "sync devices")
4575     _WaitForSync(self, instance, unlock=True)
4576
4577     # so check manually all the devices
4578     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4579       cfg.SetDiskID(dev, instance.primary_node)
4580       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4581       if result.failed or result.data[5]:
4582         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4583
4584     # Step: remove old storage
4585     self.proc.LogStep(6, steps_total, "removing old storage")
4586     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4587       info("remove logical volumes for %s" % name)
4588       for lv in old_lvs:
4589         cfg.SetDiskID(lv, tgt_node)
4590         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4591         if result.failed or not result.data:
4592           warning("Can't remove old LV", hint="manually remove unused LVs")
4593           continue
4594
4595   def _ExecD8Secondary(self, feedback_fn):
4596     """Replace the secondary node for drbd8.
4597
4598     The algorithm for replace is quite complicated:
4599       - for all disks of the instance:
4600         - create new LVs on the new node with same names
4601         - shutdown the drbd device on the old secondary
4602         - disconnect the drbd network on the primary
4603         - create the drbd device on the new secondary
4604         - network attach the drbd on the primary, using an artifice:
4605           the drbd code for Attach() will connect to the network if it
4606           finds a device which is connected to the good local disks but
4607           not network enabled
4608       - wait for sync across all devices
4609       - remove all disks from the old secondary
4610
4611     Failures are not very well handled.
4612
4613     """
4614     steps_total = 6
4615     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4616     instance = self.instance
4617     iv_names = {}
4618     # start of work
4619     cfg = self.cfg
4620     old_node = self.tgt_node
4621     new_node = self.new_node
4622     pri_node = instance.primary_node
4623
4624     # Step: check device activation
4625     self.proc.LogStep(1, steps_total, "check device existence")
4626     info("checking volume groups")
4627     my_vg = cfg.GetVGName()
4628     results = self.rpc.call_vg_list([pri_node, new_node])
4629     for node in pri_node, new_node:
4630       res = results[node]
4631       if res.failed or not res.data or my_vg not in res.data:
4632         raise errors.OpExecError("Volume group '%s' not found on %s" %
4633                                  (my_vg, node))
4634     for idx, dev in enumerate(instance.disks):
4635       if idx not in self.op.disks:
4636         continue
4637       info("checking disk/%d on %s" % (idx, pri_node))
4638       cfg.SetDiskID(dev, pri_node)
4639       result = self.rpc.call_blockdev_find(pri_node, dev)
4640       result.Raise()
4641       if not result.data:
4642         raise errors.OpExecError("Can't find disk/%d on node %s" %
4643                                  (idx, pri_node))
4644
4645     # Step: check other node consistency
4646     self.proc.LogStep(2, steps_total, "check peer consistency")
4647     for idx, dev in enumerate(instance.disks):
4648       if idx not in self.op.disks:
4649         continue
4650       info("checking disk/%d consistency on %s" % (idx, pri_node))
4651       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4652         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4653                                  " unsafe to replace the secondary" %
4654                                  pri_node)
4655
4656     # Step: create new storage
4657     self.proc.LogStep(3, steps_total, "allocate new storage")
4658     for idx, dev in enumerate(instance.disks):
4659       info("adding new local storage on %s for disk/%d" %
4660            (new_node, idx))
4661       # since we *always* want to create this LV, we use the
4662       # _Create...OnPrimary (which forces the creation), even if we
4663       # are talking about the secondary node
4664       for new_lv in dev.children:
4665         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4666                                         _GetInstanceInfoText(instance)):
4667           raise errors.OpExecError("Failed to create new LV named '%s' on"
4668                                    " node '%s'" %
4669                                    (new_lv.logical_id[1], new_node))
4670
4671     # Step 4: dbrd minors and drbd setups changes
4672     # after this, we must manually remove the drbd minors on both the
4673     # error and the success paths
4674     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4675                                    instance.name)
4676     logging.debug("Allocated minors %s" % (minors,))
4677     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4678     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4679       size = dev.size
4680       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4681       # create new devices on new_node
4682       if pri_node == dev.logical_id[0]:
4683         new_logical_id = (pri_node, new_node,
4684                           dev.logical_id[2], dev.logical_id[3], new_minor,
4685                           dev.logical_id[5])
4686       else:
4687         new_logical_id = (new_node, pri_node,
4688                           dev.logical_id[2], new_minor, dev.logical_id[4],
4689                           dev.logical_id[5])
4690       iv_names[idx] = (dev, dev.children, new_logical_id)
4691       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4692                     new_logical_id)
4693       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4694                               logical_id=new_logical_id,
4695                               children=dev.children)
4696       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4697                                         new_drbd, False,
4698                                         _GetInstanceInfoText(instance)):
4699         self.cfg.ReleaseDRBDMinors(instance.name)
4700         raise errors.OpExecError("Failed to create new DRBD on"
4701                                  " node '%s'" % new_node)
4702
4703     for idx, dev in enumerate(instance.disks):
4704       # we have new devices, shutdown the drbd on the old secondary
4705       info("shutting down drbd for disk/%d on old node" % idx)
4706       cfg.SetDiskID(dev, old_node)
4707       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4708       if result.failed or not result.data:
4709         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4710                 hint="Please cleanup this device manually as soon as possible")
4711
4712     info("detaching primary drbds from the network (=> standalone)")
4713     done = 0
4714     for idx, dev in enumerate(instance.disks):
4715       cfg.SetDiskID(dev, pri_node)
4716       # set the network part of the physical (unique in bdev terms) id
4717       # to None, meaning detach from network
4718       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4719       # and 'find' the device, which will 'fix' it to match the
4720       # standalone state
4721       result = self.rpc.call_blockdev_find(pri_node, dev)
4722       if not result.failed and result.data:
4723         done += 1
4724       else:
4725         warning("Failed to detach drbd disk/%d from network, unusual case" %
4726                 idx)
4727
4728     if not done:
4729       # no detaches succeeded (very unlikely)
4730       self.cfg.ReleaseDRBDMinors(instance.name)
4731       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4732
4733     # if we managed to detach at least one, we update all the disks of
4734     # the instance to point to the new secondary
4735     info("updating instance configuration")
4736     for dev, _, new_logical_id in iv_names.itervalues():
4737       dev.logical_id = new_logical_id
4738       cfg.SetDiskID(dev, pri_node)
4739     cfg.Update(instance)
4740     # we can remove now the temp minors as now the new values are
4741     # written to the config file (and therefore stable)
4742     self.cfg.ReleaseDRBDMinors(instance.name)
4743
4744     # and now perform the drbd attach
4745     info("attaching primary drbds to new secondary (standalone => connected)")
4746     for idx, dev in enumerate(instance.disks):
4747       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4748       # since the attach is smart, it's enough to 'find' the device,
4749       # it will automatically activate the network, if the physical_id
4750       # is correct
4751       cfg.SetDiskID(dev, pri_node)
4752       logging.debug("Disk to attach: %s", dev)
4753       result = self.rpc.call_blockdev_find(pri_node, dev)
4754       if result.failed or not result.data:
4755         warning("can't attach drbd disk/%d to new secondary!" % idx,
4756                 "please do a gnt-instance info to see the status of disks")
4757
4758     # this can fail as the old devices are degraded and _WaitForSync
4759     # does a combined result over all disks, so we don't check its
4760     # return value
4761     self.proc.LogStep(5, steps_total, "sync devices")
4762     _WaitForSync(self, instance, unlock=True)
4763
4764     # so check manually all the devices
4765     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4766       cfg.SetDiskID(dev, pri_node)
4767       result = self.rpc.call_blockdev_find(pri_node, dev)
4768       result.Raise()
4769       if result.data[5]:
4770         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4771
4772     self.proc.LogStep(6, steps_total, "removing old storage")
4773     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4774       info("remove logical volumes for disk/%d" % idx)
4775       for lv in old_lvs:
4776         cfg.SetDiskID(lv, old_node)
4777         result = self.rpc.call_blockdev_remove(old_node, lv)
4778         if result.failed or not result.data:
4779           warning("Can't remove LV on old secondary",
4780                   hint="Cleanup stale volumes by hand")
4781
4782   def Exec(self, feedback_fn):
4783     """Execute disk replacement.
4784
4785     This dispatches the disk replacement to the appropriate handler.
4786
4787     """
4788     instance = self.instance
4789
4790     # Activate the instance disks if we're replacing them on a down instance
4791     if instance.status == "down":
4792       _StartInstanceDisks(self, instance, True)
4793
4794     if instance.disk_template == constants.DT_DRBD8:
4795       if self.op.remote_node is None:
4796         fn = self._ExecD8DiskOnly
4797       else:
4798         fn = self._ExecD8Secondary
4799     else:
4800       raise errors.ProgrammerError("Unhandled disk replacement case")
4801
4802     ret = fn(feedback_fn)
4803
4804     # Deactivate the instance disks if we're replacing them on a down instance
4805     if instance.status == "down":
4806       _SafeShutdownInstanceDisks(self, instance)
4807
4808     return ret
4809
4810
4811 class LUGrowDisk(LogicalUnit):
4812   """Grow a disk of an instance.
4813
4814   """
4815   HPATH = "disk-grow"
4816   HTYPE = constants.HTYPE_INSTANCE
4817   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4818   REQ_BGL = False
4819
4820   def ExpandNames(self):
4821     self._ExpandAndLockInstance()
4822     self.needed_locks[locking.LEVEL_NODE] = []
4823     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4824
4825   def DeclareLocks(self, level):
4826     if level == locking.LEVEL_NODE:
4827       self._LockInstancesNodes()
4828
4829   def BuildHooksEnv(self):
4830     """Build hooks env.
4831
4832     This runs on the master, the primary and all the secondaries.
4833
4834     """
4835     env = {
4836       "DISK": self.op.disk,
4837       "AMOUNT": self.op.amount,
4838       }
4839     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4840     nl = [
4841       self.cfg.GetMasterNode(),
4842       self.instance.primary_node,
4843       ]
4844     return env, nl, nl
4845
4846   def CheckPrereq(self):
4847     """Check prerequisites.
4848
4849     This checks that the instance is in the cluster.
4850
4851     """
4852     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4853     assert instance is not None, \
4854       "Cannot retrieve locked instance %s" % self.op.instance_name
4855     _CheckNodeOnline(self, instance.primary_node)
4856     for node in instance.secondary_nodes:
4857       _CheckNodeOnline(self, node)
4858
4859
4860     self.instance = instance
4861
4862     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4863       raise errors.OpPrereqError("Instance's disk layout does not support"
4864                                  " growing.")
4865
4866     self.disk = instance.FindDisk(self.op.disk)
4867
4868     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4869     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4870                                        instance.hypervisor)
4871     for node in nodenames:
4872       info = nodeinfo[node]
4873       if info.failed or not info.data:
4874         raise errors.OpPrereqError("Cannot get current information"
4875                                    " from node '%s'" % node)
4876       vg_free = info.data.get('vg_free', None)
4877       if not isinstance(vg_free, int):
4878         raise errors.OpPrereqError("Can't compute free disk space on"
4879                                    " node %s" % node)
4880       if self.op.amount > vg_free:
4881         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4882                                    " %d MiB available, %d MiB required" %
4883                                    (node, vg_free, self.op.amount))
4884
4885   def Exec(self, feedback_fn):
4886     """Execute disk grow.
4887
4888     """
4889     instance = self.instance
4890     disk = self.disk
4891     for node in (instance.secondary_nodes + (instance.primary_node,)):
4892       self.cfg.SetDiskID(disk, node)
4893       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4894       result.Raise()
4895       if (not result.data or not isinstance(result.data, (list, tuple)) or
4896           len(result.data) != 2):
4897         raise errors.OpExecError("Grow request failed to node %s" % node)
4898       elif not result.data[0]:
4899         raise errors.OpExecError("Grow request failed to node %s: %s" %
4900                                  (node, result.data[1]))
4901     disk.RecordGrow(self.op.amount)
4902     self.cfg.Update(instance)
4903     if self.op.wait_for_sync:
4904       disk_abort = not _WaitForSync(self, instance)
4905       if disk_abort:
4906         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4907                              " status.\nPlease check the instance.")
4908
4909
4910 class LUQueryInstanceData(NoHooksLU):
4911   """Query runtime instance data.
4912
4913   """
4914   _OP_REQP = ["instances", "static"]
4915   REQ_BGL = False
4916
4917   def ExpandNames(self):
4918     self.needed_locks = {}
4919     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4920
4921     if not isinstance(self.op.instances, list):
4922       raise errors.OpPrereqError("Invalid argument type 'instances'")
4923
4924     if self.op.instances:
4925       self.wanted_names = []
4926       for name in self.op.instances:
4927         full_name = self.cfg.ExpandInstanceName(name)
4928         if full_name is None:
4929           raise errors.OpPrereqError("Instance '%s' not known" %
4930                                      self.op.instance_name)
4931         self.wanted_names.append(full_name)
4932       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4933     else:
4934       self.wanted_names = None
4935       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4936
4937     self.needed_locks[locking.LEVEL_NODE] = []
4938     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4939
4940   def DeclareLocks(self, level):
4941     if level == locking.LEVEL_NODE:
4942       self._LockInstancesNodes()
4943
4944   def CheckPrereq(self):
4945     """Check prerequisites.
4946
4947     This only checks the optional instance list against the existing names.
4948
4949     """
4950     if self.wanted_names is None:
4951       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4952
4953     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4954                              in self.wanted_names]
4955     return
4956
4957   def _ComputeDiskStatus(self, instance, snode, dev):
4958     """Compute block device status.
4959
4960     """
4961     static = self.op.static
4962     if not static:
4963       self.cfg.SetDiskID(dev, instance.primary_node)
4964       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4965       dev_pstatus.Raise()
4966       dev_pstatus = dev_pstatus.data
4967     else:
4968       dev_pstatus = None
4969
4970     if dev.dev_type in constants.LDS_DRBD:
4971       # we change the snode then (otherwise we use the one passed in)
4972       if dev.logical_id[0] == instance.primary_node:
4973         snode = dev.logical_id[1]
4974       else:
4975         snode = dev.logical_id[0]
4976
4977     if snode and not static:
4978       self.cfg.SetDiskID(dev, snode)
4979       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4980       dev_sstatus.Raise()
4981       dev_sstatus = dev_sstatus.data
4982     else:
4983       dev_sstatus = None
4984
4985     if dev.children:
4986       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4987                       for child in dev.children]
4988     else:
4989       dev_children = []
4990
4991     data = {
4992       "iv_name": dev.iv_name,
4993       "dev_type": dev.dev_type,
4994       "logical_id": dev.logical_id,
4995       "physical_id": dev.physical_id,
4996       "pstatus": dev_pstatus,
4997       "sstatus": dev_sstatus,
4998       "children": dev_children,
4999       "mode": dev.mode,
5000       }
5001
5002     return data
5003
5004   def Exec(self, feedback_fn):
5005     """Gather and return data"""
5006     result = {}
5007
5008     cluster = self.cfg.GetClusterInfo()
5009
5010     for instance in self.wanted_instances:
5011       if not self.op.static:
5012         remote_info = self.rpc.call_instance_info(instance.primary_node,
5013                                                   instance.name,
5014                                                   instance.hypervisor)
5015         remote_info.Raise()
5016         remote_info = remote_info.data
5017         if remote_info and "state" in remote_info:
5018           remote_state = "up"
5019         else:
5020           remote_state = "down"
5021       else:
5022         remote_state = None
5023       if instance.status == "down":
5024         config_state = "down"
5025       else:
5026         config_state = "up"
5027
5028       disks = [self._ComputeDiskStatus(instance, None, device)
5029                for device in instance.disks]
5030
5031       idict = {
5032         "name": instance.name,
5033         "config_state": config_state,
5034         "run_state": remote_state,
5035         "pnode": instance.primary_node,
5036         "snodes": instance.secondary_nodes,
5037         "os": instance.os,
5038         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5039         "disks": disks,
5040         "hypervisor": instance.hypervisor,
5041         "network_port": instance.network_port,
5042         "hv_instance": instance.hvparams,
5043         "hv_actual": cluster.FillHV(instance),
5044         "be_instance": instance.beparams,
5045         "be_actual": cluster.FillBE(instance),
5046         }
5047
5048       result[instance.name] = idict
5049
5050     return result
5051
5052
5053 class LUSetInstanceParams(LogicalUnit):
5054   """Modifies an instances's parameters.
5055
5056   """
5057   HPATH = "instance-modify"
5058   HTYPE = constants.HTYPE_INSTANCE
5059   _OP_REQP = ["instance_name"]
5060   REQ_BGL = False
5061
5062   def CheckArguments(self):
5063     if not hasattr(self.op, 'nics'):
5064       self.op.nics = []
5065     if not hasattr(self.op, 'disks'):
5066       self.op.disks = []
5067     if not hasattr(self.op, 'beparams'):
5068       self.op.beparams = {}
5069     if not hasattr(self.op, 'hvparams'):
5070       self.op.hvparams = {}
5071     self.op.force = getattr(self.op, "force", False)
5072     if not (self.op.nics or self.op.disks or
5073             self.op.hvparams or self.op.beparams):
5074       raise errors.OpPrereqError("No changes submitted")
5075
5076     utils.CheckBEParams(self.op.beparams)
5077
5078     # Disk validation
5079     disk_addremove = 0
5080     for disk_op, disk_dict in self.op.disks:
5081       if disk_op == constants.DDM_REMOVE:
5082         disk_addremove += 1
5083         continue
5084       elif disk_op == constants.DDM_ADD:
5085         disk_addremove += 1
5086       else:
5087         if not isinstance(disk_op, int):
5088           raise errors.OpPrereqError("Invalid disk index")
5089       if disk_op == constants.DDM_ADD:
5090         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5091         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5092           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5093         size = disk_dict.get('size', None)
5094         if size is None:
5095           raise errors.OpPrereqError("Required disk parameter size missing")
5096         try:
5097           size = int(size)
5098         except ValueError, err:
5099           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5100                                      str(err))
5101         disk_dict['size'] = size
5102       else:
5103         # modification of disk
5104         if 'size' in disk_dict:
5105           raise errors.OpPrereqError("Disk size change not possible, use"
5106                                      " grow-disk")
5107
5108     if disk_addremove > 1:
5109       raise errors.OpPrereqError("Only one disk add or remove operation"
5110                                  " supported at a time")
5111
5112     # NIC validation
5113     nic_addremove = 0
5114     for nic_op, nic_dict in self.op.nics:
5115       if nic_op == constants.DDM_REMOVE:
5116         nic_addremove += 1
5117         continue
5118       elif nic_op == constants.DDM_ADD:
5119         nic_addremove += 1
5120       else:
5121         if not isinstance(nic_op, int):
5122           raise errors.OpPrereqError("Invalid nic index")
5123
5124       # nic_dict should be a dict
5125       nic_ip = nic_dict.get('ip', None)
5126       if nic_ip is not None:
5127         if nic_ip.lower() == "none":
5128           nic_dict['ip'] = None
5129         else:
5130           if not utils.IsValidIP(nic_ip):
5131             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5132       # we can only check None bridges and assign the default one
5133       nic_bridge = nic_dict.get('bridge', None)
5134       if nic_bridge is None:
5135         nic_dict['bridge'] = self.cfg.GetDefBridge()
5136       # but we can validate MACs
5137       nic_mac = nic_dict.get('mac', None)
5138       if nic_mac is not None:
5139         if self.cfg.IsMacInUse(nic_mac):
5140           raise errors.OpPrereqError("MAC address %s already in use"
5141                                      " in cluster" % nic_mac)
5142         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5143           if not utils.IsValidMac(nic_mac):
5144             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5145     if nic_addremove > 1:
5146       raise errors.OpPrereqError("Only one NIC add or remove operation"
5147                                  " supported at a time")
5148
5149   def ExpandNames(self):
5150     self._ExpandAndLockInstance()
5151     self.needed_locks[locking.LEVEL_NODE] = []
5152     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5153
5154   def DeclareLocks(self, level):
5155     if level == locking.LEVEL_NODE:
5156       self._LockInstancesNodes()
5157
5158   def BuildHooksEnv(self):
5159     """Build hooks env.
5160
5161     This runs on the master, primary and secondaries.
5162
5163     """
5164     args = dict()
5165     if constants.BE_MEMORY in self.be_new:
5166       args['memory'] = self.be_new[constants.BE_MEMORY]
5167     if constants.BE_VCPUS in self.be_new:
5168       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5169     # FIXME: readd disk/nic changes
5170     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5171     nl = [self.cfg.GetMasterNode(),
5172           self.instance.primary_node] + list(self.instance.secondary_nodes)
5173     return env, nl, nl
5174
5175   def CheckPrereq(self):
5176     """Check prerequisites.
5177
5178     This only checks the instance list against the existing names.
5179
5180     """
5181     force = self.force = self.op.force
5182
5183     # checking the new params on the primary/secondary nodes
5184
5185     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5186     assert self.instance is not None, \
5187       "Cannot retrieve locked instance %s" % self.op.instance_name
5188     pnode = self.instance.primary_node
5189     nodelist = [pnode]
5190     nodelist.extend(instance.secondary_nodes)
5191
5192     # hvparams processing
5193     if self.op.hvparams:
5194       i_hvdict = copy.deepcopy(instance.hvparams)
5195       for key, val in self.op.hvparams.iteritems():
5196         if val == constants.VALUE_DEFAULT:
5197           try:
5198             del i_hvdict[key]
5199           except KeyError:
5200             pass
5201         elif val == constants.VALUE_NONE:
5202           i_hvdict[key] = None
5203         else:
5204           i_hvdict[key] = val
5205       cluster = self.cfg.GetClusterInfo()
5206       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5207                                 i_hvdict)
5208       # local check
5209       hypervisor.GetHypervisor(
5210         instance.hypervisor).CheckParameterSyntax(hv_new)
5211       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5212       self.hv_new = hv_new # the new actual values
5213       self.hv_inst = i_hvdict # the new dict (without defaults)
5214     else:
5215       self.hv_new = self.hv_inst = {}
5216
5217     # beparams processing
5218     if self.op.beparams:
5219       i_bedict = copy.deepcopy(instance.beparams)
5220       for key, val in self.op.beparams.iteritems():
5221         if val == constants.VALUE_DEFAULT:
5222           try:
5223             del i_bedict[key]
5224           except KeyError:
5225             pass
5226         else:
5227           i_bedict[key] = val
5228       cluster = self.cfg.GetClusterInfo()
5229       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5230                                 i_bedict)
5231       self.be_new = be_new # the new actual values
5232       self.be_inst = i_bedict # the new dict (without defaults)
5233     else:
5234       self.be_new = self.be_inst = {}
5235
5236     self.warn = []
5237
5238     if constants.BE_MEMORY in self.op.beparams and not self.force:
5239       mem_check_list = [pnode]
5240       if be_new[constants.BE_AUTO_BALANCE]:
5241         # either we changed auto_balance to yes or it was from before
5242         mem_check_list.extend(instance.secondary_nodes)
5243       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5244                                                   instance.hypervisor)
5245       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5246                                          instance.hypervisor)
5247       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5248         # Assume the primary node is unreachable and go ahead
5249         self.warn.append("Can't get info from primary node %s" % pnode)
5250       else:
5251         if not instance_info.failed and instance_info.data:
5252           current_mem = instance_info.data['memory']
5253         else:
5254           # Assume instance not running
5255           # (there is a slight race condition here, but it's not very probable,
5256           # and we have no other way to check)
5257           current_mem = 0
5258         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5259                     nodeinfo[pnode].data['memory_free'])
5260         if miss_mem > 0:
5261           raise errors.OpPrereqError("This change will prevent the instance"
5262                                      " from starting, due to %d MB of memory"
5263                                      " missing on its primary node" % miss_mem)
5264
5265       if be_new[constants.BE_AUTO_BALANCE]:
5266         for node, nres in instance.secondary_nodes.iteritems():
5267           if nres.failed or not isinstance(nres.data, dict):
5268             self.warn.append("Can't get info from secondary node %s" % node)
5269           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5270             self.warn.append("Not enough memory to failover instance to"
5271                              " secondary node %s" % node)
5272
5273     # NIC processing
5274     for nic_op, nic_dict in self.op.nics:
5275       if nic_op == constants.DDM_REMOVE:
5276         if not instance.nics:
5277           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5278         continue
5279       if nic_op != constants.DDM_ADD:
5280         # an existing nic
5281         if nic_op < 0 or nic_op >= len(instance.nics):
5282           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5283                                      " are 0 to %d" %
5284                                      (nic_op, len(instance.nics)))
5285       nic_bridge = nic_dict.get('bridge', None)
5286       if nic_bridge is not None:
5287         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5288           msg = ("Bridge '%s' doesn't exist on one of"
5289                  " the instance nodes" % nic_bridge)
5290           if self.force:
5291             self.warn.append(msg)
5292           else:
5293             raise errors.OpPrereqError(msg)
5294
5295     # DISK processing
5296     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5297       raise errors.OpPrereqError("Disk operations not supported for"
5298                                  " diskless instances")
5299     for disk_op, disk_dict in self.op.disks:
5300       if disk_op == constants.DDM_REMOVE:
5301         if len(instance.disks) == 1:
5302           raise errors.OpPrereqError("Cannot remove the last disk of"
5303                                      " an instance")
5304         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5305         ins_l = ins_l[pnode]
5306         if not type(ins_l) is list:
5307           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5308         if instance.name in ins_l:
5309           raise errors.OpPrereqError("Instance is running, can't remove"
5310                                      " disks.")
5311
5312       if (disk_op == constants.DDM_ADD and
5313           len(instance.nics) >= constants.MAX_DISKS):
5314         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5315                                    " add more" % constants.MAX_DISKS)
5316       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5317         # an existing disk
5318         if disk_op < 0 or disk_op >= len(instance.disks):
5319           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5320                                      " are 0 to %d" %
5321                                      (disk_op, len(instance.disks)))
5322
5323     return
5324
5325   def Exec(self, feedback_fn):
5326     """Modifies an instance.
5327
5328     All parameters take effect only at the next restart of the instance.
5329
5330     """
5331     # Process here the warnings from CheckPrereq, as we don't have a
5332     # feedback_fn there.
5333     for warn in self.warn:
5334       feedback_fn("WARNING: %s" % warn)
5335
5336     result = []
5337     instance = self.instance
5338     # disk changes
5339     for disk_op, disk_dict in self.op.disks:
5340       if disk_op == constants.DDM_REMOVE:
5341         # remove the last disk
5342         device = instance.disks.pop()
5343         device_idx = len(instance.disks)
5344         for node, disk in device.ComputeNodeTree(instance.primary_node):
5345           self.cfg.SetDiskID(disk, node)
5346           result = self.rpc.call_blockdev_remove(node, disk)
5347           if result.failed or not result.data:
5348             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5349                                  " continuing anyway", device_idx, node)
5350         result.append(("disk/%d" % device_idx, "remove"))
5351       elif disk_op == constants.DDM_ADD:
5352         # add a new disk
5353         if instance.disk_template == constants.DT_FILE:
5354           file_driver, file_path = instance.disks[0].logical_id
5355           file_path = os.path.dirname(file_path)
5356         else:
5357           file_driver = file_path = None
5358         disk_idx_base = len(instance.disks)
5359         new_disk = _GenerateDiskTemplate(self,
5360                                          instance.disk_template,
5361                                          instance, instance.primary_node,
5362                                          instance.secondary_nodes,
5363                                          [disk_dict],
5364                                          file_path,
5365                                          file_driver,
5366                                          disk_idx_base)[0]
5367         new_disk.mode = disk_dict['mode']
5368         instance.disks.append(new_disk)
5369         info = _GetInstanceInfoText(instance)
5370
5371         logging.info("Creating volume %s for instance %s",
5372                      new_disk.iv_name, instance.name)
5373         # Note: this needs to be kept in sync with _CreateDisks
5374         #HARDCODE
5375         for secondary_node in instance.secondary_nodes:
5376           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5377                                             new_disk, False, info):
5378             self.LogWarning("Failed to create volume %s (%s) on"
5379                             " secondary node %s!",
5380                             new_disk.iv_name, new_disk, secondary_node)
5381         #HARDCODE
5382         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5383                                         instance, new_disk, info):
5384           self.LogWarning("Failed to create volume %s on primary!",
5385                           new_disk.iv_name)
5386         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5387                        (new_disk.size, new_disk.mode)))
5388       else:
5389         # change a given disk
5390         instance.disks[disk_op].mode = disk_dict['mode']
5391         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5392     # NIC changes
5393     for nic_op, nic_dict in self.op.nics:
5394       if nic_op == constants.DDM_REMOVE:
5395         # remove the last nic
5396         del instance.nics[-1]
5397         result.append(("nic.%d" % len(instance.nics), "remove"))
5398       elif nic_op == constants.DDM_ADD:
5399         # add a new nic
5400         if 'mac' not in nic_dict:
5401           mac = constants.VALUE_GENERATE
5402         else:
5403           mac = nic_dict['mac']
5404         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5405           mac = self.cfg.GenerateMAC()
5406         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5407                               bridge=nic_dict.get('bridge', None))
5408         instance.nics.append(new_nic)
5409         result.append(("nic.%d" % (len(instance.nics) - 1),
5410                        "add:mac=%s,ip=%s,bridge=%s" %
5411                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5412       else:
5413         # change a given nic
5414         for key in 'mac', 'ip', 'bridge':
5415           if key in nic_dict:
5416             setattr(instance.nics[nic_op], key, nic_dict[key])
5417             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5418
5419     # hvparams changes
5420     if self.op.hvparams:
5421       instance.hvparams = self.hv_new
5422       for key, val in self.op.hvparams.iteritems():
5423         result.append(("hv/%s" % key, val))
5424
5425     # beparams changes
5426     if self.op.beparams:
5427       instance.beparams = self.be_inst
5428       for key, val in self.op.beparams.iteritems():
5429         result.append(("be/%s" % key, val))
5430
5431     self.cfg.Update(instance)
5432
5433     return result
5434
5435
5436 class LUQueryExports(NoHooksLU):
5437   """Query the exports list
5438
5439   """
5440   _OP_REQP = ['nodes']
5441   REQ_BGL = False
5442
5443   def ExpandNames(self):
5444     self.needed_locks = {}
5445     self.share_locks[locking.LEVEL_NODE] = 1
5446     if not self.op.nodes:
5447       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5448     else:
5449       self.needed_locks[locking.LEVEL_NODE] = \
5450         _GetWantedNodes(self, self.op.nodes)
5451
5452   def CheckPrereq(self):
5453     """Check prerequisites.
5454
5455     """
5456     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5457
5458   def Exec(self, feedback_fn):
5459     """Compute the list of all the exported system images.
5460
5461     @rtype: dict
5462     @return: a dictionary with the structure node->(export-list)
5463         where export-list is a list of the instances exported on
5464         that node.
5465
5466     """
5467     rpcresult = self.rpc.call_export_list(self.nodes)
5468     result = {}
5469     for node in rpcresult:
5470       if rpcresult[node].failed:
5471         result[node] = False
5472       else:
5473         result[node] = rpcresult[node].data
5474
5475     return result
5476
5477
5478 class LUExportInstance(LogicalUnit):
5479   """Export an instance to an image in the cluster.
5480
5481   """
5482   HPATH = "instance-export"
5483   HTYPE = constants.HTYPE_INSTANCE
5484   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5485   REQ_BGL = False
5486
5487   def ExpandNames(self):
5488     self._ExpandAndLockInstance()
5489     # FIXME: lock only instance primary and destination node
5490     #
5491     # Sad but true, for now we have do lock all nodes, as we don't know where
5492     # the previous export might be, and and in this LU we search for it and
5493     # remove it from its current node. In the future we could fix this by:
5494     #  - making a tasklet to search (share-lock all), then create the new one,
5495     #    then one to remove, after
5496     #  - removing the removal operation altoghether
5497     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5498
5499   def DeclareLocks(self, level):
5500     """Last minute lock declaration."""
5501     # All nodes are locked anyway, so nothing to do here.
5502
5503   def BuildHooksEnv(self):
5504     """Build hooks env.
5505
5506     This will run on the master, primary node and target node.
5507
5508     """
5509     env = {
5510       "EXPORT_NODE": self.op.target_node,
5511       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5512       }
5513     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5514     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5515           self.op.target_node]
5516     return env, nl, nl
5517
5518   def CheckPrereq(self):
5519     """Check prerequisites.
5520
5521     This checks that the instance and node names are valid.
5522
5523     """
5524     instance_name = self.op.instance_name
5525     self.instance = self.cfg.GetInstanceInfo(instance_name)
5526     assert self.instance is not None, \
5527           "Cannot retrieve locked instance %s" % self.op.instance_name
5528     _CheckNodeOnline(self, self.instance.primary_node)
5529
5530     self.dst_node = self.cfg.GetNodeInfo(
5531       self.cfg.ExpandNodeName(self.op.target_node))
5532
5533     if self.dst_node is None:
5534       # This is wrong node name, not a non-locked node
5535       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5536     _CheckNodeOnline(self, self.op.target_node)
5537
5538     # instance disk type verification
5539     for disk in self.instance.disks:
5540       if disk.dev_type == constants.LD_FILE:
5541         raise errors.OpPrereqError("Export not supported for instances with"
5542                                    " file-based disks")
5543
5544   def Exec(self, feedback_fn):
5545     """Export an instance to an image in the cluster.
5546
5547     """
5548     instance = self.instance
5549     dst_node = self.dst_node
5550     src_node = instance.primary_node
5551     if self.op.shutdown:
5552       # shutdown the instance, but not the disks
5553       result = self.rpc.call_instance_shutdown(src_node, instance)
5554       result.Raise()
5555       if not result.data:
5556         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5557                                  (instance.name, src_node))
5558
5559     vgname = self.cfg.GetVGName()
5560
5561     snap_disks = []
5562
5563     try:
5564       for disk in instance.disks:
5565         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5566         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5567         if new_dev_name.failed or not new_dev_name.data:
5568           self.LogWarning("Could not snapshot block device %s on node %s",
5569                           disk.logical_id[1], src_node)
5570           snap_disks.append(False)
5571         else:
5572           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5573                                  logical_id=(vgname, new_dev_name.data),
5574                                  physical_id=(vgname, new_dev_name.data),
5575                                  iv_name=disk.iv_name)
5576           snap_disks.append(new_dev)
5577
5578     finally:
5579       if self.op.shutdown and instance.status == "up":
5580         result = self.rpc.call_instance_start(src_node, instance, None)
5581         if result.failed or not result.data:
5582           _ShutdownInstanceDisks(self, instance)
5583           raise errors.OpExecError("Could not start instance")
5584
5585     # TODO: check for size
5586
5587     cluster_name = self.cfg.GetClusterName()
5588     for idx, dev in enumerate(snap_disks):
5589       if dev:
5590         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5591                                                instance, cluster_name, idx)
5592         if result.failed or not result.data:
5593           self.LogWarning("Could not export block device %s from node %s to"
5594                           " node %s", dev.logical_id[1], src_node,
5595                           dst_node.name)
5596         result = self.rpc.call_blockdev_remove(src_node, dev)
5597         if result.failed or not result.data:
5598           self.LogWarning("Could not remove snapshot block device %s from node"
5599                           " %s", dev.logical_id[1], src_node)
5600
5601     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5602     if result.failed or not result.data:
5603       self.LogWarning("Could not finalize export for instance %s on node %s",
5604                       instance.name, dst_node.name)
5605
5606     nodelist = self.cfg.GetNodeList()
5607     nodelist.remove(dst_node.name)
5608
5609     # on one-node clusters nodelist will be empty after the removal
5610     # if we proceed the backup would be removed because OpQueryExports
5611     # substitutes an empty list with the full cluster node list.
5612     if nodelist:
5613       exportlist = self.rpc.call_export_list(nodelist)
5614       for node in exportlist:
5615         if exportlist[node].failed:
5616           continue
5617         if instance.name in exportlist[node].data:
5618           if not self.rpc.call_export_remove(node, instance.name):
5619             self.LogWarning("Could not remove older export for instance %s"
5620                             " on node %s", instance.name, node)
5621
5622
5623 class LURemoveExport(NoHooksLU):
5624   """Remove exports related to the named instance.
5625
5626   """
5627   _OP_REQP = ["instance_name"]
5628   REQ_BGL = False
5629
5630   def ExpandNames(self):
5631     self.needed_locks = {}
5632     # We need all nodes to be locked in order for RemoveExport to work, but we
5633     # don't need to lock the instance itself, as nothing will happen to it (and
5634     # we can remove exports also for a removed instance)
5635     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5636
5637   def CheckPrereq(self):
5638     """Check prerequisites.
5639     """
5640     pass
5641
5642   def Exec(self, feedback_fn):
5643     """Remove any export.
5644
5645     """
5646     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5647     # If the instance was not found we'll try with the name that was passed in.
5648     # This will only work if it was an FQDN, though.
5649     fqdn_warn = False
5650     if not instance_name:
5651       fqdn_warn = True
5652       instance_name = self.op.instance_name
5653
5654     exportlist = self.rpc.call_export_list(self.acquired_locks[
5655       locking.LEVEL_NODE])
5656     found = False
5657     for node in exportlist:
5658       if exportlist[node].failed:
5659         self.LogWarning("Failed to query node %s, continuing" % node)
5660         continue
5661       if instance_name in exportlist[node].data:
5662         found = True
5663         result = self.rpc.call_export_remove(node, instance_name)
5664         if result.failed or not result.data:
5665           logging.error("Could not remove export for instance %s"
5666                         " on node %s", instance_name, node)
5667
5668     if fqdn_warn and not found:
5669       feedback_fn("Export not found. If trying to remove an export belonging"
5670                   " to a deleted instance please use its Fully Qualified"
5671                   " Domain Name.")
5672
5673
5674 class TagsLU(NoHooksLU):
5675   """Generic tags LU.
5676
5677   This is an abstract class which is the parent of all the other tags LUs.
5678
5679   """
5680
5681   def ExpandNames(self):
5682     self.needed_locks = {}
5683     if self.op.kind == constants.TAG_NODE:
5684       name = self.cfg.ExpandNodeName(self.op.name)
5685       if name is None:
5686         raise errors.OpPrereqError("Invalid node name (%s)" %
5687                                    (self.op.name,))
5688       self.op.name = name
5689       self.needed_locks[locking.LEVEL_NODE] = name
5690     elif self.op.kind == constants.TAG_INSTANCE:
5691       name = self.cfg.ExpandInstanceName(self.op.name)
5692       if name is None:
5693         raise errors.OpPrereqError("Invalid instance name (%s)" %
5694                                    (self.op.name,))
5695       self.op.name = name
5696       self.needed_locks[locking.LEVEL_INSTANCE] = name
5697
5698   def CheckPrereq(self):
5699     """Check prerequisites.
5700
5701     """
5702     if self.op.kind == constants.TAG_CLUSTER:
5703       self.target = self.cfg.GetClusterInfo()
5704     elif self.op.kind == constants.TAG_NODE:
5705       self.target = self.cfg.GetNodeInfo(self.op.name)
5706     elif self.op.kind == constants.TAG_INSTANCE:
5707       self.target = self.cfg.GetInstanceInfo(self.op.name)
5708     else:
5709       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5710                                  str(self.op.kind))
5711
5712
5713 class LUGetTags(TagsLU):
5714   """Returns the tags of a given object.
5715
5716   """
5717   _OP_REQP = ["kind", "name"]
5718   REQ_BGL = False
5719
5720   def Exec(self, feedback_fn):
5721     """Returns the tag list.
5722
5723     """
5724     return list(self.target.GetTags())
5725
5726
5727 class LUSearchTags(NoHooksLU):
5728   """Searches the tags for a given pattern.
5729
5730   """
5731   _OP_REQP = ["pattern"]
5732   REQ_BGL = False
5733
5734   def ExpandNames(self):
5735     self.needed_locks = {}
5736
5737   def CheckPrereq(self):
5738     """Check prerequisites.
5739
5740     This checks the pattern passed for validity by compiling it.
5741
5742     """
5743     try:
5744       self.re = re.compile(self.op.pattern)
5745     except re.error, err:
5746       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5747                                  (self.op.pattern, err))
5748
5749   def Exec(self, feedback_fn):
5750     """Returns the tag list.
5751
5752     """
5753     cfg = self.cfg
5754     tgts = [("/cluster", cfg.GetClusterInfo())]
5755     ilist = cfg.GetAllInstancesInfo().values()
5756     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5757     nlist = cfg.GetAllNodesInfo().values()
5758     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5759     results = []
5760     for path, target in tgts:
5761       for tag in target.GetTags():
5762         if self.re.search(tag):
5763           results.append((path, tag))
5764     return results
5765
5766
5767 class LUAddTags(TagsLU):
5768   """Sets a tag on a given object.
5769
5770   """
5771   _OP_REQP = ["kind", "name", "tags"]
5772   REQ_BGL = False
5773
5774   def CheckPrereq(self):
5775     """Check prerequisites.
5776
5777     This checks the type and length of the tag name and value.
5778
5779     """
5780     TagsLU.CheckPrereq(self)
5781     for tag in self.op.tags:
5782       objects.TaggableObject.ValidateTag(tag)
5783
5784   def Exec(self, feedback_fn):
5785     """Sets the tag.
5786
5787     """
5788     try:
5789       for tag in self.op.tags:
5790         self.target.AddTag(tag)
5791     except errors.TagError, err:
5792       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5793     try:
5794       self.cfg.Update(self.target)
5795     except errors.ConfigurationError:
5796       raise errors.OpRetryError("There has been a modification to the"
5797                                 " config file and the operation has been"
5798                                 " aborted. Please retry.")
5799
5800
5801 class LUDelTags(TagsLU):
5802   """Delete a list of tags from a given object.
5803
5804   """
5805   _OP_REQP = ["kind", "name", "tags"]
5806   REQ_BGL = False
5807
5808   def CheckPrereq(self):
5809     """Check prerequisites.
5810
5811     This checks that we have the given tag.
5812
5813     """
5814     TagsLU.CheckPrereq(self)
5815     for tag in self.op.tags:
5816       objects.TaggableObject.ValidateTag(tag)
5817     del_tags = frozenset(self.op.tags)
5818     cur_tags = self.target.GetTags()
5819     if not del_tags <= cur_tags:
5820       diff_tags = del_tags - cur_tags
5821       diff_names = ["'%s'" % tag for tag in diff_tags]
5822       diff_names.sort()
5823       raise errors.OpPrereqError("Tag(s) %s not found" %
5824                                  (",".join(diff_names)))
5825
5826   def Exec(self, feedback_fn):
5827     """Remove the tag from the object.
5828
5829     """
5830     for tag in self.op.tags:
5831       self.target.RemoveTag(tag)
5832     try:
5833       self.cfg.Update(self.target)
5834     except errors.ConfigurationError:
5835       raise errors.OpRetryError("There has been a modification to the"
5836                                 " config file and the operation has been"
5837                                 " aborted. Please retry.")
5838
5839
5840 class LUTestDelay(NoHooksLU):
5841   """Sleep for a specified amount of time.
5842
5843   This LU sleeps on the master and/or nodes for a specified amount of
5844   time.
5845
5846   """
5847   _OP_REQP = ["duration", "on_master", "on_nodes"]
5848   REQ_BGL = False
5849
5850   def ExpandNames(self):
5851     """Expand names and set required locks.
5852
5853     This expands the node list, if any.
5854
5855     """
5856     self.needed_locks = {}
5857     if self.op.on_nodes:
5858       # _GetWantedNodes can be used here, but is not always appropriate to use
5859       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5860       # more information.
5861       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5862       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5863
5864   def CheckPrereq(self):
5865     """Check prerequisites.
5866
5867     """
5868
5869   def Exec(self, feedback_fn):
5870     """Do the actual sleep.
5871
5872     """
5873     if self.op.on_master:
5874       if not utils.TestDelay(self.op.duration):
5875         raise errors.OpExecError("Error during master delay test")
5876     if self.op.on_nodes:
5877       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5878       if not result:
5879         raise errors.OpExecError("Complete failure from rpc call")
5880       for node, node_result in result.items():
5881         node_result.Raise()
5882         if not node_result.data:
5883           raise errors.OpExecError("Failure during rpc call to node %s,"
5884                                    " result: %s" % (node, node_result.data))
5885
5886
5887 class IAllocator(object):
5888   """IAllocator framework.
5889
5890   An IAllocator instance has three sets of attributes:
5891     - cfg that is needed to query the cluster
5892     - input data (all members of the _KEYS class attribute are required)
5893     - four buffer attributes (in|out_data|text), that represent the
5894       input (to the external script) in text and data structure format,
5895       and the output from it, again in two formats
5896     - the result variables from the script (success, info, nodes) for
5897       easy usage
5898
5899   """
5900   _ALLO_KEYS = [
5901     "mem_size", "disks", "disk_template",
5902     "os", "tags", "nics", "vcpus", "hypervisor",
5903     ]
5904   _RELO_KEYS = [
5905     "relocate_from",
5906     ]
5907
5908   def __init__(self, lu, mode, name, **kwargs):
5909     self.lu = lu
5910     # init buffer variables
5911     self.in_text = self.out_text = self.in_data = self.out_data = None
5912     # init all input fields so that pylint is happy
5913     self.mode = mode
5914     self.name = name
5915     self.mem_size = self.disks = self.disk_template = None
5916     self.os = self.tags = self.nics = self.vcpus = None
5917     self.hypervisor = None
5918     self.relocate_from = None
5919     # computed fields
5920     self.required_nodes = None
5921     # init result fields
5922     self.success = self.info = self.nodes = None
5923     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5924       keyset = self._ALLO_KEYS
5925     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5926       keyset = self._RELO_KEYS
5927     else:
5928       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5929                                    " IAllocator" % self.mode)
5930     for key in kwargs:
5931       if key not in keyset:
5932         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5933                                      " IAllocator" % key)
5934       setattr(self, key, kwargs[key])
5935     for key in keyset:
5936       if key not in kwargs:
5937         raise errors.ProgrammerError("Missing input parameter '%s' to"
5938                                      " IAllocator" % key)
5939     self._BuildInputData()
5940
5941   def _ComputeClusterData(self):
5942     """Compute the generic allocator input data.
5943
5944     This is the data that is independent of the actual operation.
5945
5946     """
5947     cfg = self.lu.cfg
5948     cluster_info = cfg.GetClusterInfo()
5949     # cluster data
5950     data = {
5951       "version": 1,
5952       "cluster_name": cfg.GetClusterName(),
5953       "cluster_tags": list(cluster_info.GetTags()),
5954       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5955       # we don't have job IDs
5956       }
5957     iinfo = cfg.GetAllInstancesInfo().values()
5958     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5959
5960     # node data
5961     node_results = {}
5962     node_list = cfg.GetNodeList()
5963
5964     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5965       hypervisor_name = self.hypervisor
5966     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5967       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
5968
5969     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5970                                            hypervisor_name)
5971     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5972                        cluster_info.enabled_hypervisors)
5973     for nname in node_list:
5974       ninfo = cfg.GetNodeInfo(nname)
5975       node_data[nname].Raise()
5976       if not isinstance(node_data[nname].data, dict):
5977         raise errors.OpExecError("Can't get data for node %s" % nname)
5978       remote_info = node_data[nname].data
5979       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5980                    'vg_size', 'vg_free', 'cpu_total']:
5981         if attr not in remote_info:
5982           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5983                                    (nname, attr))
5984         try:
5985           remote_info[attr] = int(remote_info[attr])
5986         except ValueError, err:
5987           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5988                                    " %s" % (nname, attr, str(err)))
5989       # compute memory used by primary instances
5990       i_p_mem = i_p_up_mem = 0
5991       for iinfo, beinfo in i_list:
5992         if iinfo.primary_node == nname:
5993           i_p_mem += beinfo[constants.BE_MEMORY]
5994           if iinfo.name not in node_iinfo[nname]:
5995             i_used_mem = 0
5996           else:
5997             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5998           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5999           remote_info['memory_free'] -= max(0, i_mem_diff)
6000
6001           if iinfo.status == "up":
6002             i_p_up_mem += beinfo[constants.BE_MEMORY]
6003
6004       # compute memory used by instances
6005       pnr = {
6006         "tags": list(ninfo.GetTags()),
6007         "total_memory": remote_info['memory_total'],
6008         "reserved_memory": remote_info['memory_dom0'],
6009         "free_memory": remote_info['memory_free'],
6010         "i_pri_memory": i_p_mem,
6011         "i_pri_up_memory": i_p_up_mem,
6012         "total_disk": remote_info['vg_size'],
6013         "free_disk": remote_info['vg_free'],
6014         "primary_ip": ninfo.primary_ip,
6015         "secondary_ip": ninfo.secondary_ip,
6016         "total_cpus": remote_info['cpu_total'],
6017         "offline": ninfo.offline,
6018         }
6019       node_results[nname] = pnr
6020     data["nodes"] = node_results
6021
6022     # instance data
6023     instance_data = {}
6024     for iinfo, beinfo in i_list:
6025       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6026                   for n in iinfo.nics]
6027       pir = {
6028         "tags": list(iinfo.GetTags()),
6029         "should_run": iinfo.status == "up",
6030         "vcpus": beinfo[constants.BE_VCPUS],
6031         "memory": beinfo[constants.BE_MEMORY],
6032         "os": iinfo.os,
6033         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6034         "nics": nic_data,
6035         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6036         "disk_template": iinfo.disk_template,
6037         "hypervisor": iinfo.hypervisor,
6038         }
6039       instance_data[iinfo.name] = pir
6040
6041     data["instances"] = instance_data
6042
6043     self.in_data = data
6044
6045   def _AddNewInstance(self):
6046     """Add new instance data to allocator structure.
6047
6048     This in combination with _AllocatorGetClusterData will create the
6049     correct structure needed as input for the allocator.
6050
6051     The checks for the completeness of the opcode must have already been
6052     done.
6053
6054     """
6055     data = self.in_data
6056     if len(self.disks) != 2:
6057       raise errors.OpExecError("Only two-disk configurations supported")
6058
6059     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6060
6061     if self.disk_template in constants.DTS_NET_MIRROR:
6062       self.required_nodes = 2
6063     else:
6064       self.required_nodes = 1
6065     request = {
6066       "type": "allocate",
6067       "name": self.name,
6068       "disk_template": self.disk_template,
6069       "tags": self.tags,
6070       "os": self.os,
6071       "vcpus": self.vcpus,
6072       "memory": self.mem_size,
6073       "disks": self.disks,
6074       "disk_space_total": disk_space,
6075       "nics": self.nics,
6076       "required_nodes": self.required_nodes,
6077       }
6078     data["request"] = request
6079
6080   def _AddRelocateInstance(self):
6081     """Add relocate instance data to allocator structure.
6082
6083     This in combination with _IAllocatorGetClusterData will create the
6084     correct structure needed as input for the allocator.
6085
6086     The checks for the completeness of the opcode must have already been
6087     done.
6088
6089     """
6090     instance = self.lu.cfg.GetInstanceInfo(self.name)
6091     if instance is None:
6092       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6093                                    " IAllocator" % self.name)
6094
6095     if instance.disk_template not in constants.DTS_NET_MIRROR:
6096       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6097
6098     if len(instance.secondary_nodes) != 1:
6099       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6100
6101     self.required_nodes = 1
6102     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6103     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6104
6105     request = {
6106       "type": "relocate",
6107       "name": self.name,
6108       "disk_space_total": disk_space,
6109       "required_nodes": self.required_nodes,
6110       "relocate_from": self.relocate_from,
6111       }
6112     self.in_data["request"] = request
6113
6114   def _BuildInputData(self):
6115     """Build input data structures.
6116
6117     """
6118     self._ComputeClusterData()
6119
6120     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6121       self._AddNewInstance()
6122     else:
6123       self._AddRelocateInstance()
6124
6125     self.in_text = serializer.Dump(self.in_data)
6126
6127   def Run(self, name, validate=True, call_fn=None):
6128     """Run an instance allocator and return the results.
6129
6130     """
6131     if call_fn is None:
6132       call_fn = self.lu.rpc.call_iallocator_runner
6133     data = self.in_text
6134
6135     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6136     result.Raise()
6137
6138     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6139       raise errors.OpExecError("Invalid result from master iallocator runner")
6140
6141     rcode, stdout, stderr, fail = result.data
6142
6143     if rcode == constants.IARUN_NOTFOUND:
6144       raise errors.OpExecError("Can't find allocator '%s'" % name)
6145     elif rcode == constants.IARUN_FAILURE:
6146       raise errors.OpExecError("Instance allocator call failed: %s,"
6147                                " output: %s" % (fail, stdout+stderr))
6148     self.out_text = stdout
6149     if validate:
6150       self._ValidateResult()
6151
6152   def _ValidateResult(self):
6153     """Process the allocator results.
6154
6155     This will process and if successful save the result in
6156     self.out_data and the other parameters.
6157
6158     """
6159     try:
6160       rdict = serializer.Load(self.out_text)
6161     except Exception, err:
6162       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6163
6164     if not isinstance(rdict, dict):
6165       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6166
6167     for key in "success", "info", "nodes":
6168       if key not in rdict:
6169         raise errors.OpExecError("Can't parse iallocator results:"
6170                                  " missing key '%s'" % key)
6171       setattr(self, key, rdict[key])
6172
6173     if not isinstance(rdict["nodes"], list):
6174       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6175                                " is not a list")
6176     self.out_data = rdict
6177
6178
6179 class LUTestAllocator(NoHooksLU):
6180   """Run allocator tests.
6181
6182   This LU runs the allocator tests
6183
6184   """
6185   _OP_REQP = ["direction", "mode", "name"]
6186
6187   def CheckPrereq(self):
6188     """Check prerequisites.
6189
6190     This checks the opcode parameters depending on the director and mode test.
6191
6192     """
6193     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6194       for attr in ["name", "mem_size", "disks", "disk_template",
6195                    "os", "tags", "nics", "vcpus"]:
6196         if not hasattr(self.op, attr):
6197           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6198                                      attr)
6199       iname = self.cfg.ExpandInstanceName(self.op.name)
6200       if iname is not None:
6201         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6202                                    iname)
6203       if not isinstance(self.op.nics, list):
6204         raise errors.OpPrereqError("Invalid parameter 'nics'")
6205       for row in self.op.nics:
6206         if (not isinstance(row, dict) or
6207             "mac" not in row or
6208             "ip" not in row or
6209             "bridge" not in row):
6210           raise errors.OpPrereqError("Invalid contents of the"
6211                                      " 'nics' parameter")
6212       if not isinstance(self.op.disks, list):
6213         raise errors.OpPrereqError("Invalid parameter 'disks'")
6214       if len(self.op.disks) != 2:
6215         raise errors.OpPrereqError("Only two-disk configurations supported")
6216       for row in self.op.disks:
6217         if (not isinstance(row, dict) or
6218             "size" not in row or
6219             not isinstance(row["size"], int) or
6220             "mode" not in row or
6221             row["mode"] not in ['r', 'w']):
6222           raise errors.OpPrereqError("Invalid contents of the"
6223                                      " 'disks' parameter")
6224       if self.op.hypervisor is None:
6225         self.op.hypervisor = self.cfg.GetHypervisorType()
6226     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6227       if not hasattr(self.op, "name"):
6228         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6229       fname = self.cfg.ExpandInstanceName(self.op.name)
6230       if fname is None:
6231         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6232                                    self.op.name)
6233       self.op.name = fname
6234       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6235     else:
6236       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6237                                  self.op.mode)
6238
6239     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6240       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6241         raise errors.OpPrereqError("Missing allocator name")
6242     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6243       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6244                                  self.op.direction)
6245
6246   def Exec(self, feedback_fn):
6247     """Run the allocator test.
6248
6249     """
6250     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6251       ial = IAllocator(self,
6252                        mode=self.op.mode,
6253                        name=self.op.name,
6254                        mem_size=self.op.mem_size,
6255                        disks=self.op.disks,
6256                        disk_template=self.op.disk_template,
6257                        os=self.op.os,
6258                        tags=self.op.tags,
6259                        nics=self.op.nics,
6260                        vcpus=self.op.vcpus,
6261                        hypervisor=self.op.hypervisor,
6262                        )
6263     else:
6264       ial = IAllocator(self,
6265                        mode=self.op.mode,
6266                        name=self.op.name,
6267                        relocate_from=list(self.relocate_from),
6268                        )
6269
6270     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6271       result = ial.in_text
6272     else:
6273       ial.Run(self.op.allocator, validate=False)
6274       result = ial.out_text
6275     return result