Fix gnt-cluster modify -H and offline nodes
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35 import random
36
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
47
48
49 class LogicalUnit(object):
50   """Logical Unit base class.
51
52   Subclasses must follow these rules:
53     - implement ExpandNames
54     - implement CheckPrereq
55     - implement Exec
56     - implement BuildHooksEnv
57     - redefine HPATH and HTYPE
58     - optionally redefine their run requirements:
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99     self.CheckArguments()
100
101   def __GetSSH(self):
102     """Returns the SshRunner object
103
104     """
105     if not self.__ssh:
106       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107     return self.__ssh
108
109   ssh = property(fget=__GetSSH)
110
111   def CheckArguments(self):
112     """Check syntactic validity for the opcode arguments.
113
114     This method is for doing a simple syntactic check and ensure
115     validity of opcode parameters, without any cluster-related
116     checks. While the same can be accomplished in ExpandNames and/or
117     CheckPrereq, doing these separate is better because:
118
119       - ExpandNames is left as as purely a lock-related function
120       - CheckPrereq is run after we have aquired locks (and possible
121         waited for them)
122
123     The function is allowed to change the self.op attribute so that
124     later methods can no longer worry about missing parameters.
125
126     """
127     pass
128
129   def ExpandNames(self):
130     """Expand names for this LU.
131
132     This method is called before starting to execute the opcode, and it should
133     update all the parameters of the opcode to their canonical form (e.g. a
134     short node name must be fully expanded after this method has successfully
135     completed). This way locking, hooks, logging, ecc. can work correctly.
136
137     LUs which implement this method must also populate the self.needed_locks
138     member, as a dict with lock levels as keys, and a list of needed lock names
139     as values. Rules:
140
141       - use an empty dict if you don't need any lock
142       - if you don't need any lock at a particular level omit that level
143       - don't put anything for the BGL level
144       - if you want all locks at a level use locking.ALL_SET as a value
145
146     If you need to share locks (rather than acquire them exclusively) at one
147     level you can modify self.share_locks, setting a true value (usually 1) for
148     that level. By default locks are not shared.
149
150     Examples::
151
152       # Acquire all nodes and one instance
153       self.needed_locks = {
154         locking.LEVEL_NODE: locking.ALL_SET,
155         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156       }
157       # Acquire just two nodes
158       self.needed_locks = {
159         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160       }
161       # Acquire no locks
162       self.needed_locks = {} # No, you can't leave it to the default value None
163
164     """
165     # The implementation of this method is mandatory only if the new LU is
166     # concurrent, so that old LUs don't need to be changed all at the same
167     # time.
168     if self.REQ_BGL:
169       self.needed_locks = {} # Exclusive LUs don't need locks.
170     else:
171       raise NotImplementedError
172
173   def DeclareLocks(self, level):
174     """Declare LU locking needs for a level
175
176     While most LUs can just declare their locking needs at ExpandNames time,
177     sometimes there's the need to calculate some locks after having acquired
178     the ones before. This function is called just before acquiring locks at a
179     particular level, but after acquiring the ones at lower levels, and permits
180     such calculations. It can be used to modify self.needed_locks, and by
181     default it does nothing.
182
183     This function is only called if you have something already set in
184     self.needed_locks for the level.
185
186     @param level: Locking level which is going to be locked
187     @type level: member of ganeti.locking.LEVELS
188
189     """
190
191   def CheckPrereq(self):
192     """Check prerequisites for this LU.
193
194     This method should check that the prerequisites for the execution
195     of this LU are fulfilled. It can do internode communication, but
196     it should be idempotent - no cluster or system changes are
197     allowed.
198
199     The method should raise errors.OpPrereqError in case something is
200     not fulfilled. Its return value is ignored.
201
202     This method should also update all the parameters of the opcode to
203     their canonical form if it hasn't been done by ExpandNames before.
204
205     """
206     raise NotImplementedError
207
208   def Exec(self, feedback_fn):
209     """Execute the LU.
210
211     This method should implement the actual work. It should raise
212     errors.OpExecError for failures that are somewhat dealt with in
213     code, or expected.
214
215     """
216     raise NotImplementedError
217
218   def BuildHooksEnv(self):
219     """Build hooks environment for this LU.
220
221     This method should return a three-node tuple consisting of: a dict
222     containing the environment that will be used for running the
223     specific hook for this LU, a list of node names on which the hook
224     should run before the execution, and a list of node names on which
225     the hook should run after the execution.
226
227     The keys of the dict must not have 'GANETI_' prefixed as this will
228     be handled in the hooks runner. Also note additional keys will be
229     added by the hooks runner. If the LU doesn't define any
230     environment, an empty dict (and not None) should be returned.
231
232     No nodes should be returned as an empty list (and not None).
233
234     Note that if the HPATH for a LU class is None, this function will
235     not be called.
236
237     """
238     raise NotImplementedError
239
240   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241     """Notify the LU about the results of its hooks.
242
243     This method is called every time a hooks phase is executed, and notifies
244     the Logical Unit about the hooks' result. The LU can then use it to alter
245     its result based on the hooks.  By default the method does nothing and the
246     previous result is passed back unchanged but any LU can define it if it
247     wants to use the local cluster hook-scripts somehow.
248
249     @param phase: one of L{constants.HOOKS_PHASE_POST} or
250         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251     @param hook_results: the results of the multi-node hooks rpc call
252     @param feedback_fn: function used send feedback back to the caller
253     @param lu_result: the previous Exec result this LU had, or None
254         in the PRE phase
255     @return: the new Exec result, based on the previous result
256         and hook results
257
258     """
259     return lu_result
260
261   def _ExpandAndLockInstance(self):
262     """Helper function to expand and lock an instance.
263
264     Many LUs that work on an instance take its name in self.op.instance_name
265     and need to expand it and then declare the expanded name for locking. This
266     function does it, and then updates self.op.instance_name to the expanded
267     name. It also initializes needed_locks as a dict, if this hasn't been done
268     before.
269
270     """
271     if self.needed_locks is None:
272       self.needed_locks = {}
273     else:
274       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275         "_ExpandAndLockInstance called with instance-level locks set"
276     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277     if expanded_name is None:
278       raise errors.OpPrereqError("Instance '%s' not known" %
279                                   self.op.instance_name)
280     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281     self.op.instance_name = expanded_name
282
283   def _LockInstancesNodes(self, primary_only=False):
284     """Helper function to declare instances' nodes for locking.
285
286     This function should be called after locking one or more instances to lock
287     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288     with all primary or secondary nodes for instances already locked and
289     present in self.needed_locks[locking.LEVEL_INSTANCE].
290
291     It should be called from DeclareLocks, and for safety only works if
292     self.recalculate_locks[locking.LEVEL_NODE] is set.
293
294     In the future it may grow parameters to just lock some instance's nodes, or
295     to just lock primaries or secondary nodes, if needed.
296
297     If should be called in DeclareLocks in a way similar to::
298
299       if level == locking.LEVEL_NODE:
300         self._LockInstancesNodes()
301
302     @type primary_only: boolean
303     @param primary_only: only lock primary nodes of locked instances
304
305     """
306     assert locking.LEVEL_NODE in self.recalculate_locks, \
307       "_LockInstancesNodes helper function called with no nodes to recalculate"
308
309     # TODO: check if we're really been called with the instance locks held
310
311     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312     # future we might want to have different behaviors depending on the value
313     # of self.recalculate_locks[locking.LEVEL_NODE]
314     wanted_nodes = []
315     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316       instance = self.context.cfg.GetInstanceInfo(instance_name)
317       wanted_nodes.append(instance.primary_node)
318       if not primary_only:
319         wanted_nodes.extend(instance.secondary_nodes)
320
321     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325
326     del self.recalculate_locks[locking.LEVEL_NODE]
327
328
329 class NoHooksLU(LogicalUnit):
330   """Simple LU which runs no hooks.
331
332   This LU is intended as a parent for other LogicalUnits which will
333   run no hooks, in order to reduce duplicate code.
334
335   """
336   HPATH = None
337   HTYPE = None
338
339
340 def _GetWantedNodes(lu, nodes):
341   """Returns list of checked and expanded node names.
342
343   @type lu: L{LogicalUnit}
344   @param lu: the logical unit on whose behalf we execute
345   @type nodes: list
346   @param nodes: list of node names or None for all nodes
347   @rtype: list
348   @return: the list of nodes, sorted
349   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350
351   """
352   if not isinstance(nodes, list):
353     raise errors.OpPrereqError("Invalid argument type 'nodes'")
354
355   if not nodes:
356     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357       " non-empty list of nodes whose name is to be expanded.")
358
359   wanted = []
360   for name in nodes:
361     node = lu.cfg.ExpandNodeName(name)
362     if node is None:
363       raise errors.OpPrereqError("No such node name '%s'" % name)
364     wanted.append(node)
365
366   return utils.NiceSort(wanted)
367
368
369 def _GetWantedInstances(lu, instances):
370   """Returns list of checked and expanded instance names.
371
372   @type lu: L{LogicalUnit}
373   @param lu: the logical unit on whose behalf we execute
374   @type instances: list
375   @param instances: list of instance names or None for all instances
376   @rtype: list
377   @return: the list of instances, sorted
378   @raise errors.OpPrereqError: if the instances parameter is wrong type
379   @raise errors.OpPrereqError: if any of the passed instances is not found
380
381   """
382   if not isinstance(instances, list):
383     raise errors.OpPrereqError("Invalid argument type 'instances'")
384
385   if instances:
386     wanted = []
387
388     for name in instances:
389       instance = lu.cfg.ExpandInstanceName(name)
390       if instance is None:
391         raise errors.OpPrereqError("No such instance name '%s'" % name)
392       wanted.append(instance)
393
394   else:
395     wanted = lu.cfg.GetInstanceList()
396   return utils.NiceSort(wanted)
397
398
399 def _CheckOutputFields(static, dynamic, selected):
400   """Checks whether all selected fields are valid.
401
402   @type static: L{utils.FieldSet}
403   @param static: static fields set
404   @type dynamic: L{utils.FieldSet}
405   @param dynamic: dynamic fields set
406
407   """
408   f = utils.FieldSet()
409   f.Extend(static)
410   f.Extend(dynamic)
411
412   delta = f.NonMatching(selected)
413   if delta:
414     raise errors.OpPrereqError("Unknown output fields selected: %s"
415                                % ",".join(delta))
416
417
418 def _CheckBooleanOpField(op, name):
419   """Validates boolean opcode parameters.
420
421   This will ensure that an opcode parameter is either a boolean value,
422   or None (but that it always exists).
423
424   """
425   val = getattr(op, name, None)
426   if not (val is None or isinstance(val, bool)):
427     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428                                (name, str(val)))
429   setattr(op, name, val)
430
431
432 def _CheckNodeOnline(lu, node):
433   """Ensure that a given node is online.
434
435   @param lu: the LU on behalf of which we make the check
436   @param node: the node to check
437   @raise errors.OpPrereqError: if the nodes is offline
438
439   """
440   if lu.cfg.GetNodeInfo(node).offline:
441     raise errors.OpPrereqError("Can't use offline node %s" % node)
442
443
444 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
445                           memory, vcpus, nics):
446   """Builds instance related env variables for hooks
447
448   This builds the hook environment from individual variables.
449
450   @type name: string
451   @param name: the name of the instance
452   @type primary_node: string
453   @param primary_node: the name of the instance's primary node
454   @type secondary_nodes: list
455   @param secondary_nodes: list of secondary nodes as strings
456   @type os_type: string
457   @param os_type: the name of the instance's OS
458   @type status: boolean
459   @param status: the should_run status of the instance
460   @type memory: string
461   @param memory: the memory size of the instance
462   @type vcpus: string
463   @param vcpus: the count of VCPUs the instance has
464   @type nics: list
465   @param nics: list of tuples (ip, bridge, mac) representing
466       the NICs the instance  has
467   @rtype: dict
468   @return: the hook environment for this instance
469
470   """
471   if status:
472     str_status = "up"
473   else:
474     str_status = "down"
475   env = {
476     "OP_TARGET": name,
477     "INSTANCE_NAME": name,
478     "INSTANCE_PRIMARY": primary_node,
479     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
480     "INSTANCE_OS_TYPE": os_type,
481     "INSTANCE_STATUS": str_status,
482     "INSTANCE_MEMORY": memory,
483     "INSTANCE_VCPUS": vcpus,
484   }
485
486   if nics:
487     nic_count = len(nics)
488     for idx, (ip, bridge, mac) in enumerate(nics):
489       if ip is None:
490         ip = ""
491       env["INSTANCE_NIC%d_IP" % idx] = ip
492       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
493       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
494   else:
495     nic_count = 0
496
497   env["INSTANCE_NIC_COUNT"] = nic_count
498
499   return env
500
501
502 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
503   """Builds instance related env variables for hooks from an object.
504
505   @type lu: L{LogicalUnit}
506   @param lu: the logical unit on whose behalf we execute
507   @type instance: L{objects.Instance}
508   @param instance: the instance for which we should build the
509       environment
510   @type override: dict
511   @param override: dictionary with key/values that will override
512       our values
513   @rtype: dict
514   @return: the hook environment dictionary
515
516   """
517   bep = lu.cfg.GetClusterInfo().FillBE(instance)
518   args = {
519     'name': instance.name,
520     'primary_node': instance.primary_node,
521     'secondary_nodes': instance.secondary_nodes,
522     'os_type': instance.os,
523     'status': instance.admin_up,
524     'memory': bep[constants.BE_MEMORY],
525     'vcpus': bep[constants.BE_VCPUS],
526     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
527   }
528   if override:
529     args.update(override)
530   return _BuildInstanceHookEnv(**args)
531
532
533 def _AdjustCandidatePool(lu):
534   """Adjust the candidate pool after node operations.
535
536   """
537   mod_list = lu.cfg.MaintainCandidatePool()
538   if mod_list:
539     lu.LogInfo("Promoted nodes to master candidate role: %s",
540                ", ".join(node.name for node in mod_list))
541     for name in mod_list:
542       lu.context.ReaddNode(name)
543   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
544   if mc_now > mc_max:
545     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
546                (mc_now, mc_max))
547
548
549 def _CheckInstanceBridgesExist(lu, instance):
550   """Check that the brigdes needed by an instance exist.
551
552   """
553   # check bridges existance
554   brlist = [nic.bridge for nic in instance.nics]
555   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
556   result.Raise()
557   if not result.data:
558     raise errors.OpPrereqError("One or more target bridges %s does not"
559                                " exist on destination node '%s'" %
560                                (brlist, instance.primary_node))
561
562
563 class LUDestroyCluster(NoHooksLU):
564   """Logical unit for destroying the cluster.
565
566   """
567   _OP_REQP = []
568
569   def CheckPrereq(self):
570     """Check prerequisites.
571
572     This checks whether the cluster is empty.
573
574     Any errors are signalled by raising errors.OpPrereqError.
575
576     """
577     master = self.cfg.GetMasterNode()
578
579     nodelist = self.cfg.GetNodeList()
580     if len(nodelist) != 1 or nodelist[0] != master:
581       raise errors.OpPrereqError("There are still %d node(s) in"
582                                  " this cluster." % (len(nodelist) - 1))
583     instancelist = self.cfg.GetInstanceList()
584     if instancelist:
585       raise errors.OpPrereqError("There are still %d instance(s) in"
586                                  " this cluster." % len(instancelist))
587
588   def Exec(self, feedback_fn):
589     """Destroys the cluster.
590
591     """
592     master = self.cfg.GetMasterNode()
593     result = self.rpc.call_node_stop_master(master, False)
594     result.Raise()
595     if not result.data:
596       raise errors.OpExecError("Could not disable the master role")
597     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
598     utils.CreateBackup(priv_key)
599     utils.CreateBackup(pub_key)
600     return master
601
602
603 class LUVerifyCluster(LogicalUnit):
604   """Verifies the cluster status.
605
606   """
607   HPATH = "cluster-verify"
608   HTYPE = constants.HTYPE_CLUSTER
609   _OP_REQP = ["skip_checks"]
610   REQ_BGL = False
611
612   def ExpandNames(self):
613     self.needed_locks = {
614       locking.LEVEL_NODE: locking.ALL_SET,
615       locking.LEVEL_INSTANCE: locking.ALL_SET,
616     }
617     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
618
619   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
620                   node_result, feedback_fn, master_files,
621                   drbd_map):
622     """Run multiple tests against a node.
623
624     Test list:
625
626       - compares ganeti version
627       - checks vg existance and size > 20G
628       - checks config file checksum
629       - checks ssh to other nodes
630
631     @type nodeinfo: L{objects.Node}
632     @param nodeinfo: the node to check
633     @param file_list: required list of files
634     @param local_cksum: dictionary of local files and their checksums
635     @param node_result: the results from the node
636     @param feedback_fn: function used to accumulate results
637     @param master_files: list of files that only masters should have
638     @param drbd_map: the useddrbd minors for this node, in
639         form of minor: (instance, must_exist) which correspond to instances
640         and their running status
641
642     """
643     node = nodeinfo.name
644
645     # main result, node_result should be a non-empty dict
646     if not node_result or not isinstance(node_result, dict):
647       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
648       return True
649
650     # compares ganeti version
651     local_version = constants.PROTOCOL_VERSION
652     remote_version = node_result.get('version', None)
653     if not remote_version:
654       feedback_fn("  - ERROR: connection to %s failed" % (node))
655       return True
656
657     if local_version != remote_version:
658       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
659                       (local_version, node, remote_version))
660       return True
661
662     # checks vg existance and size > 20G
663
664     bad = False
665     vglist = node_result.get(constants.NV_VGLIST, None)
666     if not vglist:
667       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
668                       (node,))
669       bad = True
670     else:
671       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
672                                             constants.MIN_VG_SIZE)
673       if vgstatus:
674         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
675         bad = True
676
677     # checks config file checksum
678
679     remote_cksum = node_result.get(constants.NV_FILELIST, None)
680     if not isinstance(remote_cksum, dict):
681       bad = True
682       feedback_fn("  - ERROR: node hasn't returned file checksum data")
683     else:
684       for file_name in file_list:
685         node_is_mc = nodeinfo.master_candidate
686         must_have_file = file_name not in master_files
687         if file_name not in remote_cksum:
688           if node_is_mc or must_have_file:
689             bad = True
690             feedback_fn("  - ERROR: file '%s' missing" % file_name)
691         elif remote_cksum[file_name] != local_cksum[file_name]:
692           if node_is_mc or must_have_file:
693             bad = True
694             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695           else:
696             # not candidate and this is not a must-have file
697             bad = True
698             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
699                         " '%s'" % file_name)
700         else:
701           # all good, except non-master/non-must have combination
702           if not node_is_mc and not must_have_file:
703             feedback_fn("  - ERROR: file '%s' should not exist on non master"
704                         " candidates" % file_name)
705
706     # checks ssh to any
707
708     if constants.NV_NODELIST not in node_result:
709       bad = True
710       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
711     else:
712       if node_result[constants.NV_NODELIST]:
713         bad = True
714         for node in node_result[constants.NV_NODELIST]:
715           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
716                           (node, node_result[constants.NV_NODELIST][node]))
717
718     if constants.NV_NODENETTEST not in node_result:
719       bad = True
720       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
721     else:
722       if node_result[constants.NV_NODENETTEST]:
723         bad = True
724         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
725         for node in nlist:
726           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
727                           (node, node_result[constants.NV_NODENETTEST][node]))
728
729     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
730     if isinstance(hyp_result, dict):
731       for hv_name, hv_result in hyp_result.iteritems():
732         if hv_result is not None:
733           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
734                       (hv_name, hv_result))
735
736     # check used drbd list
737     used_minors = node_result.get(constants.NV_DRBDLIST, [])
738     for minor, (iname, must_exist) in drbd_map.items():
739       if minor not in used_minors and must_exist:
740         feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
741                     (minor, iname))
742         bad = True
743     for minor in used_minors:
744       if minor not in drbd_map:
745         feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
746         bad = True
747
748     return bad
749
750   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
751                       node_instance, feedback_fn, n_offline):
752     """Verify an instance.
753
754     This function checks to see if the required block devices are
755     available on the instance's node.
756
757     """
758     bad = False
759
760     node_current = instanceconfig.primary_node
761
762     node_vol_should = {}
763     instanceconfig.MapLVsByNode(node_vol_should)
764
765     for node in node_vol_should:
766       if node in n_offline:
767         # ignore missing volumes on offline nodes
768         continue
769       for volume in node_vol_should[node]:
770         if node not in node_vol_is or volume not in node_vol_is[node]:
771           feedback_fn("  - ERROR: volume %s missing on node %s" %
772                           (volume, node))
773           bad = True
774
775     if instanceconfig.admin_up:
776       if ((node_current not in node_instance or
777           not instance in node_instance[node_current]) and
778           node_current not in n_offline):
779         feedback_fn("  - ERROR: instance %s not running on node %s" %
780                         (instance, node_current))
781         bad = True
782
783     for node in node_instance:
784       if (not node == node_current):
785         if instance in node_instance[node]:
786           feedback_fn("  - ERROR: instance %s should not run on node %s" %
787                           (instance, node))
788           bad = True
789
790     return bad
791
792   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
793     """Verify if there are any unknown volumes in the cluster.
794
795     The .os, .swap and backup volumes are ignored. All other volumes are
796     reported as unknown.
797
798     """
799     bad = False
800
801     for node in node_vol_is:
802       for volume in node_vol_is[node]:
803         if node not in node_vol_should or volume not in node_vol_should[node]:
804           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
805                       (volume, node))
806           bad = True
807     return bad
808
809   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
810     """Verify the list of running instances.
811
812     This checks what instances are running but unknown to the cluster.
813
814     """
815     bad = False
816     for node in node_instance:
817       for runninginstance in node_instance[node]:
818         if runninginstance not in instancelist:
819           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
820                           (runninginstance, node))
821           bad = True
822     return bad
823
824   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
825     """Verify N+1 Memory Resilience.
826
827     Check that if one single node dies we can still start all the instances it
828     was primary for.
829
830     """
831     bad = False
832
833     for node, nodeinfo in node_info.iteritems():
834       # This code checks that every node which is now listed as secondary has
835       # enough memory to host all instances it is supposed to should a single
836       # other node in the cluster fail.
837       # FIXME: not ready for failover to an arbitrary node
838       # FIXME: does not support file-backed instances
839       # WARNING: we currently take into account down instances as well as up
840       # ones, considering that even if they're down someone might want to start
841       # them even in the event of a node failure.
842       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
843         needed_mem = 0
844         for instance in instances:
845           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
846           if bep[constants.BE_AUTO_BALANCE]:
847             needed_mem += bep[constants.BE_MEMORY]
848         if nodeinfo['mfree'] < needed_mem:
849           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
850                       " failovers should node %s fail" % (node, prinode))
851           bad = True
852     return bad
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     Transform the list of checks we're going to skip into a set and check that
858     all its members are valid.
859
860     """
861     self.skip_set = frozenset(self.op.skip_checks)
862     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
863       raise errors.OpPrereqError("Invalid checks to be skipped specified")
864
865   def BuildHooksEnv(self):
866     """Build hooks env.
867
868     Cluster-Verify hooks just rone in the post phase and their failure makes
869     the output be logged in the verify output and the verification to fail.
870
871     """
872     all_nodes = self.cfg.GetNodeList()
873     # TODO: populate the environment with useful information for verify hooks
874     env = {}
875     return env, [], all_nodes
876
877   def Exec(self, feedback_fn):
878     """Verify integrity of cluster, performing various test on nodes.
879
880     """
881     bad = False
882     feedback_fn("* Verifying global settings")
883     for msg in self.cfg.VerifyConfig():
884       feedback_fn("  - ERROR: %s" % msg)
885
886     vg_name = self.cfg.GetVGName()
887     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
888     nodelist = utils.NiceSort(self.cfg.GetNodeList())
889     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
890     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
891     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
892                         for iname in instancelist)
893     i_non_redundant = [] # Non redundant instances
894     i_non_a_balanced = [] # Non auto-balanced instances
895     n_offline = [] # List of offline nodes
896     node_volume = {}
897     node_instance = {}
898     node_info = {}
899     instance_cfg = {}
900
901     # FIXME: verify OS list
902     # do local checksums
903     master_files = [constants.CLUSTER_CONF_FILE]
904
905     file_names = ssconf.SimpleStore().GetFileList()
906     file_names.append(constants.SSL_CERT_FILE)
907     file_names.append(constants.RAPI_CERT_FILE)
908     file_names.extend(master_files)
909
910     local_checksums = utils.FingerprintFiles(file_names)
911
912     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
913     node_verify_param = {
914       constants.NV_FILELIST: file_names,
915       constants.NV_NODELIST: [node.name for node in nodeinfo
916                               if not node.offline],
917       constants.NV_HYPERVISOR: hypervisors,
918       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
919                                   node.secondary_ip) for node in nodeinfo
920                                  if not node.offline],
921       constants.NV_LVLIST: vg_name,
922       constants.NV_INSTANCELIST: hypervisors,
923       constants.NV_VGLIST: None,
924       constants.NV_VERSION: None,
925       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
926       constants.NV_DRBDLIST: None,
927       }
928     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
929                                            self.cfg.GetClusterName())
930
931     cluster = self.cfg.GetClusterInfo()
932     master_node = self.cfg.GetMasterNode()
933     all_drbd_map = self.cfg.ComputeDRBDMap()
934
935     for node_i in nodeinfo:
936       node = node_i.name
937       nresult = all_nvinfo[node].data
938
939       if node_i.offline:
940         feedback_fn("* Skipping offline node %s" % (node,))
941         n_offline.append(node)
942         continue
943
944       if node == master_node:
945         ntype = "master"
946       elif node_i.master_candidate:
947         ntype = "master candidate"
948       else:
949         ntype = "regular"
950       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
951
952       if all_nvinfo[node].failed or not isinstance(nresult, dict):
953         feedback_fn("  - ERROR: connection to %s failed" % (node,))
954         bad = True
955         continue
956
957       node_drbd = {}
958       for minor, instance in all_drbd_map[node].items():
959         instance = instanceinfo[instance]
960         node_drbd[minor] = (instance.name, instance.admin_up)
961       result = self._VerifyNode(node_i, file_names, local_checksums,
962                                 nresult, feedback_fn, master_files,
963                                 node_drbd)
964       bad = bad or result
965
966       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
967       if isinstance(lvdata, basestring):
968         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
969                     (node, lvdata.encode('string_escape')))
970         bad = True
971         node_volume[node] = {}
972       elif not isinstance(lvdata, dict):
973         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
974         bad = True
975         continue
976       else:
977         node_volume[node] = lvdata
978
979       # node_instance
980       idata = nresult.get(constants.NV_INSTANCELIST, None)
981       if not isinstance(idata, list):
982         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
983                     (node,))
984         bad = True
985         continue
986
987       node_instance[node] = idata
988
989       # node_info
990       nodeinfo = nresult.get(constants.NV_HVINFO, None)
991       if not isinstance(nodeinfo, dict):
992         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
993         bad = True
994         continue
995
996       try:
997         node_info[node] = {
998           "mfree": int(nodeinfo['memory_free']),
999           "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1000           "pinst": [],
1001           "sinst": [],
1002           # dictionary holding all instances this node is secondary for,
1003           # grouped by their primary node. Each key is a cluster node, and each
1004           # value is a list of instances which have the key as primary and the
1005           # current node as secondary.  this is handy to calculate N+1 memory
1006           # availability if you can only failover from a primary to its
1007           # secondary.
1008           "sinst-by-pnode": {},
1009         }
1010       except ValueError:
1011         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1012         bad = True
1013         continue
1014
1015     node_vol_should = {}
1016
1017     for instance in instancelist:
1018       feedback_fn("* Verifying instance %s" % instance)
1019       inst_config = instanceinfo[instance]
1020       result =  self._VerifyInstance(instance, inst_config, node_volume,
1021                                      node_instance, feedback_fn, n_offline)
1022       bad = bad or result
1023       inst_nodes_offline = []
1024
1025       inst_config.MapLVsByNode(node_vol_should)
1026
1027       instance_cfg[instance] = inst_config
1028
1029       pnode = inst_config.primary_node
1030       if pnode in node_info:
1031         node_info[pnode]['pinst'].append(instance)
1032       elif pnode not in n_offline:
1033         feedback_fn("  - ERROR: instance %s, connection to primary node"
1034                     " %s failed" % (instance, pnode))
1035         bad = True
1036
1037       if pnode in n_offline:
1038         inst_nodes_offline.append(pnode)
1039
1040       # If the instance is non-redundant we cannot survive losing its primary
1041       # node, so we are not N+1 compliant. On the other hand we have no disk
1042       # templates with more than one secondary so that situation is not well
1043       # supported either.
1044       # FIXME: does not support file-backed instances
1045       if len(inst_config.secondary_nodes) == 0:
1046         i_non_redundant.append(instance)
1047       elif len(inst_config.secondary_nodes) > 1:
1048         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1049                     % instance)
1050
1051       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1052         i_non_a_balanced.append(instance)
1053
1054       for snode in inst_config.secondary_nodes:
1055         if snode in node_info:
1056           node_info[snode]['sinst'].append(instance)
1057           if pnode not in node_info[snode]['sinst-by-pnode']:
1058             node_info[snode]['sinst-by-pnode'][pnode] = []
1059           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1060         elif snode not in n_offline:
1061           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1062                       " %s failed" % (instance, snode))
1063           bad = True
1064         if snode in n_offline:
1065           inst_nodes_offline.append(snode)
1066
1067       if inst_nodes_offline:
1068         # warn that the instance lives on offline nodes, and set bad=True
1069         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1070                     ", ".join(inst_nodes_offline))
1071         bad = True
1072
1073     feedback_fn("* Verifying orphan volumes")
1074     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1075                                        feedback_fn)
1076     bad = bad or result
1077
1078     feedback_fn("* Verifying remaining instances")
1079     result = self._VerifyOrphanInstances(instancelist, node_instance,
1080                                          feedback_fn)
1081     bad = bad or result
1082
1083     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1084       feedback_fn("* Verifying N+1 Memory redundancy")
1085       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1086       bad = bad or result
1087
1088     feedback_fn("* Other Notes")
1089     if i_non_redundant:
1090       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1091                   % len(i_non_redundant))
1092
1093     if i_non_a_balanced:
1094       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1095                   % len(i_non_a_balanced))
1096
1097     if n_offline:
1098       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1099
1100     return not bad
1101
1102   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1103     """Analize the post-hooks' result
1104
1105     This method analyses the hook result, handles it, and sends some
1106     nicely-formatted feedback back to the user.
1107
1108     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1109         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1110     @param hooks_results: the results of the multi-node hooks rpc call
1111     @param feedback_fn: function used send feedback back to the caller
1112     @param lu_result: previous Exec result
1113     @return: the new Exec result, based on the previous result
1114         and hook results
1115
1116     """
1117     # We only really run POST phase hooks, and are only interested in
1118     # their results
1119     if phase == constants.HOOKS_PHASE_POST:
1120       # Used to change hooks' output to proper indentation
1121       indent_re = re.compile('^', re.M)
1122       feedback_fn("* Hooks Results")
1123       if not hooks_results:
1124         feedback_fn("  - ERROR: general communication failure")
1125         lu_result = 1
1126       else:
1127         for node_name in hooks_results:
1128           show_node_header = True
1129           res = hooks_results[node_name]
1130           if res.failed or res.data is False or not isinstance(res.data, list):
1131             if res.offline:
1132               # no need to warn or set fail return value
1133               continue
1134             feedback_fn("    Communication failure in hooks execution")
1135             lu_result = 1
1136             continue
1137           for script, hkr, output in res.data:
1138             if hkr == constants.HKR_FAIL:
1139               # The node header is only shown once, if there are
1140               # failing hooks on that node
1141               if show_node_header:
1142                 feedback_fn("  Node %s:" % node_name)
1143                 show_node_header = False
1144               feedback_fn("    ERROR: Script %s failed, output:" % script)
1145               output = indent_re.sub('      ', output)
1146               feedback_fn("%s" % output)
1147               lu_result = 1
1148
1149       return lu_result
1150
1151
1152 class LUVerifyDisks(NoHooksLU):
1153   """Verifies the cluster disks status.
1154
1155   """
1156   _OP_REQP = []
1157   REQ_BGL = False
1158
1159   def ExpandNames(self):
1160     self.needed_locks = {
1161       locking.LEVEL_NODE: locking.ALL_SET,
1162       locking.LEVEL_INSTANCE: locking.ALL_SET,
1163     }
1164     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1165
1166   def CheckPrereq(self):
1167     """Check prerequisites.
1168
1169     This has no prerequisites.
1170
1171     """
1172     pass
1173
1174   def Exec(self, feedback_fn):
1175     """Verify integrity of cluster disks.
1176
1177     """
1178     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1179
1180     vg_name = self.cfg.GetVGName()
1181     nodes = utils.NiceSort(self.cfg.GetNodeList())
1182     instances = [self.cfg.GetInstanceInfo(name)
1183                  for name in self.cfg.GetInstanceList()]
1184
1185     nv_dict = {}
1186     for inst in instances:
1187       inst_lvs = {}
1188       if (not inst.admin_up or
1189           inst.disk_template not in constants.DTS_NET_MIRROR):
1190         continue
1191       inst.MapLVsByNode(inst_lvs)
1192       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1193       for node, vol_list in inst_lvs.iteritems():
1194         for vol in vol_list:
1195           nv_dict[(node, vol)] = inst
1196
1197     if not nv_dict:
1198       return result
1199
1200     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1201
1202     to_act = set()
1203     for node in nodes:
1204       # node_volume
1205       lvs = node_lvs[node]
1206       if lvs.failed:
1207         if not lvs.offline:
1208           self.LogWarning("Connection to node %s failed: %s" %
1209                           (node, lvs.data))
1210         continue
1211       lvs = lvs.data
1212       if isinstance(lvs, basestring):
1213         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1214         res_nlvm[node] = lvs
1215       elif not isinstance(lvs, dict):
1216         logging.warning("Connection to node %s failed or invalid data"
1217                         " returned", node)
1218         res_nodes.append(node)
1219         continue
1220
1221       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1222         inst = nv_dict.pop((node, lv_name), None)
1223         if (not lv_online and inst is not None
1224             and inst.name not in res_instances):
1225           res_instances.append(inst.name)
1226
1227     # any leftover items in nv_dict are missing LVs, let's arrange the
1228     # data better
1229     for key, inst in nv_dict.iteritems():
1230       if inst.name not in res_missing:
1231         res_missing[inst.name] = []
1232       res_missing[inst.name].append(key)
1233
1234     return result
1235
1236
1237 class LURenameCluster(LogicalUnit):
1238   """Rename the cluster.
1239
1240   """
1241   HPATH = "cluster-rename"
1242   HTYPE = constants.HTYPE_CLUSTER
1243   _OP_REQP = ["name"]
1244
1245   def BuildHooksEnv(self):
1246     """Build hooks env.
1247
1248     """
1249     env = {
1250       "OP_TARGET": self.cfg.GetClusterName(),
1251       "NEW_NAME": self.op.name,
1252       }
1253     mn = self.cfg.GetMasterNode()
1254     return env, [mn], [mn]
1255
1256   def CheckPrereq(self):
1257     """Verify that the passed name is a valid one.
1258
1259     """
1260     hostname = utils.HostInfo(self.op.name)
1261
1262     new_name = hostname.name
1263     self.ip = new_ip = hostname.ip
1264     old_name = self.cfg.GetClusterName()
1265     old_ip = self.cfg.GetMasterIP()
1266     if new_name == old_name and new_ip == old_ip:
1267       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1268                                  " cluster has changed")
1269     if new_ip != old_ip:
1270       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1271         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1272                                    " reachable on the network. Aborting." %
1273                                    new_ip)
1274
1275     self.op.name = new_name
1276
1277   def Exec(self, feedback_fn):
1278     """Rename the cluster.
1279
1280     """
1281     clustername = self.op.name
1282     ip = self.ip
1283
1284     # shutdown the master IP
1285     master = self.cfg.GetMasterNode()
1286     result = self.rpc.call_node_stop_master(master, False)
1287     if result.failed or not result.data:
1288       raise errors.OpExecError("Could not disable the master role")
1289
1290     try:
1291       cluster = self.cfg.GetClusterInfo()
1292       cluster.cluster_name = clustername
1293       cluster.master_ip = ip
1294       self.cfg.Update(cluster)
1295
1296       # update the known hosts file
1297       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1298       node_list = self.cfg.GetNodeList()
1299       try:
1300         node_list.remove(master)
1301       except ValueError:
1302         pass
1303       result = self.rpc.call_upload_file(node_list,
1304                                          constants.SSH_KNOWN_HOSTS_FILE)
1305       for to_node, to_result in result.iteritems():
1306         if to_result.failed or not to_result.data:
1307           logging.error("Copy of file %s to node %s failed",
1308                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1309
1310     finally:
1311       result = self.rpc.call_node_start_master(master, False)
1312       if result.failed or not result.data:
1313         self.LogWarning("Could not re-enable the master role on"
1314                         " the master, please restart manually.")
1315
1316
1317 def _RecursiveCheckIfLVMBased(disk):
1318   """Check if the given disk or its children are lvm-based.
1319
1320   @type disk: L{objects.Disk}
1321   @param disk: the disk to check
1322   @rtype: booleean
1323   @return: boolean indicating whether a LD_LV dev_type was found or not
1324
1325   """
1326   if disk.children:
1327     for chdisk in disk.children:
1328       if _RecursiveCheckIfLVMBased(chdisk):
1329         return True
1330   return disk.dev_type == constants.LD_LV
1331
1332
1333 class LUSetClusterParams(LogicalUnit):
1334   """Change the parameters of the cluster.
1335
1336   """
1337   HPATH = "cluster-modify"
1338   HTYPE = constants.HTYPE_CLUSTER
1339   _OP_REQP = []
1340   REQ_BGL = False
1341
1342   def CheckParameters(self):
1343     """Check parameters
1344
1345     """
1346     if not hasattr(self.op, "candidate_pool_size"):
1347       self.op.candidate_pool_size = None
1348     if self.op.candidate_pool_size is not None:
1349       try:
1350         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1351       except ValueError, err:
1352         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1353                                    str(err))
1354       if self.op.candidate_pool_size < 1:
1355         raise errors.OpPrereqError("At least one master candidate needed")
1356
1357   def ExpandNames(self):
1358     # FIXME: in the future maybe other cluster params won't require checking on
1359     # all nodes to be modified.
1360     self.needed_locks = {
1361       locking.LEVEL_NODE: locking.ALL_SET,
1362     }
1363     self.share_locks[locking.LEVEL_NODE] = 1
1364
1365   def BuildHooksEnv(self):
1366     """Build hooks env.
1367
1368     """
1369     env = {
1370       "OP_TARGET": self.cfg.GetClusterName(),
1371       "NEW_VG_NAME": self.op.vg_name,
1372       }
1373     mn = self.cfg.GetMasterNode()
1374     return env, [mn], [mn]
1375
1376   def CheckPrereq(self):
1377     """Check prerequisites.
1378
1379     This checks whether the given params don't conflict and
1380     if the given volume group is valid.
1381
1382     """
1383     # FIXME: This only works because there is only one parameter that can be
1384     # changed or removed.
1385     if self.op.vg_name is not None and not self.op.vg_name:
1386       instances = self.cfg.GetAllInstancesInfo().values()
1387       for inst in instances:
1388         for disk in inst.disks:
1389           if _RecursiveCheckIfLVMBased(disk):
1390             raise errors.OpPrereqError("Cannot disable lvm storage while"
1391                                        " lvm-based instances exist")
1392
1393     node_list = self.acquired_locks[locking.LEVEL_NODE]
1394
1395     # if vg_name not None, checks given volume group on all nodes
1396     if self.op.vg_name:
1397       vglist = self.rpc.call_vg_list(node_list)
1398       for node in node_list:
1399         if vglist[node].failed:
1400           # ignoring down node
1401           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1402           continue
1403         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1404                                               self.op.vg_name,
1405                                               constants.MIN_VG_SIZE)
1406         if vgstatus:
1407           raise errors.OpPrereqError("Error on node '%s': %s" %
1408                                      (node, vgstatus))
1409
1410     self.cluster = cluster = self.cfg.GetClusterInfo()
1411     # validate beparams changes
1412     if self.op.beparams:
1413       utils.CheckBEParams(self.op.beparams)
1414       self.new_beparams = cluster.FillDict(
1415         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1416
1417     # hypervisor list/parameters
1418     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1419     if self.op.hvparams:
1420       if not isinstance(self.op.hvparams, dict):
1421         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1422       for hv_name, hv_dict in self.op.hvparams.items():
1423         if hv_name not in self.new_hvparams:
1424           self.new_hvparams[hv_name] = hv_dict
1425         else:
1426           self.new_hvparams[hv_name].update(hv_dict)
1427
1428     if self.op.enabled_hypervisors is not None:
1429       self.hv_list = self.op.enabled_hypervisors
1430     else:
1431       self.hv_list = cluster.enabled_hypervisors
1432
1433     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1434       # either the enabled list has changed, or the parameters have, validate
1435       for hv_name, hv_params in self.new_hvparams.items():
1436         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1437             (self.op.enabled_hypervisors and
1438              hv_name in self.op.enabled_hypervisors)):
1439           # either this is a new hypervisor, or its parameters have changed
1440           hv_class = hypervisor.GetHypervisor(hv_name)
1441           hv_class.CheckParameterSyntax(hv_params)
1442           _CheckHVParams(self, node_list, hv_name, hv_params)
1443
1444   def Exec(self, feedback_fn):
1445     """Change the parameters of the cluster.
1446
1447     """
1448     if self.op.vg_name is not None:
1449       if self.op.vg_name != self.cfg.GetVGName():
1450         self.cfg.SetVGName(self.op.vg_name)
1451       else:
1452         feedback_fn("Cluster LVM configuration already in desired"
1453                     " state, not changing")
1454     if self.op.hvparams:
1455       self.cluster.hvparams = self.new_hvparams
1456     if self.op.enabled_hypervisors is not None:
1457       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1458     if self.op.beparams:
1459       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1460     if self.op.candidate_pool_size is not None:
1461       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1462
1463     self.cfg.Update(self.cluster)
1464
1465     # we want to update nodes after the cluster so that if any errors
1466     # happen, we have recorded and saved the cluster info
1467     if self.op.candidate_pool_size is not None:
1468       _AdjustCandidatePool(self)
1469
1470
1471 class LURedistributeConfig(NoHooksLU):
1472   """Force the redistribution of cluster configuration.
1473
1474   This is a very simple LU.
1475
1476   """
1477   _OP_REQP = []
1478   REQ_BGL = False
1479
1480   def ExpandNames(self):
1481     self.needed_locks = {
1482       locking.LEVEL_NODE: locking.ALL_SET,
1483     }
1484     self.share_locks[locking.LEVEL_NODE] = 1
1485
1486   def CheckPrereq(self):
1487     """Check prerequisites.
1488
1489     """
1490
1491   def Exec(self, feedback_fn):
1492     """Redistribute the configuration.
1493
1494     """
1495     self.cfg.Update(self.cfg.GetClusterInfo())
1496
1497
1498 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1499   """Sleep and poll for an instance's disk to sync.
1500
1501   """
1502   if not instance.disks:
1503     return True
1504
1505   if not oneshot:
1506     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1507
1508   node = instance.primary_node
1509
1510   for dev in instance.disks:
1511     lu.cfg.SetDiskID(dev, node)
1512
1513   retries = 0
1514   while True:
1515     max_time = 0
1516     done = True
1517     cumul_degraded = False
1518     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1519     if rstats.failed or not rstats.data:
1520       lu.LogWarning("Can't get any data from node %s", node)
1521       retries += 1
1522       if retries >= 10:
1523         raise errors.RemoteError("Can't contact node %s for mirror data,"
1524                                  " aborting." % node)
1525       time.sleep(6)
1526       continue
1527     rstats = rstats.data
1528     retries = 0
1529     for i, mstat in enumerate(rstats):
1530       if mstat is None:
1531         lu.LogWarning("Can't compute data for node %s/%s",
1532                            node, instance.disks[i].iv_name)
1533         continue
1534       # we ignore the ldisk parameter
1535       perc_done, est_time, is_degraded, _ = mstat
1536       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1537       if perc_done is not None:
1538         done = False
1539         if est_time is not None:
1540           rem_time = "%d estimated seconds remaining" % est_time
1541           max_time = est_time
1542         else:
1543           rem_time = "no time estimate"
1544         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1545                         (instance.disks[i].iv_name, perc_done, rem_time))
1546     if done or oneshot:
1547       break
1548
1549     time.sleep(min(60, max_time))
1550
1551   if done:
1552     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1553   return not cumul_degraded
1554
1555
1556 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1557   """Check that mirrors are not degraded.
1558
1559   The ldisk parameter, if True, will change the test from the
1560   is_degraded attribute (which represents overall non-ok status for
1561   the device(s)) to the ldisk (representing the local storage status).
1562
1563   """
1564   lu.cfg.SetDiskID(dev, node)
1565   if ldisk:
1566     idx = 6
1567   else:
1568     idx = 5
1569
1570   result = True
1571   if on_primary or dev.AssembleOnSecondary():
1572     rstats = lu.rpc.call_blockdev_find(node, dev)
1573     if rstats.failed or not rstats.data:
1574       logging.warning("Node %s: disk degraded, not found or node down", node)
1575       result = False
1576     else:
1577       result = result and (not rstats.data[idx])
1578   if dev.children:
1579     for child in dev.children:
1580       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1581
1582   return result
1583
1584
1585 class LUDiagnoseOS(NoHooksLU):
1586   """Logical unit for OS diagnose/query.
1587
1588   """
1589   _OP_REQP = ["output_fields", "names"]
1590   REQ_BGL = False
1591   _FIELDS_STATIC = utils.FieldSet()
1592   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1593
1594   def ExpandNames(self):
1595     if self.op.names:
1596       raise errors.OpPrereqError("Selective OS query not supported")
1597
1598     _CheckOutputFields(static=self._FIELDS_STATIC,
1599                        dynamic=self._FIELDS_DYNAMIC,
1600                        selected=self.op.output_fields)
1601
1602     # Lock all nodes, in shared mode
1603     self.needed_locks = {}
1604     self.share_locks[locking.LEVEL_NODE] = 1
1605     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1606
1607   def CheckPrereq(self):
1608     """Check prerequisites.
1609
1610     """
1611
1612   @staticmethod
1613   def _DiagnoseByOS(node_list, rlist):
1614     """Remaps a per-node return list into an a per-os per-node dictionary
1615
1616     @param node_list: a list with the names of all nodes
1617     @param rlist: a map with node names as keys and OS objects as values
1618
1619     @rtype: dict
1620     @returns: a dictionary with osnames as keys and as value another map, with
1621         nodes as keys and list of OS objects as values, eg::
1622
1623           {"debian-etch": {"node1": [<object>,...],
1624                            "node2": [<object>,]}
1625           }
1626
1627     """
1628     all_os = {}
1629     for node_name, nr in rlist.iteritems():
1630       if nr.failed or not nr.data:
1631         continue
1632       for os_obj in nr.data:
1633         if os_obj.name not in all_os:
1634           # build a list of nodes for this os containing empty lists
1635           # for each node in node_list
1636           all_os[os_obj.name] = {}
1637           for nname in node_list:
1638             all_os[os_obj.name][nname] = []
1639         all_os[os_obj.name][node_name].append(os_obj)
1640     return all_os
1641
1642   def Exec(self, feedback_fn):
1643     """Compute the list of OSes.
1644
1645     """
1646     node_list = self.acquired_locks[locking.LEVEL_NODE]
1647     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1648                    if node in node_list]
1649     node_data = self.rpc.call_os_diagnose(valid_nodes)
1650     if node_data == False:
1651       raise errors.OpExecError("Can't gather the list of OSes")
1652     pol = self._DiagnoseByOS(valid_nodes, node_data)
1653     output = []
1654     for os_name, os_data in pol.iteritems():
1655       row = []
1656       for field in self.op.output_fields:
1657         if field == "name":
1658           val = os_name
1659         elif field == "valid":
1660           val = utils.all([osl and osl[0] for osl in os_data.values()])
1661         elif field == "node_status":
1662           val = {}
1663           for node_name, nos_list in os_data.iteritems():
1664             val[node_name] = [(v.status, v.path) for v in nos_list]
1665         else:
1666           raise errors.ParameterError(field)
1667         row.append(val)
1668       output.append(row)
1669
1670     return output
1671
1672
1673 class LURemoveNode(LogicalUnit):
1674   """Logical unit for removing a node.
1675
1676   """
1677   HPATH = "node-remove"
1678   HTYPE = constants.HTYPE_NODE
1679   _OP_REQP = ["node_name"]
1680
1681   def BuildHooksEnv(self):
1682     """Build hooks env.
1683
1684     This doesn't run on the target node in the pre phase as a failed
1685     node would then be impossible to remove.
1686
1687     """
1688     env = {
1689       "OP_TARGET": self.op.node_name,
1690       "NODE_NAME": self.op.node_name,
1691       }
1692     all_nodes = self.cfg.GetNodeList()
1693     all_nodes.remove(self.op.node_name)
1694     return env, all_nodes, all_nodes
1695
1696   def CheckPrereq(self):
1697     """Check prerequisites.
1698
1699     This checks:
1700      - the node exists in the configuration
1701      - it does not have primary or secondary instances
1702      - it's not the master
1703
1704     Any errors are signalled by raising errors.OpPrereqError.
1705
1706     """
1707     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1708     if node is None:
1709       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1710
1711     instance_list = self.cfg.GetInstanceList()
1712
1713     masternode = self.cfg.GetMasterNode()
1714     if node.name == masternode:
1715       raise errors.OpPrereqError("Node is the master node,"
1716                                  " you need to failover first.")
1717
1718     for instance_name in instance_list:
1719       instance = self.cfg.GetInstanceInfo(instance_name)
1720       if node.name in instance.all_nodes:
1721         raise errors.OpPrereqError("Instance %s is still running on the node,"
1722                                    " please remove first." % instance_name)
1723     self.op.node_name = node.name
1724     self.node = node
1725
1726   def Exec(self, feedback_fn):
1727     """Removes the node from the cluster.
1728
1729     """
1730     node = self.node
1731     logging.info("Stopping the node daemon and removing configs from node %s",
1732                  node.name)
1733
1734     self.context.RemoveNode(node.name)
1735
1736     self.rpc.call_node_leave_cluster(node.name)
1737
1738     # Promote nodes to master candidate as needed
1739     _AdjustCandidatePool(self)
1740
1741
1742 class LUQueryNodes(NoHooksLU):
1743   """Logical unit for querying nodes.
1744
1745   """
1746   _OP_REQP = ["output_fields", "names"]
1747   REQ_BGL = False
1748   _FIELDS_DYNAMIC = utils.FieldSet(
1749     "dtotal", "dfree",
1750     "mtotal", "mnode", "mfree",
1751     "bootid",
1752     "ctotal",
1753     )
1754
1755   _FIELDS_STATIC = utils.FieldSet(
1756     "name", "pinst_cnt", "sinst_cnt",
1757     "pinst_list", "sinst_list",
1758     "pip", "sip", "tags",
1759     "serial_no",
1760     "master_candidate",
1761     "master",
1762     "offline",
1763     )
1764
1765   def ExpandNames(self):
1766     _CheckOutputFields(static=self._FIELDS_STATIC,
1767                        dynamic=self._FIELDS_DYNAMIC,
1768                        selected=self.op.output_fields)
1769
1770     self.needed_locks = {}
1771     self.share_locks[locking.LEVEL_NODE] = 1
1772
1773     if self.op.names:
1774       self.wanted = _GetWantedNodes(self, self.op.names)
1775     else:
1776       self.wanted = locking.ALL_SET
1777
1778     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1779     if self.do_locking:
1780       # if we don't request only static fields, we need to lock the nodes
1781       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1782
1783
1784   def CheckPrereq(self):
1785     """Check prerequisites.
1786
1787     """
1788     # The validation of the node list is done in the _GetWantedNodes,
1789     # if non empty, and if empty, there's no validation to do
1790     pass
1791
1792   def Exec(self, feedback_fn):
1793     """Computes the list of nodes and their attributes.
1794
1795     """
1796     all_info = self.cfg.GetAllNodesInfo()
1797     if self.do_locking:
1798       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1799     elif self.wanted != locking.ALL_SET:
1800       nodenames = self.wanted
1801       missing = set(nodenames).difference(all_info.keys())
1802       if missing:
1803         raise errors.OpExecError(
1804           "Some nodes were removed before retrieving their data: %s" % missing)
1805     else:
1806       nodenames = all_info.keys()
1807
1808     nodenames = utils.NiceSort(nodenames)
1809     nodelist = [all_info[name] for name in nodenames]
1810
1811     # begin data gathering
1812
1813     if self.do_locking:
1814       live_data = {}
1815       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1816                                           self.cfg.GetHypervisorType())
1817       for name in nodenames:
1818         nodeinfo = node_data[name]
1819         if not nodeinfo.failed and nodeinfo.data:
1820           nodeinfo = nodeinfo.data
1821           fn = utils.TryConvert
1822           live_data[name] = {
1823             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1824             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1825             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1826             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1827             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1828             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1829             "bootid": nodeinfo.get('bootid', None),
1830             }
1831         else:
1832           live_data[name] = {}
1833     else:
1834       live_data = dict.fromkeys(nodenames, {})
1835
1836     node_to_primary = dict([(name, set()) for name in nodenames])
1837     node_to_secondary = dict([(name, set()) for name in nodenames])
1838
1839     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1840                              "sinst_cnt", "sinst_list"))
1841     if inst_fields & frozenset(self.op.output_fields):
1842       instancelist = self.cfg.GetInstanceList()
1843
1844       for instance_name in instancelist:
1845         inst = self.cfg.GetInstanceInfo(instance_name)
1846         if inst.primary_node in node_to_primary:
1847           node_to_primary[inst.primary_node].add(inst.name)
1848         for secnode in inst.secondary_nodes:
1849           if secnode in node_to_secondary:
1850             node_to_secondary[secnode].add(inst.name)
1851
1852     master_node = self.cfg.GetMasterNode()
1853
1854     # end data gathering
1855
1856     output = []
1857     for node in nodelist:
1858       node_output = []
1859       for field in self.op.output_fields:
1860         if field == "name":
1861           val = node.name
1862         elif field == "pinst_list":
1863           val = list(node_to_primary[node.name])
1864         elif field == "sinst_list":
1865           val = list(node_to_secondary[node.name])
1866         elif field == "pinst_cnt":
1867           val = len(node_to_primary[node.name])
1868         elif field == "sinst_cnt":
1869           val = len(node_to_secondary[node.name])
1870         elif field == "pip":
1871           val = node.primary_ip
1872         elif field == "sip":
1873           val = node.secondary_ip
1874         elif field == "tags":
1875           val = list(node.GetTags())
1876         elif field == "serial_no":
1877           val = node.serial_no
1878         elif field == "master_candidate":
1879           val = node.master_candidate
1880         elif field == "master":
1881           val = node.name == master_node
1882         elif field == "offline":
1883           val = node.offline
1884         elif self._FIELDS_DYNAMIC.Matches(field):
1885           val = live_data[node.name].get(field, None)
1886         else:
1887           raise errors.ParameterError(field)
1888         node_output.append(val)
1889       output.append(node_output)
1890
1891     return output
1892
1893
1894 class LUQueryNodeVolumes(NoHooksLU):
1895   """Logical unit for getting volumes on node(s).
1896
1897   """
1898   _OP_REQP = ["nodes", "output_fields"]
1899   REQ_BGL = False
1900   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1901   _FIELDS_STATIC = utils.FieldSet("node")
1902
1903   def ExpandNames(self):
1904     _CheckOutputFields(static=self._FIELDS_STATIC,
1905                        dynamic=self._FIELDS_DYNAMIC,
1906                        selected=self.op.output_fields)
1907
1908     self.needed_locks = {}
1909     self.share_locks[locking.LEVEL_NODE] = 1
1910     if not self.op.nodes:
1911       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1912     else:
1913       self.needed_locks[locking.LEVEL_NODE] = \
1914         _GetWantedNodes(self, self.op.nodes)
1915
1916   def CheckPrereq(self):
1917     """Check prerequisites.
1918
1919     This checks that the fields required are valid output fields.
1920
1921     """
1922     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1923
1924   def Exec(self, feedback_fn):
1925     """Computes the list of nodes and their attributes.
1926
1927     """
1928     nodenames = self.nodes
1929     volumes = self.rpc.call_node_volumes(nodenames)
1930
1931     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1932              in self.cfg.GetInstanceList()]
1933
1934     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1935
1936     output = []
1937     for node in nodenames:
1938       if node not in volumes or volumes[node].failed or not volumes[node].data:
1939         continue
1940
1941       node_vols = volumes[node].data[:]
1942       node_vols.sort(key=lambda vol: vol['dev'])
1943
1944       for vol in node_vols:
1945         node_output = []
1946         for field in self.op.output_fields:
1947           if field == "node":
1948             val = node
1949           elif field == "phys":
1950             val = vol['dev']
1951           elif field == "vg":
1952             val = vol['vg']
1953           elif field == "name":
1954             val = vol['name']
1955           elif field == "size":
1956             val = int(float(vol['size']))
1957           elif field == "instance":
1958             for inst in ilist:
1959               if node not in lv_by_node[inst]:
1960                 continue
1961               if vol['name'] in lv_by_node[inst][node]:
1962                 val = inst.name
1963                 break
1964             else:
1965               val = '-'
1966           else:
1967             raise errors.ParameterError(field)
1968           node_output.append(str(val))
1969
1970         output.append(node_output)
1971
1972     return output
1973
1974
1975 class LUAddNode(LogicalUnit):
1976   """Logical unit for adding node to the cluster.
1977
1978   """
1979   HPATH = "node-add"
1980   HTYPE = constants.HTYPE_NODE
1981   _OP_REQP = ["node_name"]
1982
1983   def BuildHooksEnv(self):
1984     """Build hooks env.
1985
1986     This will run on all nodes before, and on all nodes + the new node after.
1987
1988     """
1989     env = {
1990       "OP_TARGET": self.op.node_name,
1991       "NODE_NAME": self.op.node_name,
1992       "NODE_PIP": self.op.primary_ip,
1993       "NODE_SIP": self.op.secondary_ip,
1994       }
1995     nodes_0 = self.cfg.GetNodeList()
1996     nodes_1 = nodes_0 + [self.op.node_name, ]
1997     return env, nodes_0, nodes_1
1998
1999   def CheckPrereq(self):
2000     """Check prerequisites.
2001
2002     This checks:
2003      - the new node is not already in the config
2004      - it is resolvable
2005      - its parameters (single/dual homed) matches the cluster
2006
2007     Any errors are signalled by raising errors.OpPrereqError.
2008
2009     """
2010     node_name = self.op.node_name
2011     cfg = self.cfg
2012
2013     dns_data = utils.HostInfo(node_name)
2014
2015     node = dns_data.name
2016     primary_ip = self.op.primary_ip = dns_data.ip
2017     secondary_ip = getattr(self.op, "secondary_ip", None)
2018     if secondary_ip is None:
2019       secondary_ip = primary_ip
2020     if not utils.IsValidIP(secondary_ip):
2021       raise errors.OpPrereqError("Invalid secondary IP given")
2022     self.op.secondary_ip = secondary_ip
2023
2024     node_list = cfg.GetNodeList()
2025     if not self.op.readd and node in node_list:
2026       raise errors.OpPrereqError("Node %s is already in the configuration" %
2027                                  node)
2028     elif self.op.readd and node not in node_list:
2029       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2030
2031     for existing_node_name in node_list:
2032       existing_node = cfg.GetNodeInfo(existing_node_name)
2033
2034       if self.op.readd and node == existing_node_name:
2035         if (existing_node.primary_ip != primary_ip or
2036             existing_node.secondary_ip != secondary_ip):
2037           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2038                                      " address configuration as before")
2039         continue
2040
2041       if (existing_node.primary_ip == primary_ip or
2042           existing_node.secondary_ip == primary_ip or
2043           existing_node.primary_ip == secondary_ip or
2044           existing_node.secondary_ip == secondary_ip):
2045         raise errors.OpPrereqError("New node ip address(es) conflict with"
2046                                    " existing node %s" % existing_node.name)
2047
2048     # check that the type of the node (single versus dual homed) is the
2049     # same as for the master
2050     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2051     master_singlehomed = myself.secondary_ip == myself.primary_ip
2052     newbie_singlehomed = secondary_ip == primary_ip
2053     if master_singlehomed != newbie_singlehomed:
2054       if master_singlehomed:
2055         raise errors.OpPrereqError("The master has no private ip but the"
2056                                    " new node has one")
2057       else:
2058         raise errors.OpPrereqError("The master has a private ip but the"
2059                                    " new node doesn't have one")
2060
2061     # checks reachablity
2062     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2063       raise errors.OpPrereqError("Node not reachable by ping")
2064
2065     if not newbie_singlehomed:
2066       # check reachability from my secondary ip to newbie's secondary ip
2067       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2068                            source=myself.secondary_ip):
2069         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2070                                    " based ping to noded port")
2071
2072     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2073     mc_now, _ = self.cfg.GetMasterCandidateStats()
2074     master_candidate = mc_now < cp_size
2075
2076     self.new_node = objects.Node(name=node,
2077                                  primary_ip=primary_ip,
2078                                  secondary_ip=secondary_ip,
2079                                  master_candidate=master_candidate,
2080                                  offline=False)
2081
2082   def Exec(self, feedback_fn):
2083     """Adds the new node to the cluster.
2084
2085     """
2086     new_node = self.new_node
2087     node = new_node.name
2088
2089     # check connectivity
2090     result = self.rpc.call_version([node])[node]
2091     result.Raise()
2092     if result.data:
2093       if constants.PROTOCOL_VERSION == result.data:
2094         logging.info("Communication to node %s fine, sw version %s match",
2095                      node, result.data)
2096       else:
2097         raise errors.OpExecError("Version mismatch master version %s,"
2098                                  " node version %s" %
2099                                  (constants.PROTOCOL_VERSION, result.data))
2100     else:
2101       raise errors.OpExecError("Cannot get version from the new node")
2102
2103     # setup ssh on node
2104     logging.info("Copy ssh key to node %s", node)
2105     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2106     keyarray = []
2107     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2108                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2109                 priv_key, pub_key]
2110
2111     for i in keyfiles:
2112       f = open(i, 'r')
2113       try:
2114         keyarray.append(f.read())
2115       finally:
2116         f.close()
2117
2118     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2119                                     keyarray[2],
2120                                     keyarray[3], keyarray[4], keyarray[5])
2121
2122     if result.failed or not result.data:
2123       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2124
2125     # Add node to our /etc/hosts, and add key to known_hosts
2126     utils.AddHostToEtcHosts(new_node.name)
2127
2128     if new_node.secondary_ip != new_node.primary_ip:
2129       result = self.rpc.call_node_has_ip_address(new_node.name,
2130                                                  new_node.secondary_ip)
2131       if result.failed or not result.data:
2132         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2133                                  " you gave (%s). Please fix and re-run this"
2134                                  " command." % new_node.secondary_ip)
2135
2136     node_verify_list = [self.cfg.GetMasterNode()]
2137     node_verify_param = {
2138       'nodelist': [node],
2139       # TODO: do a node-net-test as well?
2140     }
2141
2142     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2143                                        self.cfg.GetClusterName())
2144     for verifier in node_verify_list:
2145       if result[verifier].failed or not result[verifier].data:
2146         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2147                                  " for remote verification" % verifier)
2148       if result[verifier].data['nodelist']:
2149         for failed in result[verifier].data['nodelist']:
2150           feedback_fn("ssh/hostname verification failed %s -> %s" %
2151                       (verifier, result[verifier]['nodelist'][failed]))
2152         raise errors.OpExecError("ssh/hostname verification failed.")
2153
2154     # Distribute updated /etc/hosts and known_hosts to all nodes,
2155     # including the node just added
2156     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2157     dist_nodes = self.cfg.GetNodeList()
2158     if not self.op.readd:
2159       dist_nodes.append(node)
2160     if myself.name in dist_nodes:
2161       dist_nodes.remove(myself.name)
2162
2163     logging.debug("Copying hosts and known_hosts to all nodes")
2164     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2165       result = self.rpc.call_upload_file(dist_nodes, fname)
2166       for to_node, to_result in result.iteritems():
2167         if to_result.failed or not to_result.data:
2168           logging.error("Copy of file %s to node %s failed", fname, to_node)
2169
2170     to_copy = []
2171     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2172       to_copy.append(constants.VNC_PASSWORD_FILE)
2173     for fname in to_copy:
2174       result = self.rpc.call_upload_file([node], fname)
2175       if result[node].failed or not result[node]:
2176         logging.error("Could not copy file %s to node %s", fname, node)
2177
2178     if self.op.readd:
2179       self.context.ReaddNode(new_node)
2180     else:
2181       self.context.AddNode(new_node)
2182
2183
2184 class LUSetNodeParams(LogicalUnit):
2185   """Modifies the parameters of a node.
2186
2187   """
2188   HPATH = "node-modify"
2189   HTYPE = constants.HTYPE_NODE
2190   _OP_REQP = ["node_name"]
2191   REQ_BGL = False
2192
2193   def CheckArguments(self):
2194     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2195     if node_name is None:
2196       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2197     self.op.node_name = node_name
2198     _CheckBooleanOpField(self.op, 'master_candidate')
2199     _CheckBooleanOpField(self.op, 'offline')
2200     if self.op.master_candidate is None and self.op.offline is None:
2201       raise errors.OpPrereqError("Please pass at least one modification")
2202     if self.op.offline == True and self.op.master_candidate == True:
2203       raise errors.OpPrereqError("Can't set the node into offline and"
2204                                  " master_candidate at the same time")
2205
2206   def ExpandNames(self):
2207     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2208
2209   def BuildHooksEnv(self):
2210     """Build hooks env.
2211
2212     This runs on the master node.
2213
2214     """
2215     env = {
2216       "OP_TARGET": self.op.node_name,
2217       "MASTER_CANDIDATE": str(self.op.master_candidate),
2218       "OFFLINE": str(self.op.offline),
2219       }
2220     nl = [self.cfg.GetMasterNode(),
2221           self.op.node_name]
2222     return env, nl, nl
2223
2224   def CheckPrereq(self):
2225     """Check prerequisites.
2226
2227     This only checks the instance list against the existing names.
2228
2229     """
2230     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2231
2232     if ((self.op.master_candidate == False or self.op.offline == True)
2233         and node.master_candidate):
2234       # we will demote the node from master_candidate
2235       if self.op.node_name == self.cfg.GetMasterNode():
2236         raise errors.OpPrereqError("The master node has to be a"
2237                                    " master candidate and online")
2238       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2239       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2240       if num_candidates <= cp_size:
2241         msg = ("Not enough master candidates (desired"
2242                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2243         if self.op.force:
2244           self.LogWarning(msg)
2245         else:
2246           raise errors.OpPrereqError(msg)
2247
2248     if (self.op.master_candidate == True and node.offline and
2249         not self.op.offline == False):
2250       raise errors.OpPrereqError("Can't set an offline node to"
2251                                  " master_candidate")
2252
2253     return
2254
2255   def Exec(self, feedback_fn):
2256     """Modifies a node.
2257
2258     """
2259     node = self.node
2260
2261     result = []
2262
2263     if self.op.offline is not None:
2264       node.offline = self.op.offline
2265       result.append(("offline", str(self.op.offline)))
2266       if self.op.offline == True and node.master_candidate:
2267         node.master_candidate = False
2268         result.append(("master_candidate", "auto-demotion due to offline"))
2269
2270     if self.op.master_candidate is not None:
2271       node.master_candidate = self.op.master_candidate
2272       result.append(("master_candidate", str(self.op.master_candidate)))
2273       if self.op.master_candidate == False:
2274         rrc = self.rpc.call_node_demote_from_mc(node.name)
2275         if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2276             or len(rrc.data) != 2):
2277           self.LogWarning("Node rpc error: %s" % rrc.error)
2278         elif not rrc.data[0]:
2279           self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2280
2281     # this will trigger configuration file update, if needed
2282     self.cfg.Update(node)
2283     # this will trigger job queue propagation or cleanup
2284     if self.op.node_name != self.cfg.GetMasterNode():
2285       self.context.ReaddNode(node)
2286
2287     return result
2288
2289
2290 class LUQueryClusterInfo(NoHooksLU):
2291   """Query cluster configuration.
2292
2293   """
2294   _OP_REQP = []
2295   REQ_BGL = False
2296
2297   def ExpandNames(self):
2298     self.needed_locks = {}
2299
2300   def CheckPrereq(self):
2301     """No prerequsites needed for this LU.
2302
2303     """
2304     pass
2305
2306   def Exec(self, feedback_fn):
2307     """Return cluster config.
2308
2309     """
2310     cluster = self.cfg.GetClusterInfo()
2311     result = {
2312       "software_version": constants.RELEASE_VERSION,
2313       "protocol_version": constants.PROTOCOL_VERSION,
2314       "config_version": constants.CONFIG_VERSION,
2315       "os_api_version": constants.OS_API_VERSION,
2316       "export_version": constants.EXPORT_VERSION,
2317       "architecture": (platform.architecture()[0], platform.machine()),
2318       "name": cluster.cluster_name,
2319       "master": cluster.master_node,
2320       "default_hypervisor": cluster.default_hypervisor,
2321       "enabled_hypervisors": cluster.enabled_hypervisors,
2322       "hvparams": cluster.hvparams,
2323       "beparams": cluster.beparams,
2324       "candidate_pool_size": cluster.candidate_pool_size,
2325       }
2326
2327     return result
2328
2329
2330 class LUQueryConfigValues(NoHooksLU):
2331   """Return configuration values.
2332
2333   """
2334   _OP_REQP = []
2335   REQ_BGL = False
2336   _FIELDS_DYNAMIC = utils.FieldSet()
2337   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2338
2339   def ExpandNames(self):
2340     self.needed_locks = {}
2341
2342     _CheckOutputFields(static=self._FIELDS_STATIC,
2343                        dynamic=self._FIELDS_DYNAMIC,
2344                        selected=self.op.output_fields)
2345
2346   def CheckPrereq(self):
2347     """No prerequisites.
2348
2349     """
2350     pass
2351
2352   def Exec(self, feedback_fn):
2353     """Dump a representation of the cluster config to the standard output.
2354
2355     """
2356     values = []
2357     for field in self.op.output_fields:
2358       if field == "cluster_name":
2359         entry = self.cfg.GetClusterName()
2360       elif field == "master_node":
2361         entry = self.cfg.GetMasterNode()
2362       elif field == "drain_flag":
2363         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2364       else:
2365         raise errors.ParameterError(field)
2366       values.append(entry)
2367     return values
2368
2369
2370 class LUActivateInstanceDisks(NoHooksLU):
2371   """Bring up an instance's disks.
2372
2373   """
2374   _OP_REQP = ["instance_name"]
2375   REQ_BGL = False
2376
2377   def ExpandNames(self):
2378     self._ExpandAndLockInstance()
2379     self.needed_locks[locking.LEVEL_NODE] = []
2380     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2381
2382   def DeclareLocks(self, level):
2383     if level == locking.LEVEL_NODE:
2384       self._LockInstancesNodes()
2385
2386   def CheckPrereq(self):
2387     """Check prerequisites.
2388
2389     This checks that the instance is in the cluster.
2390
2391     """
2392     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2393     assert self.instance is not None, \
2394       "Cannot retrieve locked instance %s" % self.op.instance_name
2395     _CheckNodeOnline(self, self.instance.primary_node)
2396
2397   def Exec(self, feedback_fn):
2398     """Activate the disks.
2399
2400     """
2401     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2402     if not disks_ok:
2403       raise errors.OpExecError("Cannot activate block devices")
2404
2405     return disks_info
2406
2407
2408 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2409   """Prepare the block devices for an instance.
2410
2411   This sets up the block devices on all nodes.
2412
2413   @type lu: L{LogicalUnit}
2414   @param lu: the logical unit on whose behalf we execute
2415   @type instance: L{objects.Instance}
2416   @param instance: the instance for whose disks we assemble
2417   @type ignore_secondaries: boolean
2418   @param ignore_secondaries: if true, errors on secondary nodes
2419       won't result in an error return from the function
2420   @return: False if the operation failed, otherwise a list of
2421       (host, instance_visible_name, node_visible_name)
2422       with the mapping from node devices to instance devices
2423
2424   """
2425   device_info = []
2426   disks_ok = True
2427   iname = instance.name
2428   # With the two passes mechanism we try to reduce the window of
2429   # opportunity for the race condition of switching DRBD to primary
2430   # before handshaking occured, but we do not eliminate it
2431
2432   # The proper fix would be to wait (with some limits) until the
2433   # connection has been made and drbd transitions from WFConnection
2434   # into any other network-connected state (Connected, SyncTarget,
2435   # SyncSource, etc.)
2436
2437   # 1st pass, assemble on all nodes in secondary mode
2438   for inst_disk in instance.disks:
2439     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2440       lu.cfg.SetDiskID(node_disk, node)
2441       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2442       if result.failed or not result:
2443         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2444                            " (is_primary=False, pass=1)",
2445                            inst_disk.iv_name, node)
2446         if not ignore_secondaries:
2447           disks_ok = False
2448
2449   # FIXME: race condition on drbd migration to primary
2450
2451   # 2nd pass, do only the primary node
2452   for inst_disk in instance.disks:
2453     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2454       if node != instance.primary_node:
2455         continue
2456       lu.cfg.SetDiskID(node_disk, node)
2457       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2458       if result.failed or not result:
2459         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2460                            " (is_primary=True, pass=2)",
2461                            inst_disk.iv_name, node)
2462         disks_ok = False
2463     device_info.append((instance.primary_node, inst_disk.iv_name, result.data))
2464
2465   # leave the disks configured for the primary node
2466   # this is a workaround that would be fixed better by
2467   # improving the logical/physical id handling
2468   for disk in instance.disks:
2469     lu.cfg.SetDiskID(disk, instance.primary_node)
2470
2471   return disks_ok, device_info
2472
2473
2474 def _StartInstanceDisks(lu, instance, force):
2475   """Start the disks of an instance.
2476
2477   """
2478   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2479                                            ignore_secondaries=force)
2480   if not disks_ok:
2481     _ShutdownInstanceDisks(lu, instance)
2482     if force is not None and not force:
2483       lu.proc.LogWarning("", hint="If the message above refers to a"
2484                          " secondary node,"
2485                          " you can retry the operation using '--force'.")
2486     raise errors.OpExecError("Disk consistency error")
2487
2488
2489 class LUDeactivateInstanceDisks(NoHooksLU):
2490   """Shutdown an instance's disks.
2491
2492   """
2493   _OP_REQP = ["instance_name"]
2494   REQ_BGL = False
2495
2496   def ExpandNames(self):
2497     self._ExpandAndLockInstance()
2498     self.needed_locks[locking.LEVEL_NODE] = []
2499     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2500
2501   def DeclareLocks(self, level):
2502     if level == locking.LEVEL_NODE:
2503       self._LockInstancesNodes()
2504
2505   def CheckPrereq(self):
2506     """Check prerequisites.
2507
2508     This checks that the instance is in the cluster.
2509
2510     """
2511     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512     assert self.instance is not None, \
2513       "Cannot retrieve locked instance %s" % self.op.instance_name
2514
2515   def Exec(self, feedback_fn):
2516     """Deactivate the disks
2517
2518     """
2519     instance = self.instance
2520     _SafeShutdownInstanceDisks(self, instance)
2521
2522
2523 def _SafeShutdownInstanceDisks(lu, instance):
2524   """Shutdown block devices of an instance.
2525
2526   This function checks if an instance is running, before calling
2527   _ShutdownInstanceDisks.
2528
2529   """
2530   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2531                                       [instance.hypervisor])
2532   ins_l = ins_l[instance.primary_node]
2533   if ins_l.failed or not isinstance(ins_l.data, list):
2534     raise errors.OpExecError("Can't contact node '%s'" %
2535                              instance.primary_node)
2536
2537   if instance.name in ins_l.data:
2538     raise errors.OpExecError("Instance is running, can't shutdown"
2539                              " block devices.")
2540
2541   _ShutdownInstanceDisks(lu, instance)
2542
2543
2544 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2545   """Shutdown block devices of an instance.
2546
2547   This does the shutdown on all nodes of the instance.
2548
2549   If the ignore_primary is false, errors on the primary node are
2550   ignored.
2551
2552   """
2553   result = True
2554   for disk in instance.disks:
2555     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2556       lu.cfg.SetDiskID(top_disk, node)
2557       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2558       if result.failed or not result.data:
2559         logging.error("Could not shutdown block device %s on node %s",
2560                       disk.iv_name, node)
2561         if not ignore_primary or node != instance.primary_node:
2562           result = False
2563   return result
2564
2565
2566 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2567   """Checks if a node has enough free memory.
2568
2569   This function check if a given node has the needed amount of free
2570   memory. In case the node has less memory or we cannot get the
2571   information from the node, this function raise an OpPrereqError
2572   exception.
2573
2574   @type lu: C{LogicalUnit}
2575   @param lu: a logical unit from which we get configuration data
2576   @type node: C{str}
2577   @param node: the node to check
2578   @type reason: C{str}
2579   @param reason: string to use in the error message
2580   @type requested: C{int}
2581   @param requested: the amount of memory in MiB to check for
2582   @type hypervisor_name: C{str}
2583   @param hypervisor_name: the hypervisor to ask for memory stats
2584   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2585       we cannot check the node
2586
2587   """
2588   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2589   nodeinfo[node].Raise()
2590   free_mem = nodeinfo[node].data.get('memory_free')
2591   if not isinstance(free_mem, int):
2592     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2593                              " was '%s'" % (node, free_mem))
2594   if requested > free_mem:
2595     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2596                              " needed %s MiB, available %s MiB" %
2597                              (node, reason, requested, free_mem))
2598
2599
2600 class LUStartupInstance(LogicalUnit):
2601   """Starts an instance.
2602
2603   """
2604   HPATH = "instance-start"
2605   HTYPE = constants.HTYPE_INSTANCE
2606   _OP_REQP = ["instance_name", "force"]
2607   REQ_BGL = False
2608
2609   def ExpandNames(self):
2610     self._ExpandAndLockInstance()
2611
2612   def BuildHooksEnv(self):
2613     """Build hooks env.
2614
2615     This runs on master, primary and secondary nodes of the instance.
2616
2617     """
2618     env = {
2619       "FORCE": self.op.force,
2620       }
2621     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2623     return env, nl, nl
2624
2625   def CheckPrereq(self):
2626     """Check prerequisites.
2627
2628     This checks that the instance is in the cluster.
2629
2630     """
2631     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632     assert self.instance is not None, \
2633       "Cannot retrieve locked instance %s" % self.op.instance_name
2634
2635     _CheckNodeOnline(self, instance.primary_node)
2636
2637     bep = self.cfg.GetClusterInfo().FillBE(instance)
2638     # check bridges existance
2639     _CheckInstanceBridgesExist(self, instance)
2640
2641     _CheckNodeFreeMemory(self, instance.primary_node,
2642                          "starting instance %s" % instance.name,
2643                          bep[constants.BE_MEMORY], instance.hypervisor)
2644
2645   def Exec(self, feedback_fn):
2646     """Start the instance.
2647
2648     """
2649     instance = self.instance
2650     force = self.op.force
2651     extra_args = getattr(self.op, "extra_args", "")
2652
2653     self.cfg.MarkInstanceUp(instance.name)
2654
2655     node_current = instance.primary_node
2656
2657     _StartInstanceDisks(self, instance, force)
2658
2659     result = self.rpc.call_instance_start(node_current, instance, extra_args)
2660     msg = result.RemoteFailMsg()
2661     if msg:
2662       _ShutdownInstanceDisks(self, instance)
2663       raise errors.OpExecError("Could not start instance: %s" % msg)
2664
2665
2666 class LURebootInstance(LogicalUnit):
2667   """Reboot an instance.
2668
2669   """
2670   HPATH = "instance-reboot"
2671   HTYPE = constants.HTYPE_INSTANCE
2672   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2673   REQ_BGL = False
2674
2675   def ExpandNames(self):
2676     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2677                                    constants.INSTANCE_REBOOT_HARD,
2678                                    constants.INSTANCE_REBOOT_FULL]:
2679       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2680                                   (constants.INSTANCE_REBOOT_SOFT,
2681                                    constants.INSTANCE_REBOOT_HARD,
2682                                    constants.INSTANCE_REBOOT_FULL))
2683     self._ExpandAndLockInstance()
2684
2685   def BuildHooksEnv(self):
2686     """Build hooks env.
2687
2688     This runs on master, primary and secondary nodes of the instance.
2689
2690     """
2691     env = {
2692       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2693       }
2694     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2695     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2696     return env, nl, nl
2697
2698   def CheckPrereq(self):
2699     """Check prerequisites.
2700
2701     This checks that the instance is in the cluster.
2702
2703     """
2704     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2705     assert self.instance is not None, \
2706       "Cannot retrieve locked instance %s" % self.op.instance_name
2707
2708     _CheckNodeOnline(self, instance.primary_node)
2709
2710     # check bridges existance
2711     _CheckInstanceBridgesExist(self, instance)
2712
2713   def Exec(self, feedback_fn):
2714     """Reboot the instance.
2715
2716     """
2717     instance = self.instance
2718     ignore_secondaries = self.op.ignore_secondaries
2719     reboot_type = self.op.reboot_type
2720     extra_args = getattr(self.op, "extra_args", "")
2721
2722     node_current = instance.primary_node
2723
2724     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2725                        constants.INSTANCE_REBOOT_HARD]:
2726       result = self.rpc.call_instance_reboot(node_current, instance,
2727                                              reboot_type, extra_args)
2728       if result.failed or not result.data:
2729         raise errors.OpExecError("Could not reboot instance")
2730     else:
2731       if not self.rpc.call_instance_shutdown(node_current, instance):
2732         raise errors.OpExecError("could not shutdown instance for full reboot")
2733       _ShutdownInstanceDisks(self, instance)
2734       _StartInstanceDisks(self, instance, ignore_secondaries)
2735       result = self.rpc.call_instance_start(node_current, instance, extra_args)
2736       msg = result.RemoteFailMsg()
2737       if msg:
2738         _ShutdownInstanceDisks(self, instance)
2739         raise errors.OpExecError("Could not start instance for"
2740                                  " full reboot: %s" % msg)
2741
2742     self.cfg.MarkInstanceUp(instance.name)
2743
2744
2745 class LUShutdownInstance(LogicalUnit):
2746   """Shutdown an instance.
2747
2748   """
2749   HPATH = "instance-stop"
2750   HTYPE = constants.HTYPE_INSTANCE
2751   _OP_REQP = ["instance_name"]
2752   REQ_BGL = False
2753
2754   def ExpandNames(self):
2755     self._ExpandAndLockInstance()
2756
2757   def BuildHooksEnv(self):
2758     """Build hooks env.
2759
2760     This runs on master, primary and secondary nodes of the instance.
2761
2762     """
2763     env = _BuildInstanceHookEnvByObject(self, self.instance)
2764     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2765     return env, nl, nl
2766
2767   def CheckPrereq(self):
2768     """Check prerequisites.
2769
2770     This checks that the instance is in the cluster.
2771
2772     """
2773     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2774     assert self.instance is not None, \
2775       "Cannot retrieve locked instance %s" % self.op.instance_name
2776     _CheckNodeOnline(self, self.instance.primary_node)
2777
2778   def Exec(self, feedback_fn):
2779     """Shutdown the instance.
2780
2781     """
2782     instance = self.instance
2783     node_current = instance.primary_node
2784     self.cfg.MarkInstanceDown(instance.name)
2785     result = self.rpc.call_instance_shutdown(node_current, instance)
2786     if result.failed or not result.data:
2787       self.proc.LogWarning("Could not shutdown instance")
2788
2789     _ShutdownInstanceDisks(self, instance)
2790
2791
2792 class LUReinstallInstance(LogicalUnit):
2793   """Reinstall an instance.
2794
2795   """
2796   HPATH = "instance-reinstall"
2797   HTYPE = constants.HTYPE_INSTANCE
2798   _OP_REQP = ["instance_name"]
2799   REQ_BGL = False
2800
2801   def ExpandNames(self):
2802     self._ExpandAndLockInstance()
2803
2804   def BuildHooksEnv(self):
2805     """Build hooks env.
2806
2807     This runs on master, primary and secondary nodes of the instance.
2808
2809     """
2810     env = _BuildInstanceHookEnvByObject(self, self.instance)
2811     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2812     return env, nl, nl
2813
2814   def CheckPrereq(self):
2815     """Check prerequisites.
2816
2817     This checks that the instance is in the cluster and is not running.
2818
2819     """
2820     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2821     assert instance is not None, \
2822       "Cannot retrieve locked instance %s" % self.op.instance_name
2823     _CheckNodeOnline(self, instance.primary_node)
2824
2825     if instance.disk_template == constants.DT_DISKLESS:
2826       raise errors.OpPrereqError("Instance '%s' has no disks" %
2827                                  self.op.instance_name)
2828     if instance.admin_up:
2829       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2830                                  self.op.instance_name)
2831     remote_info = self.rpc.call_instance_info(instance.primary_node,
2832                                               instance.name,
2833                                               instance.hypervisor)
2834     if remote_info.failed or remote_info.data:
2835       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2836                                  (self.op.instance_name,
2837                                   instance.primary_node))
2838
2839     self.op.os_type = getattr(self.op, "os_type", None)
2840     if self.op.os_type is not None:
2841       # OS verification
2842       pnode = self.cfg.GetNodeInfo(
2843         self.cfg.ExpandNodeName(instance.primary_node))
2844       if pnode is None:
2845         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2846                                    self.op.pnode)
2847       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2848       result.Raise()
2849       if not isinstance(result.data, objects.OS):
2850         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2851                                    " primary node"  % self.op.os_type)
2852
2853     self.instance = instance
2854
2855   def Exec(self, feedback_fn):
2856     """Reinstall the instance.
2857
2858     """
2859     inst = self.instance
2860
2861     if self.op.os_type is not None:
2862       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2863       inst.os = self.op.os_type
2864       self.cfg.Update(inst)
2865
2866     _StartInstanceDisks(self, inst, None)
2867     try:
2868       feedback_fn("Running the instance OS create scripts...")
2869       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2870       msg = result.RemoteFailMsg()
2871       if msg:
2872         raise errors.OpExecError("Could not install OS for instance %s"
2873                                  " on node %s: %s" %
2874                                  (inst.name, inst.primary_node, msg))
2875     finally:
2876       _ShutdownInstanceDisks(self, inst)
2877
2878
2879 class LURenameInstance(LogicalUnit):
2880   """Rename an instance.
2881
2882   """
2883   HPATH = "instance-rename"
2884   HTYPE = constants.HTYPE_INSTANCE
2885   _OP_REQP = ["instance_name", "new_name"]
2886
2887   def BuildHooksEnv(self):
2888     """Build hooks env.
2889
2890     This runs on master, primary and secondary nodes of the instance.
2891
2892     """
2893     env = _BuildInstanceHookEnvByObject(self, self.instance)
2894     env["INSTANCE_NEW_NAME"] = self.op.new_name
2895     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2896     return env, nl, nl
2897
2898   def CheckPrereq(self):
2899     """Check prerequisites.
2900
2901     This checks that the instance is in the cluster and is not running.
2902
2903     """
2904     instance = self.cfg.GetInstanceInfo(
2905       self.cfg.ExpandInstanceName(self.op.instance_name))
2906     if instance is None:
2907       raise errors.OpPrereqError("Instance '%s' not known" %
2908                                  self.op.instance_name)
2909     _CheckNodeOnline(self, instance.primary_node)
2910
2911     if instance.admin_up:
2912       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2913                                  self.op.instance_name)
2914     remote_info = self.rpc.call_instance_info(instance.primary_node,
2915                                               instance.name,
2916                                               instance.hypervisor)
2917     remote_info.Raise()
2918     if remote_info.data:
2919       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2920                                  (self.op.instance_name,
2921                                   instance.primary_node))
2922     self.instance = instance
2923
2924     # new name verification
2925     name_info = utils.HostInfo(self.op.new_name)
2926
2927     self.op.new_name = new_name = name_info.name
2928     instance_list = self.cfg.GetInstanceList()
2929     if new_name in instance_list:
2930       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2931                                  new_name)
2932
2933     if not getattr(self.op, "ignore_ip", False):
2934       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2935         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2936                                    (name_info.ip, new_name))
2937
2938
2939   def Exec(self, feedback_fn):
2940     """Reinstall the instance.
2941
2942     """
2943     inst = self.instance
2944     old_name = inst.name
2945
2946     if inst.disk_template == constants.DT_FILE:
2947       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2948
2949     self.cfg.RenameInstance(inst.name, self.op.new_name)
2950     # Change the instance lock. This is definitely safe while we hold the BGL
2951     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2952     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2953
2954     # re-read the instance from the configuration after rename
2955     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2956
2957     if inst.disk_template == constants.DT_FILE:
2958       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2959       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2960                                                      old_file_storage_dir,
2961                                                      new_file_storage_dir)
2962       result.Raise()
2963       if not result.data:
2964         raise errors.OpExecError("Could not connect to node '%s' to rename"
2965                                  " directory '%s' to '%s' (but the instance"
2966                                  " has been renamed in Ganeti)" % (
2967                                  inst.primary_node, old_file_storage_dir,
2968                                  new_file_storage_dir))
2969
2970       if not result.data[0]:
2971         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2972                                  " (but the instance has been renamed in"
2973                                  " Ganeti)" % (old_file_storage_dir,
2974                                                new_file_storage_dir))
2975
2976     _StartInstanceDisks(self, inst, None)
2977     try:
2978       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2979                                                  old_name)
2980       msg = result.RemoteFailMsg()
2981       if msg:
2982         msg = ("Could not run OS rename script for instance %s on node %s"
2983                " (but the instance has been renamed in Ganeti): %s" %
2984                (inst.name, inst.primary_node, msg))
2985         self.proc.LogWarning(msg)
2986     finally:
2987       _ShutdownInstanceDisks(self, inst)
2988
2989
2990 class LURemoveInstance(LogicalUnit):
2991   """Remove an instance.
2992
2993   """
2994   HPATH = "instance-remove"
2995   HTYPE = constants.HTYPE_INSTANCE
2996   _OP_REQP = ["instance_name", "ignore_failures"]
2997   REQ_BGL = False
2998
2999   def ExpandNames(self):
3000     self._ExpandAndLockInstance()
3001     self.needed_locks[locking.LEVEL_NODE] = []
3002     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3003
3004   def DeclareLocks(self, level):
3005     if level == locking.LEVEL_NODE:
3006       self._LockInstancesNodes()
3007
3008   def BuildHooksEnv(self):
3009     """Build hooks env.
3010
3011     This runs on master, primary and secondary nodes of the instance.
3012
3013     """
3014     env = _BuildInstanceHookEnvByObject(self, self.instance)
3015     nl = [self.cfg.GetMasterNode()]
3016     return env, nl, nl
3017
3018   def CheckPrereq(self):
3019     """Check prerequisites.
3020
3021     This checks that the instance is in the cluster.
3022
3023     """
3024     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3025     assert self.instance is not None, \
3026       "Cannot retrieve locked instance %s" % self.op.instance_name
3027
3028   def Exec(self, feedback_fn):
3029     """Remove the instance.
3030
3031     """
3032     instance = self.instance
3033     logging.info("Shutting down instance %s on node %s",
3034                  instance.name, instance.primary_node)
3035
3036     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3037     if result.failed or not result.data:
3038       if self.op.ignore_failures:
3039         feedback_fn("Warning: can't shutdown instance")
3040       else:
3041         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3042                                  (instance.name, instance.primary_node))
3043
3044     logging.info("Removing block devices for instance %s", instance.name)
3045
3046     if not _RemoveDisks(self, instance):
3047       if self.op.ignore_failures:
3048         feedback_fn("Warning: can't remove instance's disks")
3049       else:
3050         raise errors.OpExecError("Can't remove instance's disks")
3051
3052     logging.info("Removing instance %s out of cluster config", instance.name)
3053
3054     self.cfg.RemoveInstance(instance.name)
3055     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3056
3057
3058 class LUQueryInstances(NoHooksLU):
3059   """Logical unit for querying instances.
3060
3061   """
3062   _OP_REQP = ["output_fields", "names"]
3063   REQ_BGL = False
3064   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3065                                     "admin_state", "admin_ram",
3066                                     "disk_template", "ip", "mac", "bridge",
3067                                     "sda_size", "sdb_size", "vcpus", "tags",
3068                                     "network_port", "beparams",
3069                                     "(disk).(size)/([0-9]+)",
3070                                     "(disk).(sizes)",
3071                                     "(nic).(mac|ip|bridge)/([0-9]+)",
3072                                     "(nic).(macs|ips|bridges)",
3073                                     "(disk|nic).(count)",
3074                                     "serial_no", "hypervisor", "hvparams",] +
3075                                   ["hv/%s" % name
3076                                    for name in constants.HVS_PARAMETERS] +
3077                                   ["be/%s" % name
3078                                    for name in constants.BES_PARAMETERS])
3079   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3080
3081
3082   def ExpandNames(self):
3083     _CheckOutputFields(static=self._FIELDS_STATIC,
3084                        dynamic=self._FIELDS_DYNAMIC,
3085                        selected=self.op.output_fields)
3086
3087     self.needed_locks = {}
3088     self.share_locks[locking.LEVEL_INSTANCE] = 1
3089     self.share_locks[locking.LEVEL_NODE] = 1
3090
3091     if self.op.names:
3092       self.wanted = _GetWantedInstances(self, self.op.names)
3093     else:
3094       self.wanted = locking.ALL_SET
3095
3096     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3097     if self.do_locking:
3098       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3099       self.needed_locks[locking.LEVEL_NODE] = []
3100       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3101
3102   def DeclareLocks(self, level):
3103     if level == locking.LEVEL_NODE and self.do_locking:
3104       self._LockInstancesNodes()
3105
3106   def CheckPrereq(self):
3107     """Check prerequisites.
3108
3109     """
3110     pass
3111
3112   def Exec(self, feedback_fn):
3113     """Computes the list of nodes and their attributes.
3114
3115     """
3116     all_info = self.cfg.GetAllInstancesInfo()
3117     if self.do_locking:
3118       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3119     elif self.wanted != locking.ALL_SET:
3120       instance_names = self.wanted
3121       missing = set(instance_names).difference(all_info.keys())
3122       if missing:
3123         raise errors.OpExecError(
3124           "Some instances were removed before retrieving their data: %s"
3125           % missing)
3126     else:
3127       instance_names = all_info.keys()
3128
3129     instance_names = utils.NiceSort(instance_names)
3130     instance_list = [all_info[iname] for iname in instance_names]
3131
3132     # begin data gathering
3133
3134     nodes = frozenset([inst.primary_node for inst in instance_list])
3135     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3136
3137     bad_nodes = []
3138     off_nodes = []
3139     if self.do_locking:
3140       live_data = {}
3141       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3142       for name in nodes:
3143         result = node_data[name]
3144         if result.offline:
3145           # offline nodes will be in both lists
3146           off_nodes.append(name)
3147         if result.failed:
3148           bad_nodes.append(name)
3149         else:
3150           if result.data:
3151             live_data.update(result.data)
3152             # else no instance is alive
3153     else:
3154       live_data = dict([(name, {}) for name in instance_names])
3155
3156     # end data gathering
3157
3158     HVPREFIX = "hv/"
3159     BEPREFIX = "be/"
3160     output = []
3161     for instance in instance_list:
3162       iout = []
3163       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3164       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3165       for field in self.op.output_fields:
3166         st_match = self._FIELDS_STATIC.Matches(field)
3167         if field == "name":
3168           val = instance.name
3169         elif field == "os":
3170           val = instance.os
3171         elif field == "pnode":
3172           val = instance.primary_node
3173         elif field == "snodes":
3174           val = list(instance.secondary_nodes)
3175         elif field == "admin_state":
3176           val = instance.admin_up
3177         elif field == "oper_state":
3178           if instance.primary_node in bad_nodes:
3179             val = None
3180           else:
3181             val = bool(live_data.get(instance.name))
3182         elif field == "status":
3183           if instance.primary_node in off_nodes:
3184             val = "ERROR_nodeoffline"
3185           elif instance.primary_node in bad_nodes:
3186             val = "ERROR_nodedown"
3187           else:
3188             running = bool(live_data.get(instance.name))
3189             if running:
3190               if instance.admin_up:
3191                 val = "running"
3192               else:
3193                 val = "ERROR_up"
3194             else:
3195               if instance.admin_up:
3196                 val = "ERROR_down"
3197               else:
3198                 val = "ADMIN_down"
3199         elif field == "oper_ram":
3200           if instance.primary_node in bad_nodes:
3201             val = None
3202           elif instance.name in live_data:
3203             val = live_data[instance.name].get("memory", "?")
3204           else:
3205             val = "-"
3206         elif field == "disk_template":
3207           val = instance.disk_template
3208         elif field == "ip":
3209           val = instance.nics[0].ip
3210         elif field == "bridge":
3211           val = instance.nics[0].bridge
3212         elif field == "mac":
3213           val = instance.nics[0].mac
3214         elif field == "sda_size" or field == "sdb_size":
3215           idx = ord(field[2]) - ord('a')
3216           try:
3217             val = instance.FindDisk(idx).size
3218           except errors.OpPrereqError:
3219             val = None
3220         elif field == "tags":
3221           val = list(instance.GetTags())
3222         elif field == "serial_no":
3223           val = instance.serial_no
3224         elif field == "network_port":
3225           val = instance.network_port
3226         elif field == "hypervisor":
3227           val = instance.hypervisor
3228         elif field == "hvparams":
3229           val = i_hv
3230         elif (field.startswith(HVPREFIX) and
3231               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3232           val = i_hv.get(field[len(HVPREFIX):], None)
3233         elif field == "beparams":
3234           val = i_be
3235         elif (field.startswith(BEPREFIX) and
3236               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3237           val = i_be.get(field[len(BEPREFIX):], None)
3238         elif st_match and st_match.groups():
3239           # matches a variable list
3240           st_groups = st_match.groups()
3241           if st_groups and st_groups[0] == "disk":
3242             if st_groups[1] == "count":
3243               val = len(instance.disks)
3244             elif st_groups[1] == "sizes":
3245               val = [disk.size for disk in instance.disks]
3246             elif st_groups[1] == "size":
3247               try:
3248                 val = instance.FindDisk(st_groups[2]).size
3249               except errors.OpPrereqError:
3250                 val = None
3251             else:
3252               assert False, "Unhandled disk parameter"
3253           elif st_groups[0] == "nic":
3254             if st_groups[1] == "count":
3255               val = len(instance.nics)
3256             elif st_groups[1] == "macs":
3257               val = [nic.mac for nic in instance.nics]
3258             elif st_groups[1] == "ips":
3259               val = [nic.ip for nic in instance.nics]
3260             elif st_groups[1] == "bridges":
3261               val = [nic.bridge for nic in instance.nics]
3262             else:
3263               # index-based item
3264               nic_idx = int(st_groups[2])
3265               if nic_idx >= len(instance.nics):
3266                 val = None
3267               else:
3268                 if st_groups[1] == "mac":
3269                   val = instance.nics[nic_idx].mac
3270                 elif st_groups[1] == "ip":
3271                   val = instance.nics[nic_idx].ip
3272                 elif st_groups[1] == "bridge":
3273                   val = instance.nics[nic_idx].bridge
3274                 else:
3275                   assert False, "Unhandled NIC parameter"
3276           else:
3277             assert False, "Unhandled variable parameter"
3278         else:
3279           raise errors.ParameterError(field)
3280         iout.append(val)
3281       output.append(iout)
3282
3283     return output
3284
3285
3286 class LUFailoverInstance(LogicalUnit):
3287   """Failover an instance.
3288
3289   """
3290   HPATH = "instance-failover"
3291   HTYPE = constants.HTYPE_INSTANCE
3292   _OP_REQP = ["instance_name", "ignore_consistency"]
3293   REQ_BGL = False
3294
3295   def ExpandNames(self):
3296     self._ExpandAndLockInstance()
3297     self.needed_locks[locking.LEVEL_NODE] = []
3298     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3299
3300   def DeclareLocks(self, level):
3301     if level == locking.LEVEL_NODE:
3302       self._LockInstancesNodes()
3303
3304   def BuildHooksEnv(self):
3305     """Build hooks env.
3306
3307     This runs on master, primary and secondary nodes of the instance.
3308
3309     """
3310     env = {
3311       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3312       }
3313     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3314     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3315     return env, nl, nl
3316
3317   def CheckPrereq(self):
3318     """Check prerequisites.
3319
3320     This checks that the instance is in the cluster.
3321
3322     """
3323     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3324     assert self.instance is not None, \
3325       "Cannot retrieve locked instance %s" % self.op.instance_name
3326
3327     bep = self.cfg.GetClusterInfo().FillBE(instance)
3328     if instance.disk_template not in constants.DTS_NET_MIRROR:
3329       raise errors.OpPrereqError("Instance's disk layout is not"
3330                                  " network mirrored, cannot failover.")
3331
3332     secondary_nodes = instance.secondary_nodes
3333     if not secondary_nodes:
3334       raise errors.ProgrammerError("no secondary node but using "
3335                                    "a mirrored disk template")
3336
3337     target_node = secondary_nodes[0]
3338     _CheckNodeOnline(self, target_node)
3339     # check memory requirements on the secondary node
3340     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3341                          instance.name, bep[constants.BE_MEMORY],
3342                          instance.hypervisor)
3343
3344     # check bridge existance
3345     brlist = [nic.bridge for nic in instance.nics]
3346     result = self.rpc.call_bridges_exist(target_node, brlist)
3347     result.Raise()
3348     if not result.data:
3349       raise errors.OpPrereqError("One or more target bridges %s does not"
3350                                  " exist on destination node '%s'" %
3351                                  (brlist, target_node))
3352
3353   def Exec(self, feedback_fn):
3354     """Failover an instance.
3355
3356     The failover is done by shutting it down on its present node and
3357     starting it on the secondary.
3358
3359     """
3360     instance = self.instance
3361
3362     source_node = instance.primary_node
3363     target_node = instance.secondary_nodes[0]
3364
3365     feedback_fn("* checking disk consistency between source and target")
3366     for dev in instance.disks:
3367       # for drbd, these are drbd over lvm
3368       if not _CheckDiskConsistency(self, dev, target_node, False):
3369         if instance.admin_up and not self.op.ignore_consistency:
3370           raise errors.OpExecError("Disk %s is degraded on target node,"
3371                                    " aborting failover." % dev.iv_name)
3372
3373     feedback_fn("* shutting down instance on source node")
3374     logging.info("Shutting down instance %s on node %s",
3375                  instance.name, source_node)
3376
3377     result = self.rpc.call_instance_shutdown(source_node, instance)
3378     if result.failed or not result.data:
3379       if self.op.ignore_consistency:
3380         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3381                              " Proceeding"
3382                              " anyway. Please make sure node %s is down",
3383                              instance.name, source_node, source_node)
3384       else:
3385         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3386                                  (instance.name, source_node))
3387
3388     feedback_fn("* deactivating the instance's disks on source node")
3389     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3390       raise errors.OpExecError("Can't shut down the instance's disks.")
3391
3392     instance.primary_node = target_node
3393     # distribute new instance config to the other nodes
3394     self.cfg.Update(instance)
3395
3396     # Only start the instance if it's marked as up
3397     if instance.admin_up:
3398       feedback_fn("* activating the instance's disks on target node")
3399       logging.info("Starting instance %s on node %s",
3400                    instance.name, target_node)
3401
3402       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3403                                                ignore_secondaries=True)
3404       if not disks_ok:
3405         _ShutdownInstanceDisks(self, instance)
3406         raise errors.OpExecError("Can't activate the instance's disks")
3407
3408       feedback_fn("* starting the instance on the target node")
3409       result = self.rpc.call_instance_start(target_node, instance, None)
3410       msg = result.RemoteFailMsg()
3411       if msg:
3412         _ShutdownInstanceDisks(self, instance)
3413         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3414                                  (instance.name, target_node, msg))
3415
3416
3417 class LUMigrateInstance(LogicalUnit):
3418   """Migrate an instance.
3419
3420   This is migration without shutting down, compared to the failover,
3421   which is done with shutdown.
3422
3423   """
3424   HPATH = "instance-migrate"
3425   HTYPE = constants.HTYPE_INSTANCE
3426   _OP_REQP = ["instance_name", "live", "cleanup"]
3427
3428   REQ_BGL = False
3429
3430   def ExpandNames(self):
3431     self._ExpandAndLockInstance()
3432     self.needed_locks[locking.LEVEL_NODE] = []
3433     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3434
3435   def DeclareLocks(self, level):
3436     if level == locking.LEVEL_NODE:
3437       self._LockInstancesNodes()
3438
3439   def BuildHooksEnv(self):
3440     """Build hooks env.
3441
3442     This runs on master, primary and secondary nodes of the instance.
3443
3444     """
3445     env = _BuildInstanceHookEnvByObject(self, self.instance)
3446     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3447     return env, nl, nl
3448
3449   def CheckPrereq(self):
3450     """Check prerequisites.
3451
3452     This checks that the instance is in the cluster.
3453
3454     """
3455     instance = self.cfg.GetInstanceInfo(
3456       self.cfg.ExpandInstanceName(self.op.instance_name))
3457     if instance is None:
3458       raise errors.OpPrereqError("Instance '%s' not known" %
3459                                  self.op.instance_name)
3460
3461     if instance.disk_template != constants.DT_DRBD8:
3462       raise errors.OpPrereqError("Instance's disk layout is not"
3463                                  " drbd8, cannot migrate.")
3464
3465     secondary_nodes = instance.secondary_nodes
3466     if not secondary_nodes:
3467       raise errors.ProgrammerError("no secondary node but using "
3468                                    "drbd8 disk template")
3469
3470     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3471
3472     target_node = secondary_nodes[0]
3473     # check memory requirements on the secondary node
3474     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3475                          instance.name, i_be[constants.BE_MEMORY],
3476                          instance.hypervisor)
3477
3478     # check bridge existance
3479     brlist = [nic.bridge for nic in instance.nics]
3480     result = self.rpc.call_bridges_exist(target_node, brlist)
3481     if result.failed or not result.data:
3482       raise errors.OpPrereqError("One or more target bridges %s does not"
3483                                  " exist on destination node '%s'" %
3484                                  (brlist, target_node))
3485
3486     if not self.op.cleanup:
3487       result = self.rpc.call_instance_migratable(instance.primary_node,
3488                                                  instance)
3489       msg = result.RemoteFailMsg()
3490       if msg:
3491         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3492                                    msg)
3493
3494     self.instance = instance
3495
3496   def _WaitUntilSync(self):
3497     """Poll with custom rpc for disk sync.
3498
3499     This uses our own step-based rpc call.
3500
3501     """
3502     self.feedback_fn("* wait until resync is done")
3503     all_done = False
3504     while not all_done:
3505       all_done = True
3506       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3507                                             self.nodes_ip,
3508                                             self.instance.disks)
3509       min_percent = 100
3510       for node, nres in result.items():
3511         msg = nres.RemoteFailMsg()
3512         if msg:
3513           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3514                                    (node, msg))
3515         node_done, node_percent = nres.data[1]
3516         all_done = all_done and node_done
3517         if node_percent is not None:
3518           min_percent = min(min_percent, node_percent)
3519       if not all_done:
3520         if min_percent < 100:
3521           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3522         time.sleep(2)
3523
3524   def _EnsureSecondary(self, node):
3525     """Demote a node to secondary.
3526
3527     """
3528     self.feedback_fn("* switching node %s to secondary mode" % node)
3529
3530     for dev in self.instance.disks:
3531       self.cfg.SetDiskID(dev, node)
3532
3533     result = self.rpc.call_blockdev_close(node, self.instance.name,
3534                                           self.instance.disks)
3535     msg = result.RemoteFailMsg()
3536     if msg:
3537       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3538                                " error %s" % (node, msg))
3539
3540   def _GoStandalone(self):
3541     """Disconnect from the network.
3542
3543     """
3544     self.feedback_fn("* changing into standalone mode")
3545     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3546                                                self.instance.disks)
3547     for node, nres in result.items():
3548       msg = nres.RemoteFailMsg()
3549       if msg:
3550         raise errors.OpExecError("Cannot disconnect disks node %s,"
3551                                  " error %s" % (node, msg))
3552
3553   def _GoReconnect(self, multimaster):
3554     """Reconnect to the network.
3555
3556     """
3557     if multimaster:
3558       msg = "dual-master"
3559     else:
3560       msg = "single-master"
3561     self.feedback_fn("* changing disks into %s mode" % msg)
3562     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3563                                            self.instance.disks,
3564                                            self.instance.name, multimaster)
3565     for node, nres in result.items():
3566       msg = nres.RemoteFailMsg()
3567       if msg:
3568         raise errors.OpExecError("Cannot change disks config on node %s,"
3569                                  " error: %s" % (node, msg))
3570
3571   def _ExecCleanup(self):
3572     """Try to cleanup after a failed migration.
3573
3574     The cleanup is done by:
3575       - check that the instance is running only on one node
3576         (and update the config if needed)
3577       - change disks on its secondary node to secondary
3578       - wait until disks are fully synchronized
3579       - disconnect from the network
3580       - change disks into single-master mode
3581       - wait again until disks are fully synchronized
3582
3583     """
3584     instance = self.instance
3585     target_node = self.target_node
3586     source_node = self.source_node
3587
3588     # check running on only one node
3589     self.feedback_fn("* checking where the instance actually runs"
3590                      " (if this hangs, the hypervisor might be in"
3591                      " a bad state)")
3592     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3593     for node, result in ins_l.items():
3594       result.Raise()
3595       if not isinstance(result.data, list):
3596         raise errors.OpExecError("Can't contact node '%s'" % node)
3597
3598     runningon_source = instance.name in ins_l[source_node].data
3599     runningon_target = instance.name in ins_l[target_node].data
3600
3601     if runningon_source and runningon_target:
3602       raise errors.OpExecError("Instance seems to be running on two nodes,"
3603                                " or the hypervisor is confused. You will have"
3604                                " to ensure manually that it runs only on one"
3605                                " and restart this operation.")
3606
3607     if not (runningon_source or runningon_target):
3608       raise errors.OpExecError("Instance does not seem to be running at all."
3609                                " In this case, it's safer to repair by"
3610                                " running 'gnt-instance stop' to ensure disk"
3611                                " shutdown, and then restarting it.")
3612
3613     if runningon_target:
3614       # the migration has actually succeeded, we need to update the config
3615       self.feedback_fn("* instance running on secondary node (%s),"
3616                        " updating config" % target_node)
3617       instance.primary_node = target_node
3618       self.cfg.Update(instance)
3619       demoted_node = source_node
3620     else:
3621       self.feedback_fn("* instance confirmed to be running on its"
3622                        " primary node (%s)" % source_node)
3623       demoted_node = target_node
3624
3625     self._EnsureSecondary(demoted_node)
3626     try:
3627       self._WaitUntilSync()
3628     except errors.OpExecError:
3629       # we ignore here errors, since if the device is standalone, it
3630       # won't be able to sync
3631       pass
3632     self._GoStandalone()
3633     self._GoReconnect(False)
3634     self._WaitUntilSync()
3635
3636     self.feedback_fn("* done")
3637
3638   def _RevertDiskStatus(self):
3639     """Try to revert the disk status after a failed migration.
3640
3641     """
3642     target_node = self.target_node
3643     try:
3644       self._EnsureSecondary(target_node)
3645       self._GoStandalone()
3646       self._GoReconnect(False)
3647       self._WaitUntilSync()
3648     except errors.OpExecError, err:
3649       self.LogWarning("Migration failed and I can't reconnect the"
3650                       " drives: error '%s'\n"
3651                       "Please look and recover the instance status" %
3652                       str(err))
3653
3654   def _AbortMigration(self):
3655     """Call the hypervisor code to abort a started migration.
3656
3657     """
3658     instance = self.instance
3659     target_node = self.target_node
3660     migration_info = self.migration_info
3661
3662     abort_result = self.rpc.call_finalize_migration(target_node,
3663                                                     instance,
3664                                                     migration_info,
3665                                                     False)
3666     abort_msg = abort_result.RemoteFailMsg()
3667     if abort_msg:
3668       logging.error("Aborting migration failed on target node %s: %s" %
3669                     (target_node, abort_msg))
3670       # Don't raise an exception here, as we stil have to try to revert the
3671       # disk status, even if this step failed.
3672
3673   def _ExecMigration(self):
3674     """Migrate an instance.
3675
3676     The migrate is done by:
3677       - change the disks into dual-master mode
3678       - wait until disks are fully synchronized again
3679       - migrate the instance
3680       - change disks on the new secondary node (the old primary) to secondary
3681       - wait until disks are fully synchronized
3682       - change disks into single-master mode
3683
3684     """
3685     instance = self.instance
3686     target_node = self.target_node
3687     source_node = self.source_node
3688
3689     self.feedback_fn("* checking disk consistency between source and target")
3690     for dev in instance.disks:
3691       if not _CheckDiskConsistency(self, dev, target_node, False):
3692         raise errors.OpExecError("Disk %s is degraded or not fully"
3693                                  " synchronized on target node,"
3694                                  " aborting migrate." % dev.iv_name)
3695
3696     # First get the migration information from the remote node
3697     result = self.rpc.call_migration_info(source_node, instance)
3698     msg = result.RemoteFailMsg()
3699     if msg:
3700       log_err = ("Failed fetching source migration information from %s: %s" %
3701                   (source_node, msg))
3702       logging.error(log_err)
3703       raise errors.OpExecError(log_err)
3704
3705     self.migration_info = migration_info = result.data[1]
3706
3707     # Then switch the disks to master/master mode
3708     self._EnsureSecondary(target_node)
3709     self._GoStandalone()
3710     self._GoReconnect(True)
3711     self._WaitUntilSync()
3712
3713     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3714     result = self.rpc.call_accept_instance(target_node,
3715                                            instance,
3716                                            migration_info,
3717                                            self.nodes_ip[target_node])
3718
3719     msg = result.RemoteFailMsg()
3720     if msg:
3721       logging.error("Instance pre-migration failed, trying to revert"
3722                     " disk status: %s", msg)
3723       self._AbortMigration()
3724       self._RevertDiskStatus()
3725       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3726                                (instance.name, msg))
3727
3728     self.feedback_fn("* migrating instance to %s" % target_node)
3729     time.sleep(10)
3730     result = self.rpc.call_instance_migrate(source_node, instance,
3731                                             self.nodes_ip[target_node],
3732                                             self.op.live)
3733     msg = result.RemoteFailMsg()
3734     if msg:
3735       logging.error("Instance migration failed, trying to revert"
3736                     " disk status: %s", msg)
3737       self._AbortMigration()
3738       self._RevertDiskStatus()
3739       raise errors.OpExecError("Could not migrate instance %s: %s" %
3740                                (instance.name, msg))
3741     time.sleep(10)
3742
3743     instance.primary_node = target_node
3744     # distribute new instance config to the other nodes
3745     self.cfg.Update(instance)
3746
3747     result = self.rpc.call_finalize_migration(target_node,
3748                                               instance,
3749                                               migration_info,
3750                                               True)
3751     msg = result.RemoteFailMsg()
3752     if msg:
3753       logging.error("Instance migration succeeded, but finalization failed:"
3754                     " %s" % msg)
3755       raise errors.OpExecError("Could not finalize instance migration: %s" %
3756                                msg)
3757
3758     self._EnsureSecondary(source_node)
3759     self._WaitUntilSync()
3760     self._GoStandalone()
3761     self._GoReconnect(False)
3762     self._WaitUntilSync()
3763
3764     self.feedback_fn("* done")
3765
3766   def Exec(self, feedback_fn):
3767     """Perform the migration.
3768
3769     """
3770     self.feedback_fn = feedback_fn
3771
3772     self.source_node = self.instance.primary_node
3773     self.target_node = self.instance.secondary_nodes[0]
3774     self.all_nodes = [self.source_node, self.target_node]
3775     self.nodes_ip = {
3776       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3777       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3778       }
3779     if self.op.cleanup:
3780       return self._ExecCleanup()
3781     else:
3782       return self._ExecMigration()
3783
3784
3785 def _CreateBlockDev(lu, node, instance, device, force_create,
3786                     info, force_open):
3787   """Create a tree of block devices on a given node.
3788
3789   If this device type has to be created on secondaries, create it and
3790   all its children.
3791
3792   If not, just recurse to children keeping the same 'force' value.
3793
3794   @param lu: the lu on whose behalf we execute
3795   @param node: the node on which to create the device
3796   @type instance: L{objects.Instance}
3797   @param instance: the instance which owns the device
3798   @type device: L{objects.Disk}
3799   @param device: the device to create
3800   @type force_create: boolean
3801   @param force_create: whether to force creation of this device; this
3802       will be change to True whenever we find a device which has
3803       CreateOnSecondary() attribute
3804   @param info: the extra 'metadata' we should attach to the device
3805       (this will be represented as a LVM tag)
3806   @type force_open: boolean
3807   @param force_open: this parameter will be passes to the
3808       L{backend.CreateBlockDevice} function where it specifies
3809       whether we run on primary or not, and it affects both
3810       the child assembly and the device own Open() execution
3811
3812   """
3813   if device.CreateOnSecondary():
3814     force_create = True
3815
3816   if device.children:
3817     for child in device.children:
3818       _CreateBlockDev(lu, node, instance, child, force_create,
3819                       info, force_open)
3820
3821   if not force_create:
3822     return
3823
3824   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3825
3826
3827 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3828   """Create a single block device on a given node.
3829
3830   This will not recurse over children of the device, so they must be
3831   created in advance.
3832
3833   @param lu: the lu on whose behalf we execute
3834   @param node: the node on which to create the device
3835   @type instance: L{objects.Instance}
3836   @param instance: the instance which owns the device
3837   @type device: L{objects.Disk}
3838   @param device: the device to create
3839   @param info: the extra 'metadata' we should attach to the device
3840       (this will be represented as a LVM tag)
3841   @type force_open: boolean
3842   @param force_open: this parameter will be passes to the
3843       L{backend.CreateBlockDevice} function where it specifies
3844       whether we run on primary or not, and it affects both
3845       the child assembly and the device own Open() execution
3846
3847   """
3848   lu.cfg.SetDiskID(device, node)
3849   result = lu.rpc.call_blockdev_create(node, device, device.size,
3850                                        instance.name, force_open, info)
3851   msg = result.RemoteFailMsg()
3852   if msg:
3853     raise errors.OpExecError("Can't create block device %s on"
3854                              " node %s for instance %s: %s" %
3855                              (device, node, instance.name, msg))
3856   if device.physical_id is None:
3857     device.physical_id = result.data[1]
3858
3859
3860 def _GenerateUniqueNames(lu, exts):
3861   """Generate a suitable LV name.
3862
3863   This will generate a logical volume name for the given instance.
3864
3865   """
3866   results = []
3867   for val in exts:
3868     new_id = lu.cfg.GenerateUniqueID()
3869     results.append("%s%s" % (new_id, val))
3870   return results
3871
3872
3873 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3874                          p_minor, s_minor):
3875   """Generate a drbd8 device complete with its children.
3876
3877   """
3878   port = lu.cfg.AllocatePort()
3879   vgname = lu.cfg.GetVGName()
3880   shared_secret = lu.cfg.GenerateDRBDSecret()
3881   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3882                           logical_id=(vgname, names[0]))
3883   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3884                           logical_id=(vgname, names[1]))
3885   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3886                           logical_id=(primary, secondary, port,
3887                                       p_minor, s_minor,
3888                                       shared_secret),
3889                           children=[dev_data, dev_meta],
3890                           iv_name=iv_name)
3891   return drbd_dev
3892
3893
3894 def _GenerateDiskTemplate(lu, template_name,
3895                           instance_name, primary_node,
3896                           secondary_nodes, disk_info,
3897                           file_storage_dir, file_driver,
3898                           base_index):
3899   """Generate the entire disk layout for a given template type.
3900
3901   """
3902   #TODO: compute space requirements
3903
3904   vgname = lu.cfg.GetVGName()
3905   disk_count = len(disk_info)
3906   disks = []
3907   if template_name == constants.DT_DISKLESS:
3908     pass
3909   elif template_name == constants.DT_PLAIN:
3910     if len(secondary_nodes) != 0:
3911       raise errors.ProgrammerError("Wrong template configuration")
3912
3913     names = _GenerateUniqueNames(lu, [".disk%d" % i
3914                                       for i in range(disk_count)])
3915     for idx, disk in enumerate(disk_info):
3916       disk_index = idx + base_index
3917       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3918                               logical_id=(vgname, names[idx]),
3919                               iv_name="disk/%d" % disk_index,
3920                               mode=disk["mode"])
3921       disks.append(disk_dev)
3922   elif template_name == constants.DT_DRBD8:
3923     if len(secondary_nodes) != 1:
3924       raise errors.ProgrammerError("Wrong template configuration")
3925     remote_node = secondary_nodes[0]
3926     minors = lu.cfg.AllocateDRBDMinor(
3927       [primary_node, remote_node] * len(disk_info), instance_name)
3928
3929     names = []
3930     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
3931                                                for i in range(disk_count)]):
3932       names.append(lv_prefix + "_data")
3933       names.append(lv_prefix + "_meta")
3934     for idx, disk in enumerate(disk_info):
3935       disk_index = idx + base_index
3936       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3937                                       disk["size"], names[idx*2:idx*2+2],
3938                                       "disk/%d" % disk_index,
3939                                       minors[idx*2], minors[idx*2+1])
3940       disk_dev.mode = disk["mode"]
3941       disks.append(disk_dev)
3942   elif template_name == constants.DT_FILE:
3943     if len(secondary_nodes) != 0:
3944       raise errors.ProgrammerError("Wrong template configuration")
3945
3946     for idx, disk in enumerate(disk_info):
3947       disk_index = idx + base_index
3948       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3949                               iv_name="disk/%d" % disk_index,
3950                               logical_id=(file_driver,
3951                                           "%s/disk%d" % (file_storage_dir,
3952                                                          idx)),
3953                               mode=disk["mode"])
3954       disks.append(disk_dev)
3955   else:
3956     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3957   return disks
3958
3959
3960 def _GetInstanceInfoText(instance):
3961   """Compute that text that should be added to the disk's metadata.
3962
3963   """
3964   return "originstname+%s" % instance.name
3965
3966
3967 def _CreateDisks(lu, instance):
3968   """Create all disks for an instance.
3969
3970   This abstracts away some work from AddInstance.
3971
3972   @type lu: L{LogicalUnit}
3973   @param lu: the logical unit on whose behalf we execute
3974   @type instance: L{objects.Instance}
3975   @param instance: the instance whose disks we should create
3976   @rtype: boolean
3977   @return: the success of the creation
3978
3979   """
3980   info = _GetInstanceInfoText(instance)
3981   pnode = instance.primary_node
3982
3983   if instance.disk_template == constants.DT_FILE:
3984     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3985     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
3986
3987     if result.failed or not result.data:
3988       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
3989
3990     if not result.data[0]:
3991       raise errors.OpExecError("Failed to create directory '%s'" %
3992                                file_storage_dir)
3993
3994   # Note: this needs to be kept in sync with adding of disks in
3995   # LUSetInstanceParams
3996   for device in instance.disks:
3997     logging.info("Creating volume %s for instance %s",
3998                  device.iv_name, instance.name)
3999     #HARDCODE
4000     for node in instance.all_nodes:
4001       f_create = node == pnode
4002       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4003
4004
4005 def _RemoveDisks(lu, instance):
4006   """Remove all disks for an instance.
4007
4008   This abstracts away some work from `AddInstance()` and
4009   `RemoveInstance()`. Note that in case some of the devices couldn't
4010   be removed, the removal will continue with the other ones (compare
4011   with `_CreateDisks()`).
4012
4013   @type lu: L{LogicalUnit}
4014   @param lu: the logical unit on whose behalf we execute
4015   @type instance: L{objects.Instance}
4016   @param instance: the instance whose disks we should remove
4017   @rtype: boolean
4018   @return: the success of the removal
4019
4020   """
4021   logging.info("Removing block devices for instance %s", instance.name)
4022
4023   result = True
4024   for device in instance.disks:
4025     for node, disk in device.ComputeNodeTree(instance.primary_node):
4026       lu.cfg.SetDiskID(disk, node)
4027       result = lu.rpc.call_blockdev_remove(node, disk)
4028       if result.failed or not result.data:
4029         lu.proc.LogWarning("Could not remove block device %s on node %s,"
4030                            " continuing anyway", device.iv_name, node)
4031         result = False
4032
4033   if instance.disk_template == constants.DT_FILE:
4034     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4035     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4036                                                  file_storage_dir)
4037     if result.failed or not result.data:
4038       logging.error("Could not remove directory '%s'", file_storage_dir)
4039       result = False
4040
4041   return result
4042
4043
4044 def _ComputeDiskSize(disk_template, disks):
4045   """Compute disk size requirements in the volume group
4046
4047   """
4048   # Required free disk space as a function of disk and swap space
4049   req_size_dict = {
4050     constants.DT_DISKLESS: None,
4051     constants.DT_PLAIN: sum(d["size"] for d in disks),
4052     # 128 MB are added for drbd metadata for each disk
4053     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4054     constants.DT_FILE: None,
4055   }
4056
4057   if disk_template not in req_size_dict:
4058     raise errors.ProgrammerError("Disk template '%s' size requirement"
4059                                  " is unknown" %  disk_template)
4060
4061   return req_size_dict[disk_template]
4062
4063
4064 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4065   """Hypervisor parameter validation.
4066
4067   This function abstract the hypervisor parameter validation to be
4068   used in both instance create and instance modify.
4069
4070   @type lu: L{LogicalUnit}
4071   @param lu: the logical unit for which we check
4072   @type nodenames: list
4073   @param nodenames: the list of nodes on which we should check
4074   @type hvname: string
4075   @param hvname: the name of the hypervisor we should use
4076   @type hvparams: dict
4077   @param hvparams: the parameters which we need to check
4078   @raise errors.OpPrereqError: if the parameters are not valid
4079
4080   """
4081   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4082                                                   hvname,
4083                                                   hvparams)
4084   for node in nodenames:
4085     info = hvinfo[node]
4086     if info.offline:
4087       continue
4088     info.Raise()
4089     if not info.data or not isinstance(info.data, (tuple, list)):
4090       raise errors.OpPrereqError("Cannot get current information"
4091                                  " from node '%s' (%s)" % (node, info.data))
4092     if not info.data[0]:
4093       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4094                                  " %s" % info.data[1])
4095
4096
4097 class LUCreateInstance(LogicalUnit):
4098   """Create an instance.
4099
4100   """
4101   HPATH = "instance-add"
4102   HTYPE = constants.HTYPE_INSTANCE
4103   _OP_REQP = ["instance_name", "disks", "disk_template",
4104               "mode", "start",
4105               "wait_for_sync", "ip_check", "nics",
4106               "hvparams", "beparams"]
4107   REQ_BGL = False
4108
4109   def _ExpandNode(self, node):
4110     """Expands and checks one node name.
4111
4112     """
4113     node_full = self.cfg.ExpandNodeName(node)
4114     if node_full is None:
4115       raise errors.OpPrereqError("Unknown node %s" % node)
4116     return node_full
4117
4118   def ExpandNames(self):
4119     """ExpandNames for CreateInstance.
4120
4121     Figure out the right locks for instance creation.
4122
4123     """
4124     self.needed_locks = {}
4125
4126     # set optional parameters to none if they don't exist
4127     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4128       if not hasattr(self.op, attr):
4129         setattr(self.op, attr, None)
4130
4131     # cheap checks, mostly valid constants given
4132
4133     # verify creation mode
4134     if self.op.mode not in (constants.INSTANCE_CREATE,
4135                             constants.INSTANCE_IMPORT):
4136       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4137                                  self.op.mode)
4138
4139     # disk template and mirror node verification
4140     if self.op.disk_template not in constants.DISK_TEMPLATES:
4141       raise errors.OpPrereqError("Invalid disk template name")
4142
4143     if self.op.hypervisor is None:
4144       self.op.hypervisor = self.cfg.GetHypervisorType()
4145
4146     cluster = self.cfg.GetClusterInfo()
4147     enabled_hvs = cluster.enabled_hypervisors
4148     if self.op.hypervisor not in enabled_hvs:
4149       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4150                                  " cluster (%s)" % (self.op.hypervisor,
4151                                   ",".join(enabled_hvs)))
4152
4153     # check hypervisor parameter syntax (locally)
4154
4155     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4156                                   self.op.hvparams)
4157     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4158     hv_type.CheckParameterSyntax(filled_hvp)
4159
4160     # fill and remember the beparams dict
4161     utils.CheckBEParams(self.op.beparams)
4162     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4163                                     self.op.beparams)
4164
4165     #### instance parameters check
4166
4167     # instance name verification
4168     hostname1 = utils.HostInfo(self.op.instance_name)
4169     self.op.instance_name = instance_name = hostname1.name
4170
4171     # this is just a preventive check, but someone might still add this
4172     # instance in the meantime, and creation will fail at lock-add time
4173     if instance_name in self.cfg.GetInstanceList():
4174       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4175                                  instance_name)
4176
4177     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4178
4179     # NIC buildup
4180     self.nics = []
4181     for nic in self.op.nics:
4182       # ip validity checks
4183       ip = nic.get("ip", None)
4184       if ip is None or ip.lower() == "none":
4185         nic_ip = None
4186       elif ip.lower() == constants.VALUE_AUTO:
4187         nic_ip = hostname1.ip
4188       else:
4189         if not utils.IsValidIP(ip):
4190           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4191                                      " like a valid IP" % ip)
4192         nic_ip = ip
4193
4194       # MAC address verification
4195       mac = nic.get("mac", constants.VALUE_AUTO)
4196       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4197         if not utils.IsValidMac(mac.lower()):
4198           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4199                                      mac)
4200       # bridge verification
4201       bridge = nic.get("bridge", None)
4202       if bridge is None:
4203         bridge = self.cfg.GetDefBridge()
4204       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4205
4206     # disk checks/pre-build
4207     self.disks = []
4208     for disk in self.op.disks:
4209       mode = disk.get("mode", constants.DISK_RDWR)
4210       if mode not in constants.DISK_ACCESS_SET:
4211         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4212                                    mode)
4213       size = disk.get("size", None)
4214       if size is None:
4215         raise errors.OpPrereqError("Missing disk size")
4216       try:
4217         size = int(size)
4218       except ValueError:
4219         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4220       self.disks.append({"size": size, "mode": mode})
4221
4222     # used in CheckPrereq for ip ping check
4223     self.check_ip = hostname1.ip
4224
4225     # file storage checks
4226     if (self.op.file_driver and
4227         not self.op.file_driver in constants.FILE_DRIVER):
4228       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4229                                  self.op.file_driver)
4230
4231     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4232       raise errors.OpPrereqError("File storage directory path not absolute")
4233
4234     ### Node/iallocator related checks
4235     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4236       raise errors.OpPrereqError("One and only one of iallocator and primary"
4237                                  " node must be given")
4238
4239     if self.op.iallocator:
4240       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4241     else:
4242       self.op.pnode = self._ExpandNode(self.op.pnode)
4243       nodelist = [self.op.pnode]
4244       if self.op.snode is not None:
4245         self.op.snode = self._ExpandNode(self.op.snode)
4246         nodelist.append(self.op.snode)
4247       self.needed_locks[locking.LEVEL_NODE] = nodelist
4248
4249     # in case of import lock the source node too
4250     if self.op.mode == constants.INSTANCE_IMPORT:
4251       src_node = getattr(self.op, "src_node", None)
4252       src_path = getattr(self.op, "src_path", None)
4253
4254       if src_path is None:
4255         self.op.src_path = src_path = self.op.instance_name
4256
4257       if src_node is None:
4258         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4259         self.op.src_node = None
4260         if os.path.isabs(src_path):
4261           raise errors.OpPrereqError("Importing an instance from an absolute"
4262                                      " path requires a source node option.")
4263       else:
4264         self.op.src_node = src_node = self._ExpandNode(src_node)
4265         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4266           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4267         if not os.path.isabs(src_path):
4268           self.op.src_path = src_path = \
4269             os.path.join(constants.EXPORT_DIR, src_path)
4270
4271     else: # INSTANCE_CREATE
4272       if getattr(self.op, "os_type", None) is None:
4273         raise errors.OpPrereqError("No guest OS specified")
4274
4275   def _RunAllocator(self):
4276     """Run the allocator based on input opcode.
4277
4278     """
4279     nics = [n.ToDict() for n in self.nics]
4280     ial = IAllocator(self,
4281                      mode=constants.IALLOCATOR_MODE_ALLOC,
4282                      name=self.op.instance_name,
4283                      disk_template=self.op.disk_template,
4284                      tags=[],
4285                      os=self.op.os_type,
4286                      vcpus=self.be_full[constants.BE_VCPUS],
4287                      mem_size=self.be_full[constants.BE_MEMORY],
4288                      disks=self.disks,
4289                      nics=nics,
4290                      hypervisor=self.op.hypervisor,
4291                      )
4292
4293     ial.Run(self.op.iallocator)
4294
4295     if not ial.success:
4296       raise errors.OpPrereqError("Can't compute nodes using"
4297                                  " iallocator '%s': %s" % (self.op.iallocator,
4298                                                            ial.info))
4299     if len(ial.nodes) != ial.required_nodes:
4300       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4301                                  " of nodes (%s), required %s" %
4302                                  (self.op.iallocator, len(ial.nodes),
4303                                   ial.required_nodes))
4304     self.op.pnode = ial.nodes[0]
4305     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4306                  self.op.instance_name, self.op.iallocator,
4307                  ", ".join(ial.nodes))
4308     if ial.required_nodes == 2:
4309       self.op.snode = ial.nodes[1]
4310
4311   def BuildHooksEnv(self):
4312     """Build hooks env.
4313
4314     This runs on master, primary and secondary nodes of the instance.
4315
4316     """
4317     env = {
4318       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4319       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4320       "INSTANCE_ADD_MODE": self.op.mode,
4321       }
4322     if self.op.mode == constants.INSTANCE_IMPORT:
4323       env["INSTANCE_SRC_NODE"] = self.op.src_node
4324       env["INSTANCE_SRC_PATH"] = self.op.src_path
4325       env["INSTANCE_SRC_IMAGES"] = self.src_images
4326
4327     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4328       primary_node=self.op.pnode,
4329       secondary_nodes=self.secondaries,
4330       status=self.instance_status,
4331       os_type=self.op.os_type,
4332       memory=self.be_full[constants.BE_MEMORY],
4333       vcpus=self.be_full[constants.BE_VCPUS],
4334       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4335     ))
4336
4337     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4338           self.secondaries)
4339     return env, nl, nl
4340
4341
4342   def CheckPrereq(self):
4343     """Check prerequisites.
4344
4345     """
4346     if (not self.cfg.GetVGName() and
4347         self.op.disk_template not in constants.DTS_NOT_LVM):
4348       raise errors.OpPrereqError("Cluster does not support lvm-based"
4349                                  " instances")
4350
4351
4352     if self.op.mode == constants.INSTANCE_IMPORT:
4353       src_node = self.op.src_node
4354       src_path = self.op.src_path
4355
4356       if src_node is None:
4357         exp_list = self.rpc.call_export_list(
4358           self.acquired_locks[locking.LEVEL_NODE])
4359         found = False
4360         for node in exp_list:
4361           if not exp_list[node].failed and src_path in exp_list[node].data:
4362             found = True
4363             self.op.src_node = src_node = node
4364             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4365                                                        src_path)
4366             break
4367         if not found:
4368           raise errors.OpPrereqError("No export found for relative path %s" %
4369                                       src_path)
4370
4371       _CheckNodeOnline(self, src_node)
4372       result = self.rpc.call_export_info(src_node, src_path)
4373       result.Raise()
4374       if not result.data:
4375         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4376
4377       export_info = result.data
4378       if not export_info.has_section(constants.INISECT_EXP):
4379         raise errors.ProgrammerError("Corrupted export config")
4380
4381       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4382       if (int(ei_version) != constants.EXPORT_VERSION):
4383         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4384                                    (ei_version, constants.EXPORT_VERSION))
4385
4386       # Check that the new instance doesn't have less disks than the export
4387       instance_disks = len(self.disks)
4388       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4389       if instance_disks < export_disks:
4390         raise errors.OpPrereqError("Not enough disks to import."
4391                                    " (instance: %d, export: %d)" %
4392                                    (instance_disks, export_disks))
4393
4394       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4395       disk_images = []
4396       for idx in range(export_disks):
4397         option = 'disk%d_dump' % idx
4398         if export_info.has_option(constants.INISECT_INS, option):
4399           # FIXME: are the old os-es, disk sizes, etc. useful?
4400           export_name = export_info.get(constants.INISECT_INS, option)
4401           image = os.path.join(src_path, export_name)
4402           disk_images.append(image)
4403         else:
4404           disk_images.append(False)
4405
4406       self.src_images = disk_images
4407
4408       old_name = export_info.get(constants.INISECT_INS, 'name')
4409       # FIXME: int() here could throw a ValueError on broken exports
4410       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4411       if self.op.instance_name == old_name:
4412         for idx, nic in enumerate(self.nics):
4413           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4414             nic_mac_ini = 'nic%d_mac' % idx
4415             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4416
4417     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4418     if self.op.start and not self.op.ip_check:
4419       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4420                                  " adding an instance in start mode")
4421
4422     if self.op.ip_check:
4423       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4424         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4425                                    (self.check_ip, self.op.instance_name))
4426
4427     #### allocator run
4428
4429     if self.op.iallocator is not None:
4430       self._RunAllocator()
4431
4432     #### node related checks
4433
4434     # check primary node
4435     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4436     assert self.pnode is not None, \
4437       "Cannot retrieve locked node %s" % self.op.pnode
4438     if pnode.offline:
4439       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4440                                  pnode.name)
4441
4442     self.secondaries = []
4443
4444     # mirror node verification
4445     if self.op.disk_template in constants.DTS_NET_MIRROR:
4446       if self.op.snode is None:
4447         raise errors.OpPrereqError("The networked disk templates need"
4448                                    " a mirror node")
4449       if self.op.snode == pnode.name:
4450         raise errors.OpPrereqError("The secondary node cannot be"
4451                                    " the primary node.")
4452       self.secondaries.append(self.op.snode)
4453       _CheckNodeOnline(self, self.op.snode)
4454
4455     nodenames = [pnode.name] + self.secondaries
4456
4457     req_size = _ComputeDiskSize(self.op.disk_template,
4458                                 self.disks)
4459
4460     # Check lv size requirements
4461     if req_size is not None:
4462       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4463                                          self.op.hypervisor)
4464       for node in nodenames:
4465         info = nodeinfo[node]
4466         info.Raise()
4467         info = info.data
4468         if not info:
4469           raise errors.OpPrereqError("Cannot get current information"
4470                                      " from node '%s'" % node)
4471         vg_free = info.get('vg_free', None)
4472         if not isinstance(vg_free, int):
4473           raise errors.OpPrereqError("Can't compute free disk space on"
4474                                      " node %s" % node)
4475         if req_size > info['vg_free']:
4476           raise errors.OpPrereqError("Not enough disk space on target node %s."
4477                                      " %d MB available, %d MB required" %
4478                                      (node, info['vg_free'], req_size))
4479
4480     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4481
4482     # os verification
4483     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4484     result.Raise()
4485     if not isinstance(result.data, objects.OS):
4486       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4487                                  " primary node"  % self.op.os_type)
4488
4489     # bridge check on primary node
4490     bridges = [n.bridge for n in self.nics]
4491     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4492     result.Raise()
4493     if not result.data:
4494       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4495                                  " exist on destination node '%s'" %
4496                                  (",".join(bridges), pnode.name))
4497
4498     # memory check on primary node
4499     if self.op.start:
4500       _CheckNodeFreeMemory(self, self.pnode.name,
4501                            "creating instance %s" % self.op.instance_name,
4502                            self.be_full[constants.BE_MEMORY],
4503                            self.op.hypervisor)
4504
4505     self.instance_status = self.op.start
4506
4507   def Exec(self, feedback_fn):
4508     """Create and add the instance to the cluster.
4509
4510     """
4511     instance = self.op.instance_name
4512     pnode_name = self.pnode.name
4513
4514     for nic in self.nics:
4515       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4516         nic.mac = self.cfg.GenerateMAC()
4517
4518     ht_kind = self.op.hypervisor
4519     if ht_kind in constants.HTS_REQ_PORT:
4520       network_port = self.cfg.AllocatePort()
4521     else:
4522       network_port = None
4523
4524     ##if self.op.vnc_bind_address is None:
4525     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4526
4527     # this is needed because os.path.join does not accept None arguments
4528     if self.op.file_storage_dir is None:
4529       string_file_storage_dir = ""
4530     else:
4531       string_file_storage_dir = self.op.file_storage_dir
4532
4533     # build the full file storage dir path
4534     file_storage_dir = os.path.normpath(os.path.join(
4535                                         self.cfg.GetFileStorageDir(),
4536                                         string_file_storage_dir, instance))
4537
4538
4539     disks = _GenerateDiskTemplate(self,
4540                                   self.op.disk_template,
4541                                   instance, pnode_name,
4542                                   self.secondaries,
4543                                   self.disks,
4544                                   file_storage_dir,
4545                                   self.op.file_driver,
4546                                   0)
4547
4548     iobj = objects.Instance(name=instance, os=self.op.os_type,
4549                             primary_node=pnode_name,
4550                             nics=self.nics, disks=disks,
4551                             disk_template=self.op.disk_template,
4552                             admin_up=self.instance_status,
4553                             network_port=network_port,
4554                             beparams=self.op.beparams,
4555                             hvparams=self.op.hvparams,
4556                             hypervisor=self.op.hypervisor,
4557                             )
4558
4559     feedback_fn("* creating instance disks...")
4560     try:
4561       _CreateDisks(self, iobj)
4562     except errors.OpExecError:
4563       self.LogWarning("Device creation failed, reverting...")
4564       try:
4565         _RemoveDisks(self, iobj)
4566       finally:
4567         self.cfg.ReleaseDRBDMinors(instance)
4568         raise
4569
4570     feedback_fn("adding instance %s to cluster config" % instance)
4571
4572     self.cfg.AddInstance(iobj)
4573     # Declare that we don't want to remove the instance lock anymore, as we've
4574     # added the instance to the config
4575     del self.remove_locks[locking.LEVEL_INSTANCE]
4576     # Unlock all the nodes
4577     if self.op.mode == constants.INSTANCE_IMPORT:
4578       nodes_keep = [self.op.src_node]
4579       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4580                        if node != self.op.src_node]
4581       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4582       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4583     else:
4584       self.context.glm.release(locking.LEVEL_NODE)
4585       del self.acquired_locks[locking.LEVEL_NODE]
4586
4587     if self.op.wait_for_sync:
4588       disk_abort = not _WaitForSync(self, iobj)
4589     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4590       # make sure the disks are not degraded (still sync-ing is ok)
4591       time.sleep(15)
4592       feedback_fn("* checking mirrors status")
4593       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4594     else:
4595       disk_abort = False
4596
4597     if disk_abort:
4598       _RemoveDisks(self, iobj)
4599       self.cfg.RemoveInstance(iobj.name)
4600       # Make sure the instance lock gets removed
4601       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4602       raise errors.OpExecError("There are some degraded disks for"
4603                                " this instance")
4604
4605     feedback_fn("creating os for instance %s on node %s" %
4606                 (instance, pnode_name))
4607
4608     if iobj.disk_template != constants.DT_DISKLESS:
4609       if self.op.mode == constants.INSTANCE_CREATE:
4610         feedback_fn("* running the instance OS create scripts...")
4611         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4612         msg = result.RemoteFailMsg()
4613         if msg:
4614           raise errors.OpExecError("Could not add os for instance %s"
4615                                    " on node %s: %s" %
4616                                    (instance, pnode_name, msg))
4617
4618       elif self.op.mode == constants.INSTANCE_IMPORT:
4619         feedback_fn("* running the instance OS import scripts...")
4620         src_node = self.op.src_node
4621         src_images = self.src_images
4622         cluster_name = self.cfg.GetClusterName()
4623         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4624                                                          src_node, src_images,
4625                                                          cluster_name)
4626         import_result.Raise()
4627         for idx, result in enumerate(import_result.data):
4628           if not result:
4629             self.LogWarning("Could not import the image %s for instance"
4630                             " %s, disk %d, on node %s" %
4631                             (src_images[idx], instance, idx, pnode_name))
4632       else:
4633         # also checked in the prereq part
4634         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4635                                      % self.op.mode)
4636
4637     if self.op.start:
4638       logging.info("Starting instance %s on node %s", instance, pnode_name)
4639       feedback_fn("* starting instance...")
4640       result = self.rpc.call_instance_start(pnode_name, iobj, None)
4641       msg = result.RemoteFailMsg()
4642       if msg:
4643         raise errors.OpExecError("Could not start instance: %s" % msg)
4644
4645
4646 class LUConnectConsole(NoHooksLU):
4647   """Connect to an instance's console.
4648
4649   This is somewhat special in that it returns the command line that
4650   you need to run on the master node in order to connect to the
4651   console.
4652
4653   """
4654   _OP_REQP = ["instance_name"]
4655   REQ_BGL = False
4656
4657   def ExpandNames(self):
4658     self._ExpandAndLockInstance()
4659
4660   def CheckPrereq(self):
4661     """Check prerequisites.
4662
4663     This checks that the instance is in the cluster.
4664
4665     """
4666     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4667     assert self.instance is not None, \
4668       "Cannot retrieve locked instance %s" % self.op.instance_name
4669     _CheckNodeOnline(self, self.instance.primary_node)
4670
4671   def Exec(self, feedback_fn):
4672     """Connect to the console of an instance
4673
4674     """
4675     instance = self.instance
4676     node = instance.primary_node
4677
4678     node_insts = self.rpc.call_instance_list([node],
4679                                              [instance.hypervisor])[node]
4680     node_insts.Raise()
4681
4682     if instance.name not in node_insts.data:
4683       raise errors.OpExecError("Instance %s is not running." % instance.name)
4684
4685     logging.debug("Connecting to console of %s on %s", instance.name, node)
4686
4687     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4688     console_cmd = hyper.GetShellCommandForConsole(instance)
4689
4690     # build ssh cmdline
4691     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4692
4693
4694 class LUReplaceDisks(LogicalUnit):
4695   """Replace the disks of an instance.
4696
4697   """
4698   HPATH = "mirrors-replace"
4699   HTYPE = constants.HTYPE_INSTANCE
4700   _OP_REQP = ["instance_name", "mode", "disks"]
4701   REQ_BGL = False
4702
4703   def CheckArguments(self):
4704     if not hasattr(self.op, "remote_node"):
4705       self.op.remote_node = None
4706     if not hasattr(self.op, "iallocator"):
4707       self.op.iallocator = None
4708
4709     # check for valid parameter combination
4710     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4711     if self.op.mode == constants.REPLACE_DISK_CHG:
4712       if cnt == 2:
4713         raise errors.OpPrereqError("When changing the secondary either an"
4714                                    " iallocator script must be used or the"
4715                                    " new node given")
4716       elif cnt == 0:
4717         raise errors.OpPrereqError("Give either the iallocator or the new"
4718                                    " secondary, not both")
4719     else: # not replacing the secondary
4720       if cnt != 2:
4721         raise errors.OpPrereqError("The iallocator and new node options can"
4722                                    " be used only when changing the"
4723                                    " secondary node")
4724
4725   def ExpandNames(self):
4726     self._ExpandAndLockInstance()
4727
4728     if self.op.iallocator is not None:
4729       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4730     elif self.op.remote_node is not None:
4731       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4732       if remote_node is None:
4733         raise errors.OpPrereqError("Node '%s' not known" %
4734                                    self.op.remote_node)
4735       self.op.remote_node = remote_node
4736       # Warning: do not remove the locking of the new secondary here
4737       # unless DRBD8.AddChildren is changed to work in parallel;
4738       # currently it doesn't since parallel invocations of
4739       # FindUnusedMinor will conflict
4740       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4741       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4742     else:
4743       self.needed_locks[locking.LEVEL_NODE] = []
4744       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4745
4746   def DeclareLocks(self, level):
4747     # If we're not already locking all nodes in the set we have to declare the
4748     # instance's primary/secondary nodes.
4749     if (level == locking.LEVEL_NODE and
4750         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4751       self._LockInstancesNodes()
4752
4753   def _RunAllocator(self):
4754     """Compute a new secondary node using an IAllocator.
4755
4756     """
4757     ial = IAllocator(self,
4758                      mode=constants.IALLOCATOR_MODE_RELOC,
4759                      name=self.op.instance_name,
4760                      relocate_from=[self.sec_node])
4761
4762     ial.Run(self.op.iallocator)
4763
4764     if not ial.success:
4765       raise errors.OpPrereqError("Can't compute nodes using"
4766                                  " iallocator '%s': %s" % (self.op.iallocator,
4767                                                            ial.info))
4768     if len(ial.nodes) != ial.required_nodes:
4769       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4770                                  " of nodes (%s), required %s" %
4771                                  (len(ial.nodes), ial.required_nodes))
4772     self.op.remote_node = ial.nodes[0]
4773     self.LogInfo("Selected new secondary for the instance: %s",
4774                  self.op.remote_node)
4775
4776   def BuildHooksEnv(self):
4777     """Build hooks env.
4778
4779     This runs on the master, the primary and all the secondaries.
4780
4781     """
4782     env = {
4783       "MODE": self.op.mode,
4784       "NEW_SECONDARY": self.op.remote_node,
4785       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4786       }
4787     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4788     nl = [
4789       self.cfg.GetMasterNode(),
4790       self.instance.primary_node,
4791       ]
4792     if self.op.remote_node is not None:
4793       nl.append(self.op.remote_node)
4794     return env, nl, nl
4795
4796   def CheckPrereq(self):
4797     """Check prerequisites.
4798
4799     This checks that the instance is in the cluster.
4800
4801     """
4802     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4803     assert instance is not None, \
4804       "Cannot retrieve locked instance %s" % self.op.instance_name
4805     self.instance = instance
4806
4807     if instance.disk_template != constants.DT_DRBD8:
4808       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4809                                  " instances")
4810
4811     if len(instance.secondary_nodes) != 1:
4812       raise errors.OpPrereqError("The instance has a strange layout,"
4813                                  " expected one secondary but found %d" %
4814                                  len(instance.secondary_nodes))
4815
4816     self.sec_node = instance.secondary_nodes[0]
4817
4818     if self.op.iallocator is not None:
4819       self._RunAllocator()
4820
4821     remote_node = self.op.remote_node
4822     if remote_node is not None:
4823       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4824       assert self.remote_node_info is not None, \
4825         "Cannot retrieve locked node %s" % remote_node
4826     else:
4827       self.remote_node_info = None
4828     if remote_node == instance.primary_node:
4829       raise errors.OpPrereqError("The specified node is the primary node of"
4830                                  " the instance.")
4831     elif remote_node == self.sec_node:
4832       raise errors.OpPrereqError("The specified node is already the"
4833                                  " secondary node of the instance.")
4834
4835     if self.op.mode == constants.REPLACE_DISK_PRI:
4836       n1 = self.tgt_node = instance.primary_node
4837       n2 = self.oth_node = self.sec_node
4838     elif self.op.mode == constants.REPLACE_DISK_SEC:
4839       n1 = self.tgt_node = self.sec_node
4840       n2 = self.oth_node = instance.primary_node
4841     elif self.op.mode == constants.REPLACE_DISK_CHG:
4842       n1 = self.new_node = remote_node
4843       n2 = self.oth_node = instance.primary_node
4844       self.tgt_node = self.sec_node
4845     else:
4846       raise errors.ProgrammerError("Unhandled disk replace mode")
4847
4848     _CheckNodeOnline(self, n1)
4849     _CheckNodeOnline(self, n2)
4850
4851     if not self.op.disks:
4852       self.op.disks = range(len(instance.disks))
4853
4854     for disk_idx in self.op.disks:
4855       instance.FindDisk(disk_idx)
4856
4857   def _ExecD8DiskOnly(self, feedback_fn):
4858     """Replace a disk on the primary or secondary for dbrd8.
4859
4860     The algorithm for replace is quite complicated:
4861
4862       1. for each disk to be replaced:
4863
4864         1. create new LVs on the target node with unique names
4865         1. detach old LVs from the drbd device
4866         1. rename old LVs to name_replaced.<time_t>
4867         1. rename new LVs to old LVs
4868         1. attach the new LVs (with the old names now) to the drbd device
4869
4870       1. wait for sync across all devices
4871
4872       1. for each modified disk:
4873
4874         1. remove old LVs (which have the name name_replaces.<time_t>)
4875
4876     Failures are not very well handled.
4877
4878     """
4879     steps_total = 6
4880     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4881     instance = self.instance
4882     iv_names = {}
4883     vgname = self.cfg.GetVGName()
4884     # start of work
4885     cfg = self.cfg
4886     tgt_node = self.tgt_node
4887     oth_node = self.oth_node
4888
4889     # Step: check device activation
4890     self.proc.LogStep(1, steps_total, "check device existence")
4891     info("checking volume groups")
4892     my_vg = cfg.GetVGName()
4893     results = self.rpc.call_vg_list([oth_node, tgt_node])
4894     if not results:
4895       raise errors.OpExecError("Can't list volume groups on the nodes")
4896     for node in oth_node, tgt_node:
4897       res = results[node]
4898       if res.failed or not res.data or my_vg not in res.data:
4899         raise errors.OpExecError("Volume group '%s' not found on %s" %
4900                                  (my_vg, node))
4901     for idx, dev in enumerate(instance.disks):
4902       if idx not in self.op.disks:
4903         continue
4904       for node in tgt_node, oth_node:
4905         info("checking disk/%d on %s" % (idx, node))
4906         cfg.SetDiskID(dev, node)
4907         if not self.rpc.call_blockdev_find(node, dev):
4908           raise errors.OpExecError("Can't find disk/%d on node %s" %
4909                                    (idx, node))
4910
4911     # Step: check other node consistency
4912     self.proc.LogStep(2, steps_total, "check peer consistency")
4913     for idx, dev in enumerate(instance.disks):
4914       if idx not in self.op.disks:
4915         continue
4916       info("checking disk/%d consistency on %s" % (idx, oth_node))
4917       if not _CheckDiskConsistency(self, dev, oth_node,
4918                                    oth_node==instance.primary_node):
4919         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4920                                  " to replace disks on this node (%s)" %
4921                                  (oth_node, tgt_node))
4922
4923     # Step: create new storage
4924     self.proc.LogStep(3, steps_total, "allocate new storage")
4925     for idx, dev in enumerate(instance.disks):
4926       if idx not in self.op.disks:
4927         continue
4928       size = dev.size
4929       cfg.SetDiskID(dev, tgt_node)
4930       lv_names = [".disk%d_%s" % (idx, suf)
4931                   for suf in ["data", "meta"]]
4932       names = _GenerateUniqueNames(self, lv_names)
4933       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4934                              logical_id=(vgname, names[0]))
4935       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4936                              logical_id=(vgname, names[1]))
4937       new_lvs = [lv_data, lv_meta]
4938       old_lvs = dev.children
4939       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4940       info("creating new local storage on %s for %s" %
4941            (tgt_node, dev.iv_name))
4942       # we pass force_create=True to force the LVM creation
4943       for new_lv in new_lvs:
4944         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
4945                         _GetInstanceInfoText(instance), False)
4946
4947     # Step: for each lv, detach+rename*2+attach
4948     self.proc.LogStep(4, steps_total, "change drbd configuration")
4949     for dev, old_lvs, new_lvs in iv_names.itervalues():
4950       info("detaching %s drbd from local storage" % dev.iv_name)
4951       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4952       result.Raise()
4953       if not result.data:
4954         raise errors.OpExecError("Can't detach drbd from local storage on node"
4955                                  " %s for device %s" % (tgt_node, dev.iv_name))
4956       #dev.children = []
4957       #cfg.Update(instance)
4958
4959       # ok, we created the new LVs, so now we know we have the needed
4960       # storage; as such, we proceed on the target node to rename
4961       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4962       # using the assumption that logical_id == physical_id (which in
4963       # turn is the unique_id on that node)
4964
4965       # FIXME(iustin): use a better name for the replaced LVs
4966       temp_suffix = int(time.time())
4967       ren_fn = lambda d, suff: (d.physical_id[0],
4968                                 d.physical_id[1] + "_replaced-%s" % suff)
4969       # build the rename list based on what LVs exist on the node
4970       rlist = []
4971       for to_ren in old_lvs:
4972         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4973         if not find_res.failed and find_res.data is not None: # device exists
4974           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4975
4976       info("renaming the old LVs on the target node")
4977       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4978       result.Raise()
4979       if not result.data:
4980         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4981       # now we rename the new LVs to the old LVs
4982       info("renaming the new LVs on the target node")
4983       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4984       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4985       result.Raise()
4986       if not result.data:
4987         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4988
4989       for old, new in zip(old_lvs, new_lvs):
4990         new.logical_id = old.logical_id
4991         cfg.SetDiskID(new, tgt_node)
4992
4993       for disk in old_lvs:
4994         disk.logical_id = ren_fn(disk, temp_suffix)
4995         cfg.SetDiskID(disk, tgt_node)
4996
4997       # now that the new lvs have the old name, we can add them to the device
4998       info("adding new mirror component on %s" % tgt_node)
4999       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5000       if result.failed or not result.data:
5001         for new_lv in new_lvs:
5002           result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
5003           if result.failed or not result.data:
5004             warning("Can't rollback device %s", hint="manually cleanup unused"
5005                     " logical volumes")
5006         raise errors.OpExecError("Can't add local storage to drbd")
5007
5008       dev.children = new_lvs
5009       cfg.Update(instance)
5010
5011     # Step: wait for sync
5012
5013     # this can fail as the old devices are degraded and _WaitForSync
5014     # does a combined result over all disks, so we don't check its
5015     # return value
5016     self.proc.LogStep(5, steps_total, "sync devices")
5017     _WaitForSync(self, instance, unlock=True)
5018
5019     # so check manually all the devices
5020     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5021       cfg.SetDiskID(dev, instance.primary_node)
5022       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5023       if result.failed or result.data[5]:
5024         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5025
5026     # Step: remove old storage
5027     self.proc.LogStep(6, steps_total, "removing old storage")
5028     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5029       info("remove logical volumes for %s" % name)
5030       for lv in old_lvs:
5031         cfg.SetDiskID(lv, tgt_node)
5032         result = self.rpc.call_blockdev_remove(tgt_node, lv)
5033         if result.failed or not result.data:
5034           warning("Can't remove old LV", hint="manually remove unused LVs")
5035           continue
5036
5037   def _ExecD8Secondary(self, feedback_fn):
5038     """Replace the secondary node for drbd8.
5039
5040     The algorithm for replace is quite complicated:
5041       - for all disks of the instance:
5042         - create new LVs on the new node with same names
5043         - shutdown the drbd device on the old secondary
5044         - disconnect the drbd network on the primary
5045         - create the drbd device on the new secondary
5046         - network attach the drbd on the primary, using an artifice:
5047           the drbd code for Attach() will connect to the network if it
5048           finds a device which is connected to the good local disks but
5049           not network enabled
5050       - wait for sync across all devices
5051       - remove all disks from the old secondary
5052
5053     Failures are not very well handled.
5054
5055     """
5056     steps_total = 6
5057     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5058     instance = self.instance
5059     iv_names = {}
5060     # start of work
5061     cfg = self.cfg
5062     old_node = self.tgt_node
5063     new_node = self.new_node
5064     pri_node = instance.primary_node
5065     nodes_ip = {
5066       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5067       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5068       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5069       }
5070
5071     # Step: check device activation
5072     self.proc.LogStep(1, steps_total, "check device existence")
5073     info("checking volume groups")
5074     my_vg = cfg.GetVGName()
5075     results = self.rpc.call_vg_list([pri_node, new_node])
5076     for node in pri_node, new_node:
5077       res = results[node]
5078       if res.failed or not res.data or my_vg not in res.data:
5079         raise errors.OpExecError("Volume group '%s' not found on %s" %
5080                                  (my_vg, node))
5081     for idx, dev in enumerate(instance.disks):
5082       if idx not in self.op.disks:
5083         continue
5084       info("checking disk/%d on %s" % (idx, pri_node))
5085       cfg.SetDiskID(dev, pri_node)
5086       result = self.rpc.call_blockdev_find(pri_node, dev)
5087       result.Raise()
5088       if not result.data:
5089         raise errors.OpExecError("Can't find disk/%d on node %s" %
5090                                  (idx, pri_node))
5091
5092     # Step: check other node consistency
5093     self.proc.LogStep(2, steps_total, "check peer consistency")
5094     for idx, dev in enumerate(instance.disks):
5095       if idx not in self.op.disks:
5096         continue
5097       info("checking disk/%d consistency on %s" % (idx, pri_node))
5098       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5099         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5100                                  " unsafe to replace the secondary" %
5101                                  pri_node)
5102
5103     # Step: create new storage
5104     self.proc.LogStep(3, steps_total, "allocate new storage")
5105     for idx, dev in enumerate(instance.disks):
5106       info("adding new local storage on %s for disk/%d" %
5107            (new_node, idx))
5108       # we pass force_create=True to force LVM creation
5109       for new_lv in dev.children:
5110         _CreateBlockDev(self, new_node, instance, new_lv, True,
5111                         _GetInstanceInfoText(instance), False)
5112
5113     # Step 4: dbrd minors and drbd setups changes
5114     # after this, we must manually remove the drbd minors on both the
5115     # error and the success paths
5116     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5117                                    instance.name)
5118     logging.debug("Allocated minors %s" % (minors,))
5119     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5120     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5121       size = dev.size
5122       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5123       # create new devices on new_node; note that we create two IDs:
5124       # one without port, so the drbd will be activated without
5125       # networking information on the new node at this stage, and one
5126       # with network, for the latter activation in step 4
5127       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5128       if pri_node == o_node1:
5129         p_minor = o_minor1
5130       else:
5131         p_minor = o_minor2
5132
5133       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5134       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5135
5136       iv_names[idx] = (dev, dev.children, new_net_id)
5137       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5138                     new_net_id)
5139       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5140                               logical_id=new_alone_id,
5141                               children=dev.children)
5142       try:
5143         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5144                               _GetInstanceInfoText(instance), False)
5145       except errors.BlockDeviceError:
5146         self.cfg.ReleaseDRBDMinors(instance.name)
5147         raise
5148
5149     for idx, dev in enumerate(instance.disks):
5150       # we have new devices, shutdown the drbd on the old secondary
5151       info("shutting down drbd for disk/%d on old node" % idx)
5152       cfg.SetDiskID(dev, old_node)
5153       result = self.rpc.call_blockdev_shutdown(old_node, dev)
5154       if result.failed or not result.data:
5155         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
5156                 hint="Please cleanup this device manually as soon as possible")
5157
5158     info("detaching primary drbds from the network (=> standalone)")
5159     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5160                                                instance.disks)[pri_node]
5161
5162     msg = result.RemoteFailMsg()
5163     if msg:
5164       # detaches didn't succeed (unlikely)
5165       self.cfg.ReleaseDRBDMinors(instance.name)
5166       raise errors.OpExecError("Can't detach the disks from the network on"
5167                                " old node: %s" % (msg,))
5168
5169     # if we managed to detach at least one, we update all the disks of
5170     # the instance to point to the new secondary
5171     info("updating instance configuration")
5172     for dev, _, new_logical_id in iv_names.itervalues():
5173       dev.logical_id = new_logical_id
5174       cfg.SetDiskID(dev, pri_node)
5175     cfg.Update(instance)
5176
5177     # and now perform the drbd attach
5178     info("attaching primary drbds to new secondary (standalone => connected)")
5179     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5180                                            instance.disks, instance.name,
5181                                            False)
5182     for to_node, to_result in result.items():
5183       msg = to_result.RemoteFailMsg()
5184       if msg:
5185         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5186                 hint="please do a gnt-instance info to see the"
5187                 " status of disks")
5188
5189     # this can fail as the old devices are degraded and _WaitForSync
5190     # does a combined result over all disks, so we don't check its
5191     # return value
5192     self.proc.LogStep(5, steps_total, "sync devices")
5193     _WaitForSync(self, instance, unlock=True)
5194
5195     # so check manually all the devices
5196     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5197       cfg.SetDiskID(dev, pri_node)
5198       result = self.rpc.call_blockdev_find(pri_node, dev)
5199       result.Raise()
5200       if result.data[5]:
5201         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5202
5203     self.proc.LogStep(6, steps_total, "removing old storage")
5204     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5205       info("remove logical volumes for disk/%d" % idx)
5206       for lv in old_lvs:
5207         cfg.SetDiskID(lv, old_node)
5208         result = self.rpc.call_blockdev_remove(old_node, lv)
5209         if result.failed or not result.data:
5210           warning("Can't remove LV on old secondary",
5211                   hint="Cleanup stale volumes by hand")
5212
5213   def Exec(self, feedback_fn):
5214     """Execute disk replacement.
5215
5216     This dispatches the disk replacement to the appropriate handler.
5217
5218     """
5219     instance = self.instance
5220
5221     # Activate the instance disks if we're replacing them on a down instance
5222     if not instance.admin_up:
5223       _StartInstanceDisks(self, instance, True)
5224
5225     if self.op.mode == constants.REPLACE_DISK_CHG:
5226       fn = self._ExecD8Secondary
5227     else:
5228       fn = self._ExecD8DiskOnly
5229
5230     ret = fn(feedback_fn)
5231
5232     # Deactivate the instance disks if we're replacing them on a down instance
5233     if not instance.admin_up:
5234       _SafeShutdownInstanceDisks(self, instance)
5235
5236     return ret
5237
5238
5239 class LUGrowDisk(LogicalUnit):
5240   """Grow a disk of an instance.
5241
5242   """
5243   HPATH = "disk-grow"
5244   HTYPE = constants.HTYPE_INSTANCE
5245   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5246   REQ_BGL = False
5247
5248   def ExpandNames(self):
5249     self._ExpandAndLockInstance()
5250     self.needed_locks[locking.LEVEL_NODE] = []
5251     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5252
5253   def DeclareLocks(self, level):
5254     if level == locking.LEVEL_NODE:
5255       self._LockInstancesNodes()
5256
5257   def BuildHooksEnv(self):
5258     """Build hooks env.
5259
5260     This runs on the master, the primary and all the secondaries.
5261
5262     """
5263     env = {
5264       "DISK": self.op.disk,
5265       "AMOUNT": self.op.amount,
5266       }
5267     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5268     nl = [
5269       self.cfg.GetMasterNode(),
5270       self.instance.primary_node,
5271       ]
5272     return env, nl, nl
5273
5274   def CheckPrereq(self):
5275     """Check prerequisites.
5276
5277     This checks that the instance is in the cluster.
5278
5279     """
5280     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5281     assert instance is not None, \
5282       "Cannot retrieve locked instance %s" % self.op.instance_name
5283     nodenames = list(instance.all_nodes)
5284     for node in nodenames:
5285       _CheckNodeOnline(self, node)
5286
5287
5288     self.instance = instance
5289
5290     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5291       raise errors.OpPrereqError("Instance's disk layout does not support"
5292                                  " growing.")
5293
5294     self.disk = instance.FindDisk(self.op.disk)
5295
5296     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5297                                        instance.hypervisor)
5298     for node in nodenames:
5299       info = nodeinfo[node]
5300       if info.failed or not info.data:
5301         raise errors.OpPrereqError("Cannot get current information"
5302                                    " from node '%s'" % node)
5303       vg_free = info.data.get('vg_free', None)
5304       if not isinstance(vg_free, int):
5305         raise errors.OpPrereqError("Can't compute free disk space on"
5306                                    " node %s" % node)
5307       if self.op.amount > vg_free:
5308         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5309                                    " %d MiB available, %d MiB required" %
5310                                    (node, vg_free, self.op.amount))
5311
5312   def Exec(self, feedback_fn):
5313     """Execute disk grow.
5314
5315     """
5316     instance = self.instance
5317     disk = self.disk
5318     for node in instance.all_nodes:
5319       self.cfg.SetDiskID(disk, node)
5320       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5321       result.Raise()
5322       if (not result.data or not isinstance(result.data, (list, tuple)) or
5323           len(result.data) != 2):
5324         raise errors.OpExecError("Grow request failed to node %s" % node)
5325       elif not result.data[0]:
5326         raise errors.OpExecError("Grow request failed to node %s: %s" %
5327                                  (node, result.data[1]))
5328     disk.RecordGrow(self.op.amount)
5329     self.cfg.Update(instance)
5330     if self.op.wait_for_sync:
5331       disk_abort = not _WaitForSync(self, instance)
5332       if disk_abort:
5333         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5334                              " status.\nPlease check the instance.")
5335
5336
5337 class LUQueryInstanceData(NoHooksLU):
5338   """Query runtime instance data.
5339
5340   """
5341   _OP_REQP = ["instances", "static"]
5342   REQ_BGL = False
5343
5344   def ExpandNames(self):
5345     self.needed_locks = {}
5346     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5347
5348     if not isinstance(self.op.instances, list):
5349       raise errors.OpPrereqError("Invalid argument type 'instances'")
5350
5351     if self.op.instances:
5352       self.wanted_names = []
5353       for name in self.op.instances:
5354         full_name = self.cfg.ExpandInstanceName(name)
5355         if full_name is None:
5356           raise errors.OpPrereqError("Instance '%s' not known" % name)
5357         self.wanted_names.append(full_name)
5358       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5359     else:
5360       self.wanted_names = None
5361       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5362
5363     self.needed_locks[locking.LEVEL_NODE] = []
5364     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5365
5366   def DeclareLocks(self, level):
5367     if level == locking.LEVEL_NODE:
5368       self._LockInstancesNodes()
5369
5370   def CheckPrereq(self):
5371     """Check prerequisites.
5372
5373     This only checks the optional instance list against the existing names.
5374
5375     """
5376     if self.wanted_names is None:
5377       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5378
5379     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5380                              in self.wanted_names]
5381     return
5382
5383   def _ComputeDiskStatus(self, instance, snode, dev):
5384     """Compute block device status.
5385
5386     """
5387     static = self.op.static
5388     if not static:
5389       self.cfg.SetDiskID(dev, instance.primary_node)
5390       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5391       dev_pstatus.Raise()
5392       dev_pstatus = dev_pstatus.data
5393     else:
5394       dev_pstatus = None
5395
5396     if dev.dev_type in constants.LDS_DRBD:
5397       # we change the snode then (otherwise we use the one passed in)
5398       if dev.logical_id[0] == instance.primary_node:
5399         snode = dev.logical_id[1]
5400       else:
5401         snode = dev.logical_id[0]
5402
5403     if snode and not static:
5404       self.cfg.SetDiskID(dev, snode)
5405       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5406       dev_sstatus.Raise()
5407       dev_sstatus = dev_sstatus.data
5408     else:
5409       dev_sstatus = None
5410
5411     if dev.children:
5412       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5413                       for child in dev.children]
5414     else:
5415       dev_children = []
5416
5417     data = {
5418       "iv_name": dev.iv_name,
5419       "dev_type": dev.dev_type,
5420       "logical_id": dev.logical_id,
5421       "physical_id": dev.physical_id,
5422       "pstatus": dev_pstatus,
5423       "sstatus": dev_sstatus,
5424       "children": dev_children,
5425       "mode": dev.mode,
5426       }
5427
5428     return data
5429
5430   def Exec(self, feedback_fn):
5431     """Gather and return data"""
5432     result = {}
5433
5434     cluster = self.cfg.GetClusterInfo()
5435
5436     for instance in self.wanted_instances:
5437       if not self.op.static:
5438         remote_info = self.rpc.call_instance_info(instance.primary_node,
5439                                                   instance.name,
5440                                                   instance.hypervisor)
5441         remote_info.Raise()
5442         remote_info = remote_info.data
5443         if remote_info and "state" in remote_info:
5444           remote_state = "up"
5445         else:
5446           remote_state = "down"
5447       else:
5448         remote_state = None
5449       if instance.admin_up:
5450         config_state = "up"
5451       else:
5452         config_state = "down"
5453
5454       disks = [self._ComputeDiskStatus(instance, None, device)
5455                for device in instance.disks]
5456
5457       idict = {
5458         "name": instance.name,
5459         "config_state": config_state,
5460         "run_state": remote_state,
5461         "pnode": instance.primary_node,
5462         "snodes": instance.secondary_nodes,
5463         "os": instance.os,
5464         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5465         "disks": disks,
5466         "hypervisor": instance.hypervisor,
5467         "network_port": instance.network_port,
5468         "hv_instance": instance.hvparams,
5469         "hv_actual": cluster.FillHV(instance),
5470         "be_instance": instance.beparams,
5471         "be_actual": cluster.FillBE(instance),
5472         }
5473
5474       result[instance.name] = idict
5475
5476     return result
5477
5478
5479 class LUSetInstanceParams(LogicalUnit):
5480   """Modifies an instances's parameters.
5481
5482   """
5483   HPATH = "instance-modify"
5484   HTYPE = constants.HTYPE_INSTANCE
5485   _OP_REQP = ["instance_name"]
5486   REQ_BGL = False
5487
5488   def CheckArguments(self):
5489     if not hasattr(self.op, 'nics'):
5490       self.op.nics = []
5491     if not hasattr(self.op, 'disks'):
5492       self.op.disks = []
5493     if not hasattr(self.op, 'beparams'):
5494       self.op.beparams = {}
5495     if not hasattr(self.op, 'hvparams'):
5496       self.op.hvparams = {}
5497     self.op.force = getattr(self.op, "force", False)
5498     if not (self.op.nics or self.op.disks or
5499             self.op.hvparams or self.op.beparams):
5500       raise errors.OpPrereqError("No changes submitted")
5501
5502     utils.CheckBEParams(self.op.beparams)
5503
5504     # Disk validation
5505     disk_addremove = 0
5506     for disk_op, disk_dict in self.op.disks:
5507       if disk_op == constants.DDM_REMOVE:
5508         disk_addremove += 1
5509         continue
5510       elif disk_op == constants.DDM_ADD:
5511         disk_addremove += 1
5512       else:
5513         if not isinstance(disk_op, int):
5514           raise errors.OpPrereqError("Invalid disk index")
5515       if disk_op == constants.DDM_ADD:
5516         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5517         if mode not in constants.DISK_ACCESS_SET:
5518           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5519         size = disk_dict.get('size', None)
5520         if size is None:
5521           raise errors.OpPrereqError("Required disk parameter size missing")
5522         try:
5523           size = int(size)
5524         except ValueError, err:
5525           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5526                                      str(err))
5527         disk_dict['size'] = size
5528       else:
5529         # modification of disk
5530         if 'size' in disk_dict:
5531           raise errors.OpPrereqError("Disk size change not possible, use"
5532                                      " grow-disk")
5533
5534     if disk_addremove > 1:
5535       raise errors.OpPrereqError("Only one disk add or remove operation"
5536                                  " supported at a time")
5537
5538     # NIC validation
5539     nic_addremove = 0
5540     for nic_op, nic_dict in self.op.nics:
5541       if nic_op == constants.DDM_REMOVE:
5542         nic_addremove += 1
5543         continue
5544       elif nic_op == constants.DDM_ADD:
5545         nic_addremove += 1
5546       else:
5547         if not isinstance(nic_op, int):
5548           raise errors.OpPrereqError("Invalid nic index")
5549
5550       # nic_dict should be a dict
5551       nic_ip = nic_dict.get('ip', None)
5552       if nic_ip is not None:
5553         if nic_ip.lower() == "none":
5554           nic_dict['ip'] = None
5555         else:
5556           if not utils.IsValidIP(nic_ip):
5557             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5558       # we can only check None bridges and assign the default one
5559       nic_bridge = nic_dict.get('bridge', None)
5560       if nic_bridge is None:
5561         nic_dict['bridge'] = self.cfg.GetDefBridge()
5562       # but we can validate MACs
5563       nic_mac = nic_dict.get('mac', None)
5564       if nic_mac is not None:
5565         if self.cfg.IsMacInUse(nic_mac):
5566           raise errors.OpPrereqError("MAC address %s already in use"
5567                                      " in cluster" % nic_mac)
5568         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5569           if not utils.IsValidMac(nic_mac):
5570             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5571     if nic_addremove > 1:
5572       raise errors.OpPrereqError("Only one NIC add or remove operation"
5573                                  " supported at a time")
5574
5575   def ExpandNames(self):
5576     self._ExpandAndLockInstance()
5577     self.needed_locks[locking.LEVEL_NODE] = []
5578     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5579
5580   def DeclareLocks(self, level):
5581     if level == locking.LEVEL_NODE:
5582       self._LockInstancesNodes()
5583
5584   def BuildHooksEnv(self):
5585     """Build hooks env.
5586
5587     This runs on the master, primary and secondaries.
5588
5589     """
5590     args = dict()
5591     if constants.BE_MEMORY in self.be_new:
5592       args['memory'] = self.be_new[constants.BE_MEMORY]
5593     if constants.BE_VCPUS in self.be_new:
5594       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5595     # FIXME: readd disk/nic changes
5596     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5597     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5598     return env, nl, nl
5599
5600   def CheckPrereq(self):
5601     """Check prerequisites.
5602
5603     This only checks the instance list against the existing names.
5604
5605     """
5606     force = self.force = self.op.force
5607
5608     # checking the new params on the primary/secondary nodes
5609
5610     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5611     assert self.instance is not None, \
5612       "Cannot retrieve locked instance %s" % self.op.instance_name
5613     pnode = instance.primary_node
5614     nodelist = list(instance.all_nodes)
5615
5616     # hvparams processing
5617     if self.op.hvparams:
5618       i_hvdict = copy.deepcopy(instance.hvparams)
5619       for key, val in self.op.hvparams.iteritems():
5620         if val == constants.VALUE_DEFAULT:
5621           try:
5622             del i_hvdict[key]
5623           except KeyError:
5624             pass
5625         elif val == constants.VALUE_NONE:
5626           i_hvdict[key] = None
5627         else:
5628           i_hvdict[key] = val
5629       cluster = self.cfg.GetClusterInfo()
5630       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5631                                 i_hvdict)
5632       # local check
5633       hypervisor.GetHypervisor(
5634         instance.hypervisor).CheckParameterSyntax(hv_new)
5635       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5636       self.hv_new = hv_new # the new actual values
5637       self.hv_inst = i_hvdict # the new dict (without defaults)
5638     else:
5639       self.hv_new = self.hv_inst = {}
5640
5641     # beparams processing
5642     if self.op.beparams:
5643       i_bedict = copy.deepcopy(instance.beparams)
5644       for key, val in self.op.beparams.iteritems():
5645         if val == constants.VALUE_DEFAULT:
5646           try:
5647             del i_bedict[key]
5648           except KeyError:
5649             pass
5650         else:
5651           i_bedict[key] = val
5652       cluster = self.cfg.GetClusterInfo()
5653       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5654                                 i_bedict)
5655       self.be_new = be_new # the new actual values
5656       self.be_inst = i_bedict # the new dict (without defaults)
5657     else:
5658       self.be_new = self.be_inst = {}
5659
5660     self.warn = []
5661
5662     if constants.BE_MEMORY in self.op.beparams and not self.force:
5663       mem_check_list = [pnode]
5664       if be_new[constants.BE_AUTO_BALANCE]:
5665         # either we changed auto_balance to yes or it was from before
5666         mem_check_list.extend(instance.secondary_nodes)
5667       instance_info = self.rpc.call_instance_info(pnode, instance.name,
5668                                                   instance.hypervisor)
5669       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5670                                          instance.hypervisor)
5671       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5672         # Assume the primary node is unreachable and go ahead
5673         self.warn.append("Can't get info from primary node %s" % pnode)
5674       else:
5675         if not instance_info.failed and instance_info.data:
5676           current_mem = instance_info.data['memory']
5677         else:
5678           # Assume instance not running
5679           # (there is a slight race condition here, but it's not very probable,
5680           # and we have no other way to check)
5681           current_mem = 0
5682         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5683                     nodeinfo[pnode].data['memory_free'])
5684         if miss_mem > 0:
5685           raise errors.OpPrereqError("This change will prevent the instance"
5686                                      " from starting, due to %d MB of memory"
5687                                      " missing on its primary node" % miss_mem)
5688
5689       if be_new[constants.BE_AUTO_BALANCE]:
5690         for node, nres in nodeinfo.iteritems():
5691           if node not in instance.secondary_nodes:
5692             continue
5693           if nres.failed or not isinstance(nres.data, dict):
5694             self.warn.append("Can't get info from secondary node %s" % node)
5695           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5696             self.warn.append("Not enough memory to failover instance to"
5697                              " secondary node %s" % node)
5698
5699     # NIC processing
5700     for nic_op, nic_dict in self.op.nics:
5701       if nic_op == constants.DDM_REMOVE:
5702         if not instance.nics:
5703           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5704         continue
5705       if nic_op != constants.DDM_ADD:
5706         # an existing nic
5707         if nic_op < 0 or nic_op >= len(instance.nics):
5708           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5709                                      " are 0 to %d" %
5710                                      (nic_op, len(instance.nics)))
5711       nic_bridge = nic_dict.get('bridge', None)
5712       if nic_bridge is not None:
5713         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5714           msg = ("Bridge '%s' doesn't exist on one of"
5715                  " the instance nodes" % nic_bridge)
5716           if self.force:
5717             self.warn.append(msg)
5718           else:
5719             raise errors.OpPrereqError(msg)
5720
5721     # DISK processing
5722     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5723       raise errors.OpPrereqError("Disk operations not supported for"
5724                                  " diskless instances")
5725     for disk_op, disk_dict in self.op.disks:
5726       if disk_op == constants.DDM_REMOVE:
5727         if len(instance.disks) == 1:
5728           raise errors.OpPrereqError("Cannot remove the last disk of"
5729                                      " an instance")
5730         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5731         ins_l = ins_l[pnode]
5732         if ins_l.failed or not isinstance(ins_l.data, list):
5733           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5734         if instance.name in ins_l.data:
5735           raise errors.OpPrereqError("Instance is running, can't remove"
5736                                      " disks.")
5737
5738       if (disk_op == constants.DDM_ADD and
5739           len(instance.nics) >= constants.MAX_DISKS):
5740         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5741                                    " add more" % constants.MAX_DISKS)
5742       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5743         # an existing disk
5744         if disk_op < 0 or disk_op >= len(instance.disks):
5745           raise errors.OpPrereqError("Invalid disk index %s, valid values"
5746                                      " are 0 to %d" %
5747                                      (disk_op, len(instance.disks)))
5748
5749     return
5750
5751   def Exec(self, feedback_fn):
5752     """Modifies an instance.
5753
5754     All parameters take effect only at the next restart of the instance.
5755
5756     """
5757     # Process here the warnings from CheckPrereq, as we don't have a
5758     # feedback_fn there.
5759     for warn in self.warn:
5760       feedback_fn("WARNING: %s" % warn)
5761
5762     result = []
5763     instance = self.instance
5764     # disk changes
5765     for disk_op, disk_dict in self.op.disks:
5766       if disk_op == constants.DDM_REMOVE:
5767         # remove the last disk
5768         device = instance.disks.pop()
5769         device_idx = len(instance.disks)
5770         for node, disk in device.ComputeNodeTree(instance.primary_node):
5771           self.cfg.SetDiskID(disk, node)
5772           rpc_result = self.rpc.call_blockdev_remove(node, disk)
5773           if rpc_result.failed or not rpc_result.data:
5774             self.proc.LogWarning("Could not remove disk/%d on node %s,"
5775                                  " continuing anyway", device_idx, node)
5776         result.append(("disk/%d" % device_idx, "remove"))
5777       elif disk_op == constants.DDM_ADD:
5778         # add a new disk
5779         if instance.disk_template == constants.DT_FILE:
5780           file_driver, file_path = instance.disks[0].logical_id
5781           file_path = os.path.dirname(file_path)
5782         else:
5783           file_driver = file_path = None
5784         disk_idx_base = len(instance.disks)
5785         new_disk = _GenerateDiskTemplate(self,
5786                                          instance.disk_template,
5787                                          instance.name, instance.primary_node,
5788                                          instance.secondary_nodes,
5789                                          [disk_dict],
5790                                          file_path,
5791                                          file_driver,
5792                                          disk_idx_base)[0]
5793         instance.disks.append(new_disk)
5794         info = _GetInstanceInfoText(instance)
5795
5796         logging.info("Creating volume %s for instance %s",
5797                      new_disk.iv_name, instance.name)
5798         # Note: this needs to be kept in sync with _CreateDisks
5799         #HARDCODE
5800         for node in instance.all_nodes:
5801           f_create = node == instance.primary_node
5802           try:
5803             _CreateBlockDev(self, node, instance, new_disk,
5804                             f_create, info, f_create)
5805           except errors.OpExecError, err:
5806             self.LogWarning("Failed to create volume %s (%s) on"
5807                             " node %s: %s",
5808                             new_disk.iv_name, new_disk, node, err)
5809         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5810                        (new_disk.size, new_disk.mode)))
5811       else:
5812         # change a given disk
5813         instance.disks[disk_op].mode = disk_dict['mode']
5814         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5815     # NIC changes
5816     for nic_op, nic_dict in self.op.nics:
5817       if nic_op == constants.DDM_REMOVE:
5818         # remove the last nic
5819         del instance.nics[-1]
5820         result.append(("nic.%d" % len(instance.nics), "remove"))
5821       elif nic_op == constants.DDM_ADD:
5822         # add a new nic
5823         if 'mac' not in nic_dict:
5824           mac = constants.VALUE_GENERATE
5825         else:
5826           mac = nic_dict['mac']
5827         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5828           mac = self.cfg.GenerateMAC()
5829         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5830                               bridge=nic_dict.get('bridge', None))
5831         instance.nics.append(new_nic)
5832         result.append(("nic.%d" % (len(instance.nics) - 1),
5833                        "add:mac=%s,ip=%s,bridge=%s" %
5834                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5835       else:
5836         # change a given nic
5837         for key in 'mac', 'ip', 'bridge':
5838           if key in nic_dict:
5839             setattr(instance.nics[nic_op], key, nic_dict[key])
5840             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5841
5842     # hvparams changes
5843     if self.op.hvparams:
5844       instance.hvparams = self.hv_new
5845       for key, val in self.op.hvparams.iteritems():
5846         result.append(("hv/%s" % key, val))
5847
5848     # beparams changes
5849     if self.op.beparams:
5850       instance.beparams = self.be_inst
5851       for key, val in self.op.beparams.iteritems():
5852         result.append(("be/%s" % key, val))
5853
5854     self.cfg.Update(instance)
5855
5856     return result
5857
5858
5859 class LUQueryExports(NoHooksLU):
5860   """Query the exports list
5861
5862   """
5863   _OP_REQP = ['nodes']
5864   REQ_BGL = False
5865
5866   def ExpandNames(self):
5867     self.needed_locks = {}
5868     self.share_locks[locking.LEVEL_NODE] = 1
5869     if not self.op.nodes:
5870       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5871     else:
5872       self.needed_locks[locking.LEVEL_NODE] = \
5873         _GetWantedNodes(self, self.op.nodes)
5874
5875   def CheckPrereq(self):
5876     """Check prerequisites.
5877
5878     """
5879     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5880
5881   def Exec(self, feedback_fn):
5882     """Compute the list of all the exported system images.
5883
5884     @rtype: dict
5885     @return: a dictionary with the structure node->(export-list)
5886         where export-list is a list of the instances exported on
5887         that node.
5888
5889     """
5890     rpcresult = self.rpc.call_export_list(self.nodes)
5891     result = {}
5892     for node in rpcresult:
5893       if rpcresult[node].failed:
5894         result[node] = False
5895       else:
5896         result[node] = rpcresult[node].data
5897
5898     return result
5899
5900
5901 class LUExportInstance(LogicalUnit):
5902   """Export an instance to an image in the cluster.
5903
5904   """
5905   HPATH = "instance-export"
5906   HTYPE = constants.HTYPE_INSTANCE
5907   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5908   REQ_BGL = False
5909
5910   def ExpandNames(self):
5911     self._ExpandAndLockInstance()
5912     # FIXME: lock only instance primary and destination node
5913     #
5914     # Sad but true, for now we have do lock all nodes, as we don't know where
5915     # the previous export might be, and and in this LU we search for it and
5916     # remove it from its current node. In the future we could fix this by:
5917     #  - making a tasklet to search (share-lock all), then create the new one,
5918     #    then one to remove, after
5919     #  - removing the removal operation altoghether
5920     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5921
5922   def DeclareLocks(self, level):
5923     """Last minute lock declaration."""
5924     # All nodes are locked anyway, so nothing to do here.
5925
5926   def BuildHooksEnv(self):
5927     """Build hooks env.
5928
5929     This will run on the master, primary node and target node.
5930
5931     """
5932     env = {
5933       "EXPORT_NODE": self.op.target_node,
5934       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5935       }
5936     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5937     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5938           self.op.target_node]
5939     return env, nl, nl
5940
5941   def CheckPrereq(self):
5942     """Check prerequisites.
5943
5944     This checks that the instance and node names are valid.
5945
5946     """
5947     instance_name = self.op.instance_name
5948     self.instance = self.cfg.GetInstanceInfo(instance_name)
5949     assert self.instance is not None, \
5950           "Cannot retrieve locked instance %s" % self.op.instance_name
5951     _CheckNodeOnline(self, self.instance.primary_node)
5952
5953     self.dst_node = self.cfg.GetNodeInfo(
5954       self.cfg.ExpandNodeName(self.op.target_node))
5955
5956     if self.dst_node is None:
5957       # This is wrong node name, not a non-locked node
5958       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5959     _CheckNodeOnline(self, self.dst_node.name)
5960
5961     # instance disk type verification
5962     for disk in self.instance.disks:
5963       if disk.dev_type == constants.LD_FILE:
5964         raise errors.OpPrereqError("Export not supported for instances with"
5965                                    " file-based disks")
5966
5967   def Exec(self, feedback_fn):
5968     """Export an instance to an image in the cluster.
5969
5970     """
5971     instance = self.instance
5972     dst_node = self.dst_node
5973     src_node = instance.primary_node
5974     if self.op.shutdown:
5975       # shutdown the instance, but not the disks
5976       result = self.rpc.call_instance_shutdown(src_node, instance)
5977       result.Raise()
5978       if not result.data:
5979         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5980                                  (instance.name, src_node))
5981
5982     vgname = self.cfg.GetVGName()
5983
5984     snap_disks = []
5985
5986     # set the disks ID correctly since call_instance_start needs the
5987     # correct drbd minor to create the symlinks
5988     for disk in instance.disks:
5989       self.cfg.SetDiskID(disk, src_node)
5990
5991     try:
5992       for disk in instance.disks:
5993         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5994         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5995         if new_dev_name.failed or not new_dev_name.data:
5996           self.LogWarning("Could not snapshot block device %s on node %s",
5997                           disk.logical_id[1], src_node)
5998           snap_disks.append(False)
5999         else:
6000           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6001                                  logical_id=(vgname, new_dev_name.data),
6002                                  physical_id=(vgname, new_dev_name.data),
6003                                  iv_name=disk.iv_name)
6004           snap_disks.append(new_dev)
6005
6006     finally:
6007       if self.op.shutdown and instance.admin_up:
6008         result = self.rpc.call_instance_start(src_node, instance, None)
6009         msg = result.RemoteFailMsg()
6010         if msg:
6011           _ShutdownInstanceDisks(self, instance)
6012           raise errors.OpExecError("Could not start instance: %s" % msg)
6013
6014     # TODO: check for size
6015
6016     cluster_name = self.cfg.GetClusterName()
6017     for idx, dev in enumerate(snap_disks):
6018       if dev:
6019         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6020                                                instance, cluster_name, idx)
6021         if result.failed or not result.data:
6022           self.LogWarning("Could not export block device %s from node %s to"
6023                           " node %s", dev.logical_id[1], src_node,
6024                           dst_node.name)
6025         result = self.rpc.call_blockdev_remove(src_node, dev)
6026         if result.failed or not result.data:
6027           self.LogWarning("Could not remove snapshot block device %s from node"
6028                           " %s", dev.logical_id[1], src_node)
6029
6030     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6031     if result.failed or not result.data:
6032       self.LogWarning("Could not finalize export for instance %s on node %s",
6033                       instance.name, dst_node.name)
6034
6035     nodelist = self.cfg.GetNodeList()
6036     nodelist.remove(dst_node.name)
6037
6038     # on one-node clusters nodelist will be empty after the removal
6039     # if we proceed the backup would be removed because OpQueryExports
6040     # substitutes an empty list with the full cluster node list.
6041     if nodelist:
6042       exportlist = self.rpc.call_export_list(nodelist)
6043       for node in exportlist:
6044         if exportlist[node].failed:
6045           continue
6046         if instance.name in exportlist[node].data:
6047           if not self.rpc.call_export_remove(node, instance.name):
6048             self.LogWarning("Could not remove older export for instance %s"
6049                             " on node %s", instance.name, node)
6050
6051
6052 class LURemoveExport(NoHooksLU):
6053   """Remove exports related to the named instance.
6054
6055   """
6056   _OP_REQP = ["instance_name"]
6057   REQ_BGL = False
6058
6059   def ExpandNames(self):
6060     self.needed_locks = {}
6061     # We need all nodes to be locked in order for RemoveExport to work, but we
6062     # don't need to lock the instance itself, as nothing will happen to it (and
6063     # we can remove exports also for a removed instance)
6064     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6065
6066   def CheckPrereq(self):
6067     """Check prerequisites.
6068     """
6069     pass
6070
6071   def Exec(self, feedback_fn):
6072     """Remove any export.
6073
6074     """
6075     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6076     # If the instance was not found we'll try with the name that was passed in.
6077     # This will only work if it was an FQDN, though.
6078     fqdn_warn = False
6079     if not instance_name:
6080       fqdn_warn = True
6081       instance_name = self.op.instance_name
6082
6083     exportlist = self.rpc.call_export_list(self.acquired_locks[
6084       locking.LEVEL_NODE])
6085     found = False
6086     for node in exportlist:
6087       if exportlist[node].failed:
6088         self.LogWarning("Failed to query node %s, continuing" % node)
6089         continue
6090       if instance_name in exportlist[node].data:
6091         found = True
6092         result = self.rpc.call_export_remove(node, instance_name)
6093         if result.failed or not result.data:
6094           logging.error("Could not remove export for instance %s"
6095                         " on node %s", instance_name, node)
6096
6097     if fqdn_warn and not found:
6098       feedback_fn("Export not found. If trying to remove an export belonging"
6099                   " to a deleted instance please use its Fully Qualified"
6100                   " Domain Name.")
6101
6102
6103 class TagsLU(NoHooksLU):
6104   """Generic tags LU.
6105
6106   This is an abstract class which is the parent of all the other tags LUs.
6107
6108   """
6109
6110   def ExpandNames(self):
6111     self.needed_locks = {}
6112     if self.op.kind == constants.TAG_NODE:
6113       name = self.cfg.ExpandNodeName(self.op.name)
6114       if name is None:
6115         raise errors.OpPrereqError("Invalid node name (%s)" %
6116                                    (self.op.name,))
6117       self.op.name = name
6118       self.needed_locks[locking.LEVEL_NODE] = name
6119     elif self.op.kind == constants.TAG_INSTANCE:
6120       name = self.cfg.ExpandInstanceName(self.op.name)
6121       if name is None:
6122         raise errors.OpPrereqError("Invalid instance name (%s)" %
6123                                    (self.op.name,))
6124       self.op.name = name
6125       self.needed_locks[locking.LEVEL_INSTANCE] = name
6126
6127   def CheckPrereq(self):
6128     """Check prerequisites.
6129
6130     """
6131     if self.op.kind == constants.TAG_CLUSTER:
6132       self.target = self.cfg.GetClusterInfo()
6133     elif self.op.kind == constants.TAG_NODE:
6134       self.target = self.cfg.GetNodeInfo(self.op.name)
6135     elif self.op.kind == constants.TAG_INSTANCE:
6136       self.target = self.cfg.GetInstanceInfo(self.op.name)
6137     else:
6138       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6139                                  str(self.op.kind))
6140
6141
6142 class LUGetTags(TagsLU):
6143   """Returns the tags of a given object.
6144
6145   """
6146   _OP_REQP = ["kind", "name"]
6147   REQ_BGL = False
6148
6149   def Exec(self, feedback_fn):
6150     """Returns the tag list.
6151
6152     """
6153     return list(self.target.GetTags())
6154
6155
6156 class LUSearchTags(NoHooksLU):
6157   """Searches the tags for a given pattern.
6158
6159   """
6160   _OP_REQP = ["pattern"]
6161   REQ_BGL = False
6162
6163   def ExpandNames(self):
6164     self.needed_locks = {}
6165
6166   def CheckPrereq(self):
6167     """Check prerequisites.
6168
6169     This checks the pattern passed for validity by compiling it.
6170
6171     """
6172     try:
6173       self.re = re.compile(self.op.pattern)
6174     except re.error, err:
6175       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6176                                  (self.op.pattern, err))
6177
6178   def Exec(self, feedback_fn):
6179     """Returns the tag list.
6180
6181     """
6182     cfg = self.cfg
6183     tgts = [("/cluster", cfg.GetClusterInfo())]
6184     ilist = cfg.GetAllInstancesInfo().values()
6185     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6186     nlist = cfg.GetAllNodesInfo().values()
6187     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6188     results = []
6189     for path, target in tgts:
6190       for tag in target.GetTags():
6191         if self.re.search(tag):
6192           results.append((path, tag))
6193     return results
6194
6195
6196 class LUAddTags(TagsLU):
6197   """Sets a tag on a given object.
6198
6199   """
6200   _OP_REQP = ["kind", "name", "tags"]
6201   REQ_BGL = False
6202
6203   def CheckPrereq(self):
6204     """Check prerequisites.
6205
6206     This checks the type and length of the tag name and value.
6207
6208     """
6209     TagsLU.CheckPrereq(self)
6210     for tag in self.op.tags:
6211       objects.TaggableObject.ValidateTag(tag)
6212
6213   def Exec(self, feedback_fn):
6214     """Sets the tag.
6215
6216     """
6217     try:
6218       for tag in self.op.tags:
6219         self.target.AddTag(tag)
6220     except errors.TagError, err:
6221       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6222     try:
6223       self.cfg.Update(self.target)
6224     except errors.ConfigurationError:
6225       raise errors.OpRetryError("There has been a modification to the"
6226                                 " config file and the operation has been"
6227                                 " aborted. Please retry.")
6228
6229
6230 class LUDelTags(TagsLU):
6231   """Delete a list of tags from a given object.
6232
6233   """
6234   _OP_REQP = ["kind", "name", "tags"]
6235   REQ_BGL = False
6236
6237   def CheckPrereq(self):
6238     """Check prerequisites.
6239
6240     This checks that we have the given tag.
6241
6242     """
6243     TagsLU.CheckPrereq(self)
6244     for tag in self.op.tags:
6245       objects.TaggableObject.ValidateTag(tag)
6246     del_tags = frozenset(self.op.tags)
6247     cur_tags = self.target.GetTags()
6248     if not del_tags <= cur_tags:
6249       diff_tags = del_tags - cur_tags
6250       diff_names = ["'%s'" % tag for tag in diff_tags]
6251       diff_names.sort()
6252       raise errors.OpPrereqError("Tag(s) %s not found" %
6253                                  (",".join(diff_names)))
6254
6255   def Exec(self, feedback_fn):
6256     """Remove the tag from the object.
6257
6258     """
6259     for tag in self.op.tags:
6260       self.target.RemoveTag(tag)
6261     try:
6262       self.cfg.Update(self.target)
6263     except errors.ConfigurationError:
6264       raise errors.OpRetryError("There has been a modification to the"
6265                                 " config file and the operation has been"
6266                                 " aborted. Please retry.")
6267
6268
6269 class LUTestDelay(NoHooksLU):
6270   """Sleep for a specified amount of time.
6271
6272   This LU sleeps on the master and/or nodes for a specified amount of
6273   time.
6274
6275   """
6276   _OP_REQP = ["duration", "on_master", "on_nodes"]
6277   REQ_BGL = False
6278
6279   def ExpandNames(self):
6280     """Expand names and set required locks.
6281
6282     This expands the node list, if any.
6283
6284     """
6285     self.needed_locks = {}
6286     if self.op.on_nodes:
6287       # _GetWantedNodes can be used here, but is not always appropriate to use
6288       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6289       # more information.
6290       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6291       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6292
6293   def CheckPrereq(self):
6294     """Check prerequisites.
6295
6296     """
6297
6298   def Exec(self, feedback_fn):
6299     """Do the actual sleep.
6300
6301     """
6302     if self.op.on_master:
6303       if not utils.TestDelay(self.op.duration):
6304         raise errors.OpExecError("Error during master delay test")
6305     if self.op.on_nodes:
6306       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6307       if not result:
6308         raise errors.OpExecError("Complete failure from rpc call")
6309       for node, node_result in result.items():
6310         node_result.Raise()
6311         if not node_result.data:
6312           raise errors.OpExecError("Failure during rpc call to node %s,"
6313                                    " result: %s" % (node, node_result.data))
6314
6315
6316 class IAllocator(object):
6317   """IAllocator framework.
6318
6319   An IAllocator instance has three sets of attributes:
6320     - cfg that is needed to query the cluster
6321     - input data (all members of the _KEYS class attribute are required)
6322     - four buffer attributes (in|out_data|text), that represent the
6323       input (to the external script) in text and data structure format,
6324       and the output from it, again in two formats
6325     - the result variables from the script (success, info, nodes) for
6326       easy usage
6327
6328   """
6329   _ALLO_KEYS = [
6330     "mem_size", "disks", "disk_template",
6331     "os", "tags", "nics", "vcpus", "hypervisor",
6332     ]
6333   _RELO_KEYS = [
6334     "relocate_from",
6335     ]
6336
6337   def __init__(self, lu, mode, name, **kwargs):
6338     self.lu = lu
6339     # init buffer variables
6340     self.in_text = self.out_text = self.in_data = self.out_data = None
6341     # init all input fields so that pylint is happy
6342     self.mode = mode
6343     self.name = name
6344     self.mem_size = self.disks = self.disk_template = None
6345     self.os = self.tags = self.nics = self.vcpus = None
6346     self.hypervisor = None
6347     self.relocate_from = None
6348     # computed fields
6349     self.required_nodes = None
6350     # init result fields
6351     self.success = self.info = self.nodes = None
6352     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6353       keyset = self._ALLO_KEYS
6354     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6355       keyset = self._RELO_KEYS
6356     else:
6357       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6358                                    " IAllocator" % self.mode)
6359     for key in kwargs:
6360       if key not in keyset:
6361         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6362                                      " IAllocator" % key)
6363       setattr(self, key, kwargs[key])
6364     for key in keyset:
6365       if key not in kwargs:
6366         raise errors.ProgrammerError("Missing input parameter '%s' to"
6367                                      " IAllocator" % key)
6368     self._BuildInputData()
6369
6370   def _ComputeClusterData(self):
6371     """Compute the generic allocator input data.
6372
6373     This is the data that is independent of the actual operation.
6374
6375     """
6376     cfg = self.lu.cfg
6377     cluster_info = cfg.GetClusterInfo()
6378     # cluster data
6379     data = {
6380       "version": 1,
6381       "cluster_name": cfg.GetClusterName(),
6382       "cluster_tags": list(cluster_info.GetTags()),
6383       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6384       # we don't have job IDs
6385       }
6386     iinfo = cfg.GetAllInstancesInfo().values()
6387     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6388
6389     # node data
6390     node_results = {}
6391     node_list = cfg.GetNodeList()
6392
6393     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6394       hypervisor_name = self.hypervisor
6395     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6396       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6397
6398     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6399                                            hypervisor_name)
6400     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6401                        cluster_info.enabled_hypervisors)
6402     for nname, nresult in node_data.items():
6403       # first fill in static (config-based) values
6404       ninfo = cfg.GetNodeInfo(nname)
6405       pnr = {
6406         "tags": list(ninfo.GetTags()),
6407         "primary_ip": ninfo.primary_ip,
6408         "secondary_ip": ninfo.secondary_ip,
6409         "offline": ninfo.offline,
6410         "master_candidate": ninfo.master_candidate,
6411         }
6412
6413       if not ninfo.offline:
6414         nresult.Raise()
6415         if not isinstance(nresult.data, dict):
6416           raise errors.OpExecError("Can't get data for node %s" % nname)
6417         remote_info = nresult.data
6418         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6419                      'vg_size', 'vg_free', 'cpu_total']:
6420           if attr not in remote_info:
6421             raise errors.OpExecError("Node '%s' didn't return attribute"
6422                                      " '%s'" % (nname, attr))
6423           try:
6424             remote_info[attr] = int(remote_info[attr])
6425           except ValueError, err:
6426             raise errors.OpExecError("Node '%s' returned invalid value"
6427                                      " for '%s': %s" % (nname, attr, err))
6428         # compute memory used by primary instances
6429         i_p_mem = i_p_up_mem = 0
6430         for iinfo, beinfo in i_list:
6431           if iinfo.primary_node == nname:
6432             i_p_mem += beinfo[constants.BE_MEMORY]
6433             if iinfo.name not in node_iinfo[nname].data:
6434               i_used_mem = 0
6435             else:
6436               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6437             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6438             remote_info['memory_free'] -= max(0, i_mem_diff)
6439
6440             if iinfo.admin_up:
6441               i_p_up_mem += beinfo[constants.BE_MEMORY]
6442
6443         # compute memory used by instances
6444         pnr_dyn = {
6445           "total_memory": remote_info['memory_total'],
6446           "reserved_memory": remote_info['memory_dom0'],
6447           "free_memory": remote_info['memory_free'],
6448           "total_disk": remote_info['vg_size'],
6449           "free_disk": remote_info['vg_free'],
6450           "total_cpus": remote_info['cpu_total'],
6451           "i_pri_memory": i_p_mem,
6452           "i_pri_up_memory": i_p_up_mem,
6453           }
6454         pnr.update(pnr_dyn)
6455
6456       node_results[nname] = pnr
6457     data["nodes"] = node_results
6458
6459     # instance data
6460     instance_data = {}
6461     for iinfo, beinfo in i_list:
6462       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6463                   for n in iinfo.nics]
6464       pir = {
6465         "tags": list(iinfo.GetTags()),
6466         "admin_up": iinfo.admin_up,
6467         "vcpus": beinfo[constants.BE_VCPUS],
6468         "memory": beinfo[constants.BE_MEMORY],
6469         "os": iinfo.os,
6470         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6471         "nics": nic_data,
6472         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6473         "disk_template": iinfo.disk_template,
6474         "hypervisor": iinfo.hypervisor,
6475         }
6476       instance_data[iinfo.name] = pir
6477
6478     data["instances"] = instance_data
6479
6480     self.in_data = data
6481
6482   def _AddNewInstance(self):
6483     """Add new instance data to allocator structure.
6484
6485     This in combination with _AllocatorGetClusterData will create the
6486     correct structure needed as input for the allocator.
6487
6488     The checks for the completeness of the opcode must have already been
6489     done.
6490
6491     """
6492     data = self.in_data
6493
6494     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6495
6496     if self.disk_template in constants.DTS_NET_MIRROR:
6497       self.required_nodes = 2
6498     else:
6499       self.required_nodes = 1
6500     request = {
6501       "type": "allocate",
6502       "name": self.name,
6503       "disk_template": self.disk_template,
6504       "tags": self.tags,
6505       "os": self.os,
6506       "vcpus": self.vcpus,
6507       "memory": self.mem_size,
6508       "disks": self.disks,
6509       "disk_space_total": disk_space,
6510       "nics": self.nics,
6511       "required_nodes": self.required_nodes,
6512       }
6513     data["request"] = request
6514
6515   def _AddRelocateInstance(self):
6516     """Add relocate instance data to allocator structure.
6517
6518     This in combination with _IAllocatorGetClusterData will create the
6519     correct structure needed as input for the allocator.
6520
6521     The checks for the completeness of the opcode must have already been
6522     done.
6523
6524     """
6525     instance = self.lu.cfg.GetInstanceInfo(self.name)
6526     if instance is None:
6527       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6528                                    " IAllocator" % self.name)
6529
6530     if instance.disk_template not in constants.DTS_NET_MIRROR:
6531       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6532
6533     if len(instance.secondary_nodes) != 1:
6534       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6535
6536     self.required_nodes = 1
6537     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6538     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6539
6540     request = {
6541       "type": "relocate",
6542       "name": self.name,
6543       "disk_space_total": disk_space,
6544       "required_nodes": self.required_nodes,
6545       "relocate_from": self.relocate_from,
6546       }
6547     self.in_data["request"] = request
6548
6549   def _BuildInputData(self):
6550     """Build input data structures.
6551
6552     """
6553     self._ComputeClusterData()
6554
6555     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6556       self._AddNewInstance()
6557     else:
6558       self._AddRelocateInstance()
6559
6560     self.in_text = serializer.Dump(self.in_data)
6561
6562   def Run(self, name, validate=True, call_fn=None):
6563     """Run an instance allocator and return the results.
6564
6565     """
6566     if call_fn is None:
6567       call_fn = self.lu.rpc.call_iallocator_runner
6568     data = self.in_text
6569
6570     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6571     result.Raise()
6572
6573     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6574       raise errors.OpExecError("Invalid result from master iallocator runner")
6575
6576     rcode, stdout, stderr, fail = result.data
6577
6578     if rcode == constants.IARUN_NOTFOUND:
6579       raise errors.OpExecError("Can't find allocator '%s'" % name)
6580     elif rcode == constants.IARUN_FAILURE:
6581       raise errors.OpExecError("Instance allocator call failed: %s,"
6582                                " output: %s" % (fail, stdout+stderr))
6583     self.out_text = stdout
6584     if validate:
6585       self._ValidateResult()
6586
6587   def _ValidateResult(self):
6588     """Process the allocator results.
6589
6590     This will process and if successful save the result in
6591     self.out_data and the other parameters.
6592
6593     """
6594     try:
6595       rdict = serializer.Load(self.out_text)
6596     except Exception, err:
6597       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6598
6599     if not isinstance(rdict, dict):
6600       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6601
6602     for key in "success", "info", "nodes":
6603       if key not in rdict:
6604         raise errors.OpExecError("Can't parse iallocator results:"
6605                                  " missing key '%s'" % key)
6606       setattr(self, key, rdict[key])
6607
6608     if not isinstance(rdict["nodes"], list):
6609       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6610                                " is not a list")
6611     self.out_data = rdict
6612
6613
6614 class LUTestAllocator(NoHooksLU):
6615   """Run allocator tests.
6616
6617   This LU runs the allocator tests
6618
6619   """
6620   _OP_REQP = ["direction", "mode", "name"]
6621
6622   def CheckPrereq(self):
6623     """Check prerequisites.
6624
6625     This checks the opcode parameters depending on the director and mode test.
6626
6627     """
6628     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6629       for attr in ["name", "mem_size", "disks", "disk_template",
6630                    "os", "tags", "nics", "vcpus"]:
6631         if not hasattr(self.op, attr):
6632           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6633                                      attr)
6634       iname = self.cfg.ExpandInstanceName(self.op.name)
6635       if iname is not None:
6636         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6637                                    iname)
6638       if not isinstance(self.op.nics, list):
6639         raise errors.OpPrereqError("Invalid parameter 'nics'")
6640       for row in self.op.nics:
6641         if (not isinstance(row, dict) or
6642             "mac" not in row or
6643             "ip" not in row or
6644             "bridge" not in row):
6645           raise errors.OpPrereqError("Invalid contents of the"
6646                                      " 'nics' parameter")
6647       if not isinstance(self.op.disks, list):
6648         raise errors.OpPrereqError("Invalid parameter 'disks'")
6649       for row in self.op.disks:
6650         if (not isinstance(row, dict) or
6651             "size" not in row or
6652             not isinstance(row["size"], int) or
6653             "mode" not in row or
6654             row["mode"] not in ['r', 'w']):
6655           raise errors.OpPrereqError("Invalid contents of the"
6656                                      " 'disks' parameter")
6657       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6658         self.op.hypervisor = self.cfg.GetHypervisorType()
6659     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6660       if not hasattr(self.op, "name"):
6661         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6662       fname = self.cfg.ExpandInstanceName(self.op.name)
6663       if fname is None:
6664         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6665                                    self.op.name)
6666       self.op.name = fname
6667       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6668     else:
6669       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6670                                  self.op.mode)
6671
6672     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6673       if not hasattr(self.op, "allocator") or self.op.allocator is None:
6674         raise errors.OpPrereqError("Missing allocator name")
6675     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6676       raise errors.OpPrereqError("Wrong allocator test '%s'" %
6677                                  self.op.direction)
6678
6679   def Exec(self, feedback_fn):
6680     """Run the allocator test.
6681
6682     """
6683     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6684       ial = IAllocator(self,
6685                        mode=self.op.mode,
6686                        name=self.op.name,
6687                        mem_size=self.op.mem_size,
6688                        disks=self.op.disks,
6689                        disk_template=self.op.disk_template,
6690                        os=self.op.os,
6691                        tags=self.op.tags,
6692                        nics=self.op.nics,
6693                        vcpus=self.op.vcpus,
6694                        hypervisor=self.op.hypervisor,
6695                        )
6696     else:
6697       ial = IAllocator(self,
6698                        mode=self.op.mode,
6699                        name=self.op.name,
6700                        relocate_from=list(self.relocate_from),
6701                        )
6702
6703     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6704       result = ial.in_text
6705     else:
6706       ial.Run(self.op.allocator, validate=False)
6707       result = ial.out_text
6708     return result