Release ganeti 2.0~alpha1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: string
459   @param status: the desired status of the instances
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   env = {
472     "OP_TARGET": name,
473     "INSTANCE_NAME": name,
474     "INSTANCE_PRIMARY": primary_node,
475     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
476     "INSTANCE_OS_TYPE": os_type,
477     "INSTANCE_STATUS": status,
478     "INSTANCE_MEMORY": memory,
479     "INSTANCE_VCPUS": vcpus,
480   }
481
482   if nics:
483     nic_count = len(nics)
484     for idx, (ip, bridge, mac) in enumerate(nics):
485       if ip is None:
486         ip = ""
487       env["INSTANCE_NIC%d_IP" % idx] = ip
488       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
489       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
490   else:
491     nic_count = 0
492
493   env["INSTANCE_NIC_COUNT"] = nic_count
494
495   return env
496
497
498 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
499   """Builds instance related env variables for hooks from an object.
500
501   @type lu: L{LogicalUnit}
502   @param lu: the logical unit on whose behalf we execute
503   @type instance: L{objects.Instance}
504   @param instance: the instance for which we should build the
505       environment
506   @type override: dict
507   @param override: dictionary with key/values that will override
508       our values
509   @rtype: dict
510   @return: the hook environment dictionary
511
512   """
513   bep = lu.cfg.GetClusterInfo().FillBE(instance)
514   args = {
515     'name': instance.name,
516     'primary_node': instance.primary_node,
517     'secondary_nodes': instance.secondary_nodes,
518     'os_type': instance.os,
519     'status': instance.os,
520     'memory': bep[constants.BE_MEMORY],
521     'vcpus': bep[constants.BE_VCPUS],
522     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
523   }
524   if override:
525     args.update(override)
526   return _BuildInstanceHookEnv(**args)
527
528
529 def _AdjustCandidatePool(lu):
530   """Adjust the candidate pool after node operations.
531
532   """
533   mod_list = lu.cfg.MaintainCandidatePool()
534   if mod_list:
535     lu.LogInfo("Promoted nodes to master candidate role: %s",
536                ", ".join(node.name for node in mod_list))
537     for name in mod_list:
538       lu.context.ReaddNode(name)
539   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
540   if mc_now > mc_max:
541     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
542                (mc_now, mc_max))
543
544
545 def _CheckInstanceBridgesExist(lu, instance):
546   """Check that the brigdes needed by an instance exist.
547
548   """
549   # check bridges existance
550   brlist = [nic.bridge for nic in instance.nics]
551   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
552   result.Raise()
553   if not result.data:
554     raise errors.OpPrereqError("One or more target bridges %s does not"
555                                " exist on destination node '%s'" %
556                                (brlist, instance.primary_node))
557
558
559 class LUDestroyCluster(NoHooksLU):
560   """Logical unit for destroying the cluster.
561
562   """
563   _OP_REQP = []
564
565   def CheckPrereq(self):
566     """Check prerequisites.
567
568     This checks whether the cluster is empty.
569
570     Any errors are signalled by raising errors.OpPrereqError.
571
572     """
573     master = self.cfg.GetMasterNode()
574
575     nodelist = self.cfg.GetNodeList()
576     if len(nodelist) != 1 or nodelist[0] != master:
577       raise errors.OpPrereqError("There are still %d node(s) in"
578                                  " this cluster." % (len(nodelist) - 1))
579     instancelist = self.cfg.GetInstanceList()
580     if instancelist:
581       raise errors.OpPrereqError("There are still %d instance(s) in"
582                                  " this cluster." % len(instancelist))
583
584   def Exec(self, feedback_fn):
585     """Destroys the cluster.
586
587     """
588     master = self.cfg.GetMasterNode()
589     result = self.rpc.call_node_stop_master(master, False)
590     result.Raise()
591     if not result.data:
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     return master
597
598
599 class LUVerifyCluster(LogicalUnit):
600   """Verifies the cluster status.
601
602   """
603   HPATH = "cluster-verify"
604   HTYPE = constants.HTYPE_CLUSTER
605   _OP_REQP = ["skip_checks"]
606   REQ_BGL = False
607
608   def ExpandNames(self):
609     self.needed_locks = {
610       locking.LEVEL_NODE: locking.ALL_SET,
611       locking.LEVEL_INSTANCE: locking.ALL_SET,
612     }
613     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
614
615   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
616                   node_result, feedback_fn, master_files):
617     """Run multiple tests against a node.
618
619     Test list:
620
621       - compares ganeti version
622       - checks vg existance and size > 20G
623       - checks config file checksum
624       - checks ssh to other nodes
625
626     @type nodeinfo: L{objects.Node}
627     @param nodeinfo: the node to check
628     @param file_list: required list of files
629     @param local_cksum: dictionary of local files and their checksums
630     @param node_result: the results from the node
631     @param feedback_fn: function used to accumulate results
632     @param master_files: list of files that only masters should have
633
634     """
635     node = nodeinfo.name
636
637     # main result, node_result should be a non-empty dict
638     if not node_result or not isinstance(node_result, dict):
639       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
640       return True
641
642     # compares ganeti version
643     local_version = constants.PROTOCOL_VERSION
644     remote_version = node_result.get('version', None)
645     if not remote_version:
646       feedback_fn("  - ERROR: connection to %s failed" % (node))
647       return True
648
649     if local_version != remote_version:
650       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
651                       (local_version, node, remote_version))
652       return True
653
654     # checks vg existance and size > 20G
655
656     bad = False
657     vglist = node_result.get(constants.NV_VGLIST, None)
658     if not vglist:
659       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
660                       (node,))
661       bad = True
662     else:
663       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
664                                             constants.MIN_VG_SIZE)
665       if vgstatus:
666         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
667         bad = True
668
669     # checks config file checksum
670
671     remote_cksum = node_result.get(constants.NV_FILELIST, None)
672     if not isinstance(remote_cksum, dict):
673       bad = True
674       feedback_fn("  - ERROR: node hasn't returned file checksum data")
675     else:
676       for file_name in file_list:
677         node_is_mc = nodeinfo.master_candidate
678         must_have_file = file_name not in master_files
679         if file_name not in remote_cksum:
680           if node_is_mc or must_have_file:
681             bad = True
682             feedback_fn("  - ERROR: file '%s' missing" % file_name)
683         elif remote_cksum[file_name] != local_cksum[file_name]:
684           if node_is_mc or must_have_file:
685             bad = True
686             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
687           else:
688             # not candidate and this is not a must-have file
689             bad = True
690             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
691                         " '%s'" % file_name)
692         else:
693           # all good, except non-master/non-must have combination
694           if not node_is_mc and not must_have_file:
695             feedback_fn("  - ERROR: file '%s' should not exist on non master"
696                         " candidates" % file_name)
697
698     # checks ssh to any
699
700     if constants.NV_NODELIST not in node_result:
701       bad = True
702       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
703     else:
704       if node_result[constants.NV_NODELIST]:
705         bad = True
706         for node in node_result[constants.NV_NODELIST]:
707           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
708                           (node, node_result[constants.NV_NODELIST][node]))
709
710     if constants.NV_NODENETTEST not in node_result:
711       bad = True
712       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
713     else:
714       if node_result[constants.NV_NODENETTEST]:
715         bad = True
716         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
717         for node in nlist:
718           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
719                           (node, node_result[constants.NV_NODENETTEST][node]))
720
721     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
722     if isinstance(hyp_result, dict):
723       for hv_name, hv_result in hyp_result.iteritems():
724         if hv_result is not None:
725           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
726                       (hv_name, hv_result))
727     return bad
728
729   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
730                       node_instance, feedback_fn, n_offline):
731     """Verify an instance.
732
733     This function checks to see if the required block devices are
734     available on the instance's node.
735
736     """
737     bad = False
738
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       if node in n_offline:
746         # ignore missing volumes on offline nodes
747         continue
748       for volume in node_vol_should[node]:
749         if node not in node_vol_is or volume not in node_vol_is[node]:
750           feedback_fn("  - ERROR: volume %s missing on node %s" %
751                           (volume, node))
752           bad = True
753
754     if not instanceconfig.status == 'down':
755       if ((node_current not in node_instance or
756           not instance in node_instance[node_current]) and
757           node_current not in n_offline):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
804     """Verify N+1 Memory Resilience.
805
806     Check that if one single node dies we can still start all the instances it
807     was primary for.
808
809     """
810     bad = False
811
812     for node, nodeinfo in node_info.iteritems():
813       # This code checks that every node which is now listed as secondary has
814       # enough memory to host all instances it is supposed to should a single
815       # other node in the cluster fail.
816       # FIXME: not ready for failover to an arbitrary node
817       # FIXME: does not support file-backed instances
818       # WARNING: we currently take into account down instances as well as up
819       # ones, considering that even if they're down someone might want to start
820       # them even in the event of a node failure.
821       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
822         needed_mem = 0
823         for instance in instances:
824           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
825           if bep[constants.BE_AUTO_BALANCE]:
826             needed_mem += bep[constants.BE_MEMORY]
827         if nodeinfo['mfree'] < needed_mem:
828           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
829                       " failovers should node %s fail" % (node, prinode))
830           bad = True
831     return bad
832
833   def CheckPrereq(self):
834     """Check prerequisites.
835
836     Transform the list of checks we're going to skip into a set and check that
837     all its members are valid.
838
839     """
840     self.skip_set = frozenset(self.op.skip_checks)
841     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
842       raise errors.OpPrereqError("Invalid checks to be skipped specified")
843
844   def BuildHooksEnv(self):
845     """Build hooks env.
846
847     Cluster-Verify hooks just rone in the post phase and their failure makes
848     the output be logged in the verify output and the verification to fail.
849
850     """
851     all_nodes = self.cfg.GetNodeList()
852     # TODO: populate the environment with useful information for verify hooks
853     env = {}
854     return env, [], all_nodes
855
856   def Exec(self, feedback_fn):
857     """Verify integrity of cluster, performing various test on nodes.
858
859     """
860     bad = False
861     feedback_fn("* Verifying global settings")
862     for msg in self.cfg.VerifyConfig():
863       feedback_fn("  - ERROR: %s" % msg)
864
865     vg_name = self.cfg.GetVGName()
866     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
867     nodelist = utils.NiceSort(self.cfg.GetNodeList())
868     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
869     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
870     i_non_redundant = [] # Non redundant instances
871     i_non_a_balanced = [] # Non auto-balanced instances
872     n_offline = [] # List of offline nodes
873     node_volume = {}
874     node_instance = {}
875     node_info = {}
876     instance_cfg = {}
877
878     # FIXME: verify OS list
879     # do local checksums
880     master_files = [constants.CLUSTER_CONF_FILE]
881
882     file_names = ssconf.SimpleStore().GetFileList()
883     file_names.append(constants.SSL_CERT_FILE)
884     file_names.extend(master_files)
885
886     local_checksums = utils.FingerprintFiles(file_names)
887
888     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
889     node_verify_param = {
890       constants.NV_FILELIST: file_names,
891       constants.NV_NODELIST: nodelist,
892       constants.NV_HYPERVISOR: hypervisors,
893       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
894                                   node.secondary_ip) for node in nodeinfo],
895       constants.NV_LVLIST: vg_name,
896       constants.NV_INSTANCELIST: hypervisors,
897       constants.NV_VGLIST: None,
898       constants.NV_VERSION: None,
899       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
900       }
901     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
902                                            self.cfg.GetClusterName())
903
904     cluster = self.cfg.GetClusterInfo()
905     master_node = self.cfg.GetMasterNode()
906     for node_i in nodeinfo:
907       node = node_i.name
908       nresult = all_nvinfo[node].data
909
910       if node_i.offline:
911         feedback_fn("* Skipping offline node %s" % (node,))
912         n_offline.append(node)
913         continue
914
915       if node == master_node:
916         ntype = "master"
917       elif node_i.master_candidate:
918         ntype = "master candidate"
919       else:
920         ntype = "regular"
921       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
922
923       if all_nvinfo[node].failed or not isinstance(nresult, dict):
924         feedback_fn("  - ERROR: connection to %s failed" % (node,))
925         bad = True
926         continue
927
928       result = self._VerifyNode(node_i, file_names, local_checksums,
929                                 nresult, feedback_fn, master_files)
930       bad = bad or result
931
932       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
933       if isinstance(lvdata, basestring):
934         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
935                     (node, lvdata.encode('string_escape')))
936         bad = True
937         node_volume[node] = {}
938       elif not isinstance(lvdata, dict):
939         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
940         bad = True
941         continue
942       else:
943         node_volume[node] = lvdata
944
945       # node_instance
946       idata = nresult.get(constants.NV_INSTANCELIST, None)
947       if not isinstance(idata, list):
948         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
949                     (node,))
950         bad = True
951         continue
952
953       node_instance[node] = idata
954
955       # node_info
956       nodeinfo = nresult.get(constants.NV_HVINFO, None)
957       if not isinstance(nodeinfo, dict):
958         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
959         bad = True
960         continue
961
962       try:
963         node_info[node] = {
964           "mfree": int(nodeinfo['memory_free']),
965           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
966           "pinst": [],
967           "sinst": [],
968           # dictionary holding all instances this node is secondary for,
969           # grouped by their primary node. Each key is a cluster node, and each
970           # value is a list of instances which have the key as primary and the
971           # current node as secondary.  this is handy to calculate N+1 memory
972           # availability if you can only failover from a primary to its
973           # secondary.
974           "sinst-by-pnode": {},
975         }
976       except ValueError:
977         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
978         bad = True
979         continue
980
981     node_vol_should = {}
982
983     for instance in instancelist:
984       feedback_fn("* Verifying instance %s" % instance)
985       inst_config = self.cfg.GetInstanceInfo(instance)
986       result =  self._VerifyInstance(instance, inst_config, node_volume,
987                                      node_instance, feedback_fn, n_offline)
988       bad = bad or result
989       inst_nodes_offline = []
990
991       inst_config.MapLVsByNode(node_vol_should)
992
993       instance_cfg[instance] = inst_config
994
995       pnode = inst_config.primary_node
996       if pnode in node_info:
997         node_info[pnode]['pinst'].append(instance)
998       elif pnode not in n_offline:
999         feedback_fn("  - ERROR: instance %s, connection to primary node"
1000                     " %s failed" % (instance, pnode))
1001         bad = True
1002
1003       if pnode in n_offline:
1004         inst_nodes_offline.append(pnode)
1005
1006       # If the instance is non-redundant we cannot survive losing its primary
1007       # node, so we are not N+1 compliant. On the other hand we have no disk
1008       # templates with more than one secondary so that situation is not well
1009       # supported either.
1010       # FIXME: does not support file-backed instances
1011       if len(inst_config.secondary_nodes) == 0:
1012         i_non_redundant.append(instance)
1013       elif len(inst_config.secondary_nodes) > 1:
1014         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1015                     % instance)
1016
1017       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1018         i_non_a_balanced.append(instance)
1019
1020       for snode in inst_config.secondary_nodes:
1021         if snode in node_info:
1022           node_info[snode]['sinst'].append(instance)
1023           if pnode not in node_info[snode]['sinst-by-pnode']:
1024             node_info[snode]['sinst-by-pnode'][pnode] = []
1025           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1026         elif snode not in n_offline:
1027           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1028                       " %s failed" % (instance, snode))
1029           bad = True
1030         if snode in n_offline:
1031           inst_nodes_offline.append(snode)
1032
1033       if inst_nodes_offline:
1034         # warn that the instance lives on offline nodes, and set bad=True
1035         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1036                     ", ".join(inst_nodes_offline))
1037         bad = True
1038
1039     feedback_fn("* Verifying orphan volumes")
1040     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1041                                        feedback_fn)
1042     bad = bad or result
1043
1044     feedback_fn("* Verifying remaining instances")
1045     result = self._VerifyOrphanInstances(instancelist, node_instance,
1046                                          feedback_fn)
1047     bad = bad or result
1048
1049     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1050       feedback_fn("* Verifying N+1 Memory redundancy")
1051       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1052       bad = bad or result
1053
1054     feedback_fn("* Other Notes")
1055     if i_non_redundant:
1056       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1057                   % len(i_non_redundant))
1058
1059     if i_non_a_balanced:
1060       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1061                   % len(i_non_a_balanced))
1062
1063     if n_offline:
1064       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1065
1066     return not bad
1067
1068   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1069     """Analize the post-hooks' result
1070
1071     This method analyses the hook result, handles it, and sends some
1072     nicely-formatted feedback back to the user.
1073
1074     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1075         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1076     @param hooks_results: the results of the multi-node hooks rpc call
1077     @param feedback_fn: function used send feedback back to the caller
1078     @param lu_result: previous Exec result
1079     @return: the new Exec result, based on the previous result
1080         and hook results
1081
1082     """
1083     # We only really run POST phase hooks, and are only interested in
1084     # their results
1085     if phase == constants.HOOKS_PHASE_POST:
1086       # Used to change hooks' output to proper indentation
1087       indent_re = re.compile('^', re.M)
1088       feedback_fn("* Hooks Results")
1089       if not hooks_results:
1090         feedback_fn("  - ERROR: general communication failure")
1091         lu_result = 1
1092       else:
1093         for node_name in hooks_results:
1094           show_node_header = True
1095           res = hooks_results[node_name]
1096           if res.failed or res.data is False or not isinstance(res.data, list):
1097             if res.offline:
1098               # no need to warn or set fail return value
1099               continue
1100             feedback_fn("    Communication failure in hooks execution")
1101             lu_result = 1
1102             continue
1103           for script, hkr, output in res.data:
1104             if hkr == constants.HKR_FAIL:
1105               # The node header is only shown once, if there are
1106               # failing hooks on that node
1107               if show_node_header:
1108                 feedback_fn("  Node %s:" % node_name)
1109                 show_node_header = False
1110               feedback_fn("    ERROR: Script %s failed, output:" % script)
1111               output = indent_re.sub('      ', output)
1112               feedback_fn("%s" % output)
1113               lu_result = 1
1114
1115       return lu_result
1116
1117
1118 class LUVerifyDisks(NoHooksLU):
1119   """Verifies the cluster disks status.
1120
1121   """
1122   _OP_REQP = []
1123   REQ_BGL = False
1124
1125   def ExpandNames(self):
1126     self.needed_locks = {
1127       locking.LEVEL_NODE: locking.ALL_SET,
1128       locking.LEVEL_INSTANCE: locking.ALL_SET,
1129     }
1130     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1131
1132   def CheckPrereq(self):
1133     """Check prerequisites.
1134
1135     This has no prerequisites.
1136
1137     """
1138     pass
1139
1140   def Exec(self, feedback_fn):
1141     """Verify integrity of cluster disks.
1142
1143     """
1144     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1145
1146     vg_name = self.cfg.GetVGName()
1147     nodes = utils.NiceSort(self.cfg.GetNodeList())
1148     instances = [self.cfg.GetInstanceInfo(name)
1149                  for name in self.cfg.GetInstanceList()]
1150
1151     nv_dict = {}
1152     for inst in instances:
1153       inst_lvs = {}
1154       if (inst.status != "up" or
1155           inst.disk_template not in constants.DTS_NET_MIRROR):
1156         continue
1157       inst.MapLVsByNode(inst_lvs)
1158       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1159       for node, vol_list in inst_lvs.iteritems():
1160         for vol in vol_list:
1161           nv_dict[(node, vol)] = inst
1162
1163     if not nv_dict:
1164       return result
1165
1166     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1167
1168     to_act = set()
1169     for node in nodes:
1170       # node_volume
1171       lvs = node_lvs[node]
1172       if lvs.failed:
1173         if not lvs.offline:
1174           self.LogWarning("Connection to node %s failed: %s" %
1175                           (node, lvs.data))
1176         continue
1177       lvs = lvs.data
1178       if isinstance(lvs, basestring):
1179         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1180         res_nlvm[node] = lvs
1181       elif not isinstance(lvs, dict):
1182         logging.warning("Connection to node %s failed or invalid data"
1183                         " returned", node)
1184         res_nodes.append(node)
1185         continue
1186
1187       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1188         inst = nv_dict.pop((node, lv_name), None)
1189         if (not lv_online and inst is not None
1190             and inst.name not in res_instances):
1191           res_instances.append(inst.name)
1192
1193     # any leftover items in nv_dict are missing LVs, let's arrange the
1194     # data better
1195     for key, inst in nv_dict.iteritems():
1196       if inst.name not in res_missing:
1197         res_missing[inst.name] = []
1198       res_missing[inst.name].append(key)
1199
1200     return result
1201
1202
1203 class LURenameCluster(LogicalUnit):
1204   """Rename the cluster.
1205
1206   """
1207   HPATH = "cluster-rename"
1208   HTYPE = constants.HTYPE_CLUSTER
1209   _OP_REQP = ["name"]
1210
1211   def BuildHooksEnv(self):
1212     """Build hooks env.
1213
1214     """
1215     env = {
1216       "OP_TARGET": self.cfg.GetClusterName(),
1217       "NEW_NAME": self.op.name,
1218       }
1219     mn = self.cfg.GetMasterNode()
1220     return env, [mn], [mn]
1221
1222   def CheckPrereq(self):
1223     """Verify that the passed name is a valid one.
1224
1225     """
1226     hostname = utils.HostInfo(self.op.name)
1227
1228     new_name = hostname.name
1229     self.ip = new_ip = hostname.ip
1230     old_name = self.cfg.GetClusterName()
1231     old_ip = self.cfg.GetMasterIP()
1232     if new_name == old_name and new_ip == old_ip:
1233       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1234                                  " cluster has changed")
1235     if new_ip != old_ip:
1236       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1237         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1238                                    " reachable on the network. Aborting." %
1239                                    new_ip)
1240
1241     self.op.name = new_name
1242
1243   def Exec(self, feedback_fn):
1244     """Rename the cluster.
1245
1246     """
1247     clustername = self.op.name
1248     ip = self.ip
1249
1250     # shutdown the master IP
1251     master = self.cfg.GetMasterNode()
1252     result = self.rpc.call_node_stop_master(master, False)
1253     if result.failed or not result.data:
1254       raise errors.OpExecError("Could not disable the master role")
1255
1256     try:
1257       cluster = self.cfg.GetClusterInfo()
1258       cluster.cluster_name = clustername
1259       cluster.master_ip = ip
1260       self.cfg.Update(cluster)
1261
1262       # update the known hosts file
1263       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1264       node_list = self.cfg.GetNodeList()
1265       try:
1266         node_list.remove(master)
1267       except ValueError:
1268         pass
1269       result = self.rpc.call_upload_file(node_list,
1270                                          constants.SSH_KNOWN_HOSTS_FILE)
1271       for to_node, to_result in result.iteritems():
1272         if to_result.failed or not to_result.data:
1273           logging.error("Copy of file %s to node %s failed",
1274                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1275
1276     finally:
1277       result = self.rpc.call_node_start_master(master, False)
1278       if result.failed or not result.data:
1279         self.LogWarning("Could not re-enable the master role on"
1280                         " the master, please restart manually.")
1281
1282
1283 def _RecursiveCheckIfLVMBased(disk):
1284   """Check if the given disk or its children are lvm-based.
1285
1286   @type disk: L{objects.Disk}
1287   @param disk: the disk to check
1288   @rtype: booleean
1289   @return: boolean indicating whether a LD_LV dev_type was found or not
1290
1291   """
1292   if disk.children:
1293     for chdisk in disk.children:
1294       if _RecursiveCheckIfLVMBased(chdisk):
1295         return True
1296   return disk.dev_type == constants.LD_LV
1297
1298
1299 class LUSetClusterParams(LogicalUnit):
1300   """Change the parameters of the cluster.
1301
1302   """
1303   HPATH = "cluster-modify"
1304   HTYPE = constants.HTYPE_CLUSTER
1305   _OP_REQP = []
1306   REQ_BGL = False
1307
1308   def CheckParameters(self):
1309     """Check parameters
1310
1311     """
1312     if not hasattr(self.op, "candidate_pool_size"):
1313       self.op.candidate_pool_size = None
1314     if self.op.candidate_pool_size is not None:
1315       try:
1316         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1317       except ValueError, err:
1318         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1319                                    str(err))
1320       if self.op.candidate_pool_size < 1:
1321         raise errors.OpPrereqError("At least one master candidate needed")
1322
1323   def ExpandNames(self):
1324     # FIXME: in the future maybe other cluster params won't require checking on
1325     # all nodes to be modified.
1326     self.needed_locks = {
1327       locking.LEVEL_NODE: locking.ALL_SET,
1328     }
1329     self.share_locks[locking.LEVEL_NODE] = 1
1330
1331   def BuildHooksEnv(self):
1332     """Build hooks env.
1333
1334     """
1335     env = {
1336       "OP_TARGET": self.cfg.GetClusterName(),
1337       "NEW_VG_NAME": self.op.vg_name,
1338       }
1339     mn = self.cfg.GetMasterNode()
1340     return env, [mn], [mn]
1341
1342   def CheckPrereq(self):
1343     """Check prerequisites.
1344
1345     This checks whether the given params don't conflict and
1346     if the given volume group is valid.
1347
1348     """
1349     # FIXME: This only works because there is only one parameter that can be
1350     # changed or removed.
1351     if self.op.vg_name is not None and not self.op.vg_name:
1352       instances = self.cfg.GetAllInstancesInfo().values()
1353       for inst in instances:
1354         for disk in inst.disks:
1355           if _RecursiveCheckIfLVMBased(disk):
1356             raise errors.OpPrereqError("Cannot disable lvm storage while"
1357                                        " lvm-based instances exist")
1358
1359     node_list = self.acquired_locks[locking.LEVEL_NODE]
1360
1361     # if vg_name not None, checks given volume group on all nodes
1362     if self.op.vg_name:
1363       vglist = self.rpc.call_vg_list(node_list)
1364       for node in node_list:
1365         if vglist[node].failed:
1366           # ignoring down node
1367           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1368           continue
1369         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1370                                               self.op.vg_name,
1371                                               constants.MIN_VG_SIZE)
1372         if vgstatus:
1373           raise errors.OpPrereqError("Error on node '%s': %s" %
1374                                      (node, vgstatus))
1375
1376     self.cluster = cluster = self.cfg.GetClusterInfo()
1377     # validate beparams changes
1378     if self.op.beparams:
1379       utils.CheckBEParams(self.op.beparams)
1380       self.new_beparams = cluster.FillDict(
1381         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1382
1383     # hypervisor list/parameters
1384     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1385     if self.op.hvparams:
1386       if not isinstance(self.op.hvparams, dict):
1387         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1388       for hv_name, hv_dict in self.op.hvparams.items():
1389         if hv_name not in self.new_hvparams:
1390           self.new_hvparams[hv_name] = hv_dict
1391         else:
1392           self.new_hvparams[hv_name].update(hv_dict)
1393
1394     if self.op.enabled_hypervisors is not None:
1395       self.hv_list = self.op.enabled_hypervisors
1396     else:
1397       self.hv_list = cluster.enabled_hypervisors
1398
1399     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1400       # either the enabled list has changed, or the parameters have, validate
1401       for hv_name, hv_params in self.new_hvparams.items():
1402         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1403             (self.op.enabled_hypervisors and
1404              hv_name in self.op.enabled_hypervisors)):
1405           # either this is a new hypervisor, or its parameters have changed
1406           hv_class = hypervisor.GetHypervisor(hv_name)
1407           hv_class.CheckParameterSyntax(hv_params)
1408           _CheckHVParams(self, node_list, hv_name, hv_params)
1409
1410   def Exec(self, feedback_fn):
1411     """Change the parameters of the cluster.
1412
1413     """
1414     if self.op.vg_name is not None:
1415       if self.op.vg_name != self.cfg.GetVGName():
1416         self.cfg.SetVGName(self.op.vg_name)
1417       else:
1418         feedback_fn("Cluster LVM configuration already in desired"
1419                     " state, not changing")
1420     if self.op.hvparams:
1421       self.cluster.hvparams = self.new_hvparams
1422     if self.op.enabled_hypervisors is not None:
1423       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1424     if self.op.beparams:
1425       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1426     if self.op.candidate_pool_size is not None:
1427       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1428
1429     self.cfg.Update(self.cluster)
1430
1431     # we want to update nodes after the cluster so that if any errors
1432     # happen, we have recorded and saved the cluster info
1433     if self.op.candidate_pool_size is not None:
1434       _AdjustCandidatePool(self)
1435
1436
1437 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1438   """Sleep and poll for an instance's disk to sync.
1439
1440   """
1441   if not instance.disks:
1442     return True
1443
1444   if not oneshot:
1445     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1446
1447   node = instance.primary_node
1448
1449   for dev in instance.disks:
1450     lu.cfg.SetDiskID(dev, node)
1451
1452   retries = 0
1453   while True:
1454     max_time = 0
1455     done = True
1456     cumul_degraded = False
1457     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1458     if rstats.failed or not rstats.data:
1459       lu.LogWarning("Can't get any data from node %s", node)
1460       retries += 1
1461       if retries >= 10:
1462         raise errors.RemoteError("Can't contact node %s for mirror data,"
1463                                  " aborting." % node)
1464       time.sleep(6)
1465       continue
1466     rstats = rstats.data
1467     retries = 0
1468     for i in range(len(rstats)):
1469       mstat = rstats[i]
1470       if mstat is None:
1471         lu.LogWarning("Can't compute data for node %s/%s",
1472                            node, instance.disks[i].iv_name)
1473         continue
1474       # we ignore the ldisk parameter
1475       perc_done, est_time, is_degraded, _ = mstat
1476       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1477       if perc_done is not None:
1478         done = False
1479         if est_time is not None:
1480           rem_time = "%d estimated seconds remaining" % est_time
1481           max_time = est_time
1482         else:
1483           rem_time = "no time estimate"
1484         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1485                         (instance.disks[i].iv_name, perc_done, rem_time))
1486     if done or oneshot:
1487       break
1488
1489     time.sleep(min(60, max_time))
1490
1491   if done:
1492     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1493   return not cumul_degraded
1494
1495
1496 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1497   """Check that mirrors are not degraded.
1498
1499   The ldisk parameter, if True, will change the test from the
1500   is_degraded attribute (which represents overall non-ok status for
1501   the device(s)) to the ldisk (representing the local storage status).
1502
1503   """
1504   lu.cfg.SetDiskID(dev, node)
1505   if ldisk:
1506     idx = 6
1507   else:
1508     idx = 5
1509
1510   result = True
1511   if on_primary or dev.AssembleOnSecondary():
1512     rstats = lu.rpc.call_blockdev_find(node, dev)
1513     if rstats.failed or not rstats.data:
1514       logging.warning("Node %s: disk degraded, not found or node down", node)
1515       result = False
1516     else:
1517       result = result and (not rstats.data[idx])
1518   if dev.children:
1519     for child in dev.children:
1520       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1521
1522   return result
1523
1524
1525 class LUDiagnoseOS(NoHooksLU):
1526   """Logical unit for OS diagnose/query.
1527
1528   """
1529   _OP_REQP = ["output_fields", "names"]
1530   REQ_BGL = False
1531   _FIELDS_STATIC = utils.FieldSet()
1532   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1533
1534   def ExpandNames(self):
1535     if self.op.names:
1536       raise errors.OpPrereqError("Selective OS query not supported")
1537
1538     _CheckOutputFields(static=self._FIELDS_STATIC,
1539                        dynamic=self._FIELDS_DYNAMIC,
1540                        selected=self.op.output_fields)
1541
1542     # Lock all nodes, in shared mode
1543     self.needed_locks = {}
1544     self.share_locks[locking.LEVEL_NODE] = 1
1545     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1546
1547   def CheckPrereq(self):
1548     """Check prerequisites.
1549
1550     """
1551
1552   @staticmethod
1553   def _DiagnoseByOS(node_list, rlist):
1554     """Remaps a per-node return list into an a per-os per-node dictionary
1555
1556     @param node_list: a list with the names of all nodes
1557     @param rlist: a map with node names as keys and OS objects as values
1558
1559     @rtype: dict
1560     @returns: a dictionary with osnames as keys and as value another map, with
1561         nodes as keys and list of OS objects as values, eg::
1562
1563           {"debian-etch": {"node1": [<object>,...],
1564                            "node2": [<object>,]}
1565           }
1566
1567     """
1568     all_os = {}
1569     for node_name, nr in rlist.iteritems():
1570       if nr.failed or not nr.data:
1571         continue
1572       for os_obj in nr.data:
1573         if os_obj.name not in all_os:
1574           # build a list of nodes for this os containing empty lists
1575           # for each node in node_list
1576           all_os[os_obj.name] = {}
1577           for nname in node_list:
1578             all_os[os_obj.name][nname] = []
1579         all_os[os_obj.name][node_name].append(os_obj)
1580     return all_os
1581
1582   def Exec(self, feedback_fn):
1583     """Compute the list of OSes.
1584
1585     """
1586     node_list = self.acquired_locks[locking.LEVEL_NODE]
1587     node_data = self.rpc.call_os_diagnose(node_list)
1588     if node_data == False:
1589       raise errors.OpExecError("Can't gather the list of OSes")
1590     pol = self._DiagnoseByOS(node_list, node_data)
1591     output = []
1592     for os_name, os_data in pol.iteritems():
1593       row = []
1594       for field in self.op.output_fields:
1595         if field == "name":
1596           val = os_name
1597         elif field == "valid":
1598           val = utils.all([osl and osl[0] for osl in os_data.values()])
1599         elif field == "node_status":
1600           val = {}
1601           for node_name, nos_list in os_data.iteritems():
1602             val[node_name] = [(v.status, v.path) for v in nos_list]
1603         else:
1604           raise errors.ParameterError(field)
1605         row.append(val)
1606       output.append(row)
1607
1608     return output
1609
1610
1611 class LURemoveNode(LogicalUnit):
1612   """Logical unit for removing a node.
1613
1614   """
1615   HPATH = "node-remove"
1616   HTYPE = constants.HTYPE_NODE
1617   _OP_REQP = ["node_name"]
1618
1619   def BuildHooksEnv(self):
1620     """Build hooks env.
1621
1622     This doesn't run on the target node in the pre phase as a failed
1623     node would then be impossible to remove.
1624
1625     """
1626     env = {
1627       "OP_TARGET": self.op.node_name,
1628       "NODE_NAME": self.op.node_name,
1629       }
1630     all_nodes = self.cfg.GetNodeList()
1631     all_nodes.remove(self.op.node_name)
1632     return env, all_nodes, all_nodes
1633
1634   def CheckPrereq(self):
1635     """Check prerequisites.
1636
1637     This checks:
1638      - the node exists in the configuration
1639      - it does not have primary or secondary instances
1640      - it's not the master
1641
1642     Any errors are signalled by raising errors.OpPrereqError.
1643
1644     """
1645     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1646     if node is None:
1647       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1648
1649     instance_list = self.cfg.GetInstanceList()
1650
1651     masternode = self.cfg.GetMasterNode()
1652     if node.name == masternode:
1653       raise errors.OpPrereqError("Node is the master node,"
1654                                  " you need to failover first.")
1655
1656     for instance_name in instance_list:
1657       instance = self.cfg.GetInstanceInfo(instance_name)
1658       if node.name == instance.primary_node:
1659         raise errors.OpPrereqError("Instance %s still running on the node,"
1660                                    " please remove first." % instance_name)
1661       if node.name in instance.secondary_nodes:
1662         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1663                                    " please remove first." % instance_name)
1664     self.op.node_name = node.name
1665     self.node = node
1666
1667   def Exec(self, feedback_fn):
1668     """Removes the node from the cluster.
1669
1670     """
1671     node = self.node
1672     logging.info("Stopping the node daemon and removing configs from node %s",
1673                  node.name)
1674
1675     self.context.RemoveNode(node.name)
1676
1677     self.rpc.call_node_leave_cluster(node.name)
1678
1679     # Promote nodes to master candidate as needed
1680     _AdjustCandidatePool(self)
1681
1682
1683 class LUQueryNodes(NoHooksLU):
1684   """Logical unit for querying nodes.
1685
1686   """
1687   _OP_REQP = ["output_fields", "names"]
1688   REQ_BGL = False
1689   _FIELDS_DYNAMIC = utils.FieldSet(
1690     "dtotal", "dfree",
1691     "mtotal", "mnode", "mfree",
1692     "bootid",
1693     "ctotal",
1694     )
1695
1696   _FIELDS_STATIC = utils.FieldSet(
1697     "name", "pinst_cnt", "sinst_cnt",
1698     "pinst_list", "sinst_list",
1699     "pip", "sip", "tags",
1700     "serial_no",
1701     "master_candidate",
1702     "master",
1703     "offline",
1704     )
1705
1706   def ExpandNames(self):
1707     _CheckOutputFields(static=self._FIELDS_STATIC,
1708                        dynamic=self._FIELDS_DYNAMIC,
1709                        selected=self.op.output_fields)
1710
1711     self.needed_locks = {}
1712     self.share_locks[locking.LEVEL_NODE] = 1
1713
1714     if self.op.names:
1715       self.wanted = _GetWantedNodes(self, self.op.names)
1716     else:
1717       self.wanted = locking.ALL_SET
1718
1719     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1720     if self.do_locking:
1721       # if we don't request only static fields, we need to lock the nodes
1722       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1723
1724
1725   def CheckPrereq(self):
1726     """Check prerequisites.
1727
1728     """
1729     # The validation of the node list is done in the _GetWantedNodes,
1730     # if non empty, and if empty, there's no validation to do
1731     pass
1732
1733   def Exec(self, feedback_fn):
1734     """Computes the list of nodes and their attributes.
1735
1736     """
1737     all_info = self.cfg.GetAllNodesInfo()
1738     if self.do_locking:
1739       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1740     elif self.wanted != locking.ALL_SET:
1741       nodenames = self.wanted
1742       missing = set(nodenames).difference(all_info.keys())
1743       if missing:
1744         raise errors.OpExecError(
1745           "Some nodes were removed before retrieving their data: %s" % missing)
1746     else:
1747       nodenames = all_info.keys()
1748
1749     nodenames = utils.NiceSort(nodenames)
1750     nodelist = [all_info[name] for name in nodenames]
1751
1752     # begin data gathering
1753
1754     if self.do_locking:
1755       live_data = {}
1756       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1757                                           self.cfg.GetHypervisorType())
1758       for name in nodenames:
1759         nodeinfo = node_data[name]
1760         if not nodeinfo.failed and nodeinfo.data:
1761           nodeinfo = nodeinfo.data
1762           fn = utils.TryConvert
1763           live_data[name] = {
1764             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1765             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1766             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1767             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1768             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1769             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1770             "bootid": nodeinfo.get('bootid', None),
1771             }
1772         else:
1773           live_data[name] = {}
1774     else:
1775       live_data = dict.fromkeys(nodenames, {})
1776
1777     node_to_primary = dict([(name, set()) for name in nodenames])
1778     node_to_secondary = dict([(name, set()) for name in nodenames])
1779
1780     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1781                              "sinst_cnt", "sinst_list"))
1782     if inst_fields & frozenset(self.op.output_fields):
1783       instancelist = self.cfg.GetInstanceList()
1784
1785       for instance_name in instancelist:
1786         inst = self.cfg.GetInstanceInfo(instance_name)
1787         if inst.primary_node in node_to_primary:
1788           node_to_primary[inst.primary_node].add(inst.name)
1789         for secnode in inst.secondary_nodes:
1790           if secnode in node_to_secondary:
1791             node_to_secondary[secnode].add(inst.name)
1792
1793     master_node = self.cfg.GetMasterNode()
1794
1795     # end data gathering
1796
1797     output = []
1798     for node in nodelist:
1799       node_output = []
1800       for field in self.op.output_fields:
1801         if field == "name":
1802           val = node.name
1803         elif field == "pinst_list":
1804           val = list(node_to_primary[node.name])
1805         elif field == "sinst_list":
1806           val = list(node_to_secondary[node.name])
1807         elif field == "pinst_cnt":
1808           val = len(node_to_primary[node.name])
1809         elif field == "sinst_cnt":
1810           val = len(node_to_secondary[node.name])
1811         elif field == "pip":
1812           val = node.primary_ip
1813         elif field == "sip":
1814           val = node.secondary_ip
1815         elif field == "tags":
1816           val = list(node.GetTags())
1817         elif field == "serial_no":
1818           val = node.serial_no
1819         elif field == "master_candidate":
1820           val = node.master_candidate
1821         elif field == "master":
1822           val = node.name == master_node
1823         elif field == "offline":
1824           val = node.offline
1825         elif self._FIELDS_DYNAMIC.Matches(field):
1826           val = live_data[node.name].get(field, None)
1827         else:
1828           raise errors.ParameterError(field)
1829         node_output.append(val)
1830       output.append(node_output)
1831
1832     return output
1833
1834
1835 class LUQueryNodeVolumes(NoHooksLU):
1836   """Logical unit for getting volumes on node(s).
1837
1838   """
1839   _OP_REQP = ["nodes", "output_fields"]
1840   REQ_BGL = False
1841   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1842   _FIELDS_STATIC = utils.FieldSet("node")
1843
1844   def ExpandNames(self):
1845     _CheckOutputFields(static=self._FIELDS_STATIC,
1846                        dynamic=self._FIELDS_DYNAMIC,
1847                        selected=self.op.output_fields)
1848
1849     self.needed_locks = {}
1850     self.share_locks[locking.LEVEL_NODE] = 1
1851     if not self.op.nodes:
1852       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1853     else:
1854       self.needed_locks[locking.LEVEL_NODE] = \
1855         _GetWantedNodes(self, self.op.nodes)
1856
1857   def CheckPrereq(self):
1858     """Check prerequisites.
1859
1860     This checks that the fields required are valid output fields.
1861
1862     """
1863     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1864
1865   def Exec(self, feedback_fn):
1866     """Computes the list of nodes and their attributes.
1867
1868     """
1869     nodenames = self.nodes
1870     volumes = self.rpc.call_node_volumes(nodenames)
1871
1872     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1873              in self.cfg.GetInstanceList()]
1874
1875     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1876
1877     output = []
1878     for node in nodenames:
1879       if node not in volumes or volumes[node].failed or not volumes[node].data:
1880         continue
1881
1882       node_vols = volumes[node].data[:]
1883       node_vols.sort(key=lambda vol: vol['dev'])
1884
1885       for vol in node_vols:
1886         node_output = []
1887         for field in self.op.output_fields:
1888           if field == "node":
1889             val = node
1890           elif field == "phys":
1891             val = vol['dev']
1892           elif field == "vg":
1893             val = vol['vg']
1894           elif field == "name":
1895             val = vol['name']
1896           elif field == "size":
1897             val = int(float(vol['size']))
1898           elif field == "instance":
1899             for inst in ilist:
1900               if node not in lv_by_node[inst]:
1901                 continue
1902               if vol['name'] in lv_by_node[inst][node]:
1903                 val = inst.name
1904                 break
1905             else:
1906               val = '-'
1907           else:
1908             raise errors.ParameterError(field)
1909           node_output.append(str(val))
1910
1911         output.append(node_output)
1912
1913     return output
1914
1915
1916 class LUAddNode(LogicalUnit):
1917   """Logical unit for adding node to the cluster.
1918
1919   """
1920   HPATH = "node-add"
1921   HTYPE = constants.HTYPE_NODE
1922   _OP_REQP = ["node_name"]
1923
1924   def BuildHooksEnv(self):
1925     """Build hooks env.
1926
1927     This will run on all nodes before, and on all nodes + the new node after.
1928
1929     """
1930     env = {
1931       "OP_TARGET": self.op.node_name,
1932       "NODE_NAME": self.op.node_name,
1933       "NODE_PIP": self.op.primary_ip,
1934       "NODE_SIP": self.op.secondary_ip,
1935       }
1936     nodes_0 = self.cfg.GetNodeList()
1937     nodes_1 = nodes_0 + [self.op.node_name, ]
1938     return env, nodes_0, nodes_1
1939
1940   def CheckPrereq(self):
1941     """Check prerequisites.
1942
1943     This checks:
1944      - the new node is not already in the config
1945      - it is resolvable
1946      - its parameters (single/dual homed) matches the cluster
1947
1948     Any errors are signalled by raising errors.OpPrereqError.
1949
1950     """
1951     node_name = self.op.node_name
1952     cfg = self.cfg
1953
1954     dns_data = utils.HostInfo(node_name)
1955
1956     node = dns_data.name
1957     primary_ip = self.op.primary_ip = dns_data.ip
1958     secondary_ip = getattr(self.op, "secondary_ip", None)
1959     if secondary_ip is None:
1960       secondary_ip = primary_ip
1961     if not utils.IsValidIP(secondary_ip):
1962       raise errors.OpPrereqError("Invalid secondary IP given")
1963     self.op.secondary_ip = secondary_ip
1964
1965     node_list = cfg.GetNodeList()
1966     if not self.op.readd and node in node_list:
1967       raise errors.OpPrereqError("Node %s is already in the configuration" %
1968                                  node)
1969     elif self.op.readd and node not in node_list:
1970       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1971
1972     for existing_node_name in node_list:
1973       existing_node = cfg.GetNodeInfo(existing_node_name)
1974
1975       if self.op.readd and node == existing_node_name:
1976         if (existing_node.primary_ip != primary_ip or
1977             existing_node.secondary_ip != secondary_ip):
1978           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1979                                      " address configuration as before")
1980         continue
1981
1982       if (existing_node.primary_ip == primary_ip or
1983           existing_node.secondary_ip == primary_ip or
1984           existing_node.primary_ip == secondary_ip or
1985           existing_node.secondary_ip == secondary_ip):
1986         raise errors.OpPrereqError("New node ip address(es) conflict with"
1987                                    " existing node %s" % existing_node.name)
1988
1989     # check that the type of the node (single versus dual homed) is the
1990     # same as for the master
1991     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1992     master_singlehomed = myself.secondary_ip == myself.primary_ip
1993     newbie_singlehomed = secondary_ip == primary_ip
1994     if master_singlehomed != newbie_singlehomed:
1995       if master_singlehomed:
1996         raise errors.OpPrereqError("The master has no private ip but the"
1997                                    " new node has one")
1998       else:
1999         raise errors.OpPrereqError("The master has a private ip but the"
2000                                    " new node doesn't have one")
2001
2002     # checks reachablity
2003     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2004       raise errors.OpPrereqError("Node not reachable by ping")
2005
2006     if not newbie_singlehomed:
2007       # check reachability from my secondary ip to newbie's secondary ip
2008       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2009                            source=myself.secondary_ip):
2010         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2011                                    " based ping to noded port")
2012
2013     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2014     mc_now, _ = self.cfg.GetMasterCandidateStats()
2015     master_candidate = mc_now < cp_size
2016
2017     self.new_node = objects.Node(name=node,
2018                                  primary_ip=primary_ip,
2019                                  secondary_ip=secondary_ip,
2020                                  master_candidate=master_candidate,
2021                                  offline=False)
2022
2023   def Exec(self, feedback_fn):
2024     """Adds the new node to the cluster.
2025
2026     """
2027     new_node = self.new_node
2028     node = new_node.name
2029
2030     # check connectivity
2031     result = self.rpc.call_version([node])[node]
2032     result.Raise()
2033     if result.data:
2034       if constants.PROTOCOL_VERSION == result.data:
2035         logging.info("Communication to node %s fine, sw version %s match",
2036                      node, result.data)
2037       else:
2038         raise errors.OpExecError("Version mismatch master version %s,"
2039                                  " node version %s" %
2040                                  (constants.PROTOCOL_VERSION, result.data))
2041     else:
2042       raise errors.OpExecError("Cannot get version from the new node")
2043
2044     # setup ssh on node
2045     logging.info("Copy ssh key to node %s", node)
2046     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2047     keyarray = []
2048     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2049                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2050                 priv_key, pub_key]
2051
2052     for i in keyfiles:
2053       f = open(i, 'r')
2054       try:
2055         keyarray.append(f.read())
2056       finally:
2057         f.close()
2058
2059     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2060                                     keyarray[2],
2061                                     keyarray[3], keyarray[4], keyarray[5])
2062
2063     if result.failed or not result.data:
2064       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2065
2066     # Add node to our /etc/hosts, and add key to known_hosts
2067     utils.AddHostToEtcHosts(new_node.name)
2068
2069     if new_node.secondary_ip != new_node.primary_ip:
2070       result = self.rpc.call_node_has_ip_address(new_node.name,
2071                                                  new_node.secondary_ip)
2072       if result.failed or not result.data:
2073         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2074                                  " you gave (%s). Please fix and re-run this"
2075                                  " command." % new_node.secondary_ip)
2076
2077     node_verify_list = [self.cfg.GetMasterNode()]
2078     node_verify_param = {
2079       'nodelist': [node],
2080       # TODO: do a node-net-test as well?
2081     }
2082
2083     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2084                                        self.cfg.GetClusterName())
2085     for verifier in node_verify_list:
2086       if result[verifier].failed or not result[verifier].data:
2087         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2088                                  " for remote verification" % verifier)
2089       if result[verifier].data['nodelist']:
2090         for failed in result[verifier].data['nodelist']:
2091           feedback_fn("ssh/hostname verification failed %s -> %s" %
2092                       (verifier, result[verifier]['nodelist'][failed]))
2093         raise errors.OpExecError("ssh/hostname verification failed.")
2094
2095     # Distribute updated /etc/hosts and known_hosts to all nodes,
2096     # including the node just added
2097     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2098     dist_nodes = self.cfg.GetNodeList()
2099     if not self.op.readd:
2100       dist_nodes.append(node)
2101     if myself.name in dist_nodes:
2102       dist_nodes.remove(myself.name)
2103
2104     logging.debug("Copying hosts and known_hosts to all nodes")
2105     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2106       result = self.rpc.call_upload_file(dist_nodes, fname)
2107       for to_node, to_result in result.iteritems():
2108         if to_result.failed or not to_result.data:
2109           logging.error("Copy of file %s to node %s failed", fname, to_node)
2110
2111     to_copy = []
2112     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2113       to_copy.append(constants.VNC_PASSWORD_FILE)
2114     for fname in to_copy:
2115       result = self.rpc.call_upload_file([node], fname)
2116       if result[node].failed or not result[node]:
2117         logging.error("Could not copy file %s to node %s", fname, node)
2118
2119     if self.op.readd:
2120       self.context.ReaddNode(new_node)
2121     else:
2122       self.context.AddNode(new_node)
2123
2124
2125 class LUSetNodeParams(LogicalUnit):
2126   """Modifies the parameters of a node.
2127
2128   """
2129   HPATH = "node-modify"
2130   HTYPE = constants.HTYPE_NODE
2131   _OP_REQP = ["node_name"]
2132   REQ_BGL = False
2133
2134   def CheckArguments(self):
2135     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2136     if node_name is None:
2137       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2138     self.op.node_name = node_name
2139     _CheckBooleanOpField(self.op, 'master_candidate')
2140     _CheckBooleanOpField(self.op, 'offline')
2141     if self.op.master_candidate is None and self.op.offline is None:
2142       raise errors.OpPrereqError("Please pass at least one modification")
2143     if self.op.offline == True and self.op.master_candidate == True:
2144       raise errors.OpPrereqError("Can't set the node into offline and"
2145                                  " master_candidate at the same time")
2146
2147   def ExpandNames(self):
2148     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2149
2150   def BuildHooksEnv(self):
2151     """Build hooks env.
2152
2153     This runs on the master node.
2154
2155     """
2156     env = {
2157       "OP_TARGET": self.op.node_name,
2158       "MASTER_CANDIDATE": str(self.op.master_candidate),
2159       "OFFLINE": str(self.op.offline),
2160       }
2161     nl = [self.cfg.GetMasterNode(),
2162           self.op.node_name]
2163     return env, nl, nl
2164
2165   def CheckPrereq(self):
2166     """Check prerequisites.
2167
2168     This only checks the instance list against the existing names.
2169
2170     """
2171     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2172
2173     if ((self.op.master_candidate == False or self.op.offline == True)
2174         and node.master_candidate):
2175       # we will demote the node from master_candidate
2176       if self.op.node_name == self.cfg.GetMasterNode():
2177         raise errors.OpPrereqError("The master node has to be a"
2178                                    " master candidate and online")
2179       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2180       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2181       if num_candidates <= cp_size:
2182         msg = ("Not enough master candidates (desired"
2183                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2184         if self.op.force:
2185           self.LogWarning(msg)
2186         else:
2187           raise errors.OpPrereqError(msg)
2188
2189     if (self.op.master_candidate == True and node.offline and
2190         not self.op.offline == False):
2191       raise errors.OpPrereqError("Can't set an offline node to"
2192                                  " master_candidate")
2193
2194     return
2195
2196   def Exec(self, feedback_fn):
2197     """Modifies a node.
2198
2199     """
2200     node = self.node
2201
2202     result = []
2203
2204     if self.op.offline is not None:
2205       node.offline = self.op.offline
2206       result.append(("offline", str(self.op.offline)))
2207       if self.op.offline == True and node.master_candidate:
2208         node.master_candidate = False
2209         result.append(("master_candidate", "auto-demotion due to offline"))
2210
2211     if self.op.master_candidate is not None:
2212       node.master_candidate = self.op.master_candidate
2213       result.append(("master_candidate", str(self.op.master_candidate)))
2214       if self.op.master_candidate == False:
2215         rrc = self.rpc.call_node_demote_from_mc(node.name)
2216         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2217             or len(rrc.data) != 2):
2218           self.LogWarning("Node rpc error: %s" % rrc.error)
2219         elif not rrc.data[0]:
2220           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2221
2222     # this will trigger configuration file update, if needed
2223     self.cfg.Update(node)
2224     # this will trigger job queue propagation or cleanup
2225     if self.op.node_name != self.cfg.GetMasterNode():
2226       self.context.ReaddNode(node)
2227
2228     return result
2229
2230
2231 class LUQueryClusterInfo(NoHooksLU):
2232   """Query cluster configuration.
2233
2234   """
2235   _OP_REQP = []
2236   REQ_BGL = False
2237
2238   def ExpandNames(self):
2239     self.needed_locks = {}
2240
2241   def CheckPrereq(self):
2242     """No prerequsites needed for this LU.
2243
2244     """
2245     pass
2246
2247   def Exec(self, feedback_fn):
2248     """Return cluster config.
2249
2250     """
2251     cluster = self.cfg.GetClusterInfo()
2252     result = {
2253       "software_version": constants.RELEASE_VERSION,
2254       "protocol_version": constants.PROTOCOL_VERSION,
2255       "config_version": constants.CONFIG_VERSION,
2256       "os_api_version": constants.OS_API_VERSION,
2257       "export_version": constants.EXPORT_VERSION,
2258       "architecture": (platform.architecture()[0], platform.machine()),
2259       "name": cluster.cluster_name,
2260       "master": cluster.master_node,
2261       "default_hypervisor": cluster.default_hypervisor,
2262       "enabled_hypervisors": cluster.enabled_hypervisors,
2263       "hvparams": cluster.hvparams,
2264       "beparams": cluster.beparams,
2265       "candidate_pool_size": cluster.candidate_pool_size,
2266       }
2267
2268     return result
2269
2270
2271 class LUQueryConfigValues(NoHooksLU):
2272   """Return configuration values.
2273
2274   """
2275   _OP_REQP = []
2276   REQ_BGL = False
2277   _FIELDS_DYNAMIC = utils.FieldSet()
2278   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2279
2280   def ExpandNames(self):
2281     self.needed_locks = {}
2282
2283     _CheckOutputFields(static=self._FIELDS_STATIC,
2284                        dynamic=self._FIELDS_DYNAMIC,
2285                        selected=self.op.output_fields)
2286
2287   def CheckPrereq(self):
2288     """No prerequisites.
2289
2290     """
2291     pass
2292
2293   def Exec(self, feedback_fn):
2294     """Dump a representation of the cluster config to the standard output.
2295
2296     """
2297     values = []
2298     for field in self.op.output_fields:
2299       if field == "cluster_name":
2300         entry = self.cfg.GetClusterName()
2301       elif field == "master_node":
2302         entry = self.cfg.GetMasterNode()
2303       elif field == "drain_flag":
2304         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2305       else:
2306         raise errors.ParameterError(field)
2307       values.append(entry)
2308     return values
2309
2310
2311 class LUActivateInstanceDisks(NoHooksLU):
2312   """Bring up an instance's disks.
2313
2314   """
2315   _OP_REQP = ["instance_name"]
2316   REQ_BGL = False
2317
2318   def ExpandNames(self):
2319     self._ExpandAndLockInstance()
2320     self.needed_locks[locking.LEVEL_NODE] = []
2321     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2322
2323   def DeclareLocks(self, level):
2324     if level == locking.LEVEL_NODE:
2325       self._LockInstancesNodes()
2326
2327   def CheckPrereq(self):
2328     """Check prerequisites.
2329
2330     This checks that the instance is in the cluster.
2331
2332     """
2333     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2334     assert self.instance is not None, \
2335       "Cannot retrieve locked instance %s" % self.op.instance_name
2336     _CheckNodeOnline(self, self.instance.primary_node)
2337
2338   def Exec(self, feedback_fn):
2339     """Activate the disks.
2340
2341     """
2342     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2343     if not disks_ok:
2344       raise errors.OpExecError("Cannot activate block devices")
2345
2346     return disks_info
2347
2348
2349 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2350   """Prepare the block devices for an instance.
2351
2352   This sets up the block devices on all nodes.
2353
2354   @type lu: L{LogicalUnit}
2355   @param lu: the logical unit on whose behalf we execute
2356   @type instance: L{objects.Instance}
2357   @param instance: the instance for whose disks we assemble
2358   @type ignore_secondaries: boolean
2359   @param ignore_secondaries: if true, errors on secondary nodes
2360       won't result in an error return from the function
2361   @return: False if the operation failed, otherwise a list of
2362       (host, instance_visible_name, node_visible_name)
2363       with the mapping from node devices to instance devices
2364
2365   """
2366   device_info = []
2367   disks_ok = True
2368   iname = instance.name
2369   # With the two passes mechanism we try to reduce the window of
2370   # opportunity for the race condition of switching DRBD to primary
2371   # before handshaking occured, but we do not eliminate it
2372
2373   # The proper fix would be to wait (with some limits) until the
2374   # connection has been made and drbd transitions from WFConnection
2375   # into any other network-connected state (Connected, SyncTarget,
2376   # SyncSource, etc.)
2377
2378   # 1st pass, assemble on all nodes in secondary mode
2379   for inst_disk in instance.disks:
2380     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2381       lu.cfg.SetDiskID(node_disk, node)
2382       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2383       if result.failed or not result:
2384         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2385                            " (is_primary=False, pass=1)",
2386                            inst_disk.iv_name, node)
2387         if not ignore_secondaries:
2388           disks_ok = False
2389
2390   # FIXME: race condition on drbd migration to primary
2391
2392   # 2nd pass, do only the primary node
2393   for inst_disk in instance.disks:
2394     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2395       if node != instance.primary_node:
2396         continue
2397       lu.cfg.SetDiskID(node_disk, node)
2398       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2399       if result.failed or not result:
2400         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2401                            " (is_primary=True, pass=2)",
2402                            inst_disk.iv_name, node)
2403         disks_ok = False
2404     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2405
2406   # leave the disks configured for the primary node
2407   # this is a workaround that would be fixed better by
2408   # improving the logical/physical id handling
2409   for disk in instance.disks:
2410     lu.cfg.SetDiskID(disk, instance.primary_node)
2411
2412   return disks_ok, device_info
2413
2414
2415 def _StartInstanceDisks(lu, instance, force):
2416   """Start the disks of an instance.
2417
2418   """
2419   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2420                                            ignore_secondaries=force)
2421   if not disks_ok:
2422     _ShutdownInstanceDisks(lu, instance)
2423     if force is not None and not force:
2424       lu.proc.LogWarning("", hint="If the message above refers to a"
2425                          " secondary node,"
2426                          " you can retry the operation using '--force'.")
2427     raise errors.OpExecError("Disk consistency error")
2428
2429
2430 class LUDeactivateInstanceDisks(NoHooksLU):
2431   """Shutdown an instance's disks.
2432
2433   """
2434   _OP_REQP = ["instance_name"]
2435   REQ_BGL = False
2436
2437   def ExpandNames(self):
2438     self._ExpandAndLockInstance()
2439     self.needed_locks[locking.LEVEL_NODE] = []
2440     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2441
2442   def DeclareLocks(self, level):
2443     if level == locking.LEVEL_NODE:
2444       self._LockInstancesNodes()
2445
2446   def CheckPrereq(self):
2447     """Check prerequisites.
2448
2449     This checks that the instance is in the cluster.
2450
2451     """
2452     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2453     assert self.instance is not None, \
2454       "Cannot retrieve locked instance %s" % self.op.instance_name
2455
2456   def Exec(self, feedback_fn):
2457     """Deactivate the disks
2458
2459     """
2460     instance = self.instance
2461     _SafeShutdownInstanceDisks(self, instance)
2462
2463
2464 def _SafeShutdownInstanceDisks(lu, instance):
2465   """Shutdown block devices of an instance.
2466
2467   This function checks if an instance is running, before calling
2468   _ShutdownInstanceDisks.
2469
2470   """
2471   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2472                                       [instance.hypervisor])
2473   ins_l = ins_l[instance.primary_node]
2474   if ins_l.failed or not isinstance(ins_l.data, list):
2475     raise errors.OpExecError("Can't contact node '%s'" %
2476                              instance.primary_node)
2477
2478   if instance.name in ins_l.data:
2479     raise errors.OpExecError("Instance is running, can't shutdown"
2480                              " block devices.")
2481
2482   _ShutdownInstanceDisks(lu, instance)
2483
2484
2485 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2486   """Shutdown block devices of an instance.
2487
2488   This does the shutdown on all nodes of the instance.
2489
2490   If the ignore_primary is false, errors on the primary node are
2491   ignored.
2492
2493   """
2494   result = True
2495   for disk in instance.disks:
2496     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2497       lu.cfg.SetDiskID(top_disk, node)
2498       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2499       if result.failed or not result.data:
2500         logging.error("Could not shutdown block device %s on node %s",
2501                       disk.iv_name, node)
2502         if not ignore_primary or node != instance.primary_node:
2503           result = False
2504   return result
2505
2506
2507 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2508   """Checks if a node has enough free memory.
2509
2510   This function check if a given node has the needed amount of free
2511   memory. In case the node has less memory or we cannot get the
2512   information from the node, this function raise an OpPrereqError
2513   exception.
2514
2515   @type lu: C{LogicalUnit}
2516   @param lu: a logical unit from which we get configuration data
2517   @type node: C{str}
2518   @param node: the node to check
2519   @type reason: C{str}
2520   @param reason: string to use in the error message
2521   @type requested: C{int}
2522   @param requested: the amount of memory in MiB to check for
2523   @type hypervisor_name: C{str}
2524   @param hypervisor_name: the hypervisor to ask for memory stats
2525   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2526       we cannot check the node
2527
2528   """
2529   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2530   nodeinfo[node].Raise()
2531   free_mem = nodeinfo[node].data.get('memory_free')
2532   if not isinstance(free_mem, int):
2533     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2534                              " was '%s'" % (node, free_mem))
2535   if requested > free_mem:
2536     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2537                              " needed %s MiB, available %s MiB" %
2538                              (node, reason, requested, free_mem))
2539
2540
2541 class LUStartupInstance(LogicalUnit):
2542   """Starts an instance.
2543
2544   """
2545   HPATH = "instance-start"
2546   HTYPE = constants.HTYPE_INSTANCE
2547   _OP_REQP = ["instance_name", "force"]
2548   REQ_BGL = False
2549
2550   def ExpandNames(self):
2551     self._ExpandAndLockInstance()
2552
2553   def BuildHooksEnv(self):
2554     """Build hooks env.
2555
2556     This runs on master, primary and secondary nodes of the instance.
2557
2558     """
2559     env = {
2560       "FORCE": self.op.force,
2561       }
2562     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2563     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2564           list(self.instance.secondary_nodes))
2565     return env, nl, nl
2566
2567   def CheckPrereq(self):
2568     """Check prerequisites.
2569
2570     This checks that the instance is in the cluster.
2571
2572     """
2573     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2574     assert self.instance is not None, \
2575       "Cannot retrieve locked instance %s" % self.op.instance_name
2576
2577     _CheckNodeOnline(self, instance.primary_node)
2578
2579     bep = self.cfg.GetClusterInfo().FillBE(instance)
2580     # check bridges existance
2581     _CheckInstanceBridgesExist(self, instance)
2582
2583     _CheckNodeFreeMemory(self, instance.primary_node,
2584                          "starting instance %s" % instance.name,
2585                          bep[constants.BE_MEMORY], instance.hypervisor)
2586
2587   def Exec(self, feedback_fn):
2588     """Start the instance.
2589
2590     """
2591     instance = self.instance
2592     force = self.op.force
2593     extra_args = getattr(self.op, "extra_args", "")
2594
2595     self.cfg.MarkInstanceUp(instance.name)
2596
2597     node_current = instance.primary_node
2598
2599     _StartInstanceDisks(self, instance, force)
2600
2601     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2602     if result.failed or not result.data:
2603       _ShutdownInstanceDisks(self, instance)
2604       raise errors.OpExecError("Could not start instance")
2605
2606
2607 class LURebootInstance(LogicalUnit):
2608   """Reboot an instance.
2609
2610   """
2611   HPATH = "instance-reboot"
2612   HTYPE = constants.HTYPE_INSTANCE
2613   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2614   REQ_BGL = False
2615
2616   def ExpandNames(self):
2617     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2618                                    constants.INSTANCE_REBOOT_HARD,
2619                                    constants.INSTANCE_REBOOT_FULL]:
2620       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2621                                   (constants.INSTANCE_REBOOT_SOFT,
2622                                    constants.INSTANCE_REBOOT_HARD,
2623                                    constants.INSTANCE_REBOOT_FULL))
2624     self._ExpandAndLockInstance()
2625
2626   def BuildHooksEnv(self):
2627     """Build hooks env.
2628
2629     This runs on master, primary and secondary nodes of the instance.
2630
2631     """
2632     env = {
2633       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2634       }
2635     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2636     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2637           list(self.instance.secondary_nodes))
2638     return env, nl, nl
2639
2640   def CheckPrereq(self):
2641     """Check prerequisites.
2642
2643     This checks that the instance is in the cluster.
2644
2645     """
2646     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2647     assert self.instance is not None, \
2648       "Cannot retrieve locked instance %s" % self.op.instance_name
2649
2650     _CheckNodeOnline(self, instance.primary_node)
2651
2652     # check bridges existance
2653     _CheckInstanceBridgesExist(self, instance)
2654
2655   def Exec(self, feedback_fn):
2656     """Reboot the instance.
2657
2658     """
2659     instance = self.instance
2660     ignore_secondaries = self.op.ignore_secondaries
2661     reboot_type = self.op.reboot_type
2662     extra_args = getattr(self.op, "extra_args", "")
2663
2664     node_current = instance.primary_node
2665
2666     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2667                        constants.INSTANCE_REBOOT_HARD]:
2668       result = self.rpc.call_instance_reboot(node_current, instance,
2669                                              reboot_type, extra_args)
2670       if result.failed or not result.data:
2671         raise errors.OpExecError("Could not reboot instance")
2672     else:
2673       if not self.rpc.call_instance_shutdown(node_current, instance):
2674         raise errors.OpExecError("could not shutdown instance for full reboot")
2675       _ShutdownInstanceDisks(self, instance)
2676       _StartInstanceDisks(self, instance, ignore_secondaries)
2677       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2678       if result.failed or not result.data:
2679         _ShutdownInstanceDisks(self, instance)
2680         raise errors.OpExecError("Could not start instance for full reboot")
2681
2682     self.cfg.MarkInstanceUp(instance.name)
2683
2684
2685 class LUShutdownInstance(LogicalUnit):
2686   """Shutdown an instance.
2687
2688   """
2689   HPATH = "instance-stop"
2690   HTYPE = constants.HTYPE_INSTANCE
2691   _OP_REQP = ["instance_name"]
2692   REQ_BGL = False
2693
2694   def ExpandNames(self):
2695     self._ExpandAndLockInstance()
2696
2697   def BuildHooksEnv(self):
2698     """Build hooks env.
2699
2700     This runs on master, primary and secondary nodes of the instance.
2701
2702     """
2703     env = _BuildInstanceHookEnvByObject(self, self.instance)
2704     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2705           list(self.instance.secondary_nodes))
2706     return env, nl, nl
2707
2708   def CheckPrereq(self):
2709     """Check prerequisites.
2710
2711     This checks that the instance is in the cluster.
2712
2713     """
2714     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715     assert self.instance is not None, \
2716       "Cannot retrieve locked instance %s" % self.op.instance_name
2717     _CheckNodeOnline(self, self.instance.primary_node)
2718
2719   def Exec(self, feedback_fn):
2720     """Shutdown the instance.
2721
2722     """
2723     instance = self.instance
2724     node_current = instance.primary_node
2725     self.cfg.MarkInstanceDown(instance.name)
2726     result = self.rpc.call_instance_shutdown(node_current, instance)
2727     if result.failed or not result.data:
2728       self.proc.LogWarning("Could not shutdown instance")
2729
2730     _ShutdownInstanceDisks(self, instance)
2731
2732
2733 class LUReinstallInstance(LogicalUnit):
2734   """Reinstall an instance.
2735
2736   """
2737   HPATH = "instance-reinstall"
2738   HTYPE = constants.HTYPE_INSTANCE
2739   _OP_REQP = ["instance_name"]
2740   REQ_BGL = False
2741
2742   def ExpandNames(self):
2743     self._ExpandAndLockInstance()
2744
2745   def BuildHooksEnv(self):
2746     """Build hooks env.
2747
2748     This runs on master, primary and secondary nodes of the instance.
2749
2750     """
2751     env = _BuildInstanceHookEnvByObject(self, self.instance)
2752     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2753           list(self.instance.secondary_nodes))
2754     return env, nl, nl
2755
2756   def CheckPrereq(self):
2757     """Check prerequisites.
2758
2759     This checks that the instance is in the cluster and is not running.
2760
2761     """
2762     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2763     assert instance is not None, \
2764       "Cannot retrieve locked instance %s" % self.op.instance_name
2765     _CheckNodeOnline(self, instance.primary_node)
2766
2767     if instance.disk_template == constants.DT_DISKLESS:
2768       raise errors.OpPrereqError("Instance '%s' has no disks" %
2769                                  self.op.instance_name)
2770     if instance.status != "down":
2771       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2772                                  self.op.instance_name)
2773     remote_info = self.rpc.call_instance_info(instance.primary_node,
2774                                               instance.name,
2775                                               instance.hypervisor)
2776     if remote_info.failed or remote_info.data:
2777       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2778                                  (self.op.instance_name,
2779                                   instance.primary_node))
2780
2781     self.op.os_type = getattr(self.op, "os_type", None)
2782     if self.op.os_type is not None:
2783       # OS verification
2784       pnode = self.cfg.GetNodeInfo(
2785         self.cfg.ExpandNodeName(instance.primary_node))
2786       if pnode is None:
2787         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2788                                    self.op.pnode)
2789       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2790       result.Raise()
2791       if not isinstance(result.data, objects.OS):
2792         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2793                                    " primary node"  % self.op.os_type)
2794
2795     self.instance = instance
2796
2797   def Exec(self, feedback_fn):
2798     """Reinstall the instance.
2799
2800     """
2801     inst = self.instance
2802
2803     if self.op.os_type is not None:
2804       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2805       inst.os = self.op.os_type
2806       self.cfg.Update(inst)
2807
2808     _StartInstanceDisks(self, inst, None)
2809     try:
2810       feedback_fn("Running the instance OS create scripts...")
2811       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2812       result.Raise()
2813       if not result.data:
2814         raise errors.OpExecError("Could not install OS for instance %s"
2815                                  " on node %s" %
2816                                  (inst.name, inst.primary_node))
2817     finally:
2818       _ShutdownInstanceDisks(self, inst)
2819
2820
2821 class LURenameInstance(LogicalUnit):
2822   """Rename an instance.
2823
2824   """
2825   HPATH = "instance-rename"
2826   HTYPE = constants.HTYPE_INSTANCE
2827   _OP_REQP = ["instance_name", "new_name"]
2828
2829   def BuildHooksEnv(self):
2830     """Build hooks env.
2831
2832     This runs on master, primary and secondary nodes of the instance.
2833
2834     """
2835     env = _BuildInstanceHookEnvByObject(self, self.instance)
2836     env["INSTANCE_NEW_NAME"] = self.op.new_name
2837     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2838           list(self.instance.secondary_nodes))
2839     return env, nl, nl
2840
2841   def CheckPrereq(self):
2842     """Check prerequisites.
2843
2844     This checks that the instance is in the cluster and is not running.
2845
2846     """
2847     instance = self.cfg.GetInstanceInfo(
2848       self.cfg.ExpandInstanceName(self.op.instance_name))
2849     if instance is None:
2850       raise errors.OpPrereqError("Instance '%s' not known" %
2851                                  self.op.instance_name)
2852     _CheckNodeOnline(self, instance.primary_node)
2853
2854     if instance.status != "down":
2855       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2856                                  self.op.instance_name)
2857     remote_info = self.rpc.call_instance_info(instance.primary_node,
2858                                               instance.name,
2859                                               instance.hypervisor)
2860     remote_info.Raise()
2861     if remote_info.data:
2862       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2863                                  (self.op.instance_name,
2864                                   instance.primary_node))
2865     self.instance = instance
2866
2867     # new name verification
2868     name_info = utils.HostInfo(self.op.new_name)
2869
2870     self.op.new_name = new_name = name_info.name
2871     instance_list = self.cfg.GetInstanceList()
2872     if new_name in instance_list:
2873       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2874                                  new_name)
2875
2876     if not getattr(self.op, "ignore_ip", False):
2877       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2878         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2879                                    (name_info.ip, new_name))
2880
2881
2882   def Exec(self, feedback_fn):
2883     """Reinstall the instance.
2884
2885     """
2886     inst = self.instance
2887     old_name = inst.name
2888
2889     if inst.disk_template == constants.DT_FILE:
2890       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2891
2892     self.cfg.RenameInstance(inst.name, self.op.new_name)
2893     # Change the instance lock. This is definitely safe while we hold the BGL
2894     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2895     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2896
2897     # re-read the instance from the configuration after rename
2898     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2899
2900     if inst.disk_template == constants.DT_FILE:
2901       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2902       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2903                                                      old_file_storage_dir,
2904                                                      new_file_storage_dir)
2905       result.Raise()
2906       if not result.data:
2907         raise errors.OpExecError("Could not connect to node '%s' to rename"
2908                                  " directory '%s' to '%s' (but the instance"
2909                                  " has been renamed in Ganeti)" % (
2910                                  inst.primary_node, old_file_storage_dir,
2911                                  new_file_storage_dir))
2912
2913       if not result.data[0]:
2914         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2915                                  " (but the instance has been renamed in"
2916                                  " Ganeti)" % (old_file_storage_dir,
2917                                                new_file_storage_dir))
2918
2919     _StartInstanceDisks(self, inst, None)
2920     try:
2921       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2922                                                  old_name)
2923       if result.failed or not result.data:
2924         msg = ("Could not run OS rename script for instance %s on node %s"
2925                " (but the instance has been renamed in Ganeti)" %
2926                (inst.name, inst.primary_node))
2927         self.proc.LogWarning(msg)
2928     finally:
2929       _ShutdownInstanceDisks(self, inst)
2930
2931
2932 class LURemoveInstance(LogicalUnit):
2933   """Remove an instance.
2934
2935   """
2936   HPATH = "instance-remove"
2937   HTYPE = constants.HTYPE_INSTANCE
2938   _OP_REQP = ["instance_name", "ignore_failures"]
2939   REQ_BGL = False
2940
2941   def ExpandNames(self):
2942     self._ExpandAndLockInstance()
2943     self.needed_locks[locking.LEVEL_NODE] = []
2944     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2945
2946   def DeclareLocks(self, level):
2947     if level == locking.LEVEL_NODE:
2948       self._LockInstancesNodes()
2949
2950   def BuildHooksEnv(self):
2951     """Build hooks env.
2952
2953     This runs on master, primary and secondary nodes of the instance.
2954
2955     """
2956     env = _BuildInstanceHookEnvByObject(self, self.instance)
2957     nl = [self.cfg.GetMasterNode()]
2958     return env, nl, nl
2959
2960   def CheckPrereq(self):
2961     """Check prerequisites.
2962
2963     This checks that the instance is in the cluster.
2964
2965     """
2966     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2967     assert self.instance is not None, \
2968       "Cannot retrieve locked instance %s" % self.op.instance_name
2969
2970   def Exec(self, feedback_fn):
2971     """Remove the instance.
2972
2973     """
2974     instance = self.instance
2975     logging.info("Shutting down instance %s on node %s",
2976                  instance.name, instance.primary_node)
2977
2978     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2979     if result.failed or not result.data:
2980       if self.op.ignore_failures:
2981         feedback_fn("Warning: can't shutdown instance")
2982       else:
2983         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2984                                  (instance.name, instance.primary_node))
2985
2986     logging.info("Removing block devices for instance %s", instance.name)
2987
2988     if not _RemoveDisks(self, instance):
2989       if self.op.ignore_failures:
2990         feedback_fn("Warning: can't remove instance's disks")
2991       else:
2992         raise errors.OpExecError("Can't remove instance's disks")
2993
2994     logging.info("Removing instance %s out of cluster config", instance.name)
2995
2996     self.cfg.RemoveInstance(instance.name)
2997     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2998
2999
3000 class LUQueryInstances(NoHooksLU):
3001   """Logical unit for querying instances.
3002
3003   """
3004   _OP_REQP = ["output_fields", "names"]
3005   REQ_BGL = False
3006   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3007                                     "admin_state", "admin_ram",
3008                                     "disk_template", "ip", "mac", "bridge",
3009                                     "sda_size", "sdb_size", "vcpus", "tags",
3010                                     "network_port", "beparams",
3011                                     "(disk).(size)/([0-9]+)",
3012                                     "(disk).(sizes)",
3013                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3014                                     "(nic).(macs|ips|bridges)",
3015                                     "(disk|nic).(count)",
3016                                     "serial_no", "hypervisor", "hvparams",] +
3017                                   ["hv/%s" % name
3018                                    for name in constants.HVS_PARAMETERS] +
3019                                   ["be/%s" % name
3020                                    for name in constants.BES_PARAMETERS])
3021   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3022
3023
3024   def ExpandNames(self):
3025     _CheckOutputFields(static=self._FIELDS_STATIC,
3026                        dynamic=self._FIELDS_DYNAMIC,
3027                        selected=self.op.output_fields)
3028
3029     self.needed_locks = {}
3030     self.share_locks[locking.LEVEL_INSTANCE] = 1
3031     self.share_locks[locking.LEVEL_NODE] = 1
3032
3033     if self.op.names:
3034       self.wanted = _GetWantedInstances(self, self.op.names)
3035     else:
3036       self.wanted = locking.ALL_SET
3037
3038     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3039     if self.do_locking:
3040       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3041       self.needed_locks[locking.LEVEL_NODE] = []
3042       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3043
3044   def DeclareLocks(self, level):
3045     if level == locking.LEVEL_NODE and self.do_locking:
3046       self._LockInstancesNodes()
3047
3048   def CheckPrereq(self):
3049     """Check prerequisites.
3050
3051     """
3052     pass
3053
3054   def Exec(self, feedback_fn):
3055     """Computes the list of nodes and their attributes.
3056
3057     """
3058     all_info = self.cfg.GetAllInstancesInfo()
3059     if self.do_locking:
3060       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3061     elif self.wanted != locking.ALL_SET:
3062       instance_names = self.wanted
3063       missing = set(instance_names).difference(all_info.keys())
3064       if missing:
3065         raise errors.OpExecError(
3066           "Some instances were removed before retrieving their data: %s"
3067           % missing)
3068     else:
3069       instance_names = all_info.keys()
3070
3071     instance_names = utils.NiceSort(instance_names)
3072     instance_list = [all_info[iname] for iname in instance_names]
3073
3074     # begin data gathering
3075
3076     nodes = frozenset([inst.primary_node for inst in instance_list])
3077     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3078
3079     bad_nodes = []
3080     off_nodes = []
3081     if self.do_locking:
3082       live_data = {}
3083       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3084       for name in nodes:
3085         result = node_data[name]
3086         if result.offline:
3087           # offline nodes will be in both lists
3088           off_nodes.append(name)
3089         if result.failed:
3090           bad_nodes.append(name)
3091         else:
3092           if result.data:
3093             live_data.update(result.data)
3094             # else no instance is alive
3095     else:
3096       live_data = dict([(name, {}) for name in instance_names])
3097
3098     # end data gathering
3099
3100     HVPREFIX = "hv/"
3101     BEPREFIX = "be/"
3102     output = []
3103     for instance in instance_list:
3104       iout = []
3105       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3106       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3107       for field in self.op.output_fields:
3108         st_match = self._FIELDS_STATIC.Matches(field)
3109         if field == "name":
3110           val = instance.name
3111         elif field == "os":
3112           val = instance.os
3113         elif field == "pnode":
3114           val = instance.primary_node
3115         elif field == "snodes":
3116           val = list(instance.secondary_nodes)
3117         elif field == "admin_state":
3118           val = (instance.status != "down")
3119         elif field == "oper_state":
3120           if instance.primary_node in bad_nodes:
3121             val = None
3122           else:
3123             val = bool(live_data.get(instance.name))
3124         elif field == "status":
3125           if instance.primary_node in off_nodes:
3126             val = "ERROR_nodeoffline"
3127           elif instance.primary_node in bad_nodes:
3128             val = "ERROR_nodedown"
3129           else:
3130             running = bool(live_data.get(instance.name))
3131             if running:
3132               if instance.status != "down":
3133                 val = "running"
3134               else:
3135                 val = "ERROR_up"
3136             else:
3137               if instance.status != "down":
3138                 val = "ERROR_down"
3139               else:
3140                 val = "ADMIN_down"
3141         elif field == "oper_ram":
3142           if instance.primary_node in bad_nodes:
3143             val = None
3144           elif instance.name in live_data:
3145             val = live_data[instance.name].get("memory", "?")
3146           else:
3147             val = "-"
3148         elif field == "disk_template":
3149           val = instance.disk_template
3150         elif field == "ip":
3151           val = instance.nics[0].ip
3152         elif field == "bridge":
3153           val = instance.nics[0].bridge
3154         elif field == "mac":
3155           val = instance.nics[0].mac
3156         elif field == "sda_size" or field == "sdb_size":
3157           idx = ord(field[2]) - ord('a')
3158           try:
3159             val = instance.FindDisk(idx).size
3160           except errors.OpPrereqError:
3161             val = None
3162         elif field == "tags":
3163           val = list(instance.GetTags())
3164         elif field == "serial_no":
3165           val = instance.serial_no
3166         elif field == "network_port":
3167           val = instance.network_port
3168         elif field == "hypervisor":
3169           val = instance.hypervisor
3170         elif field == "hvparams":
3171           val = i_hv
3172         elif (field.startswith(HVPREFIX) and
3173               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3174           val = i_hv.get(field[len(HVPREFIX):], None)
3175         elif field == "beparams":
3176           val = i_be
3177         elif (field.startswith(BEPREFIX) and
3178               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3179           val = i_be.get(field[len(BEPREFIX):], None)
3180         elif st_match and st_match.groups():
3181           # matches a variable list
3182           st_groups = st_match.groups()
3183           if st_groups and st_groups[0] == "disk":
3184             if st_groups[1] == "count":
3185               val = len(instance.disks)
3186             elif st_groups[1] == "sizes":
3187               val = [disk.size for disk in instance.disks]
3188             elif st_groups[1] == "size":
3189               try:
3190                 val = instance.FindDisk(st_groups[2]).size
3191               except errors.OpPrereqError:
3192                 val = None
3193             else:
3194               assert False, "Unhandled disk parameter"
3195           elif st_groups[0] == "nic":
3196             if st_groups[1] == "count":
3197               val = len(instance.nics)
3198             elif st_groups[1] == "macs":
3199               val = [nic.mac for nic in instance.nics]
3200             elif st_groups[1] == "ips":
3201               val = [nic.ip for nic in instance.nics]
3202             elif st_groups[1] == "bridges":
3203               val = [nic.bridge for nic in instance.nics]
3204             else:
3205               # index-based item
3206               nic_idx = int(st_groups[2])
3207               if nic_idx >= len(instance.nics):
3208                 val = None
3209               else:
3210                 if st_groups[1] == "mac":
3211                   val = instance.nics[nic_idx].mac
3212                 elif st_groups[1] == "ip":
3213                   val = instance.nics[nic_idx].ip
3214                 elif st_groups[1] == "bridge":
3215                   val = instance.nics[nic_idx].bridge
3216                 else:
3217                   assert False, "Unhandled NIC parameter"
3218           else:
3219             assert False, "Unhandled variable parameter"
3220         else:
3221           raise errors.ParameterError(field)
3222         iout.append(val)
3223       output.append(iout)
3224
3225     return output
3226
3227
3228 class LUFailoverInstance(LogicalUnit):
3229   """Failover an instance.
3230
3231   """
3232   HPATH = "instance-failover"
3233   HTYPE = constants.HTYPE_INSTANCE
3234   _OP_REQP = ["instance_name", "ignore_consistency"]
3235   REQ_BGL = False
3236
3237   def ExpandNames(self):
3238     self._ExpandAndLockInstance()
3239     self.needed_locks[locking.LEVEL_NODE] = []
3240     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3241
3242   def DeclareLocks(self, level):
3243     if level == locking.LEVEL_NODE:
3244       self._LockInstancesNodes()
3245
3246   def BuildHooksEnv(self):
3247     """Build hooks env.
3248
3249     This runs on master, primary and secondary nodes of the instance.
3250
3251     """
3252     env = {
3253       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3254       }
3255     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3256     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3257     return env, nl, nl
3258
3259   def CheckPrereq(self):
3260     """Check prerequisites.
3261
3262     This checks that the instance is in the cluster.
3263
3264     """
3265     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3266     assert self.instance is not None, \
3267       "Cannot retrieve locked instance %s" % self.op.instance_name
3268
3269     bep = self.cfg.GetClusterInfo().FillBE(instance)
3270     if instance.disk_template not in constants.DTS_NET_MIRROR:
3271       raise errors.OpPrereqError("Instance's disk layout is not"
3272                                  " network mirrored, cannot failover.")
3273
3274     secondary_nodes = instance.secondary_nodes
3275     if not secondary_nodes:
3276       raise errors.ProgrammerError("no secondary node but using "
3277                                    "a mirrored disk template")
3278
3279     target_node = secondary_nodes[0]
3280     _CheckNodeOnline(self, target_node)
3281     # check memory requirements on the secondary node
3282     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3283                          instance.name, bep[constants.BE_MEMORY],
3284                          instance.hypervisor)
3285
3286     # check bridge existance
3287     brlist = [nic.bridge for nic in instance.nics]
3288     result = self.rpc.call_bridges_exist(target_node, brlist)
3289     result.Raise()
3290     if not result.data:
3291       raise errors.OpPrereqError("One or more target bridges %s does not"
3292                                  " exist on destination node '%s'" %
3293                                  (brlist, target_node))
3294
3295   def Exec(self, feedback_fn):
3296     """Failover an instance.
3297
3298     The failover is done by shutting it down on its present node and
3299     starting it on the secondary.
3300
3301     """
3302     instance = self.instance
3303
3304     source_node = instance.primary_node
3305     target_node = instance.secondary_nodes[0]
3306
3307     feedback_fn("* checking disk consistency between source and target")
3308     for dev in instance.disks:
3309       # for drbd, these are drbd over lvm
3310       if not _CheckDiskConsistency(self, dev, target_node, False):
3311         if instance.status == "up" and not self.op.ignore_consistency:
3312           raise errors.OpExecError("Disk %s is degraded on target node,"
3313                                    " aborting failover." % dev.iv_name)
3314
3315     feedback_fn("* shutting down instance on source node")
3316     logging.info("Shutting down instance %s on node %s",
3317                  instance.name, source_node)
3318
3319     result = self.rpc.call_instance_shutdown(source_node, instance)
3320     if result.failed or not result.data:
3321       if self.op.ignore_consistency:
3322         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3323                              " Proceeding"
3324                              " anyway. Please make sure node %s is down",
3325                              instance.name, source_node, source_node)
3326       else:
3327         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3328                                  (instance.name, source_node))
3329
3330     feedback_fn("* deactivating the instance's disks on source node")
3331     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3332       raise errors.OpExecError("Can't shut down the instance's disks.")
3333
3334     instance.primary_node = target_node
3335     # distribute new instance config to the other nodes
3336     self.cfg.Update(instance)
3337
3338     # Only start the instance if it's marked as up
3339     if instance.status == "up":
3340       feedback_fn("* activating the instance's disks on target node")
3341       logging.info("Starting instance %s on node %s",
3342                    instance.name, target_node)
3343
3344       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3345                                                ignore_secondaries=True)
3346       if not disks_ok:
3347         _ShutdownInstanceDisks(self, instance)
3348         raise errors.OpExecError("Can't activate the instance's disks")
3349
3350       feedback_fn("* starting the instance on the target node")
3351       result = self.rpc.call_instance_start(target_node, instance, None)
3352       if result.failed or not result.data:
3353         _ShutdownInstanceDisks(self, instance)
3354         raise errors.OpExecError("Could not start instance %s on node %s." %
3355                                  (instance.name, target_node))
3356
3357
3358 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3359   """Create a tree of block devices on the primary node.
3360
3361   This always creates all devices.
3362
3363   """
3364   if device.children:
3365     for child in device.children:
3366       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3367         return False
3368
3369   lu.cfg.SetDiskID(device, node)
3370   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3371                                        instance.name, True, info)
3372   if new_id.failed or not new_id.data:
3373     return False
3374   if device.physical_id is None:
3375     device.physical_id = new_id
3376   return True
3377
3378
3379 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3380   """Create a tree of block devices on a secondary node.
3381
3382   If this device type has to be created on secondaries, create it and
3383   all its children.
3384
3385   If not, just recurse to children keeping the same 'force' value.
3386
3387   """
3388   if device.CreateOnSecondary():
3389     force = True
3390   if device.children:
3391     for child in device.children:
3392       if not _CreateBlockDevOnSecondary(lu, node, instance,
3393                                         child, force, info):
3394         return False
3395
3396   if not force:
3397     return True
3398   lu.cfg.SetDiskID(device, node)
3399   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3400                                        instance.name, False, info)
3401   if new_id.failed or not new_id.data:
3402     return False
3403   if device.physical_id is None:
3404     device.physical_id = new_id
3405   return True
3406
3407
3408 def _GenerateUniqueNames(lu, exts):
3409   """Generate a suitable LV name.
3410
3411   This will generate a logical volume name for the given instance.
3412
3413   """
3414   results = []
3415   for val in exts:
3416     new_id = lu.cfg.GenerateUniqueID()
3417     results.append("%s%s" % (new_id, val))
3418   return results
3419
3420
3421 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3422                          p_minor, s_minor):
3423   """Generate a drbd8 device complete with its children.
3424
3425   """
3426   port = lu.cfg.AllocatePort()
3427   vgname = lu.cfg.GetVGName()
3428   shared_secret = lu.cfg.GenerateDRBDSecret()
3429   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3430                           logical_id=(vgname, names[0]))
3431   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3432                           logical_id=(vgname, names[1]))
3433   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3434                           logical_id=(primary, secondary, port,
3435                                       p_minor, s_minor,
3436                                       shared_secret),
3437                           children=[dev_data, dev_meta],
3438                           iv_name=iv_name)
3439   return drbd_dev
3440
3441
3442 def _GenerateDiskTemplate(lu, template_name,
3443                           instance_name, primary_node,
3444                           secondary_nodes, disk_info,
3445                           file_storage_dir, file_driver,
3446                           base_index):
3447   """Generate the entire disk layout for a given template type.
3448
3449   """
3450   #TODO: compute space requirements
3451
3452   vgname = lu.cfg.GetVGName()
3453   disk_count = len(disk_info)
3454   disks = []
3455   if template_name == constants.DT_DISKLESS:
3456     pass
3457   elif template_name == constants.DT_PLAIN:
3458     if len(secondary_nodes) != 0:
3459       raise errors.ProgrammerError("Wrong template configuration")
3460
3461     names = _GenerateUniqueNames(lu, [".disk%d" % i
3462                                       for i in range(disk_count)])
3463     for idx, disk in enumerate(disk_info):
3464       disk_index = idx + base_index
3465       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3466                               logical_id=(vgname, names[idx]),
3467                               iv_name="disk/%d" % disk_index)
3468       disks.append(disk_dev)
3469   elif template_name == constants.DT_DRBD8:
3470     if len(secondary_nodes) != 1:
3471       raise errors.ProgrammerError("Wrong template configuration")
3472     remote_node = secondary_nodes[0]
3473     minors = lu.cfg.AllocateDRBDMinor(
3474       [primary_node, remote_node] * len(disk_info), instance_name)
3475
3476     names = _GenerateUniqueNames(lu,
3477                                  [".disk%d_%s" % (i, s)
3478                                   for i in range(disk_count)
3479                                   for s in ("data", "meta")
3480                                   ])
3481     for idx, disk in enumerate(disk_info):
3482       disk_index = idx + base_index
3483       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3484                                       disk["size"], names[idx*2:idx*2+2],
3485                                       "disk/%d" % disk_index,
3486                                       minors[idx*2], minors[idx*2+1])
3487       disks.append(disk_dev)
3488   elif template_name == constants.DT_FILE:
3489     if len(secondary_nodes) != 0:
3490       raise errors.ProgrammerError("Wrong template configuration")
3491
3492     for idx, disk in enumerate(disk_info):
3493       disk_index = idx + base_index
3494       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3495                               iv_name="disk/%d" % disk_index,
3496                               logical_id=(file_driver,
3497                                           "%s/disk%d" % (file_storage_dir,
3498                                                          idx)))
3499       disks.append(disk_dev)
3500   else:
3501     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3502   return disks
3503
3504
3505 def _GetInstanceInfoText(instance):
3506   """Compute that text that should be added to the disk's metadata.
3507
3508   """
3509   return "originstname+%s" % instance.name
3510
3511
3512 def _CreateDisks(lu, instance):
3513   """Create all disks for an instance.
3514
3515   This abstracts away some work from AddInstance.
3516
3517   @type lu: L{LogicalUnit}
3518   @param lu: the logical unit on whose behalf we execute
3519   @type instance: L{objects.Instance}
3520   @param instance: the instance whose disks we should create
3521   @rtype: boolean
3522   @return: the success of the creation
3523
3524   """
3525   info = _GetInstanceInfoText(instance)
3526
3527   if instance.disk_template == constants.DT_FILE:
3528     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3529     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3530                                                  file_storage_dir)
3531
3532     if result.failed or not result.data:
3533       logging.error("Could not connect to node '%s'", instance.primary_node)
3534       return False
3535
3536     if not result.data[0]:
3537       logging.error("Failed to create directory '%s'", file_storage_dir)
3538       return False
3539
3540   # Note: this needs to be kept in sync with adding of disks in
3541   # LUSetInstanceParams
3542   for device in instance.disks:
3543     logging.info("Creating volume %s for instance %s",
3544                  device.iv_name, instance.name)
3545     #HARDCODE
3546     for secondary_node in instance.secondary_nodes:
3547       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3548                                         device, False, info):
3549         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3550                       device.iv_name, device, secondary_node)
3551         return False
3552     #HARDCODE
3553     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3554                                     instance, device, info):
3555       logging.error("Failed to create volume %s on primary!", device.iv_name)
3556       return False
3557
3558   return True
3559
3560
3561 def _RemoveDisks(lu, instance):
3562   """Remove all disks for an instance.
3563
3564   This abstracts away some work from `AddInstance()` and
3565   `RemoveInstance()`. Note that in case some of the devices couldn't
3566   be removed, the removal will continue with the other ones (compare
3567   with `_CreateDisks()`).
3568
3569   @type lu: L{LogicalUnit}
3570   @param lu: the logical unit on whose behalf we execute
3571   @type instance: L{objects.Instance}
3572   @param instance: the instance whose disks we should remove
3573   @rtype: boolean
3574   @return: the success of the removal
3575
3576   """
3577   logging.info("Removing block devices for instance %s", instance.name)
3578
3579   result = True
3580   for device in instance.disks:
3581     for node, disk in device.ComputeNodeTree(instance.primary_node):
3582       lu.cfg.SetDiskID(disk, node)
3583       result = lu.rpc.call_blockdev_remove(node, disk)
3584       if result.failed or not result.data:
3585         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3586                            " continuing anyway", device.iv_name, node)
3587         result = False
3588
3589   if instance.disk_template == constants.DT_FILE:
3590     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3591     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3592                                                  file_storage_dir)
3593     if result.failed or not result.data:
3594       logging.error("Could not remove directory '%s'", file_storage_dir)
3595       result = False
3596
3597   return result
3598
3599
3600 def _ComputeDiskSize(disk_template, disks):
3601   """Compute disk size requirements in the volume group
3602
3603   """
3604   # Required free disk space as a function of disk and swap space
3605   req_size_dict = {
3606     constants.DT_DISKLESS: None,
3607     constants.DT_PLAIN: sum(d["size"] for d in disks),
3608     # 128 MB are added for drbd metadata for each disk
3609     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3610     constants.DT_FILE: None,
3611   }
3612
3613   if disk_template not in req_size_dict:
3614     raise errors.ProgrammerError("Disk template '%s' size requirement"
3615                                  " is unknown" %  disk_template)
3616
3617   return req_size_dict[disk_template]
3618
3619
3620 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3621   """Hypervisor parameter validation.
3622
3623   This function abstract the hypervisor parameter validation to be
3624   used in both instance create and instance modify.
3625
3626   @type lu: L{LogicalUnit}
3627   @param lu: the logical unit for which we check
3628   @type nodenames: list
3629   @param nodenames: the list of nodes on which we should check
3630   @type hvname: string
3631   @param hvname: the name of the hypervisor we should use
3632   @type hvparams: dict
3633   @param hvparams: the parameters which we need to check
3634   @raise errors.OpPrereqError: if the parameters are not valid
3635
3636   """
3637   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3638                                                   hvname,
3639                                                   hvparams)
3640   for node in nodenames:
3641     info = hvinfo[node]
3642     info.Raise()
3643     if not info.data or not isinstance(info.data, (tuple, list)):
3644       raise errors.OpPrereqError("Cannot get current information"
3645                                  " from node '%s' (%s)" % (node, info.data))
3646     if not info.data[0]:
3647       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3648                                  " %s" % info.data[1])
3649
3650
3651 class LUCreateInstance(LogicalUnit):
3652   """Create an instance.
3653
3654   """
3655   HPATH = "instance-add"
3656   HTYPE = constants.HTYPE_INSTANCE
3657   _OP_REQP = ["instance_name", "disks", "disk_template",
3658               "mode", "start",
3659               "wait_for_sync", "ip_check", "nics",
3660               "hvparams", "beparams"]
3661   REQ_BGL = False
3662
3663   def _ExpandNode(self, node):
3664     """Expands and checks one node name.
3665
3666     """
3667     node_full = self.cfg.ExpandNodeName(node)
3668     if node_full is None:
3669       raise errors.OpPrereqError("Unknown node %s" % node)
3670     return node_full
3671
3672   def ExpandNames(self):
3673     """ExpandNames for CreateInstance.
3674
3675     Figure out the right locks for instance creation.
3676
3677     """
3678     self.needed_locks = {}
3679
3680     # set optional parameters to none if they don't exist
3681     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3682       if not hasattr(self.op, attr):
3683         setattr(self.op, attr, None)
3684
3685     # cheap checks, mostly valid constants given
3686
3687     # verify creation mode
3688     if self.op.mode not in (constants.INSTANCE_CREATE,
3689                             constants.INSTANCE_IMPORT):
3690       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3691                                  self.op.mode)
3692
3693     # disk template and mirror node verification
3694     if self.op.disk_template not in constants.DISK_TEMPLATES:
3695       raise errors.OpPrereqError("Invalid disk template name")
3696
3697     if self.op.hypervisor is None:
3698       self.op.hypervisor = self.cfg.GetHypervisorType()
3699
3700     cluster = self.cfg.GetClusterInfo()
3701     enabled_hvs = cluster.enabled_hypervisors
3702     if self.op.hypervisor not in enabled_hvs:
3703       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3704                                  " cluster (%s)" % (self.op.hypervisor,
3705                                   ",".join(enabled_hvs)))
3706
3707     # check hypervisor parameter syntax (locally)
3708
3709     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3710                                   self.op.hvparams)
3711     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3712     hv_type.CheckParameterSyntax(filled_hvp)
3713
3714     # fill and remember the beparams dict
3715     utils.CheckBEParams(self.op.beparams)
3716     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3717                                     self.op.beparams)
3718
3719     #### instance parameters check
3720
3721     # instance name verification
3722     hostname1 = utils.HostInfo(self.op.instance_name)
3723     self.op.instance_name = instance_name = hostname1.name
3724
3725     # this is just a preventive check, but someone might still add this
3726     # instance in the meantime, and creation will fail at lock-add time
3727     if instance_name in self.cfg.GetInstanceList():
3728       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3729                                  instance_name)
3730
3731     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3732
3733     # NIC buildup
3734     self.nics = []
3735     for nic in self.op.nics:
3736       # ip validity checks
3737       ip = nic.get("ip", None)
3738       if ip is None or ip.lower() == "none":
3739         nic_ip = None
3740       elif ip.lower() == constants.VALUE_AUTO:
3741         nic_ip = hostname1.ip
3742       else:
3743         if not utils.IsValidIP(ip):
3744           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3745                                      " like a valid IP" % ip)
3746         nic_ip = ip
3747
3748       # MAC address verification
3749       mac = nic.get("mac", constants.VALUE_AUTO)
3750       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3751         if not utils.IsValidMac(mac.lower()):
3752           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3753                                      mac)
3754       # bridge verification
3755       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3756       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3757
3758     # disk checks/pre-build
3759     self.disks = []
3760     for disk in self.op.disks:
3761       mode = disk.get("mode", constants.DISK_RDWR)
3762       if mode not in constants.DISK_ACCESS_SET:
3763         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3764                                    mode)
3765       size = disk.get("size", None)
3766       if size is None:
3767         raise errors.OpPrereqError("Missing disk size")
3768       try:
3769         size = int(size)
3770       except ValueError:
3771         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3772       self.disks.append({"size": size, "mode": mode})
3773
3774     # used in CheckPrereq for ip ping check
3775     self.check_ip = hostname1.ip
3776
3777     # file storage checks
3778     if (self.op.file_driver and
3779         not self.op.file_driver in constants.FILE_DRIVER):
3780       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3781                                  self.op.file_driver)
3782
3783     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3784       raise errors.OpPrereqError("File storage directory path not absolute")
3785
3786     ### Node/iallocator related checks
3787     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3788       raise errors.OpPrereqError("One and only one of iallocator and primary"
3789                                  " node must be given")
3790
3791     if self.op.iallocator:
3792       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3793     else:
3794       self.op.pnode = self._ExpandNode(self.op.pnode)
3795       nodelist = [self.op.pnode]
3796       if self.op.snode is not None:
3797         self.op.snode = self._ExpandNode(self.op.snode)
3798         nodelist.append(self.op.snode)
3799       self.needed_locks[locking.LEVEL_NODE] = nodelist
3800
3801     # in case of import lock the source node too
3802     if self.op.mode == constants.INSTANCE_IMPORT:
3803       src_node = getattr(self.op, "src_node", None)
3804       src_path = getattr(self.op, "src_path", None)
3805
3806       if src_path is None:
3807         self.op.src_path = src_path = self.op.instance_name
3808
3809       if src_node is None:
3810         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3811         self.op.src_node = None
3812         if os.path.isabs(src_path):
3813           raise errors.OpPrereqError("Importing an instance from an absolute"
3814                                      " path requires a source node option.")
3815       else:
3816         self.op.src_node = src_node = self._ExpandNode(src_node)
3817         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3818           self.needed_locks[locking.LEVEL_NODE].append(src_node)
3819         if not os.path.isabs(src_path):
3820           self.op.src_path = src_path = \
3821             os.path.join(constants.EXPORT_DIR, src_path)
3822
3823     else: # INSTANCE_CREATE
3824       if getattr(self.op, "os_type", None) is None:
3825         raise errors.OpPrereqError("No guest OS specified")
3826
3827   def _RunAllocator(self):
3828     """Run the allocator based on input opcode.
3829
3830     """
3831     nics = [n.ToDict() for n in self.nics]
3832     ial = IAllocator(self,
3833                      mode=constants.IALLOCATOR_MODE_ALLOC,
3834                      name=self.op.instance_name,
3835                      disk_template=self.op.disk_template,
3836                      tags=[],
3837                      os=self.op.os_type,
3838                      vcpus=self.be_full[constants.BE_VCPUS],
3839                      mem_size=self.be_full[constants.BE_MEMORY],
3840                      disks=self.disks,
3841                      nics=nics,
3842                      hypervisor=self.op.hypervisor,
3843                      )
3844
3845     ial.Run(self.op.iallocator)
3846
3847     if not ial.success:
3848       raise errors.OpPrereqError("Can't compute nodes using"
3849                                  " iallocator '%s': %s" % (self.op.iallocator,
3850                                                            ial.info))
3851     if len(ial.nodes) != ial.required_nodes:
3852       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3853                                  " of nodes (%s), required %s" %
3854                                  (self.op.iallocator, len(ial.nodes),
3855                                   ial.required_nodes))
3856     self.op.pnode = ial.nodes[0]
3857     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3858                  self.op.instance_name, self.op.iallocator,
3859                  ", ".join(ial.nodes))
3860     if ial.required_nodes == 2:
3861       self.op.snode = ial.nodes[1]
3862
3863   def BuildHooksEnv(self):
3864     """Build hooks env.
3865
3866     This runs on master, primary and secondary nodes of the instance.
3867
3868     """
3869     env = {
3870       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3871       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3872       "INSTANCE_ADD_MODE": self.op.mode,
3873       }
3874     if self.op.mode == constants.INSTANCE_IMPORT:
3875       env["INSTANCE_SRC_NODE"] = self.op.src_node
3876       env["INSTANCE_SRC_PATH"] = self.op.src_path
3877       env["INSTANCE_SRC_IMAGES"] = self.src_images
3878
3879     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3880       primary_node=self.op.pnode,
3881       secondary_nodes=self.secondaries,
3882       status=self.instance_status,
3883       os_type=self.op.os_type,
3884       memory=self.be_full[constants.BE_MEMORY],
3885       vcpus=self.be_full[constants.BE_VCPUS],
3886       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3887     ))
3888
3889     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3890           self.secondaries)
3891     return env, nl, nl
3892
3893
3894   def CheckPrereq(self):
3895     """Check prerequisites.
3896
3897     """
3898     if (not self.cfg.GetVGName() and
3899         self.op.disk_template not in constants.DTS_NOT_LVM):
3900       raise errors.OpPrereqError("Cluster does not support lvm-based"
3901                                  " instances")
3902
3903
3904     if self.op.mode == constants.INSTANCE_IMPORT:
3905       src_node = self.op.src_node
3906       src_path = self.op.src_path
3907
3908       if src_node is None:
3909         exp_list = self.rpc.call_export_list(
3910           self.acquired_locks[locking.LEVEL_NODE])
3911         found = False
3912         for node in exp_list:
3913           if not exp_list[node].failed and src_path in exp_list[node].data:
3914             found = True
3915             self.op.src_node = src_node = node
3916             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3917                                                        src_path)
3918             break
3919         if not found:
3920           raise errors.OpPrereqError("No export found for relative path %s" %
3921                                       src_path)
3922
3923       _CheckNodeOnline(self, src_node)
3924       result = self.rpc.call_export_info(src_node, src_path)
3925       result.Raise()
3926       if not result.data:
3927         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3928
3929       export_info = result.data
3930       if not export_info.has_section(constants.INISECT_EXP):
3931         raise errors.ProgrammerError("Corrupted export config")
3932
3933       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3934       if (int(ei_version) != constants.EXPORT_VERSION):
3935         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3936                                    (ei_version, constants.EXPORT_VERSION))
3937
3938       # Check that the new instance doesn't have less disks than the export
3939       instance_disks = len(self.disks)
3940       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3941       if instance_disks < export_disks:
3942         raise errors.OpPrereqError("Not enough disks to import."
3943                                    " (instance: %d, export: %d)" %
3944                                    (instance_disks, export_disks))
3945
3946       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3947       disk_images = []
3948       for idx in range(export_disks):
3949         option = 'disk%d_dump' % idx
3950         if export_info.has_option(constants.INISECT_INS, option):
3951           # FIXME: are the old os-es, disk sizes, etc. useful?
3952           export_name = export_info.get(constants.INISECT_INS, option)
3953           image = os.path.join(src_path, export_name)
3954           disk_images.append(image)
3955         else:
3956           disk_images.append(False)
3957
3958       self.src_images = disk_images
3959
3960       old_name = export_info.get(constants.INISECT_INS, 'name')
3961       # FIXME: int() here could throw a ValueError on broken exports
3962       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3963       if self.op.instance_name == old_name:
3964         for idx, nic in enumerate(self.nics):
3965           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3966             nic_mac_ini = 'nic%d_mac' % idx
3967             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3968
3969     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3970     if self.op.start and not self.op.ip_check:
3971       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3972                                  " adding an instance in start mode")
3973
3974     if self.op.ip_check:
3975       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3976         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3977                                    (self.check_ip, self.op.instance_name))
3978
3979     #### allocator run
3980
3981     if self.op.iallocator is not None:
3982       self._RunAllocator()
3983
3984     #### node related checks
3985
3986     # check primary node
3987     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3988     assert self.pnode is not None, \
3989       "Cannot retrieve locked node %s" % self.op.pnode
3990     if pnode.offline:
3991       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
3992                                  pnode.name)
3993
3994     self.secondaries = []
3995
3996     # mirror node verification
3997     if self.op.disk_template in constants.DTS_NET_MIRROR:
3998       if self.op.snode is None:
3999         raise errors.OpPrereqError("The networked disk templates need"
4000                                    " a mirror node")
4001       if self.op.snode == pnode.name:
4002         raise errors.OpPrereqError("The secondary node cannot be"
4003                                    " the primary node.")
4004       self.secondaries.append(self.op.snode)
4005       _CheckNodeOnline(self, self.op.snode)
4006
4007     nodenames = [pnode.name] + self.secondaries
4008
4009     req_size = _ComputeDiskSize(self.op.disk_template,
4010                                 self.disks)
4011
4012     # Check lv size requirements
4013     if req_size is not None:
4014       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4015                                          self.op.hypervisor)
4016       for node in nodenames:
4017         info = nodeinfo[node]
4018         info.Raise()
4019         info = info.data
4020         if not info:
4021           raise errors.OpPrereqError("Cannot get current information"
4022                                      " from node '%s'" % node)
4023         vg_free = info.get('vg_free', None)
4024         if not isinstance(vg_free, int):
4025           raise errors.OpPrereqError("Can't compute free disk space on"
4026                                      " node %s" % node)
4027         if req_size > info['vg_free']:
4028           raise errors.OpPrereqError("Not enough disk space on target node %s."
4029                                      " %d MB available, %d MB required" %
4030                                      (node, info['vg_free'], req_size))
4031
4032     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4033
4034     # os verification
4035     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4036     result.Raise()
4037     if not isinstance(result.data, objects.OS):
4038       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4039                                  " primary node"  % self.op.os_type)
4040
4041     # bridge check on primary node
4042     bridges = [n.bridge for n in self.nics]
4043     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4044     result.Raise()
4045     if not result.data:
4046       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4047                                  " exist on destination node '%s'" %
4048                                  (",".join(bridges), pnode.name))
4049
4050     # memory check on primary node
4051     if self.op.start:
4052       _CheckNodeFreeMemory(self, self.pnode.name,
4053                            "creating instance %s" % self.op.instance_name,
4054                            self.be_full[constants.BE_MEMORY],
4055                            self.op.hypervisor)
4056
4057     if self.op.start:
4058       self.instance_status = 'up'
4059     else:
4060       self.instance_status = 'down'
4061
4062   def Exec(self, feedback_fn):
4063     """Create and add the instance to the cluster.
4064
4065     """
4066     instance = self.op.instance_name
4067     pnode_name = self.pnode.name
4068
4069     for nic in self.nics:
4070       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4071         nic.mac = self.cfg.GenerateMAC()
4072
4073     ht_kind = self.op.hypervisor
4074     if ht_kind in constants.HTS_REQ_PORT:
4075       network_port = self.cfg.AllocatePort()
4076     else:
4077       network_port = None
4078
4079     ##if self.op.vnc_bind_address is None:
4080     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4081
4082     # this is needed because os.path.join does not accept None arguments
4083     if self.op.file_storage_dir is None:
4084       string_file_storage_dir = ""
4085     else:
4086       string_file_storage_dir = self.op.file_storage_dir
4087
4088     # build the full file storage dir path
4089     file_storage_dir = os.path.normpath(os.path.join(
4090                                         self.cfg.GetFileStorageDir(),
4091                                         string_file_storage_dir, instance))
4092
4093
4094     disks = _GenerateDiskTemplate(self,
4095                                   self.op.disk_template,
4096                                   instance, pnode_name,
4097                                   self.secondaries,
4098                                   self.disks,
4099                                   file_storage_dir,
4100                                   self.op.file_driver,
4101                                   0)
4102
4103     iobj = objects.Instance(name=instance, os=self.op.os_type,
4104                             primary_node=pnode_name,
4105                             nics=self.nics, disks=disks,
4106                             disk_template=self.op.disk_template,
4107                             status=self.instance_status,
4108                             network_port=network_port,
4109                             beparams=self.op.beparams,
4110                             hvparams=self.op.hvparams,
4111                             hypervisor=self.op.hypervisor,
4112                             )
4113
4114     feedback_fn("* creating instance disks...")
4115     if not _CreateDisks(self, iobj):
4116       _RemoveDisks(self, iobj)
4117       self.cfg.ReleaseDRBDMinors(instance)
4118       raise errors.OpExecError("Device creation failed, reverting...")
4119
4120     feedback_fn("adding instance %s to cluster config" % instance)
4121
4122     self.cfg.AddInstance(iobj)
4123     # Declare that we don't want to remove the instance lock anymore, as we've
4124     # added the instance to the config
4125     del self.remove_locks[locking.LEVEL_INSTANCE]
4126     # Remove the temp. assignements for the instance's drbds
4127     self.cfg.ReleaseDRBDMinors(instance)
4128     # Unlock all the nodes
4129     if self.op.mode == constants.INSTANCE_IMPORT:
4130       nodes_keep = [self.op.src_node]
4131       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4132                        if node != self.op.src_node]
4133       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4134       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4135     else:
4136       self.context.glm.release(locking.LEVEL_NODE)
4137       del self.acquired_locks[locking.LEVEL_NODE]
4138
4139     if self.op.wait_for_sync:
4140       disk_abort = not _WaitForSync(self, iobj)
4141     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4142       # make sure the disks are not degraded (still sync-ing is ok)
4143       time.sleep(15)
4144       feedback_fn("* checking mirrors status")
4145       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4146     else:
4147       disk_abort = False
4148
4149     if disk_abort:
4150       _RemoveDisks(self, iobj)
4151       self.cfg.RemoveInstance(iobj.name)
4152       # Make sure the instance lock gets removed
4153       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4154       raise errors.OpExecError("There are some degraded disks for"
4155                                " this instance")
4156
4157     feedback_fn("creating os for instance %s on node %s" %
4158                 (instance, pnode_name))
4159
4160     if iobj.disk_template != constants.DT_DISKLESS:
4161       if self.op.mode == constants.INSTANCE_CREATE:
4162         feedback_fn("* running the instance OS create scripts...")
4163         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4164         result.Raise()
4165         if not result.data:
4166           raise errors.OpExecError("Could not add os for instance %s"
4167                                    " on node %s" %
4168                                    (instance, pnode_name))
4169
4170       elif self.op.mode == constants.INSTANCE_IMPORT:
4171         feedback_fn("* running the instance OS import scripts...")
4172         src_node = self.op.src_node
4173         src_images = self.src_images
4174         cluster_name = self.cfg.GetClusterName()
4175         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4176                                                          src_node, src_images,
4177                                                          cluster_name)
4178         import_result.Raise()
4179         for idx, result in enumerate(import_result.data):
4180           if not result:
4181             self.LogWarning("Could not import the image %s for instance"
4182                             " %s, disk %d, on node %s" %
4183                             (src_images[idx], instance, idx, pnode_name))
4184       else:
4185         # also checked in the prereq part
4186         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4187                                      % self.op.mode)
4188
4189     if self.op.start:
4190       logging.info("Starting instance %s on node %s", instance, pnode_name)
4191       feedback_fn("* starting instance...")
4192       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4193       result.Raise()
4194       if not result.data:
4195         raise errors.OpExecError("Could not start instance")
4196
4197
4198 class LUConnectConsole(NoHooksLU):
4199   """Connect to an instance's console.
4200
4201   This is somewhat special in that it returns the command line that
4202   you need to run on the master node in order to connect to the
4203   console.
4204
4205   """
4206   _OP_REQP = ["instance_name"]
4207   REQ_BGL = False
4208
4209   def ExpandNames(self):
4210     self._ExpandAndLockInstance()
4211
4212   def CheckPrereq(self):
4213     """Check prerequisites.
4214
4215     This checks that the instance is in the cluster.
4216
4217     """
4218     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4219     assert self.instance is not None, \
4220       "Cannot retrieve locked instance %s" % self.op.instance_name
4221     _CheckNodeOnline(self, self.instance.primary_node)
4222
4223   def Exec(self, feedback_fn):
4224     """Connect to the console of an instance
4225
4226     """
4227     instance = self.instance
4228     node = instance.primary_node
4229
4230     node_insts = self.rpc.call_instance_list([node],
4231                                              [instance.hypervisor])[node]
4232     node_insts.Raise()
4233
4234     if instance.name not in node_insts.data:
4235       raise errors.OpExecError("Instance %s is not running." % instance.name)
4236
4237     logging.debug("Connecting to console of %s on %s", instance.name, node)
4238
4239     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4240     console_cmd = hyper.GetShellCommandForConsole(instance)
4241
4242     # build ssh cmdline
4243     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4244
4245
4246 class LUReplaceDisks(LogicalUnit):
4247   """Replace the disks of an instance.
4248
4249   """
4250   HPATH = "mirrors-replace"
4251   HTYPE = constants.HTYPE_INSTANCE
4252   _OP_REQP = ["instance_name", "mode", "disks"]
4253   REQ_BGL = False
4254
4255   def ExpandNames(self):
4256     self._ExpandAndLockInstance()
4257
4258     if not hasattr(self.op, "remote_node"):
4259       self.op.remote_node = None
4260
4261     ia_name = getattr(self.op, "iallocator", None)
4262     if ia_name is not None:
4263       if self.op.remote_node is not None:
4264         raise errors.OpPrereqError("Give either the iallocator or the new"
4265                                    " secondary, not both")
4266       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4267     elif self.op.remote_node is not None:
4268       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4269       if remote_node is None:
4270         raise errors.OpPrereqError("Node '%s' not known" %
4271                                    self.op.remote_node)
4272       self.op.remote_node = remote_node
4273       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4274       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4275     else:
4276       self.needed_locks[locking.LEVEL_NODE] = []
4277       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4278
4279   def DeclareLocks(self, level):
4280     # If we're not already locking all nodes in the set we have to declare the
4281     # instance's primary/secondary nodes.
4282     if (level == locking.LEVEL_NODE and
4283         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4284       self._LockInstancesNodes()
4285
4286   def _RunAllocator(self):
4287     """Compute a new secondary node using an IAllocator.
4288
4289     """
4290     ial = IAllocator(self,
4291                      mode=constants.IALLOCATOR_MODE_RELOC,
4292                      name=self.op.instance_name,
4293                      relocate_from=[self.sec_node])
4294
4295     ial.Run(self.op.iallocator)
4296
4297     if not ial.success:
4298       raise errors.OpPrereqError("Can't compute nodes using"
4299                                  " iallocator '%s': %s" % (self.op.iallocator,
4300                                                            ial.info))
4301     if len(ial.nodes) != ial.required_nodes:
4302       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4303                                  " of nodes (%s), required %s" %
4304                                  (len(ial.nodes), ial.required_nodes))
4305     self.op.remote_node = ial.nodes[0]
4306     self.LogInfo("Selected new secondary for the instance: %s",
4307                  self.op.remote_node)
4308
4309   def BuildHooksEnv(self):
4310     """Build hooks env.
4311
4312     This runs on the master, the primary and all the secondaries.
4313
4314     """
4315     env = {
4316       "MODE": self.op.mode,
4317       "NEW_SECONDARY": self.op.remote_node,
4318       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4319       }
4320     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4321     nl = [
4322       self.cfg.GetMasterNode(),
4323       self.instance.primary_node,
4324       ]
4325     if self.op.remote_node is not None:
4326       nl.append(self.op.remote_node)
4327     return env, nl, nl
4328
4329   def CheckPrereq(self):
4330     """Check prerequisites.
4331
4332     This checks that the instance is in the cluster.
4333
4334     """
4335     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4336     assert instance is not None, \
4337       "Cannot retrieve locked instance %s" % self.op.instance_name
4338     self.instance = instance
4339
4340     if instance.disk_template not in constants.DTS_NET_MIRROR:
4341       raise errors.OpPrereqError("Instance's disk layout is not"
4342                                  " network mirrored.")
4343
4344     if len(instance.secondary_nodes) != 1:
4345       raise errors.OpPrereqError("The instance has a strange layout,"
4346                                  " expected one secondary but found %d" %
4347                                  len(instance.secondary_nodes))
4348
4349     self.sec_node = instance.secondary_nodes[0]
4350
4351     ia_name = getattr(self.op, "iallocator", None)
4352     if ia_name is not None:
4353       self._RunAllocator()
4354
4355     remote_node = self.op.remote_node
4356     if remote_node is not None:
4357       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4358       assert self.remote_node_info is not None, \
4359         "Cannot retrieve locked node %s" % remote_node
4360     else:
4361       self.remote_node_info = None
4362     if remote_node == instance.primary_node:
4363       raise errors.OpPrereqError("The specified node is the primary node of"
4364                                  " the instance.")
4365     elif remote_node == self.sec_node:
4366       if self.op.mode == constants.REPLACE_DISK_SEC:
4367         # this is for DRBD8, where we can't execute the same mode of
4368         # replacement as for drbd7 (no different port allocated)
4369         raise errors.OpPrereqError("Same secondary given, cannot execute"
4370                                    " replacement")
4371     if instance.disk_template == constants.DT_DRBD8:
4372       if (self.op.mode == constants.REPLACE_DISK_ALL and
4373           remote_node is not None):
4374         # switch to replace secondary mode
4375         self.op.mode = constants.REPLACE_DISK_SEC
4376
4377       if self.op.mode == constants.REPLACE_DISK_ALL:
4378         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4379                                    " secondary disk replacement, not"
4380                                    " both at once")
4381       elif self.op.mode == constants.REPLACE_DISK_PRI:
4382         if remote_node is not None:
4383           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4384                                      " the secondary while doing a primary"
4385                                      " node disk replacement")
4386         self.tgt_node = instance.primary_node
4387         self.oth_node = instance.secondary_nodes[0]
4388         _CheckNodeOnline(self, self.tgt_node)
4389         _CheckNodeOnline(self, self.oth_node)
4390       elif self.op.mode == constants.REPLACE_DISK_SEC:
4391         self.new_node = remote_node # this can be None, in which case
4392                                     # we don't change the secondary
4393         self.tgt_node = instance.secondary_nodes[0]
4394         self.oth_node = instance.primary_node
4395         _CheckNodeOnline(self, self.oth_node)
4396         if self.new_node is not None:
4397           _CheckNodeOnline(self, self.new_node)
4398         else:
4399           _CheckNodeOnline(self, self.tgt_node)
4400       else:
4401         raise errors.ProgrammerError("Unhandled disk replace mode")
4402
4403     if not self.op.disks:
4404       self.op.disks = range(len(instance.disks))
4405
4406     for disk_idx in self.op.disks:
4407       instance.FindDisk(disk_idx)
4408
4409   def _ExecD8DiskOnly(self, feedback_fn):
4410     """Replace a disk on the primary or secondary for dbrd8.
4411
4412     The algorithm for replace is quite complicated:
4413
4414       1. for each disk to be replaced:
4415
4416         1. create new LVs on the target node with unique names
4417         1. detach old LVs from the drbd device
4418         1. rename old LVs to name_replaced.<time_t>
4419         1. rename new LVs to old LVs
4420         1. attach the new LVs (with the old names now) to the drbd device
4421
4422       1. wait for sync across all devices
4423
4424       1. for each modified disk:
4425
4426         1. remove old LVs (which have the name name_replaces.<time_t>)
4427
4428     Failures are not very well handled.
4429
4430     """
4431     steps_total = 6
4432     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4433     instance = self.instance
4434     iv_names = {}
4435     vgname = self.cfg.GetVGName()
4436     # start of work
4437     cfg = self.cfg
4438     tgt_node = self.tgt_node
4439     oth_node = self.oth_node
4440
4441     # Step: check device activation
4442     self.proc.LogStep(1, steps_total, "check device existence")
4443     info("checking volume groups")
4444     my_vg = cfg.GetVGName()
4445     results = self.rpc.call_vg_list([oth_node, tgt_node])
4446     if not results:
4447       raise errors.OpExecError("Can't list volume groups on the nodes")
4448     for node in oth_node, tgt_node:
4449       res = results[node]
4450       if res.failed or not res.data or my_vg not in res.data:
4451         raise errors.OpExecError("Volume group '%s' not found on %s" %
4452                                  (my_vg, node))
4453     for idx, dev in enumerate(instance.disks):
4454       if idx not in self.op.disks:
4455         continue
4456       for node in tgt_node, oth_node:
4457         info("checking disk/%d on %s" % (idx, node))
4458         cfg.SetDiskID(dev, node)
4459         if not self.rpc.call_blockdev_find(node, dev):
4460           raise errors.OpExecError("Can't find disk/%d on node %s" %
4461                                    (idx, node))
4462
4463     # Step: check other node consistency
4464     self.proc.LogStep(2, steps_total, "check peer consistency")
4465     for idx, dev in enumerate(instance.disks):
4466       if idx not in self.op.disks:
4467         continue
4468       info("checking disk/%d consistency on %s" % (idx, oth_node))
4469       if not _CheckDiskConsistency(self, dev, oth_node,
4470                                    oth_node==instance.primary_node):
4471         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4472                                  " to replace disks on this node (%s)" %
4473                                  (oth_node, tgt_node))
4474
4475     # Step: create new storage
4476     self.proc.LogStep(3, steps_total, "allocate new storage")
4477     for idx, dev in enumerate(instance.disks):
4478       if idx not in self.op.disks:
4479         continue
4480       size = dev.size
4481       cfg.SetDiskID(dev, tgt_node)
4482       lv_names = [".disk%d_%s" % (idx, suf)
4483                   for suf in ["data", "meta"]]
4484       names = _GenerateUniqueNames(self, lv_names)
4485       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4486                              logical_id=(vgname, names[0]))
4487       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4488                              logical_id=(vgname, names[1]))
4489       new_lvs = [lv_data, lv_meta]
4490       old_lvs = dev.children
4491       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4492       info("creating new local storage on %s for %s" %
4493            (tgt_node, dev.iv_name))
4494       # since we *always* want to create this LV, we use the
4495       # _Create...OnPrimary (which forces the creation), even if we
4496       # are talking about the secondary node
4497       for new_lv in new_lvs:
4498         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4499                                         _GetInstanceInfoText(instance)):
4500           raise errors.OpExecError("Failed to create new LV named '%s' on"
4501                                    " node '%s'" %
4502                                    (new_lv.logical_id[1], tgt_node))
4503
4504     # Step: for each lv, detach+rename*2+attach
4505     self.proc.LogStep(4, steps_total, "change drbd configuration")
4506     for dev, old_lvs, new_lvs in iv_names.itervalues():
4507       info("detaching %s drbd from local storage" % dev.iv_name)
4508       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4509       result.Raise()
4510       if not result.data:
4511         raise errors.OpExecError("Can't detach drbd from local storage on node"
4512                                  " %s for device %s" % (tgt_node, dev.iv_name))
4513       #dev.children = []
4514       #cfg.Update(instance)
4515
4516       # ok, we created the new LVs, so now we know we have the needed
4517       # storage; as such, we proceed on the target node to rename
4518       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4519       # using the assumption that logical_id == physical_id (which in
4520       # turn is the unique_id on that node)
4521
4522       # FIXME(iustin): use a better name for the replaced LVs
4523       temp_suffix = int(time.time())
4524       ren_fn = lambda d, suff: (d.physical_id[0],
4525                                 d.physical_id[1] + "_replaced-%s" % suff)
4526       # build the rename list based on what LVs exist on the node
4527       rlist = []
4528       for to_ren in old_lvs:
4529         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4530         if not find_res.failed and find_res.data is not None: # device exists
4531           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4532
4533       info("renaming the old LVs on the target node")
4534       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4535       result.Raise()
4536       if not result.data:
4537         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4538       # now we rename the new LVs to the old LVs
4539       info("renaming the new LVs on the target node")
4540       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4541       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4542       result.Raise()
4543       if not result.data:
4544         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4545
4546       for old, new in zip(old_lvs, new_lvs):
4547         new.logical_id = old.logical_id
4548         cfg.SetDiskID(new, tgt_node)
4549
4550       for disk in old_lvs:
4551         disk.logical_id = ren_fn(disk, temp_suffix)
4552         cfg.SetDiskID(disk, tgt_node)
4553
4554       # now that the new lvs have the old name, we can add them to the device
4555       info("adding new mirror component on %s" % tgt_node)
4556       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4557       if result.failed or not result.data:
4558         for new_lv in new_lvs:
4559           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4560           if result.failed or not result.data:
4561             warning("Can't rollback device %s", hint="manually cleanup unused"
4562                     " logical volumes")
4563         raise errors.OpExecError("Can't add local storage to drbd")
4564
4565       dev.children = new_lvs
4566       cfg.Update(instance)
4567
4568     # Step: wait for sync
4569
4570     # this can fail as the old devices are degraded and _WaitForSync
4571     # does a combined result over all disks, so we don't check its
4572     # return value
4573     self.proc.LogStep(5, steps_total, "sync devices")
4574     _WaitForSync(self, instance, unlock=True)
4575
4576     # so check manually all the devices
4577     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4578       cfg.SetDiskID(dev, instance.primary_node)
4579       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4580       if result.failed or result.data[5]:
4581         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4582
4583     # Step: remove old storage
4584     self.proc.LogStep(6, steps_total, "removing old storage")
4585     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4586       info("remove logical volumes for %s" % name)
4587       for lv in old_lvs:
4588         cfg.SetDiskID(lv, tgt_node)
4589         result = self.rpc.call_blockdev_remove(tgt_node, lv)
4590         if result.failed or not result.data:
4591           warning("Can't remove old LV", hint="manually remove unused LVs")
4592           continue
4593
4594   def _ExecD8Secondary(self, feedback_fn):
4595     """Replace the secondary node for drbd8.
4596
4597     The algorithm for replace is quite complicated:
4598       - for all disks of the instance:
4599         - create new LVs on the new node with same names
4600         - shutdown the drbd device on the old secondary
4601         - disconnect the drbd network on the primary
4602         - create the drbd device on the new secondary
4603         - network attach the drbd on the primary, using an artifice:
4604           the drbd code for Attach() will connect to the network if it
4605           finds a device which is connected to the good local disks but
4606           not network enabled
4607       - wait for sync across all devices
4608       - remove all disks from the old secondary
4609
4610     Failures are not very well handled.
4611
4612     """
4613     steps_total = 6
4614     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4615     instance = self.instance
4616     iv_names = {}
4617     # start of work
4618     cfg = self.cfg
4619     old_node = self.tgt_node
4620     new_node = self.new_node
4621     pri_node = instance.primary_node
4622
4623     # Step: check device activation
4624     self.proc.LogStep(1, steps_total, "check device existence")
4625     info("checking volume groups")
4626     my_vg = cfg.GetVGName()
4627     results = self.rpc.call_vg_list([pri_node, new_node])
4628     for node in pri_node, new_node:
4629       res = results[node]
4630       if res.failed or not res.data or my_vg not in res.data:
4631         raise errors.OpExecError("Volume group '%s' not found on %s" %
4632                                  (my_vg, node))
4633     for idx, dev in enumerate(instance.disks):
4634       if idx not in self.op.disks:
4635         continue
4636       info("checking disk/%d on %s" % (idx, pri_node))
4637       cfg.SetDiskID(dev, pri_node)
4638       result = self.rpc.call_blockdev_find(pri_node, dev)
4639       result.Raise()
4640       if not result.data:
4641         raise errors.OpExecError("Can't find disk/%d on node %s" %
4642                                  (idx, pri_node))
4643
4644     # Step: check other node consistency
4645     self.proc.LogStep(2, steps_total, "check peer consistency")
4646     for idx, dev in enumerate(instance.disks):
4647       if idx not in self.op.disks:
4648         continue
4649       info("checking disk/%d consistency on %s" % (idx, pri_node))
4650       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4651         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4652                                  " unsafe to replace the secondary" %
4653                                  pri_node)
4654
4655     # Step: create new storage
4656     self.proc.LogStep(3, steps_total, "allocate new storage")
4657     for idx, dev in enumerate(instance.disks):
4658       info("adding new local storage on %s for disk/%d" %
4659            (new_node, idx))
4660       # since we *always* want to create this LV, we use the
4661       # _Create...OnPrimary (which forces the creation), even if we
4662       # are talking about the secondary node
4663       for new_lv in dev.children:
4664         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4665                                         _GetInstanceInfoText(instance)):
4666           raise errors.OpExecError("Failed to create new LV named '%s' on"
4667                                    " node '%s'" %
4668                                    (new_lv.logical_id[1], new_node))
4669
4670     # Step 4: dbrd minors and drbd setups changes
4671     # after this, we must manually remove the drbd minors on both the
4672     # error and the success paths
4673     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4674                                    instance.name)
4675     logging.debug("Allocated minors %s" % (minors,))
4676     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4677     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4678       size = dev.size
4679       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4680       # create new devices on new_node
4681       if pri_node == dev.logical_id[0]:
4682         new_logical_id = (pri_node, new_node,
4683                           dev.logical_id[2], dev.logical_id[3], new_minor,
4684                           dev.logical_id[5])
4685       else:
4686         new_logical_id = (new_node, pri_node,
4687                           dev.logical_id[2], new_minor, dev.logical_id[4],
4688                           dev.logical_id[5])
4689       iv_names[idx] = (dev, dev.children, new_logical_id)
4690       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4691                     new_logical_id)
4692       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4693                               logical_id=new_logical_id,
4694                               children=dev.children)
4695       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4696                                         new_drbd, False,
4697                                         _GetInstanceInfoText(instance)):
4698         self.cfg.ReleaseDRBDMinors(instance.name)
4699         raise errors.OpExecError("Failed to create new DRBD on"
4700                                  " node '%s'" % new_node)
4701
4702     for idx, dev in enumerate(instance.disks):
4703       # we have new devices, shutdown the drbd on the old secondary
4704       info("shutting down drbd for disk/%d on old node" % idx)
4705       cfg.SetDiskID(dev, old_node)
4706       result = self.rpc.call_blockdev_shutdown(old_node, dev)
4707       if result.failed or not result.data:
4708         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4709                 hint="Please cleanup this device manually as soon as possible")
4710
4711     info("detaching primary drbds from the network (=> standalone)")
4712     done = 0
4713     for idx, dev in enumerate(instance.disks):
4714       cfg.SetDiskID(dev, pri_node)
4715       # set the network part of the physical (unique in bdev terms) id
4716       # to None, meaning detach from network
4717       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4718       # and 'find' the device, which will 'fix' it to match the
4719       # standalone state
4720       result = self.rpc.call_blockdev_find(pri_node, dev)
4721       if not result.failed and result.data:
4722         done += 1
4723       else:
4724         warning("Failed to detach drbd disk/%d from network, unusual case" %
4725                 idx)
4726
4727     if not done:
4728       # no detaches succeeded (very unlikely)
4729       self.cfg.ReleaseDRBDMinors(instance.name)
4730       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4731
4732     # if we managed to detach at least one, we update all the disks of
4733     # the instance to point to the new secondary
4734     info("updating instance configuration")
4735     for dev, _, new_logical_id in iv_names.itervalues():
4736       dev.logical_id = new_logical_id
4737       cfg.SetDiskID(dev, pri_node)
4738     cfg.Update(instance)
4739     # we can remove now the temp minors as now the new values are
4740     # written to the config file (and therefore stable)
4741     self.cfg.ReleaseDRBDMinors(instance.name)
4742
4743     # and now perform the drbd attach
4744     info("attaching primary drbds to new secondary (standalone => connected)")
4745     for idx, dev in enumerate(instance.disks):
4746       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4747       # since the attach is smart, it's enough to 'find' the device,
4748       # it will automatically activate the network, if the physical_id
4749       # is correct
4750       cfg.SetDiskID(dev, pri_node)
4751       logging.debug("Disk to attach: %s", dev)
4752       result = self.rpc.call_blockdev_find(pri_node, dev)
4753       if result.failed or not result.data:
4754         warning("can't attach drbd disk/%d to new secondary!" % idx,
4755                 "please do a gnt-instance info to see the status of disks")
4756
4757     # this can fail as the old devices are degraded and _WaitForSync
4758     # does a combined result over all disks, so we don't check its
4759     # return value
4760     self.proc.LogStep(5, steps_total, "sync devices")
4761     _WaitForSync(self, instance, unlock=True)
4762
4763     # so check manually all the devices
4764     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4765       cfg.SetDiskID(dev, pri_node)
4766       result = self.rpc.call_blockdev_find(pri_node, dev)
4767       result.Raise()
4768       if result.data[5]:
4769         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4770
4771     self.proc.LogStep(6, steps_total, "removing old storage")
4772     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4773       info("remove logical volumes for disk/%d" % idx)
4774       for lv in old_lvs:
4775         cfg.SetDiskID(lv, old_node)
4776         result = self.rpc.call_blockdev_remove(old_node, lv)
4777         if result.failed or not result.data:
4778           warning("Can't remove LV on old secondary",
4779                   hint="Cleanup stale volumes by hand")
4780
4781   def Exec(self, feedback_fn):
4782     """Execute disk replacement.
4783
4784     This dispatches the disk replacement to the appropriate handler.
4785
4786     """
4787     instance = self.instance
4788
4789     # Activate the instance disks if we're replacing them on a down instance
4790     if instance.status == "down":
4791       _StartInstanceDisks(self, instance, True)
4792
4793     if instance.disk_template == constants.DT_DRBD8:
4794       if self.op.remote_node is None:
4795         fn = self._ExecD8DiskOnly
4796       else:
4797         fn = self._ExecD8Secondary
4798     else:
4799       raise errors.ProgrammerError("Unhandled disk replacement case")
4800
4801     ret = fn(feedback_fn)
4802
4803     # Deactivate the instance disks if we're replacing them on a down instance
4804     if instance.status == "down":
4805       _SafeShutdownInstanceDisks(self, instance)
4806
4807     return ret
4808
4809
4810 class LUGrowDisk(LogicalUnit):
4811   """Grow a disk of an instance.
4812
4813   """
4814   HPATH = "disk-grow"
4815   HTYPE = constants.HTYPE_INSTANCE
4816   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4817   REQ_BGL = False
4818
4819   def ExpandNames(self):
4820     self._ExpandAndLockInstance()
4821     self.needed_locks[locking.LEVEL_NODE] = []
4822     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4823
4824   def DeclareLocks(self, level):
4825     if level == locking.LEVEL_NODE:
4826       self._LockInstancesNodes()
4827
4828   def BuildHooksEnv(self):
4829     """Build hooks env.
4830
4831     This runs on the master, the primary and all the secondaries.
4832
4833     """
4834     env = {
4835       "DISK": self.op.disk,
4836       "AMOUNT": self.op.amount,
4837       }
4838     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4839     nl = [
4840       self.cfg.GetMasterNode(),
4841       self.instance.primary_node,
4842       ]
4843     return env, nl, nl
4844
4845   def CheckPrereq(self):
4846     """Check prerequisites.
4847
4848     This checks that the instance is in the cluster.
4849
4850     """
4851     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4852     assert instance is not None, \
4853       "Cannot retrieve locked instance %s" % self.op.instance_name
4854     _CheckNodeOnline(self, instance.primary_node)
4855     for node in instance.secondary_nodes:
4856       _CheckNodeOnline(self, node)
4857
4858
4859     self.instance = instance
4860
4861     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4862       raise errors.OpPrereqError("Instance's disk layout does not support"
4863                                  " growing.")
4864
4865     self.disk = instance.FindDisk(self.op.disk)
4866
4867     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4868     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4869                                        instance.hypervisor)
4870     for node in nodenames:
4871       info = nodeinfo[node]
4872       if info.failed or not info.data:
4873         raise errors.OpPrereqError("Cannot get current information"
4874                                    " from node '%s'" % node)
4875       vg_free = info.data.get('vg_free', None)
4876       if not isinstance(vg_free, int):
4877         raise errors.OpPrereqError("Can't compute free disk space on"
4878                                    " node %s" % node)
4879       if self.op.amount > vg_free:
4880         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4881                                    " %d MiB available, %d MiB required" %
4882                                    (node, vg_free, self.op.amount))
4883
4884   def Exec(self, feedback_fn):
4885     """Execute disk grow.
4886
4887     """
4888     instance = self.instance
4889     disk = self.disk
4890     for node in (instance.secondary_nodes + (instance.primary_node,)):
4891       self.cfg.SetDiskID(disk, node)
4892       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4893       result.Raise()
4894       if (not result.data or not isinstance(result.data, (list, tuple)) or
4895           len(result.data) != 2):
4896         raise errors.OpExecError("Grow request failed to node %s" % node)
4897       elif not result.data[0]:
4898         raise errors.OpExecError("Grow request failed to node %s: %s" %
4899                                  (node, result.data[1]))
4900     disk.RecordGrow(self.op.amount)
4901     self.cfg.Update(instance)
4902     if self.op.wait_for_sync:
4903       disk_abort = not _WaitForSync(self, instance)
4904       if disk_abort:
4905         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4906                              " status.\nPlease check the instance.")
4907
4908
4909 class LUQueryInstanceData(NoHooksLU):
4910   """Query runtime instance data.
4911
4912   """
4913   _OP_REQP = ["instances", "static"]
4914   REQ_BGL = False
4915
4916   def ExpandNames(self):
4917     self.needed_locks = {}
4918     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4919
4920     if not isinstance(self.op.instances, list):
4921       raise errors.OpPrereqError("Invalid argument type 'instances'")
4922
4923     if self.op.instances:
4924       self.wanted_names = []
4925       for name in self.op.instances:
4926         full_name = self.cfg.ExpandInstanceName(name)
4927         if full_name is None:
4928           raise errors.OpPrereqError("Instance '%s' not known" %
4929                                      self.op.instance_name)
4930         self.wanted_names.append(full_name)
4931       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4932     else:
4933       self.wanted_names = None
4934       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4935
4936     self.needed_locks[locking.LEVEL_NODE] = []
4937     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4938
4939   def DeclareLocks(self, level):
4940     if level == locking.LEVEL_NODE:
4941       self._LockInstancesNodes()
4942
4943   def CheckPrereq(self):
4944     """Check prerequisites.
4945
4946     This only checks the optional instance list against the existing names.
4947
4948     """
4949     if self.wanted_names is None:
4950       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4951
4952     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4953                              in self.wanted_names]
4954     return
4955
4956   def _ComputeDiskStatus(self, instance, snode, dev):
4957     """Compute block device status.
4958
4959     """
4960     static = self.op.static
4961     if not static:
4962       self.cfg.SetDiskID(dev, instance.primary_node)
4963       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4964       dev_pstatus.Raise()
4965       dev_pstatus = dev_pstatus.data
4966     else:
4967       dev_pstatus = None
4968
4969     if dev.dev_type in constants.LDS_DRBD:
4970       # we change the snode then (otherwise we use the one passed in)
4971       if dev.logical_id[0] == instance.primary_node:
4972         snode = dev.logical_id[1]
4973       else:
4974         snode = dev.logical_id[0]
4975
4976     if snode and not static:
4977       self.cfg.SetDiskID(dev, snode)
4978       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4979       dev_sstatus.Raise()
4980       dev_sstatus = dev_sstatus.data
4981     else:
4982       dev_sstatus = None
4983
4984     if dev.children:
4985       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4986                       for child in dev.children]
4987     else:
4988       dev_children = []
4989
4990     data = {
4991       "iv_name": dev.iv_name,
4992       "dev_type": dev.dev_type,
4993       "logical_id": dev.logical_id,
4994       "physical_id": dev.physical_id,
4995       "pstatus": dev_pstatus,
4996       "sstatus": dev_sstatus,
4997       "children": dev_children,
4998       "mode": dev.mode,
4999       }
5000
5001     return data
5002
5003   def Exec(self, feedback_fn):
5004     """Gather and return data"""
5005     result = {}
5006
5007     cluster = self.cfg.GetClusterInfo()
5008
5009     for instance in self.wanted_instances:
5010       if not self.op.static:
5011         remote_info = self.rpc.call_instance_info(instance.primary_node,
5012                                                   instance.name,
5013                                                   instance.hypervisor)
5014         remote_info.Raise()
5015         remote_info = remote_info.data
5016         if remote_info and "state" in remote_info:
5017           remote_state = "up"
5018         else:
5019           remote_state = "down"
5020       else:
5021         remote_state = None
5022       if instance.status == "down":
5023         config_state = "down"
5024       else:
5025         config_state = "up"
5026
5027       disks = [self._ComputeDiskStatus(instance, None, device)
5028                for device in instance.disks]
5029
5030       idict = {
5031         "name": instance.name,
5032         "config_state": config_state,
5033         "run_state": remote_state,
5034         "pnode": instance.primary_node,
5035         "snodes": instance.secondary_nodes,
5036         "os": instance.os,
5037         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5038         "disks": disks,
5039         "hypervisor": instance.hypervisor,
5040         "network_port": instance.network_port,
5041         "hv_instance": instance.hvparams,
5042         "hv_actual": cluster.FillHV(instance),
5043         "be_instance": instance.beparams,
5044         "be_actual": cluster.FillBE(instance),
5045         }
5046
5047       result[instance.name] = idict
5048
5049     return result
5050
5051
5052 class LUSetInstanceParams(LogicalUnit):
5053   """Modifies an instances's parameters.
5054
5055   """
5056   HPATH = "instance-modify"
5057   HTYPE = constants.HTYPE_INSTANCE
5058   _OP_REQP = ["instance_name"]
5059   REQ_BGL = False
5060
5061   def CheckArguments(self):
5062     if not hasattr(self.op, 'nics'):
5063       self.op.nics = []
5064     if not hasattr(self.op, 'disks'):
5065       self.op.disks = []
5066     if not hasattr(self.op, 'beparams'):
5067       self.op.beparams = {}
5068     if not hasattr(self.op, 'hvparams'):
5069       self.op.hvparams = {}
5070     self.op.force = getattr(self.op, "force", False)
5071     if not (self.op.nics or self.op.disks or
5072             self.op.hvparams or self.op.beparams):
5073       raise errors.OpPrereqError("No changes submitted")
5074
5075     utils.CheckBEParams(self.op.beparams)
5076
5077     # Disk validation
5078     disk_addremove = 0
5079     for disk_op, disk_dict in self.op.disks:
5080       if disk_op == constants.DDM_REMOVE:
5081         disk_addremove += 1
5082         continue
5083       elif disk_op == constants.DDM_ADD:
5084         disk_addremove += 1
5085       else:
5086         if not isinstance(disk_op, int):
5087           raise errors.OpPrereqError("Invalid disk index")
5088       if disk_op == constants.DDM_ADD:
5089         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5090         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5091           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5092         size = disk_dict.get('size', None)
5093         if size is None:
5094           raise errors.OpPrereqError("Required disk parameter size missing")
5095         try:
5096           size = int(size)
5097         except ValueError, err:
5098           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5099                                      str(err))
5100         disk_dict['size'] = size
5101       else:
5102         # modification of disk
5103         if 'size' in disk_dict:
5104           raise errors.OpPrereqError("Disk size change not possible, use"
5105                                      " grow-disk")
5106
5107     if disk_addremove > 1:
5108       raise errors.OpPrereqError("Only one disk add or remove operation"
5109                                  " supported at a time")
5110
5111     # NIC validation
5112     nic_addremove = 0
5113     for nic_op, nic_dict in self.op.nics:
5114       if nic_op == constants.DDM_REMOVE:
5115         nic_addremove += 1
5116         continue
5117       elif nic_op == constants.DDM_ADD:
5118         nic_addremove += 1
5119       else:
5120         if not isinstance(nic_op, int):
5121           raise errors.OpPrereqError("Invalid nic index")
5122
5123       # nic_dict should be a dict
5124       nic_ip = nic_dict.get('ip', None)
5125       if nic_ip is not None:
5126         if nic_ip.lower() == "none":
5127           nic_dict['ip'] = None
5128         else:
5129           if not utils.IsValidIP(nic_ip):
5130             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5131       # we can only check None bridges and assign the default one
5132       nic_bridge = nic_dict.get('bridge', None)
5133       if nic_bridge is None:
5134         nic_dict['bridge'] = self.cfg.GetDefBridge()
5135       # but we can validate MACs
5136       nic_mac = nic_dict.get('mac', None)
5137       if nic_mac is not None:
5138         if self.cfg.IsMacInUse(nic_mac):
5139           raise errors.OpPrereqError("MAC address %s already in use"
5140                                      " in cluster" % nic_mac)
5141         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5142           if not utils.IsValidMac(nic_mac):
5143             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5144     if nic_addremove > 1:
5145       raise errors.OpPrereqError("Only one NIC add or remove operation"
5146                                  " supported at a time")
5147
5148   def ExpandNames(self):
5149     self._ExpandAndLockInstance()
5150     self.needed_locks[locking.LEVEL_NODE] = []
5151     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5152
5153   def DeclareLocks(self, level):
5154     if level == locking.LEVEL_NODE:
5155       self._LockInstancesNodes()
5156
5157   def BuildHooksEnv(self):
5158     """Build hooks env.
5159
5160     This runs on the master, primary and secondaries.
5161
5162     """
5163     args = dict()
5164     if constants.BE_MEMORY in self.be_new:
5165       args['memory'] = self.be_new[constants.BE_MEMORY]
5166     if constants.BE_VCPUS in self.be_new:
5167       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5168     # FIXME: readd disk/nic changes
5169     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5170     nl = [self.cfg.GetMasterNode(),
5171           self.instance.primary_node] + list(self.instance.secondary_nodes)
5172     return env, nl, nl
5173
5174   def CheckPrereq(self):
5175     """Check prerequisites.
5176
5177     This only checks the instance list against the existing names.
5178
5179     """
5180     force = self.force = self.op.force
5181
5182     # checking the new params on the primary/secondary nodes
5183
5184     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5185     assert self.instance is not None, \
5186       "Cannot retrieve locked instance %s" % self.op.instance_name
5187     pnode = self.instance.primary_node
5188     nodelist = [pnode]
5189     nodelist.extend(instance.secondary_nodes)
5190
5191     # hvparams processing
5192     if self.op.hvparams:
5193       i_hvdict = copy.deepcopy(instance.hvparams)
5194       for key, val in self.op.hvparams.iteritems():
5195         if val == constants.VALUE_DEFAULT:
5196           try:
5197             del i_hvdict[key]
5198           except KeyError:
5199             pass
5200         elif val == constants.VALUE_NONE:
5201           i_hvdict[key] = None
5202         else:
5203           i_hvdict[key] = val
5204       cluster = self.cfg.GetClusterInfo()
5205       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5206                                 i_hvdict)
5207       # local check
5208       hypervisor.GetHypervisor(
5209         instance.hypervisor).CheckParameterSyntax(hv_new)
5210       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5211       self.hv_new = hv_new # the new actual values
5212       self.hv_inst = i_hvdict # the new dict (without defaults)
5213     else:
5214       self.hv_new = self.hv_inst = {}
5215
5216     # beparams processing
5217     if self.op.beparams:
5218       i_bedict = copy.deepcopy(instance.beparams)
5219       for key, val in self.op.beparams.iteritems():
5220         if val == constants.VALUE_DEFAULT:
5221           try:
5222             del i_bedict[key]
5223           except KeyError:
5224             pass
5225         else:
5226           i_bedict[key] = val
5227       cluster = self.cfg.GetClusterInfo()
5228       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5229                                 i_bedict)
5230       self.be_new = be_new # the new actual values
5231       self.be_inst = i_bedict # the new dict (without defaults)
5232     else:
5233       self.be_new = self.be_inst = {}
5234
5235     self.warn = []
5236
5237     if constants.BE_MEMORY in self.op.beparams and not self.force:
5238       mem_check_list = [pnode]
5239       if be_new[constants.BE_AUTO_BALANCE]:
5240         # either we changed auto_balance to yes or it was from before
5241         mem_check_list.extend(instance.secondary_nodes)
5242       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5243                                                   instance.hypervisor)
5244       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5245                                          instance.hypervisor)
5246       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5247         # Assume the primary node is unreachable and go ahead
5248         self.warn.append("Can't get info from primary node %s" % pnode)
5249       else:
5250         if not instance_info.failed and instance_info.data:
5251           current_mem = instance_info.data['memory']
5252         else:
5253           # Assume instance not running
5254           # (there is a slight race condition here, but it's not very probable,
5255           # and we have no other way to check)
5256           current_mem = 0
5257         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5258                     nodeinfo[pnode].data['memory_free'])
5259         if miss_mem > 0:
5260           raise errors.OpPrereqError("This change will prevent the instance"
5261                                      " from starting, due to %d MB of memory"
5262                                      " missing on its primary node" % miss_mem)
5263
5264       if be_new[constants.BE_AUTO_BALANCE]:
5265         for node, nres in instance.secondary_nodes.iteritems():
5266           if nres.failed or not isinstance(nres.data, dict):
5267             self.warn.append("Can't get info from secondary node %s" % node)
5268           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5269             self.warn.append("Not enough memory to failover instance to"
5270                              " secondary node %s" % node)
5271
5272     # NIC processing
5273     for nic_op, nic_dict in self.op.nics:
5274       if nic_op == constants.DDM_REMOVE:
5275         if not instance.nics:
5276           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5277         continue
5278       if nic_op != constants.DDM_ADD:
5279         # an existing nic
5280         if nic_op < 0 or nic_op >= len(instance.nics):
5281           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5282                                      " are 0 to %d" %
5283                                      (nic_op, len(instance.nics)))
5284       nic_bridge = nic_dict.get('bridge', None)
5285       if nic_bridge is not None:
5286         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5287           msg = ("Bridge '%s' doesn't exist on one of"
5288                  " the instance nodes" % nic_bridge)
5289           if self.force:
5290             self.warn.append(msg)
5291           else:
5292             raise errors.OpPrereqError(msg)
5293
5294     # DISK processing
5295     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5296       raise errors.OpPrereqError("Disk operations not supported for"
5297                                  " diskless instances")
5298     for disk_op, disk_dict in self.op.disks:
5299       if disk_op == constants.DDM_REMOVE:
5300         if len(instance.disks) == 1:
5301           raise errors.OpPrereqError("Cannot remove the last disk of"
5302                                      " an instance")
5303         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5304         ins_l = ins_l[pnode]
5305         if not type(ins_l) is list:
5306           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5307         if instance.name in ins_l:
5308           raise errors.OpPrereqError("Instance is running, can't remove"
5309                                      " disks.")
5310
5311       if (disk_op == constants.DDM_ADD and
5312           len(instance.nics) >= constants.MAX_DISKS):
5313         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5314                                    " add more" % constants.MAX_DISKS)
5315       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5316         # an existing disk
5317         if disk_op < 0 or disk_op >= len(instance.disks):
5318           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5319                                      " are 0 to %d" %
5320                                      (disk_op, len(instance.disks)))
5321
5322     return
5323
5324   def Exec(self, feedback_fn):
5325     """Modifies an instance.
5326
5327     All parameters take effect only at the next restart of the instance.
5328
5329     """
5330     # Process here the warnings from CheckPrereq, as we don't have a
5331     # feedback_fn there.
5332     for warn in self.warn:
5333       feedback_fn("WARNING: %s" % warn)
5334
5335     result = []
5336     instance = self.instance
5337     # disk changes
5338     for disk_op, disk_dict in self.op.disks:
5339       if disk_op == constants.DDM_REMOVE:
5340         # remove the last disk
5341         device = instance.disks.pop()
5342         device_idx = len(instance.disks)
5343         for node, disk in device.ComputeNodeTree(instance.primary_node):
5344           self.cfg.SetDiskID(disk, node)
5345           result = self.rpc.call_blockdev_remove(node, disk)
5346           if result.failed or not result.data:
5347             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5348                                  " continuing anyway", device_idx, node)
5349         result.append(("disk/%d" % device_idx, "remove"))
5350       elif disk_op == constants.DDM_ADD:
5351         # add a new disk
5352         if instance.disk_template == constants.DT_FILE:
5353           file_driver, file_path = instance.disks[0].logical_id
5354           file_path = os.path.dirname(file_path)
5355         else:
5356           file_driver = file_path = None
5357         disk_idx_base = len(instance.disks)
5358         new_disk = _GenerateDiskTemplate(self,
5359                                          instance.disk_template,
5360                                          instance, instance.primary_node,
5361                                          instance.secondary_nodes,
5362                                          [disk_dict],
5363                                          file_path,
5364                                          file_driver,
5365                                          disk_idx_base)[0]
5366         new_disk.mode = disk_dict['mode']
5367         instance.disks.append(new_disk)
5368         info = _GetInstanceInfoText(instance)
5369
5370         logging.info("Creating volume %s for instance %s",
5371                      new_disk.iv_name, instance.name)
5372         # Note: this needs to be kept in sync with _CreateDisks
5373         #HARDCODE
5374         for secondary_node in instance.secondary_nodes:
5375           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5376                                             new_disk, False, info):
5377             self.LogWarning("Failed to create volume %s (%s) on"
5378                             " secondary node %s!",
5379                             new_disk.iv_name, new_disk, secondary_node)
5380         #HARDCODE
5381         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5382                                         instance, new_disk, info):
5383           self.LogWarning("Failed to create volume %s on primary!",
5384                           new_disk.iv_name)
5385         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5386                        (new_disk.size, new_disk.mode)))
5387       else:
5388         # change a given disk
5389         instance.disks[disk_op].mode = disk_dict['mode']
5390         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5391     # NIC changes
5392     for nic_op, nic_dict in self.op.nics:
5393       if nic_op == constants.DDM_REMOVE:
5394         # remove the last nic
5395         del instance.nics[-1]
5396         result.append(("nic.%d" % len(instance.nics), "remove"))
5397       elif nic_op == constants.DDM_ADD:
5398         # add a new nic
5399         if 'mac' not in nic_dict:
5400           mac = constants.VALUE_GENERATE
5401         else:
5402           mac = nic_dict['mac']
5403         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5404           mac = self.cfg.GenerateMAC()
5405         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5406                               bridge=nic_dict.get('bridge', None))
5407         instance.nics.append(new_nic)
5408         result.append(("nic.%d" % (len(instance.nics) - 1),
5409                        "add:mac=%s,ip=%s,bridge=%s" %
5410                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5411       else:
5412         # change a given nic
5413         for key in 'mac', 'ip', 'bridge':
5414           if key in nic_dict:
5415             setattr(instance.nics[nic_op], key, nic_dict[key])
5416             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5417
5418     # hvparams changes
5419     if self.op.hvparams:
5420       instance.hvparams = self.hv_new
5421       for key, val in self.op.hvparams.iteritems():
5422         result.append(("hv/%s" % key, val))
5423
5424     # beparams changes
5425     if self.op.beparams:
5426       instance.beparams = self.be_inst
5427       for key, val in self.op.beparams.iteritems():
5428         result.append(("be/%s" % key, val))
5429
5430     self.cfg.Update(instance)
5431
5432     return result
5433
5434
5435 class LUQueryExports(NoHooksLU):
5436   """Query the exports list
5437
5438   """
5439   _OP_REQP = ['nodes']
5440   REQ_BGL = False
5441
5442   def ExpandNames(self):
5443     self.needed_locks = {}
5444     self.share_locks[locking.LEVEL_NODE] = 1
5445     if not self.op.nodes:
5446       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5447     else:
5448       self.needed_locks[locking.LEVEL_NODE] = \
5449         _GetWantedNodes(self, self.op.nodes)
5450
5451   def CheckPrereq(self):
5452     """Check prerequisites.
5453
5454     """
5455     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5456
5457   def Exec(self, feedback_fn):
5458     """Compute the list of all the exported system images.
5459
5460     @rtype: dict
5461     @return: a dictionary with the structure node->(export-list)
5462         where export-list is a list of the instances exported on
5463         that node.
5464
5465     """
5466     rpcresult = self.rpc.call_export_list(self.nodes)
5467     result = {}
5468     for node in rpcresult:
5469       if rpcresult[node].failed:
5470         result[node] = False
5471       else:
5472         result[node] = rpcresult[node].data
5473
5474     return result
5475
5476
5477 class LUExportInstance(LogicalUnit):
5478   """Export an instance to an image in the cluster.
5479
5480   """
5481   HPATH = "instance-export"
5482   HTYPE = constants.HTYPE_INSTANCE
5483   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5484   REQ_BGL = False
5485
5486   def ExpandNames(self):
5487     self._ExpandAndLockInstance()
5488     # FIXME: lock only instance primary and destination node
5489     #
5490     # Sad but true, for now we have do lock all nodes, as we don't know where
5491     # the previous export might be, and and in this LU we search for it and
5492     # remove it from its current node. In the future we could fix this by:
5493     #  - making a tasklet to search (share-lock all), then create the new one,
5494     #    then one to remove, after
5495     #  - removing the removal operation altoghether
5496     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5497
5498   def DeclareLocks(self, level):
5499     """Last minute lock declaration."""
5500     # All nodes are locked anyway, so nothing to do here.
5501
5502   def BuildHooksEnv(self):
5503     """Build hooks env.
5504
5505     This will run on the master, primary node and target node.
5506
5507     """
5508     env = {
5509       "EXPORT_NODE": self.op.target_node,
5510       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5511       }
5512     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5513     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5514           self.op.target_node]
5515     return env, nl, nl
5516
5517   def CheckPrereq(self):
5518     """Check prerequisites.
5519
5520     This checks that the instance and node names are valid.
5521
5522     """
5523     instance_name = self.op.instance_name
5524     self.instance = self.cfg.GetInstanceInfo(instance_name)
5525     assert self.instance is not None, \
5526           "Cannot retrieve locked instance %s" % self.op.instance_name
5527     _CheckNodeOnline(self, self.instance.primary_node)
5528
5529     self.dst_node = self.cfg.GetNodeInfo(
5530       self.cfg.ExpandNodeName(self.op.target_node))
5531
5532     if self.dst_node is None:
5533       # This is wrong node name, not a non-locked node
5534       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5535     _CheckNodeOnline(self, self.op.target_node)
5536
5537     # instance disk type verification
5538     for disk in self.instance.disks:
5539       if disk.dev_type == constants.LD_FILE:
5540         raise errors.OpPrereqError("Export not supported for instances with"
5541                                    " file-based disks")
5542
5543   def Exec(self, feedback_fn):
5544     """Export an instance to an image in the cluster.
5545
5546     """
5547     instance = self.instance
5548     dst_node = self.dst_node
5549     src_node = instance.primary_node
5550     if self.op.shutdown:
5551       # shutdown the instance, but not the disks
5552       result = self.rpc.call_instance_shutdown(src_node, instance)
5553       result.Raise()
5554       if not result.data:
5555         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5556                                  (instance.name, src_node))
5557
5558     vgname = self.cfg.GetVGName()
5559
5560     snap_disks = []
5561
5562     try:
5563       for disk in instance.disks:
5564         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5565         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5566         if new_dev_name.failed or not new_dev_name.data:
5567           self.LogWarning("Could not snapshot block device %s on node %s",
5568                           disk.logical_id[1], src_node)
5569           snap_disks.append(False)
5570         else:
5571           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5572                                  logical_id=(vgname, new_dev_name.data),
5573                                  physical_id=(vgname, new_dev_name.data),
5574                                  iv_name=disk.iv_name)
5575           snap_disks.append(new_dev)
5576
5577     finally:
5578       if self.op.shutdown and instance.status == "up":
5579         result = self.rpc.call_instance_start(src_node, instance, None)
5580         if result.failed or not result.data:
5581           _ShutdownInstanceDisks(self, instance)
5582           raise errors.OpExecError("Could not start instance")
5583
5584     # TODO: check for size
5585
5586     cluster_name = self.cfg.GetClusterName()
5587     for idx, dev in enumerate(snap_disks):
5588       if dev:
5589         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5590                                                instance, cluster_name, idx)
5591         if result.failed or not result.data:
5592           self.LogWarning("Could not export block device %s from node %s to"
5593                           " node %s", dev.logical_id[1], src_node,
5594                           dst_node.name)
5595         result = self.rpc.call_blockdev_remove(src_node, dev)
5596         if result.failed or not result.data:
5597           self.LogWarning("Could not remove snapshot block device %s from node"
5598                           " %s", dev.logical_id[1], src_node)
5599
5600     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5601     if result.failed or not result.data:
5602       self.LogWarning("Could not finalize export for instance %s on node %s",
5603                       instance.name, dst_node.name)
5604
5605     nodelist = self.cfg.GetNodeList()
5606     nodelist.remove(dst_node.name)
5607
5608     # on one-node clusters nodelist will be empty after the removal
5609     # if we proceed the backup would be removed because OpQueryExports
5610     # substitutes an empty list with the full cluster node list.
5611     if nodelist:
5612       exportlist = self.rpc.call_export_list(nodelist)
5613       for node in exportlist:
5614         if exportlist[node].failed:
5615           continue
5616         if instance.name in exportlist[node].data:
5617           if not self.rpc.call_export_remove(node, instance.name):
5618             self.LogWarning("Could not remove older export for instance %s"
5619                             " on node %s", instance.name, node)
5620
5621
5622 class LURemoveExport(NoHooksLU):
5623   """Remove exports related to the named instance.
5624
5625   """
5626   _OP_REQP = ["instance_name"]
5627   REQ_BGL = False
5628
5629   def ExpandNames(self):
5630     self.needed_locks = {}
5631     # We need all nodes to be locked in order for RemoveExport to work, but we
5632     # don't need to lock the instance itself, as nothing will happen to it (and
5633     # we can remove exports also for a removed instance)
5634     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5635
5636   def CheckPrereq(self):
5637     """Check prerequisites.
5638     """
5639     pass
5640
5641   def Exec(self, feedback_fn):
5642     """Remove any export.
5643
5644     """
5645     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5646     # If the instance was not found we'll try with the name that was passed in.
5647     # This will only work if it was an FQDN, though.
5648     fqdn_warn = False
5649     if not instance_name:
5650       fqdn_warn = True
5651       instance_name = self.op.instance_name
5652
5653     exportlist = self.rpc.call_export_list(self.acquired_locks[
5654       locking.LEVEL_NODE])
5655     found = False
5656     for node in exportlist:
5657       if exportlist[node].failed:
5658         self.LogWarning("Failed to query node %s, continuing" % node)
5659         continue
5660       if instance_name in exportlist[node].data:
5661         found = True
5662         result = self.rpc.call_export_remove(node, instance_name)
5663         if result.failed or not result.data:
5664           logging.error("Could not remove export for instance %s"
5665                         " on node %s", instance_name, node)
5666
5667     if fqdn_warn and not found:
5668       feedback_fn("Export not found. If trying to remove an export belonging"
5669                   " to a deleted instance please use its Fully Qualified"
5670                   " Domain Name.")
5671
5672
5673 class TagsLU(NoHooksLU):
5674   """Generic tags LU.
5675
5676   This is an abstract class which is the parent of all the other tags LUs.
5677
5678   """
5679
5680   def ExpandNames(self):
5681     self.needed_locks = {}
5682     if self.op.kind == constants.TAG_NODE:
5683       name = self.cfg.ExpandNodeName(self.op.name)
5684       if name is None:
5685         raise errors.OpPrereqError("Invalid node name (%s)" %
5686                                    (self.op.name,))
5687       self.op.name = name
5688       self.needed_locks[locking.LEVEL_NODE] = name
5689     elif self.op.kind == constants.TAG_INSTANCE:
5690       name = self.cfg.ExpandInstanceName(self.op.name)
5691       if name is None:
5692         raise errors.OpPrereqError("Invalid instance name (%s)" %
5693                                    (self.op.name,))
5694       self.op.name = name
5695       self.needed_locks[locking.LEVEL_INSTANCE] = name
5696
5697   def CheckPrereq(self):
5698     """Check prerequisites.
5699
5700     """
5701     if self.op.kind == constants.TAG_CLUSTER:
5702       self.target = self.cfg.GetClusterInfo()
5703     elif self.op.kind == constants.TAG_NODE:
5704       self.target = self.cfg.GetNodeInfo(self.op.name)
5705     elif self.op.kind == constants.TAG_INSTANCE:
5706       self.target = self.cfg.GetInstanceInfo(self.op.name)
5707     else:
5708       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5709                                  str(self.op.kind))
5710
5711
5712 class LUGetTags(TagsLU):
5713   """Returns the tags of a given object.
5714
5715   """
5716   _OP_REQP = ["kind", "name"]
5717   REQ_BGL = False
5718
5719   def Exec(self, feedback_fn):
5720     """Returns the tag list.
5721
5722     """
5723     return list(self.target.GetTags())
5724
5725
5726 class LUSearchTags(NoHooksLU):
5727   """Searches the tags for a given pattern.
5728
5729   """
5730   _OP_REQP = ["pattern"]
5731   REQ_BGL = False
5732
5733   def ExpandNames(self):
5734     self.needed_locks = {}
5735
5736   def CheckPrereq(self):
5737     """Check prerequisites.
5738
5739     This checks the pattern passed for validity by compiling it.
5740
5741     """
5742     try:
5743       self.re = re.compile(self.op.pattern)
5744     except re.error, err:
5745       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5746                                  (self.op.pattern, err))
5747
5748   def Exec(self, feedback_fn):
5749     """Returns the tag list.
5750
5751     """
5752     cfg = self.cfg
5753     tgts = [("/cluster", cfg.GetClusterInfo())]
5754     ilist = cfg.GetAllInstancesInfo().values()
5755     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5756     nlist = cfg.GetAllNodesInfo().values()
5757     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5758     results = []
5759     for path, target in tgts:
5760       for tag in target.GetTags():
5761         if self.re.search(tag):
5762           results.append((path, tag))
5763     return results
5764
5765
5766 class LUAddTags(TagsLU):
5767   """Sets a tag on a given object.
5768
5769   """
5770   _OP_REQP = ["kind", "name", "tags"]
5771   REQ_BGL = False
5772
5773   def CheckPrereq(self):
5774     """Check prerequisites.
5775
5776     This checks the type and length of the tag name and value.
5777
5778     """
5779     TagsLU.CheckPrereq(self)
5780     for tag in self.op.tags:
5781       objects.TaggableObject.ValidateTag(tag)
5782
5783   def Exec(self, feedback_fn):
5784     """Sets the tag.
5785
5786     """
5787     try:
5788       for tag in self.op.tags:
5789         self.target.AddTag(tag)
5790     except errors.TagError, err:
5791       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5792     try:
5793       self.cfg.Update(self.target)
5794     except errors.ConfigurationError:
5795       raise errors.OpRetryError("There has been a modification to the"
5796                                 " config file and the operation has been"
5797                                 " aborted. Please retry.")
5798
5799
5800 class LUDelTags(TagsLU):
5801   """Delete a list of tags from a given object.
5802
5803   """
5804   _OP_REQP = ["kind", "name", "tags"]
5805   REQ_BGL = False
5806
5807   def CheckPrereq(self):
5808     """Check prerequisites.
5809
5810     This checks that we have the given tag.
5811
5812     """
5813     TagsLU.CheckPrereq(self)
5814     for tag in self.op.tags:
5815       objects.TaggableObject.ValidateTag(tag)
5816     del_tags = frozenset(self.op.tags)
5817     cur_tags = self.target.GetTags()
5818     if not del_tags <= cur_tags:
5819       diff_tags = del_tags - cur_tags
5820       diff_names = ["'%s'" % tag for tag in diff_tags]
5821       diff_names.sort()
5822       raise errors.OpPrereqError("Tag(s) %s not found" %
5823                                  (",".join(diff_names)))
5824
5825   def Exec(self, feedback_fn):
5826     """Remove the tag from the object.
5827
5828     """
5829     for tag in self.op.tags:
5830       self.target.RemoveTag(tag)
5831     try:
5832       self.cfg.Update(self.target)
5833     except errors.ConfigurationError:
5834       raise errors.OpRetryError("There has been a modification to the"
5835                                 " config file and the operation has been"
5836                                 " aborted. Please retry.")
5837
5838
5839 class LUTestDelay(NoHooksLU):
5840   """Sleep for a specified amount of time.
5841
5842   This LU sleeps on the master and/or nodes for a specified amount of
5843   time.
5844
5845   """
5846   _OP_REQP = ["duration", "on_master", "on_nodes"]
5847   REQ_BGL = False
5848
5849   def ExpandNames(self):
5850     """Expand names and set required locks.
5851
5852     This expands the node list, if any.
5853
5854     """
5855     self.needed_locks = {}
5856     if self.op.on_nodes:
5857       # _GetWantedNodes can be used here, but is not always appropriate to use
5858       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5859       # more information.
5860       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5861       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5862
5863   def CheckPrereq(self):
5864     """Check prerequisites.
5865
5866     """
5867
5868   def Exec(self, feedback_fn):
5869     """Do the actual sleep.
5870
5871     """
5872     if self.op.on_master:
5873       if not utils.TestDelay(self.op.duration):
5874         raise errors.OpExecError("Error during master delay test")
5875     if self.op.on_nodes:
5876       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5877       if not result:
5878         raise errors.OpExecError("Complete failure from rpc call")
5879       for node, node_result in result.items():
5880         node_result.Raise()
5881         if not node_result.data:
5882           raise errors.OpExecError("Failure during rpc call to node %s,"
5883                                    " result: %s" % (node, node_result.data))
5884
5885
5886 class IAllocator(object):
5887   """IAllocator framework.
5888
5889   An IAllocator instance has three sets of attributes:
5890     - cfg that is needed to query the cluster
5891     - input data (all members of the _KEYS class attribute are required)
5892     - four buffer attributes (in|out_data|text), that represent the
5893       input (to the external script) in text and data structure format,
5894       and the output from it, again in two formats
5895     - the result variables from the script (success, info, nodes) for
5896       easy usage
5897
5898   """
5899   _ALLO_KEYS = [
5900     "mem_size", "disks", "disk_template",
5901     "os", "tags", "nics", "vcpus", "hypervisor",
5902     ]
5903   _RELO_KEYS = [
5904     "relocate_from",
5905     ]
5906
5907   def __init__(self, lu, mode, name, **kwargs):
5908     self.lu = lu
5909     # init buffer variables
5910     self.in_text = self.out_text = self.in_data = self.out_data = None
5911     # init all input fields so that pylint is happy
5912     self.mode = mode
5913     self.name = name
5914     self.mem_size = self.disks = self.disk_template = None
5915     self.os = self.tags = self.nics = self.vcpus = None
5916     self.hypervisor = None
5917     self.relocate_from = None
5918     # computed fields
5919     self.required_nodes = None
5920     # init result fields
5921     self.success = self.info = self.nodes = None
5922     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5923       keyset = self._ALLO_KEYS
5924     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5925       keyset = self._RELO_KEYS
5926     else:
5927       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5928                                    " IAllocator" % self.mode)
5929     for key in kwargs:
5930       if key not in keyset:
5931         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5932                                      " IAllocator" % key)
5933       setattr(self, key, kwargs[key])
5934     for key in keyset:
5935       if key not in kwargs:
5936         raise errors.ProgrammerError("Missing input parameter '%s' to"
5937                                      " IAllocator" % key)
5938     self._BuildInputData()
5939
5940   def _ComputeClusterData(self):
5941     """Compute the generic allocator input data.
5942
5943     This is the data that is independent of the actual operation.
5944
5945     """
5946     cfg = self.lu.cfg
5947     cluster_info = cfg.GetClusterInfo()
5948     # cluster data
5949     data = {
5950       "version": 1,
5951       "cluster_name": cfg.GetClusterName(),
5952       "cluster_tags": list(cluster_info.GetTags()),
5953       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5954       # we don't have job IDs
5955       }
5956     iinfo = cfg.GetAllInstancesInfo().values()
5957     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5958
5959     # node data
5960     node_results = {}
5961     node_list = cfg.GetNodeList()
5962
5963     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5964       hypervisor_name = self.hypervisor
5965     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5966       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
5967
5968     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5969                                            hypervisor_name)
5970     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5971                        cluster_info.enabled_hypervisors)
5972     for nname in node_list:
5973       ninfo = cfg.GetNodeInfo(nname)
5974       node_data[nname].Raise()
5975       if not isinstance(node_data[nname].data, dict):
5976         raise errors.OpExecError("Can't get data for node %s" % nname)
5977       remote_info = node_data[nname].data
5978       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5979                    'vg_size', 'vg_free', 'cpu_total']:
5980         if attr not in remote_info:
5981           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5982                                    (nname, attr))
5983         try:
5984           remote_info[attr] = int(remote_info[attr])
5985         except ValueError, err:
5986           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5987                                    " %s" % (nname, attr, str(err)))
5988       # compute memory used by primary instances
5989       i_p_mem = i_p_up_mem = 0
5990       for iinfo, beinfo in i_list:
5991         if iinfo.primary_node == nname:
5992           i_p_mem += beinfo[constants.BE_MEMORY]
5993           if iinfo.name not in node_iinfo[nname]:
5994             i_used_mem = 0
5995           else:
5996             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5997           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5998           remote_info['memory_free'] -= max(0, i_mem_diff)
5999
6000           if iinfo.status == "up":
6001             i_p_up_mem += beinfo[constants.BE_MEMORY]
6002
6003       # compute memory used by instances
6004       pnr = {
6005         "tags": list(ninfo.GetTags()),
6006         "total_memory": remote_info['memory_total'],
6007         "reserved_memory": remote_info['memory_dom0'],
6008         "free_memory": remote_info['memory_free'],
6009         "i_pri_memory": i_p_mem,
6010         "i_pri_up_memory": i_p_up_mem,
6011         "total_disk": remote_info['vg_size'],
6012         "free_disk": remote_info['vg_free'],
6013         "primary_ip": ninfo.primary_ip,
6014         "secondary_ip": ninfo.secondary_ip,
6015         "total_cpus": remote_info['cpu_total'],
6016         "offline": ninfo.offline,
6017         }
6018       node_results[nname] = pnr
6019     data["nodes"] = node_results
6020
6021     # instance data
6022     instance_data = {}
6023     for iinfo, beinfo in i_list:
6024       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6025                   for n in iinfo.nics]
6026       pir = {
6027         "tags": list(iinfo.GetTags()),
6028         "should_run": iinfo.status == "up",
6029         "vcpus": beinfo[constants.BE_VCPUS],
6030         "memory": beinfo[constants.BE_MEMORY],
6031         "os": iinfo.os,
6032         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6033         "nics": nic_data,
6034         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
6035         "disk_template": iinfo.disk_template,
6036         "hypervisor": iinfo.hypervisor,
6037         }
6038       instance_data[iinfo.name] = pir
6039
6040     data["instances"] = instance_data
6041
6042     self.in_data = data
6043
6044   def _AddNewInstance(self):
6045     """Add new instance data to allocator structure.
6046
6047     This in combination with _AllocatorGetClusterData will create the
6048     correct structure needed as input for the allocator.
6049
6050     The checks for the completeness of the opcode must have already been
6051     done.
6052
6053     """
6054     data = self.in_data
6055     if len(self.disks) != 2:
6056       raise errors.OpExecError("Only two-disk configurations supported")
6057
6058     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6059
6060     if self.disk_template in constants.DTS_NET_MIRROR:
6061       self.required_nodes = 2
6062     else:
6063       self.required_nodes = 1
6064     request = {
6065       "type": "allocate",
6066       "name": self.name,
6067       "disk_template": self.disk_template,
6068       "tags": self.tags,
6069       "os": self.os,
6070       "vcpus": self.vcpus,
6071       "memory": self.mem_size,
6072       "disks": self.disks,
6073       "disk_space_total": disk_space,
6074       "nics": self.nics,
6075       "required_nodes": self.required_nodes,
6076       }
6077     data["request"] = request
6078
6079   def _AddRelocateInstance(self):
6080     """Add relocate instance data to allocator structure.
6081
6082     This in combination with _IAllocatorGetClusterData will create the
6083     correct structure needed as input for the allocator.
6084
6085     The checks for the completeness of the opcode must have already been
6086     done.
6087
6088     """
6089     instance = self.lu.cfg.GetInstanceInfo(self.name)
6090     if instance is None:
6091       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6092                                    " IAllocator" % self.name)
6093
6094     if instance.disk_template not in constants.DTS_NET_MIRROR:
6095       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6096
6097     if len(instance.secondary_nodes) != 1:
6098       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6099
6100     self.required_nodes = 1
6101     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6102     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6103
6104     request = {
6105       "type": "relocate",
6106       "name": self.name,
6107       "disk_space_total": disk_space,
6108       "required_nodes": self.required_nodes,
6109       "relocate_from": self.relocate_from,
6110       }
6111     self.in_data["request"] = request
6112
6113   def _BuildInputData(self):
6114     """Build input data structures.
6115
6116     """
6117     self._ComputeClusterData()
6118
6119     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6120       self._AddNewInstance()
6121     else:
6122       self._AddRelocateInstance()
6123
6124     self.in_text = serializer.Dump(self.in_data)
6125
6126   def Run(self, name, validate=True, call_fn=None):
6127     """Run an instance allocator and return the results.
6128
6129     """
6130     if call_fn is None:
6131       call_fn = self.lu.rpc.call_iallocator_runner
6132     data = self.in_text
6133
6134     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6135     result.Raise()
6136
6137     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6138       raise errors.OpExecError("Invalid result from master iallocator runner")
6139
6140     rcode, stdout, stderr, fail = result.data
6141
6142     if rcode == constants.IARUN_NOTFOUND:
6143       raise errors.OpExecError("Can't find allocator '%s'" % name)
6144     elif rcode == constants.IARUN_FAILURE:
6145       raise errors.OpExecError("Instance allocator call failed: %s,"
6146                                " output: %s" % (fail, stdout+stderr))
6147     self.out_text = stdout
6148     if validate:
6149       self._ValidateResult()
6150
6151   def _ValidateResult(self):
6152     """Process the allocator results.
6153
6154     This will process and if successful save the result in
6155     self.out_data and the other parameters.
6156
6157     """
6158     try:
6159       rdict = serializer.Load(self.out_text)
6160     except Exception, err:
6161       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6162
6163     if not isinstance(rdict, dict):
6164       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6165
6166     for key in "success", "info", "nodes":
6167       if key not in rdict:
6168         raise errors.OpExecError("Can't parse iallocator results:"
6169                                  " missing key '%s'" % key)
6170       setattr(self, key, rdict[key])
6171
6172     if not isinstance(rdict["nodes"], list):
6173       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6174                                " is not a list")
6175     self.out_data = rdict
6176
6177
6178 class LUTestAllocator(NoHooksLU):
6179   """Run allocator tests.
6180
6181   This LU runs the allocator tests
6182
6183   """
6184   _OP_REQP = ["direction", "mode", "name"]
6185
6186   def CheckPrereq(self):
6187     """Check prerequisites.
6188
6189     This checks the opcode parameters depending on the director and mode test.
6190
6191     """
6192     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6193       for attr in ["name", "mem_size", "disks", "disk_template",
6194                    "os", "tags", "nics", "vcpus"]:
6195         if not hasattr(self.op, attr):
6196           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6197                                      attr)
6198       iname = self.cfg.ExpandInstanceName(self.op.name)
6199       if iname is not None:
6200         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6201                                    iname)
6202       if not isinstance(self.op.nics, list):
6203         raise errors.OpPrereqError("Invalid parameter 'nics'")
6204       for row in self.op.nics:
6205         if (not isinstance(row, dict) or
6206             "mac" not in row or
6207             "ip" not in row or
6208             "bridge" not in row):
6209           raise errors.OpPrereqError("Invalid contents of the"
6210                                      " 'nics' parameter")
6211       if not isinstance(self.op.disks, list):
6212         raise errors.OpPrereqError("Invalid parameter 'disks'")
6213       if len(self.op.disks) != 2:
6214         raise errors.OpPrereqError("Only two-disk configurations supported")
6215       for row in self.op.disks:
6216         if (not isinstance(row, dict) or
6217             "size" not in row or
6218             not isinstance(row["size"], int) or
6219             "mode" not in row or
6220             row["mode"] not in ['r', 'w']):
6221           raise errors.OpPrereqError("Invalid contents of the"
6222                                      " 'disks' parameter")
6223       if self.op.hypervisor is None:
6224         self.op.hypervisor = self.cfg.GetHypervisorType()
6225     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6226       if not hasattr(self.op, "name"):
6227         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6228       fname = self.cfg.ExpandInstanceName(self.op.name)
6229       if fname is None:
6230         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6231                                    self.op.name)
6232       self.op.name = fname
6233       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6234     else:
6235       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6236                                  self.op.mode)
6237
6238     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6239       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6240         raise errors.OpPrereqError("Missing allocator name")
6241     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6242       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6243                                  self.op.direction)
6244
6245   def Exec(self, feedback_fn):
6246     """Run the allocator test.
6247
6248     """
6249     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6250       ial = IAllocator(self,
6251                        mode=self.op.mode,
6252                        name=self.op.name,
6253                        mem_size=self.op.mem_size,
6254                        disks=self.op.disks,
6255                        disk_template=self.op.disk_template,
6256                        os=self.op.os,
6257                        tags=self.op.tags,
6258                        nics=self.op.nics,
6259                        vcpus=self.op.vcpus,
6260                        hypervisor=self.op.hypervisor,
6261                        )
6262     else:
6263       ial = IAllocator(self,
6264                        mode=self.op.mode,
6265                        name=self.op.name,
6266                        relocate_from=list(self.relocate_from),
6267                        )
6268
6269     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6270       result = ial.in_text
6271     else:
6272       ial.Run(self.op.allocator, validate=False)
6273       result = ial.out_text
6274     return result