Parallelize instance operations on the same node
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_MASTER = True
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90     # logging
91     self.LogWarning = processor.LogWarning
92     self.LogInfo = processor.LogInfo
93
94     for attr_name in self._OP_REQP:
95       attr_val = getattr(op, attr_name, None)
96       if attr_val is None:
97         raise errors.OpPrereqError("Required parameter '%s' missing" %
98                                    attr_name)
99
100     if not self.cfg.IsCluster():
101       raise errors.OpPrereqError("Cluster not initialized yet,"
102                                  " use 'gnt-cluster init' first.")
103     if self.REQ_MASTER:
104       master = self.cfg.GetMasterNode()
105       if master != utils.HostInfo().name:
106         raise errors.OpPrereqError("Commands must be run on the master"
107                                    " node %s" % master)
108
109   def __GetSSH(self):
110     """Returns the SshRunner object
111
112     """
113     if not self.__ssh:
114       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115     return self.__ssh
116
117   ssh = property(fget=__GetSSH)
118
119   def ExpandNames(self):
120     """Expand names for this LU.
121
122     This method is called before starting to execute the opcode, and it should
123     update all the parameters of the opcode to their canonical form (e.g. a
124     short node name must be fully expanded after this method has successfully
125     completed). This way locking, hooks, logging, ecc. can work correctly.
126
127     LUs which implement this method must also populate the self.needed_locks
128     member, as a dict with lock levels as keys, and a list of needed lock names
129     as values. Rules:
130
131       - use an empty dict if you don't need any lock
132       - if you don't need any lock at a particular level omit that level
133       - don't put anything for the BGL level
134       - if you want all locks at a level use locking.ALL_SET as a value
135
136     If you need to share locks (rather than acquire them exclusively) at one
137     level you can modify self.share_locks, setting a true value (usually 1) for
138     that level. By default locks are not shared.
139
140     Examples::
141
142       # Acquire all nodes and one instance
143       self.needed_locks = {
144         locking.LEVEL_NODE: locking.ALL_SET,
145         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
146       }
147       # Acquire just two nodes
148       self.needed_locks = {
149         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
150       }
151       # Acquire no locks
152       self.needed_locks = {} # No, you can't leave it to the default value None
153
154     """
155     # The implementation of this method is mandatory only if the new LU is
156     # concurrent, so that old LUs don't need to be changed all at the same
157     # time.
158     if self.REQ_BGL:
159       self.needed_locks = {} # Exclusive LUs don't need locks.
160     else:
161       raise NotImplementedError
162
163   def DeclareLocks(self, level):
164     """Declare LU locking needs for a level
165
166     While most LUs can just declare their locking needs at ExpandNames time,
167     sometimes there's the need to calculate some locks after having acquired
168     the ones before. This function is called just before acquiring locks at a
169     particular level, but after acquiring the ones at lower levels, and permits
170     such calculations. It can be used to modify self.needed_locks, and by
171     default it does nothing.
172
173     This function is only called if you have something already set in
174     self.needed_locks for the level.
175
176     @param level: Locking level which is going to be locked
177     @type level: member of ganeti.locking.LEVELS
178
179     """
180
181   def CheckPrereq(self):
182     """Check prerequisites for this LU.
183
184     This method should check that the prerequisites for the execution
185     of this LU are fulfilled. It can do internode communication, but
186     it should be idempotent - no cluster or system changes are
187     allowed.
188
189     The method should raise errors.OpPrereqError in case something is
190     not fulfilled. Its return value is ignored.
191
192     This method should also update all the parameters of the opcode to
193     their canonical form if it hasn't been done by ExpandNames before.
194
195     """
196     raise NotImplementedError
197
198   def Exec(self, feedback_fn):
199     """Execute the LU.
200
201     This method should implement the actual work. It should raise
202     errors.OpExecError for failures that are somewhat dealt with in
203     code, or expected.
204
205     """
206     raise NotImplementedError
207
208   def BuildHooksEnv(self):
209     """Build hooks environment for this LU.
210
211     This method should return a three-node tuple consisting of: a dict
212     containing the environment that will be used for running the
213     specific hook for this LU, a list of node names on which the hook
214     should run before the execution, and a list of node names on which
215     the hook should run after the execution.
216
217     The keys of the dict must not have 'GANETI_' prefixed as this will
218     be handled in the hooks runner. Also note additional keys will be
219     added by the hooks runner. If the LU doesn't define any
220     environment, an empty dict (and not None) should be returned.
221
222     No nodes should be returned as an empty list (and not None).
223
224     Note that if the HPATH for a LU class is None, this function will
225     not be called.
226
227     """
228     raise NotImplementedError
229
230   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
231     """Notify the LU about the results of its hooks.
232
233     This method is called every time a hooks phase is executed, and notifies
234     the Logical Unit about the hooks' result. The LU can then use it to alter
235     its result based on the hooks.  By default the method does nothing and the
236     previous result is passed back unchanged but any LU can define it if it
237     wants to use the local cluster hook-scripts somehow.
238
239     @param phase: one of L{constants.HOOKS_PHASE_POST} or
240         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
241     @param hook_results: the results of the multi-node hooks rpc call
242     @param feedback_fn: function used send feedback back to the caller
243     @param lu_result: the previous Exec result this LU had, or None
244         in the PRE phase
245     @return: the new Exec result, based on the previous result
246         and hook results
247
248     """
249     return lu_result
250
251   def _ExpandAndLockInstance(self):
252     """Helper function to expand and lock an instance.
253
254     Many LUs that work on an instance take its name in self.op.instance_name
255     and need to expand it and then declare the expanded name for locking. This
256     function does it, and then updates self.op.instance_name to the expanded
257     name. It also initializes needed_locks as a dict, if this hasn't been done
258     before.
259
260     """
261     if self.needed_locks is None:
262       self.needed_locks = {}
263     else:
264       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
265         "_ExpandAndLockInstance called with instance-level locks set"
266     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
267     if expanded_name is None:
268       raise errors.OpPrereqError("Instance '%s' not known" %
269                                   self.op.instance_name)
270     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
271     self.op.instance_name = expanded_name
272
273   def _LockInstancesNodes(self, primary_only=False):
274     """Helper function to declare instances' nodes for locking.
275
276     This function should be called after locking one or more instances to lock
277     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
278     with all primary or secondary nodes for instances already locked and
279     present in self.needed_locks[locking.LEVEL_INSTANCE].
280
281     It should be called from DeclareLocks, and for safety only works if
282     self.recalculate_locks[locking.LEVEL_NODE] is set.
283
284     In the future it may grow parameters to just lock some instance's nodes, or
285     to just lock primaries or secondary nodes, if needed.
286
287     If should be called in DeclareLocks in a way similar to::
288
289       if level == locking.LEVEL_NODE:
290         self._LockInstancesNodes()
291
292     @type primary_only: boolean
293     @param primary_only: only lock primary nodes of locked instances
294
295     """
296     assert locking.LEVEL_NODE in self.recalculate_locks, \
297       "_LockInstancesNodes helper function called with no nodes to recalculate"
298
299     # TODO: check if we're really been called with the instance locks held
300
301     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
302     # future we might want to have different behaviors depending on the value
303     # of self.recalculate_locks[locking.LEVEL_NODE]
304     wanted_nodes = []
305     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
306       instance = self.context.cfg.GetInstanceInfo(instance_name)
307       wanted_nodes.append(instance.primary_node)
308       if not primary_only:
309         wanted_nodes.extend(instance.secondary_nodes)
310
311     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
312       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
313     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
314       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
315
316     del self.recalculate_locks[locking.LEVEL_NODE]
317
318
319 class NoHooksLU(LogicalUnit):
320   """Simple LU which runs no hooks.
321
322   This LU is intended as a parent for other LogicalUnits which will
323   run no hooks, in order to reduce duplicate code.
324
325   """
326   HPATH = None
327   HTYPE = None
328
329
330 def _GetWantedNodes(lu, nodes):
331   """Returns list of checked and expanded node names.
332
333   @type lu: L{LogicalUnit}
334   @param lu: the logical unit on whose behalf we execute
335   @type nodes: list
336   @param nodes: list of node names or None for all nodes
337   @rtype: list
338   @return: the list of nodes, sorted
339   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
340
341   """
342   if not isinstance(nodes, list):
343     raise errors.OpPrereqError("Invalid argument type 'nodes'")
344
345   if not nodes:
346     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
347       " non-empty list of nodes whose name is to be expanded.")
348
349   wanted = []
350   for name in nodes:
351     node = lu.cfg.ExpandNodeName(name)
352     if node is None:
353       raise errors.OpPrereqError("No such node name '%s'" % name)
354     wanted.append(node)
355
356   return utils.NiceSort(wanted)
357
358
359 def _GetWantedInstances(lu, instances):
360   """Returns list of checked and expanded instance names.
361
362   @type lu: L{LogicalUnit}
363   @param lu: the logical unit on whose behalf we execute
364   @type instances: list
365   @param instances: list of instance names or None for all instances
366   @rtype: list
367   @return: the list of instances, sorted
368   @raise errors.OpPrereqError: if the instances parameter is wrong type
369   @raise errors.OpPrereqError: if any of the passed instances is not found
370
371   """
372   if not isinstance(instances, list):
373     raise errors.OpPrereqError("Invalid argument type 'instances'")
374
375   if instances:
376     wanted = []
377
378     for name in instances:
379       instance = lu.cfg.ExpandInstanceName(name)
380       if instance is None:
381         raise errors.OpPrereqError("No such instance name '%s'" % name)
382       wanted.append(instance)
383
384   else:
385     wanted = lu.cfg.GetInstanceList()
386   return utils.NiceSort(wanted)
387
388
389 def _CheckOutputFields(static, dynamic, selected):
390   """Checks whether all selected fields are valid.
391
392   @type static: L{utils.FieldSet}
393   @param static: static fields set
394   @type dynamic: L{utils.FieldSet}
395   @param dynamic: dynamic fields set
396
397   """
398   f = utils.FieldSet()
399   f.Extend(static)
400   f.Extend(dynamic)
401
402   delta = f.NonMatching(selected)
403   if delta:
404     raise errors.OpPrereqError("Unknown output fields selected: %s"
405                                % ",".join(delta))
406
407
408 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
409                           memory, vcpus, nics):
410   """Builds instance related env variables for hooks
411
412   This builds the hook environment from individual variables.
413
414   @type name: string
415   @param name: the name of the instance
416   @type primary_node: string
417   @param primary_node: the name of the instance's primary node
418   @type secondary_nodes: list
419   @param secondary_nodes: list of secondary nodes as strings
420   @type os_type: string
421   @param os_type: the name of the instance's OS
422   @type status: string
423   @param status: the desired status of the instances
424   @type memory: string
425   @param memory: the memory size of the instance
426   @type vcpus: string
427   @param vcpus: the count of VCPUs the instance has
428   @type nics: list
429   @param nics: list of tuples (ip, bridge, mac) representing
430       the NICs the instance  has
431   @rtype: dict
432   @return: the hook environment for this instance
433
434   """
435   env = {
436     "OP_TARGET": name,
437     "INSTANCE_NAME": name,
438     "INSTANCE_PRIMARY": primary_node,
439     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
440     "INSTANCE_OS_TYPE": os_type,
441     "INSTANCE_STATUS": status,
442     "INSTANCE_MEMORY": memory,
443     "INSTANCE_VCPUS": vcpus,
444   }
445
446   if nics:
447     nic_count = len(nics)
448     for idx, (ip, bridge, mac) in enumerate(nics):
449       if ip is None:
450         ip = ""
451       env["INSTANCE_NIC%d_IP" % idx] = ip
452       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
453       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
454   else:
455     nic_count = 0
456
457   env["INSTANCE_NIC_COUNT"] = nic_count
458
459   return env
460
461
462 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
463   """Builds instance related env variables for hooks from an object.
464
465   @type lu: L{LogicalUnit}
466   @param lu: the logical unit on whose behalf we execute
467   @type instance: L{objects.Instance}
468   @param instance: the instance for which we should build the
469       environment
470   @type override: dict
471   @param override: dictionary with key/values that will override
472       our values
473   @rtype: dict
474   @return: the hook environment dictionary
475
476   """
477   bep = lu.cfg.GetClusterInfo().FillBE(instance)
478   args = {
479     'name': instance.name,
480     'primary_node': instance.primary_node,
481     'secondary_nodes': instance.secondary_nodes,
482     'os_type': instance.os,
483     'status': instance.os,
484     'memory': bep[constants.BE_MEMORY],
485     'vcpus': bep[constants.BE_VCPUS],
486     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
487   }
488   if override:
489     args.update(override)
490   return _BuildInstanceHookEnv(**args)
491
492
493 def _CheckInstanceBridgesExist(lu, instance):
494   """Check that the brigdes needed by an instance exist.
495
496   """
497   # check bridges existance
498   brlist = [nic.bridge for nic in instance.nics]
499   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
500     raise errors.OpPrereqError("one or more target bridges %s does not"
501                                " exist on destination node '%s'" %
502                                (brlist, instance.primary_node))
503
504
505 class LUDestroyCluster(NoHooksLU):
506   """Logical unit for destroying the cluster.
507
508   """
509   _OP_REQP = []
510
511   def CheckPrereq(self):
512     """Check prerequisites.
513
514     This checks whether the cluster is empty.
515
516     Any errors are signalled by raising errors.OpPrereqError.
517
518     """
519     master = self.cfg.GetMasterNode()
520
521     nodelist = self.cfg.GetNodeList()
522     if len(nodelist) != 1 or nodelist[0] != master:
523       raise errors.OpPrereqError("There are still %d node(s) in"
524                                  " this cluster." % (len(nodelist) - 1))
525     instancelist = self.cfg.GetInstanceList()
526     if instancelist:
527       raise errors.OpPrereqError("There are still %d instance(s) in"
528                                  " this cluster." % len(instancelist))
529
530   def Exec(self, feedback_fn):
531     """Destroys the cluster.
532
533     """
534     master = self.cfg.GetMasterNode()
535     if not self.rpc.call_node_stop_master(master, False):
536       raise errors.OpExecError("Could not disable the master role")
537     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
538     utils.CreateBackup(priv_key)
539     utils.CreateBackup(pub_key)
540     return master
541
542
543 class LUVerifyCluster(LogicalUnit):
544   """Verifies the cluster status.
545
546   """
547   HPATH = "cluster-verify"
548   HTYPE = constants.HTYPE_CLUSTER
549   _OP_REQP = ["skip_checks"]
550   REQ_BGL = False
551
552   def ExpandNames(self):
553     self.needed_locks = {
554       locking.LEVEL_NODE: locking.ALL_SET,
555       locking.LEVEL_INSTANCE: locking.ALL_SET,
556     }
557     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
558
559   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
560                   remote_version, feedback_fn):
561     """Run multiple tests against a node.
562
563     Test list::
564
565       - compares ganeti version
566       - checks vg existance and size > 20G
567       - checks config file checksum
568       - checks ssh to other nodes
569
570     @type node: string
571     @param node: the name of the node to check
572     @param file_list: required list of files
573     @param local_cksum: dictionary of local files and their checksums
574     @type vglist: dict
575     @param vglist: dictionary of volume group names and their size
576     @param node_result: the results from the node
577     @param remote_version: the RPC version from the remote node
578     @param feedback_fn: function used to accumulate results
579
580     """
581     # compares ganeti version
582     local_version = constants.PROTOCOL_VERSION
583     if not remote_version:
584       feedback_fn("  - ERROR: connection to %s failed" % (node))
585       return True
586
587     if local_version != remote_version:
588       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
589                       (local_version, node, remote_version))
590       return True
591
592     # checks vg existance and size > 20G
593
594     bad = False
595     if not vglist:
596       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
597                       (node,))
598       bad = True
599     else:
600       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
601                                             constants.MIN_VG_SIZE)
602       if vgstatus:
603         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
604         bad = True
605
606     if not node_result:
607       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
608       return True
609
610     # checks config file checksum
611     # checks ssh to any
612
613     if 'filelist' not in node_result:
614       bad = True
615       feedback_fn("  - ERROR: node hasn't returned file checksum data")
616     else:
617       remote_cksum = node_result['filelist']
618       for file_name in file_list:
619         if file_name not in remote_cksum:
620           bad = True
621           feedback_fn("  - ERROR: file '%s' missing" % file_name)
622         elif remote_cksum[file_name] != local_cksum[file_name]:
623           bad = True
624           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
625
626     if 'nodelist' not in node_result:
627       bad = True
628       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
629     else:
630       if node_result['nodelist']:
631         bad = True
632         for node in node_result['nodelist']:
633           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
634                           (node, node_result['nodelist'][node]))
635     if 'node-net-test' not in node_result:
636       bad = True
637       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
638     else:
639       if node_result['node-net-test']:
640         bad = True
641         nlist = utils.NiceSort(node_result['node-net-test'].keys())
642         for node in nlist:
643           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
644                           (node, node_result['node-net-test'][node]))
645
646     hyp_result = node_result.get('hypervisor', None)
647     if isinstance(hyp_result, dict):
648       for hv_name, hv_result in hyp_result.iteritems():
649         if hv_result is not None:
650           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
651                       (hv_name, hv_result))
652     return bad
653
654   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
655                       node_instance, feedback_fn):
656     """Verify an instance.
657
658     This function checks to see if the required block devices are
659     available on the instance's node.
660
661     """
662     bad = False
663
664     node_current = instanceconfig.primary_node
665
666     node_vol_should = {}
667     instanceconfig.MapLVsByNode(node_vol_should)
668
669     for node in node_vol_should:
670       for volume in node_vol_should[node]:
671         if node not in node_vol_is or volume not in node_vol_is[node]:
672           feedback_fn("  - ERROR: volume %s missing on node %s" %
673                           (volume, node))
674           bad = True
675
676     if not instanceconfig.status == 'down':
677       if (node_current not in node_instance or
678           not instance in node_instance[node_current]):
679         feedback_fn("  - ERROR: instance %s not running on node %s" %
680                         (instance, node_current))
681         bad = True
682
683     for node in node_instance:
684       if (not node == node_current):
685         if instance in node_instance[node]:
686           feedback_fn("  - ERROR: instance %s should not run on node %s" %
687                           (instance, node))
688           bad = True
689
690     return bad
691
692   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
693     """Verify if there are any unknown volumes in the cluster.
694
695     The .os, .swap and backup volumes are ignored. All other volumes are
696     reported as unknown.
697
698     """
699     bad = False
700
701     for node in node_vol_is:
702       for volume in node_vol_is[node]:
703         if node not in node_vol_should or volume not in node_vol_should[node]:
704           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
705                       (volume, node))
706           bad = True
707     return bad
708
709   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
710     """Verify the list of running instances.
711
712     This checks what instances are running but unknown to the cluster.
713
714     """
715     bad = False
716     for node in node_instance:
717       for runninginstance in node_instance[node]:
718         if runninginstance not in instancelist:
719           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
720                           (runninginstance, node))
721           bad = True
722     return bad
723
724   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
725     """Verify N+1 Memory Resilience.
726
727     Check that if one single node dies we can still start all the instances it
728     was primary for.
729
730     """
731     bad = False
732
733     for node, nodeinfo in node_info.iteritems():
734       # This code checks that every node which is now listed as secondary has
735       # enough memory to host all instances it is supposed to should a single
736       # other node in the cluster fail.
737       # FIXME: not ready for failover to an arbitrary node
738       # FIXME: does not support file-backed instances
739       # WARNING: we currently take into account down instances as well as up
740       # ones, considering that even if they're down someone might want to start
741       # them even in the event of a node failure.
742       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
743         needed_mem = 0
744         for instance in instances:
745           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
746           if bep[constants.BE_AUTO_BALANCE]:
747             needed_mem += bep[constants.BE_MEMORY]
748         if nodeinfo['mfree'] < needed_mem:
749           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
750                       " failovers should node %s fail" % (node, prinode))
751           bad = True
752     return bad
753
754   def CheckPrereq(self):
755     """Check prerequisites.
756
757     Transform the list of checks we're going to skip into a set and check that
758     all its members are valid.
759
760     """
761     self.skip_set = frozenset(self.op.skip_checks)
762     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
763       raise errors.OpPrereqError("Invalid checks to be skipped specified")
764
765   def BuildHooksEnv(self):
766     """Build hooks env.
767
768     Cluster-Verify hooks just rone in the post phase and their failure makes
769     the output be logged in the verify output and the verification to fail.
770
771     """
772     all_nodes = self.cfg.GetNodeList()
773     # TODO: populate the environment with useful information for verify hooks
774     env = {}
775     return env, [], all_nodes
776
777   def Exec(self, feedback_fn):
778     """Verify integrity of cluster, performing various test on nodes.
779
780     """
781     bad = False
782     feedback_fn("* Verifying global settings")
783     for msg in self.cfg.VerifyConfig():
784       feedback_fn("  - ERROR: %s" % msg)
785
786     vg_name = self.cfg.GetVGName()
787     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
788     nodelist = utils.NiceSort(self.cfg.GetNodeList())
789     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
790     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
791     i_non_redundant = [] # Non redundant instances
792     i_non_a_balanced = [] # Non auto-balanced instances
793     node_volume = {}
794     node_instance = {}
795     node_info = {}
796     instance_cfg = {}
797
798     # FIXME: verify OS list
799     # do local checksums
800     file_names = []
801     file_names.append(constants.SSL_CERT_FILE)
802     file_names.append(constants.CLUSTER_CONF_FILE)
803     local_checksums = utils.FingerprintFiles(file_names)
804
805     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
806     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
807     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
808     all_vglist = self.rpc.call_vg_list(nodelist)
809     node_verify_param = {
810       'filelist': file_names,
811       'nodelist': nodelist,
812       'hypervisor': hypervisors,
813       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
814                         for node in nodeinfo]
815       }
816     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
817                                            self.cfg.GetClusterName())
818     all_rversion = self.rpc.call_version(nodelist)
819     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
820                                         self.cfg.GetHypervisorType())
821
822     cluster = self.cfg.GetClusterInfo()
823     for node in nodelist:
824       feedback_fn("* Verifying node %s" % node)
825       result = self._VerifyNode(node, file_names, local_checksums,
826                                 all_vglist[node], all_nvinfo[node],
827                                 all_rversion[node], feedback_fn)
828       bad = bad or result
829
830       # node_volume
831       volumeinfo = all_volumeinfo[node]
832
833       if isinstance(volumeinfo, basestring):
834         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
835                     (node, volumeinfo[-400:].encode('string_escape')))
836         bad = True
837         node_volume[node] = {}
838       elif not isinstance(volumeinfo, dict):
839         feedback_fn("  - ERROR: connection to %s failed" % (node,))
840         bad = True
841         continue
842       else:
843         node_volume[node] = volumeinfo
844
845       # node_instance
846       nodeinstance = all_instanceinfo[node]
847       if type(nodeinstance) != list:
848         feedback_fn("  - ERROR: connection to %s failed" % (node,))
849         bad = True
850         continue
851
852       node_instance[node] = nodeinstance
853
854       # node_info
855       nodeinfo = all_ninfo[node]
856       if not isinstance(nodeinfo, dict):
857         feedback_fn("  - ERROR: connection to %s failed" % (node,))
858         bad = True
859         continue
860
861       try:
862         node_info[node] = {
863           "mfree": int(nodeinfo['memory_free']),
864           "dfree": int(nodeinfo['vg_free']),
865           "pinst": [],
866           "sinst": [],
867           # dictionary holding all instances this node is secondary for,
868           # grouped by their primary node. Each key is a cluster node, and each
869           # value is a list of instances which have the key as primary and the
870           # current node as secondary.  this is handy to calculate N+1 memory
871           # availability if you can only failover from a primary to its
872           # secondary.
873           "sinst-by-pnode": {},
874         }
875       except ValueError:
876         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
877         bad = True
878         continue
879
880     node_vol_should = {}
881
882     for instance in instancelist:
883       feedback_fn("* Verifying instance %s" % instance)
884       inst_config = self.cfg.GetInstanceInfo(instance)
885       result =  self._VerifyInstance(instance, inst_config, node_volume,
886                                      node_instance, feedback_fn)
887       bad = bad or result
888
889       inst_config.MapLVsByNode(node_vol_should)
890
891       instance_cfg[instance] = inst_config
892
893       pnode = inst_config.primary_node
894       if pnode in node_info:
895         node_info[pnode]['pinst'].append(instance)
896       else:
897         feedback_fn("  - ERROR: instance %s, connection to primary node"
898                     " %s failed" % (instance, pnode))
899         bad = True
900
901       # If the instance is non-redundant we cannot survive losing its primary
902       # node, so we are not N+1 compliant. On the other hand we have no disk
903       # templates with more than one secondary so that situation is not well
904       # supported either.
905       # FIXME: does not support file-backed instances
906       if len(inst_config.secondary_nodes) == 0:
907         i_non_redundant.append(instance)
908       elif len(inst_config.secondary_nodes) > 1:
909         feedback_fn("  - WARNING: multiple secondaries for instance %s"
910                     % instance)
911
912       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
913         i_non_a_balanced.append(instance)
914
915       for snode in inst_config.secondary_nodes:
916         if snode in node_info:
917           node_info[snode]['sinst'].append(instance)
918           if pnode not in node_info[snode]['sinst-by-pnode']:
919             node_info[snode]['sinst-by-pnode'][pnode] = []
920           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921         else:
922           feedback_fn("  - ERROR: instance %s, connection to secondary node"
923                       " %s failed" % (instance, snode))
924
925     feedback_fn("* Verifying orphan volumes")
926     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927                                        feedback_fn)
928     bad = bad or result
929
930     feedback_fn("* Verifying remaining instances")
931     result = self._VerifyOrphanInstances(instancelist, node_instance,
932                                          feedback_fn)
933     bad = bad or result
934
935     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936       feedback_fn("* Verifying N+1 Memory redundancy")
937       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938       bad = bad or result
939
940     feedback_fn("* Other Notes")
941     if i_non_redundant:
942       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943                   % len(i_non_redundant))
944
945     if i_non_a_balanced:
946       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
947                   % len(i_non_a_balanced))
948
949     return not bad
950
951   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
952     """Analize the post-hooks' result
953
954     This method analyses the hook result, handles it, and sends some
955     nicely-formatted feedback back to the user.
956
957     @param phase: one of L{constants.HOOKS_PHASE_POST} or
958         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
959     @param hooks_results: the results of the multi-node hooks rpc call
960     @param feedback_fn: function used send feedback back to the caller
961     @param lu_result: previous Exec result
962     @return: the new Exec result, based on the previous result
963         and hook results
964
965     """
966     # We only really run POST phase hooks, and are only interested in
967     # their results
968     if phase == constants.HOOKS_PHASE_POST:
969       # Used to change hooks' output to proper indentation
970       indent_re = re.compile('^', re.M)
971       feedback_fn("* Hooks Results")
972       if not hooks_results:
973         feedback_fn("  - ERROR: general communication failure")
974         lu_result = 1
975       else:
976         for node_name in hooks_results:
977           show_node_header = True
978           res = hooks_results[node_name]
979           if res is False or not isinstance(res, list):
980             feedback_fn("    Communication failure")
981             lu_result = 1
982             continue
983           for script, hkr, output in res:
984             if hkr == constants.HKR_FAIL:
985               # The node header is only shown once, if there are
986               # failing hooks on that node
987               if show_node_header:
988                 feedback_fn("  Node %s:" % node_name)
989                 show_node_header = False
990               feedback_fn("    ERROR: Script %s failed, output:" % script)
991               output = indent_re.sub('      ', output)
992               feedback_fn("%s" % output)
993               lu_result = 1
994
995       return lu_result
996
997
998 class LUVerifyDisks(NoHooksLU):
999   """Verifies the cluster disks status.
1000
1001   """
1002   _OP_REQP = []
1003   REQ_BGL = False
1004
1005   def ExpandNames(self):
1006     self.needed_locks = {
1007       locking.LEVEL_NODE: locking.ALL_SET,
1008       locking.LEVEL_INSTANCE: locking.ALL_SET,
1009     }
1010     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1011
1012   def CheckPrereq(self):
1013     """Check prerequisites.
1014
1015     This has no prerequisites.
1016
1017     """
1018     pass
1019
1020   def Exec(self, feedback_fn):
1021     """Verify integrity of cluster disks.
1022
1023     """
1024     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1025
1026     vg_name = self.cfg.GetVGName()
1027     nodes = utils.NiceSort(self.cfg.GetNodeList())
1028     instances = [self.cfg.GetInstanceInfo(name)
1029                  for name in self.cfg.GetInstanceList()]
1030
1031     nv_dict = {}
1032     for inst in instances:
1033       inst_lvs = {}
1034       if (inst.status != "up" or
1035           inst.disk_template not in constants.DTS_NET_MIRROR):
1036         continue
1037       inst.MapLVsByNode(inst_lvs)
1038       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039       for node, vol_list in inst_lvs.iteritems():
1040         for vol in vol_list:
1041           nv_dict[(node, vol)] = inst
1042
1043     if not nv_dict:
1044       return result
1045
1046     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1047
1048     to_act = set()
1049     for node in nodes:
1050       # node_volume
1051       lvs = node_lvs[node]
1052
1053       if isinstance(lvs, basestring):
1054         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1055         res_nlvm[node] = lvs
1056       elif not isinstance(lvs, dict):
1057         logging.warning("Connection to node %s failed or invalid data"
1058                         " returned", node)
1059         res_nodes.append(node)
1060         continue
1061
1062       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063         inst = nv_dict.pop((node, lv_name), None)
1064         if (not lv_online and inst is not None
1065             and inst.name not in res_instances):
1066           res_instances.append(inst.name)
1067
1068     # any leftover items in nv_dict are missing LVs, let's arrange the
1069     # data better
1070     for key, inst in nv_dict.iteritems():
1071       if inst.name not in res_missing:
1072         res_missing[inst.name] = []
1073       res_missing[inst.name].append(key)
1074
1075     return result
1076
1077
1078 class LURenameCluster(LogicalUnit):
1079   """Rename the cluster.
1080
1081   """
1082   HPATH = "cluster-rename"
1083   HTYPE = constants.HTYPE_CLUSTER
1084   _OP_REQP = ["name"]
1085
1086   def BuildHooksEnv(self):
1087     """Build hooks env.
1088
1089     """
1090     env = {
1091       "OP_TARGET": self.cfg.GetClusterName(),
1092       "NEW_NAME": self.op.name,
1093       }
1094     mn = self.cfg.GetMasterNode()
1095     return env, [mn], [mn]
1096
1097   def CheckPrereq(self):
1098     """Verify that the passed name is a valid one.
1099
1100     """
1101     hostname = utils.HostInfo(self.op.name)
1102
1103     new_name = hostname.name
1104     self.ip = new_ip = hostname.ip
1105     old_name = self.cfg.GetClusterName()
1106     old_ip = self.cfg.GetMasterIP()
1107     if new_name == old_name and new_ip == old_ip:
1108       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109                                  " cluster has changed")
1110     if new_ip != old_ip:
1111       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1112         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1113                                    " reachable on the network. Aborting." %
1114                                    new_ip)
1115
1116     self.op.name = new_name
1117
1118   def Exec(self, feedback_fn):
1119     """Rename the cluster.
1120
1121     """
1122     clustername = self.op.name
1123     ip = self.ip
1124
1125     # shutdown the master IP
1126     master = self.cfg.GetMasterNode()
1127     if not self.rpc.call_node_stop_master(master, False):
1128       raise errors.OpExecError("Could not disable the master role")
1129
1130     try:
1131       # modify the sstore
1132       # TODO: sstore
1133       ss.SetKey(ss.SS_MASTER_IP, ip)
1134       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1135
1136       # Distribute updated ss config to all nodes
1137       myself = self.cfg.GetNodeInfo(master)
1138       dist_nodes = self.cfg.GetNodeList()
1139       if myself.name in dist_nodes:
1140         dist_nodes.remove(myself.name)
1141
1142       logging.debug("Copying updated ssconf data to all nodes")
1143       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1144         fname = ss.KeyToFilename(keyname)
1145         result = self.rpc.call_upload_file(dist_nodes, fname)
1146         for to_node in dist_nodes:
1147           if not result[to_node]:
1148             self.LogWarning("Copy of file %s to node %s failed",
1149                             fname, to_node)
1150     finally:
1151       if not self.rpc.call_node_start_master(master, False):
1152         self.LogWarning("Could not re-enable the master role on"
1153                         " the master, please restart manually.")
1154
1155
1156 def _RecursiveCheckIfLVMBased(disk):
1157   """Check if the given disk or its children are lvm-based.
1158
1159   @type disk: L{objects.Disk}
1160   @param disk: the disk to check
1161   @rtype: booleean
1162   @return: boolean indicating whether a LD_LV dev_type was found or not
1163
1164   """
1165   if disk.children:
1166     for chdisk in disk.children:
1167       if _RecursiveCheckIfLVMBased(chdisk):
1168         return True
1169   return disk.dev_type == constants.LD_LV
1170
1171
1172 class LUSetClusterParams(LogicalUnit):
1173   """Change the parameters of the cluster.
1174
1175   """
1176   HPATH = "cluster-modify"
1177   HTYPE = constants.HTYPE_CLUSTER
1178   _OP_REQP = []
1179   REQ_BGL = False
1180
1181   def ExpandNames(self):
1182     # FIXME: in the future maybe other cluster params won't require checking on
1183     # all nodes to be modified.
1184     self.needed_locks = {
1185       locking.LEVEL_NODE: locking.ALL_SET,
1186     }
1187     self.share_locks[locking.LEVEL_NODE] = 1
1188
1189   def BuildHooksEnv(self):
1190     """Build hooks env.
1191
1192     """
1193     env = {
1194       "OP_TARGET": self.cfg.GetClusterName(),
1195       "NEW_VG_NAME": self.op.vg_name,
1196       }
1197     mn = self.cfg.GetMasterNode()
1198     return env, [mn], [mn]
1199
1200   def CheckPrereq(self):
1201     """Check prerequisites.
1202
1203     This checks whether the given params don't conflict and
1204     if the given volume group is valid.
1205
1206     """
1207     # FIXME: This only works because there is only one parameter that can be
1208     # changed or removed.
1209     if self.op.vg_name is not None and not self.op.vg_name:
1210       instances = self.cfg.GetAllInstancesInfo().values()
1211       for inst in instances:
1212         for disk in inst.disks:
1213           if _RecursiveCheckIfLVMBased(disk):
1214             raise errors.OpPrereqError("Cannot disable lvm storage while"
1215                                        " lvm-based instances exist")
1216
1217     node_list = self.acquired_locks[locking.LEVEL_NODE]
1218
1219     # if vg_name not None, checks given volume group on all nodes
1220     if self.op.vg_name:
1221       vglist = self.rpc.call_vg_list(node_list)
1222       for node in node_list:
1223         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1224                                               constants.MIN_VG_SIZE)
1225         if vgstatus:
1226           raise errors.OpPrereqError("Error on node '%s': %s" %
1227                                      (node, vgstatus))
1228
1229     self.cluster = cluster = self.cfg.GetClusterInfo()
1230     # beparams changes do not need validation (we can't validate?),
1231     # but we still process here
1232     if self.op.beparams:
1233       self.new_beparams = cluster.FillDict(
1234         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1235
1236     # hypervisor list/parameters
1237     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1238     if self.op.hvparams:
1239       if not isinstance(self.op.hvparams, dict):
1240         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1241       for hv_name, hv_dict in self.op.hvparams.items():
1242         if hv_name not in self.new_hvparams:
1243           self.new_hvparams[hv_name] = hv_dict
1244         else:
1245           self.new_hvparams[hv_name].update(hv_dict)
1246
1247     if self.op.enabled_hypervisors is not None:
1248       self.hv_list = self.op.enabled_hypervisors
1249     else:
1250       self.hv_list = cluster.enabled_hypervisors
1251
1252     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1253       # either the enabled list has changed, or the parameters have, validate
1254       for hv_name, hv_params in self.new_hvparams.items():
1255         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1256             (self.op.enabled_hypervisors and
1257              hv_name in self.op.enabled_hypervisors)):
1258           # either this is a new hypervisor, or its parameters have changed
1259           hv_class = hypervisor.GetHypervisor(hv_name)
1260           hv_class.CheckParameterSyntax(hv_params)
1261           _CheckHVParams(self, node_list, hv_name, hv_params)
1262
1263   def Exec(self, feedback_fn):
1264     """Change the parameters of the cluster.
1265
1266     """
1267     if self.op.vg_name is not None:
1268       if self.op.vg_name != self.cfg.GetVGName():
1269         self.cfg.SetVGName(self.op.vg_name)
1270       else:
1271         feedback_fn("Cluster LVM configuration already in desired"
1272                     " state, not changing")
1273     if self.op.hvparams:
1274       self.cluster.hvparams = self.new_hvparams
1275     if self.op.enabled_hypervisors is not None:
1276       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1277     if self.op.beparams:
1278       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1279     self.cfg.Update(self.cluster)
1280
1281
1282 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1283   """Sleep and poll for an instance's disk to sync.
1284
1285   """
1286   if not instance.disks:
1287     return True
1288
1289   if not oneshot:
1290     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1291
1292   node = instance.primary_node
1293
1294   for dev in instance.disks:
1295     lu.cfg.SetDiskID(dev, node)
1296
1297   retries = 0
1298   while True:
1299     max_time = 0
1300     done = True
1301     cumul_degraded = False
1302     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1303     if not rstats:
1304       lu.LogWarning("Can't get any data from node %s", node)
1305       retries += 1
1306       if retries >= 10:
1307         raise errors.RemoteError("Can't contact node %s for mirror data,"
1308                                  " aborting." % node)
1309       time.sleep(6)
1310       continue
1311     retries = 0
1312     for i in range(len(rstats)):
1313       mstat = rstats[i]
1314       if mstat is None:
1315         lu.LogWarning("Can't compute data for node %s/%s",
1316                            node, instance.disks[i].iv_name)
1317         continue
1318       # we ignore the ldisk parameter
1319       perc_done, est_time, is_degraded, _ = mstat
1320       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1321       if perc_done is not None:
1322         done = False
1323         if est_time is not None:
1324           rem_time = "%d estimated seconds remaining" % est_time
1325           max_time = est_time
1326         else:
1327           rem_time = "no time estimate"
1328         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1329                         (instance.disks[i].iv_name, perc_done, rem_time))
1330     if done or oneshot:
1331       break
1332
1333     time.sleep(min(60, max_time))
1334
1335   if done:
1336     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1337   return not cumul_degraded
1338
1339
1340 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1341   """Check that mirrors are not degraded.
1342
1343   The ldisk parameter, if True, will change the test from the
1344   is_degraded attribute (which represents overall non-ok status for
1345   the device(s)) to the ldisk (representing the local storage status).
1346
1347   """
1348   lu.cfg.SetDiskID(dev, node)
1349   if ldisk:
1350     idx = 6
1351   else:
1352     idx = 5
1353
1354   result = True
1355   if on_primary or dev.AssembleOnSecondary():
1356     rstats = lu.rpc.call_blockdev_find(node, dev)
1357     if not rstats:
1358       logging.warning("Node %s: disk degraded, not found or node down", node)
1359       result = False
1360     else:
1361       result = result and (not rstats[idx])
1362   if dev.children:
1363     for child in dev.children:
1364       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1365
1366   return result
1367
1368
1369 class LUDiagnoseOS(NoHooksLU):
1370   """Logical unit for OS diagnose/query.
1371
1372   """
1373   _OP_REQP = ["output_fields", "names"]
1374   REQ_BGL = False
1375   _FIELDS_STATIC = utils.FieldSet()
1376   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1377
1378   def ExpandNames(self):
1379     if self.op.names:
1380       raise errors.OpPrereqError("Selective OS query not supported")
1381
1382     _CheckOutputFields(static=self._FIELDS_STATIC,
1383                        dynamic=self._FIELDS_DYNAMIC,
1384                        selected=self.op.output_fields)
1385
1386     # Lock all nodes, in shared mode
1387     self.needed_locks = {}
1388     self.share_locks[locking.LEVEL_NODE] = 1
1389     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1390
1391   def CheckPrereq(self):
1392     """Check prerequisites.
1393
1394     """
1395
1396   @staticmethod
1397   def _DiagnoseByOS(node_list, rlist):
1398     """Remaps a per-node return list into an a per-os per-node dictionary
1399
1400     @param node_list: a list with the names of all nodes
1401     @param rlist: a map with node names as keys and OS objects as values
1402
1403     @rtype: dict
1404     @returns: a dictionary with osnames as keys and as value another map, with
1405         nodes as keys and list of OS objects as values, eg::
1406
1407           {"debian-etch": {"node1": [<object>,...],
1408                            "node2": [<object>,]}
1409           }
1410
1411     """
1412     all_os = {}
1413     for node_name, nr in rlist.iteritems():
1414       if not nr:
1415         continue
1416       for os_obj in nr:
1417         if os_obj.name not in all_os:
1418           # build a list of nodes for this os containing empty lists
1419           # for each node in node_list
1420           all_os[os_obj.name] = {}
1421           for nname in node_list:
1422             all_os[os_obj.name][nname] = []
1423         all_os[os_obj.name][node_name].append(os_obj)
1424     return all_os
1425
1426   def Exec(self, feedback_fn):
1427     """Compute the list of OSes.
1428
1429     """
1430     node_list = self.acquired_locks[locking.LEVEL_NODE]
1431     node_data = self.rpc.call_os_diagnose(node_list)
1432     if node_data == False:
1433       raise errors.OpExecError("Can't gather the list of OSes")
1434     pol = self._DiagnoseByOS(node_list, node_data)
1435     output = []
1436     for os_name, os_data in pol.iteritems():
1437       row = []
1438       for field in self.op.output_fields:
1439         if field == "name":
1440           val = os_name
1441         elif field == "valid":
1442           val = utils.all([osl and osl[0] for osl in os_data.values()])
1443         elif field == "node_status":
1444           val = {}
1445           for node_name, nos_list in os_data.iteritems():
1446             val[node_name] = [(v.status, v.path) for v in nos_list]
1447         else:
1448           raise errors.ParameterError(field)
1449         row.append(val)
1450       output.append(row)
1451
1452     return output
1453
1454
1455 class LURemoveNode(LogicalUnit):
1456   """Logical unit for removing a node.
1457
1458   """
1459   HPATH = "node-remove"
1460   HTYPE = constants.HTYPE_NODE
1461   _OP_REQP = ["node_name"]
1462
1463   def BuildHooksEnv(self):
1464     """Build hooks env.
1465
1466     This doesn't run on the target node in the pre phase as a failed
1467     node would then be impossible to remove.
1468
1469     """
1470     env = {
1471       "OP_TARGET": self.op.node_name,
1472       "NODE_NAME": self.op.node_name,
1473       }
1474     all_nodes = self.cfg.GetNodeList()
1475     all_nodes.remove(self.op.node_name)
1476     return env, all_nodes, all_nodes
1477
1478   def CheckPrereq(self):
1479     """Check prerequisites.
1480
1481     This checks:
1482      - the node exists in the configuration
1483      - it does not have primary or secondary instances
1484      - it's not the master
1485
1486     Any errors are signalled by raising errors.OpPrereqError.
1487
1488     """
1489     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1490     if node is None:
1491       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1492
1493     instance_list = self.cfg.GetInstanceList()
1494
1495     masternode = self.cfg.GetMasterNode()
1496     if node.name == masternode:
1497       raise errors.OpPrereqError("Node is the master node,"
1498                                  " you need to failover first.")
1499
1500     for instance_name in instance_list:
1501       instance = self.cfg.GetInstanceInfo(instance_name)
1502       if node.name == instance.primary_node:
1503         raise errors.OpPrereqError("Instance %s still running on the node,"
1504                                    " please remove first." % instance_name)
1505       if node.name in instance.secondary_nodes:
1506         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1507                                    " please remove first." % instance_name)
1508     self.op.node_name = node.name
1509     self.node = node
1510
1511   def Exec(self, feedback_fn):
1512     """Removes the node from the cluster.
1513
1514     """
1515     node = self.node
1516     logging.info("Stopping the node daemon and removing configs from node %s",
1517                  node.name)
1518
1519     self.context.RemoveNode(node.name)
1520
1521     self.rpc.call_node_leave_cluster(node.name)
1522
1523
1524 class LUQueryNodes(NoHooksLU):
1525   """Logical unit for querying nodes.
1526
1527   """
1528   _OP_REQP = ["output_fields", "names"]
1529   REQ_BGL = False
1530   _FIELDS_DYNAMIC = utils.FieldSet(
1531     "dtotal", "dfree",
1532     "mtotal", "mnode", "mfree",
1533     "bootid",
1534     "ctotal",
1535     )
1536
1537   _FIELDS_STATIC = utils.FieldSet(
1538     "name", "pinst_cnt", "sinst_cnt",
1539     "pinst_list", "sinst_list",
1540     "pip", "sip", "tags",
1541     "serial_no",
1542     )
1543
1544   def ExpandNames(self):
1545     _CheckOutputFields(static=self._FIELDS_STATIC,
1546                        dynamic=self._FIELDS_DYNAMIC,
1547                        selected=self.op.output_fields)
1548
1549     self.needed_locks = {}
1550     self.share_locks[locking.LEVEL_NODE] = 1
1551
1552     if self.op.names:
1553       self.wanted = _GetWantedNodes(self, self.op.names)
1554     else:
1555       self.wanted = locking.ALL_SET
1556
1557     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1558     if self.do_locking:
1559       # if we don't request only static fields, we need to lock the nodes
1560       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1561
1562
1563   def CheckPrereq(self):
1564     """Check prerequisites.
1565
1566     """
1567     # The validation of the node list is done in the _GetWantedNodes,
1568     # if non empty, and if empty, there's no validation to do
1569     pass
1570
1571   def Exec(self, feedback_fn):
1572     """Computes the list of nodes and their attributes.
1573
1574     """
1575     all_info = self.cfg.GetAllNodesInfo()
1576     if self.do_locking:
1577       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1578     elif self.wanted != locking.ALL_SET:
1579       nodenames = self.wanted
1580       missing = set(nodenames).difference(all_info.keys())
1581       if missing:
1582         raise errors.OpExecError(
1583           "Some nodes were removed before retrieving their data: %s" % missing)
1584     else:
1585       nodenames = all_info.keys()
1586
1587     nodenames = utils.NiceSort(nodenames)
1588     nodelist = [all_info[name] for name in nodenames]
1589
1590     # begin data gathering
1591
1592     if self.do_locking:
1593       live_data = {}
1594       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1595                                           self.cfg.GetHypervisorType())
1596       for name in nodenames:
1597         nodeinfo = node_data.get(name, None)
1598         if nodeinfo:
1599           live_data[name] = {
1600             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1601             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1602             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1603             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1604             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1605             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1606             "bootid": nodeinfo['bootid'],
1607             }
1608         else:
1609           live_data[name] = {}
1610     else:
1611       live_data = dict.fromkeys(nodenames, {})
1612
1613     node_to_primary = dict([(name, set()) for name in nodenames])
1614     node_to_secondary = dict([(name, set()) for name in nodenames])
1615
1616     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1617                              "sinst_cnt", "sinst_list"))
1618     if inst_fields & frozenset(self.op.output_fields):
1619       instancelist = self.cfg.GetInstanceList()
1620
1621       for instance_name in instancelist:
1622         inst = self.cfg.GetInstanceInfo(instance_name)
1623         if inst.primary_node in node_to_primary:
1624           node_to_primary[inst.primary_node].add(inst.name)
1625         for secnode in inst.secondary_nodes:
1626           if secnode in node_to_secondary:
1627             node_to_secondary[secnode].add(inst.name)
1628
1629     # end data gathering
1630
1631     output = []
1632     for node in nodelist:
1633       node_output = []
1634       for field in self.op.output_fields:
1635         if field == "name":
1636           val = node.name
1637         elif field == "pinst_list":
1638           val = list(node_to_primary[node.name])
1639         elif field == "sinst_list":
1640           val = list(node_to_secondary[node.name])
1641         elif field == "pinst_cnt":
1642           val = len(node_to_primary[node.name])
1643         elif field == "sinst_cnt":
1644           val = len(node_to_secondary[node.name])
1645         elif field == "pip":
1646           val = node.primary_ip
1647         elif field == "sip":
1648           val = node.secondary_ip
1649         elif field == "tags":
1650           val = list(node.GetTags())
1651         elif field == "serial_no":
1652           val = node.serial_no
1653         elif self._FIELDS_DYNAMIC.Matches(field):
1654           val = live_data[node.name].get(field, None)
1655         else:
1656           raise errors.ParameterError(field)
1657         node_output.append(val)
1658       output.append(node_output)
1659
1660     return output
1661
1662
1663 class LUQueryNodeVolumes(NoHooksLU):
1664   """Logical unit for getting volumes on node(s).
1665
1666   """
1667   _OP_REQP = ["nodes", "output_fields"]
1668   REQ_BGL = False
1669   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1670   _FIELDS_STATIC = utils.FieldSet("node")
1671
1672   def ExpandNames(self):
1673     _CheckOutputFields(static=self._FIELDS_STATIC,
1674                        dynamic=self._FIELDS_DYNAMIC,
1675                        selected=self.op.output_fields)
1676
1677     self.needed_locks = {}
1678     self.share_locks[locking.LEVEL_NODE] = 1
1679     if not self.op.nodes:
1680       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1681     else:
1682       self.needed_locks[locking.LEVEL_NODE] = \
1683         _GetWantedNodes(self, self.op.nodes)
1684
1685   def CheckPrereq(self):
1686     """Check prerequisites.
1687
1688     This checks that the fields required are valid output fields.
1689
1690     """
1691     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1692
1693   def Exec(self, feedback_fn):
1694     """Computes the list of nodes and their attributes.
1695
1696     """
1697     nodenames = self.nodes
1698     volumes = self.rpc.call_node_volumes(nodenames)
1699
1700     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1701              in self.cfg.GetInstanceList()]
1702
1703     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1704
1705     output = []
1706     for node in nodenames:
1707       if node not in volumes or not volumes[node]:
1708         continue
1709
1710       node_vols = volumes[node][:]
1711       node_vols.sort(key=lambda vol: vol['dev'])
1712
1713       for vol in node_vols:
1714         node_output = []
1715         for field in self.op.output_fields:
1716           if field == "node":
1717             val = node
1718           elif field == "phys":
1719             val = vol['dev']
1720           elif field == "vg":
1721             val = vol['vg']
1722           elif field == "name":
1723             val = vol['name']
1724           elif field == "size":
1725             val = int(float(vol['size']))
1726           elif field == "instance":
1727             for inst in ilist:
1728               if node not in lv_by_node[inst]:
1729                 continue
1730               if vol['name'] in lv_by_node[inst][node]:
1731                 val = inst.name
1732                 break
1733             else:
1734               val = '-'
1735           else:
1736             raise errors.ParameterError(field)
1737           node_output.append(str(val))
1738
1739         output.append(node_output)
1740
1741     return output
1742
1743
1744 class LUAddNode(LogicalUnit):
1745   """Logical unit for adding node to the cluster.
1746
1747   """
1748   HPATH = "node-add"
1749   HTYPE = constants.HTYPE_NODE
1750   _OP_REQP = ["node_name"]
1751
1752   def BuildHooksEnv(self):
1753     """Build hooks env.
1754
1755     This will run on all nodes before, and on all nodes + the new node after.
1756
1757     """
1758     env = {
1759       "OP_TARGET": self.op.node_name,
1760       "NODE_NAME": self.op.node_name,
1761       "NODE_PIP": self.op.primary_ip,
1762       "NODE_SIP": self.op.secondary_ip,
1763       }
1764     nodes_0 = self.cfg.GetNodeList()
1765     nodes_1 = nodes_0 + [self.op.node_name, ]
1766     return env, nodes_0, nodes_1
1767
1768   def CheckPrereq(self):
1769     """Check prerequisites.
1770
1771     This checks:
1772      - the new node is not already in the config
1773      - it is resolvable
1774      - its parameters (single/dual homed) matches the cluster
1775
1776     Any errors are signalled by raising errors.OpPrereqError.
1777
1778     """
1779     node_name = self.op.node_name
1780     cfg = self.cfg
1781
1782     dns_data = utils.HostInfo(node_name)
1783
1784     node = dns_data.name
1785     primary_ip = self.op.primary_ip = dns_data.ip
1786     secondary_ip = getattr(self.op, "secondary_ip", None)
1787     if secondary_ip is None:
1788       secondary_ip = primary_ip
1789     if not utils.IsValidIP(secondary_ip):
1790       raise errors.OpPrereqError("Invalid secondary IP given")
1791     self.op.secondary_ip = secondary_ip
1792
1793     node_list = cfg.GetNodeList()
1794     if not self.op.readd and node in node_list:
1795       raise errors.OpPrereqError("Node %s is already in the configuration" %
1796                                  node)
1797     elif self.op.readd and node not in node_list:
1798       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1799
1800     for existing_node_name in node_list:
1801       existing_node = cfg.GetNodeInfo(existing_node_name)
1802
1803       if self.op.readd and node == existing_node_name:
1804         if (existing_node.primary_ip != primary_ip or
1805             existing_node.secondary_ip != secondary_ip):
1806           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1807                                      " address configuration as before")
1808         continue
1809
1810       if (existing_node.primary_ip == primary_ip or
1811           existing_node.secondary_ip == primary_ip or
1812           existing_node.primary_ip == secondary_ip or
1813           existing_node.secondary_ip == secondary_ip):
1814         raise errors.OpPrereqError("New node ip address(es) conflict with"
1815                                    " existing node %s" % existing_node.name)
1816
1817     # check that the type of the node (single versus dual homed) is the
1818     # same as for the master
1819     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1820     master_singlehomed = myself.secondary_ip == myself.primary_ip
1821     newbie_singlehomed = secondary_ip == primary_ip
1822     if master_singlehomed != newbie_singlehomed:
1823       if master_singlehomed:
1824         raise errors.OpPrereqError("The master has no private ip but the"
1825                                    " new node has one")
1826       else:
1827         raise errors.OpPrereqError("The master has a private ip but the"
1828                                    " new node doesn't have one")
1829
1830     # checks reachablity
1831     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1832       raise errors.OpPrereqError("Node not reachable by ping")
1833
1834     if not newbie_singlehomed:
1835       # check reachability from my secondary ip to newbie's secondary ip
1836       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1837                            source=myself.secondary_ip):
1838         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1839                                    " based ping to noded port")
1840
1841     self.new_node = objects.Node(name=node,
1842                                  primary_ip=primary_ip,
1843                                  secondary_ip=secondary_ip)
1844
1845   def Exec(self, feedback_fn):
1846     """Adds the new node to the cluster.
1847
1848     """
1849     new_node = self.new_node
1850     node = new_node.name
1851
1852     # check connectivity
1853     result = self.rpc.call_version([node])[node]
1854     if result:
1855       if constants.PROTOCOL_VERSION == result:
1856         logging.info("Communication to node %s fine, sw version %s match",
1857                      node, result)
1858       else:
1859         raise errors.OpExecError("Version mismatch master version %s,"
1860                                  " node version %s" %
1861                                  (constants.PROTOCOL_VERSION, result))
1862     else:
1863       raise errors.OpExecError("Cannot get version from the new node")
1864
1865     # setup ssh on node
1866     logging.info("Copy ssh key to node %s", node)
1867     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1868     keyarray = []
1869     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1870                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1871                 priv_key, pub_key]
1872
1873     for i in keyfiles:
1874       f = open(i, 'r')
1875       try:
1876         keyarray.append(f.read())
1877       finally:
1878         f.close()
1879
1880     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1881                                     keyarray[2],
1882                                     keyarray[3], keyarray[4], keyarray[5])
1883
1884     if not result:
1885       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1886
1887     # Add node to our /etc/hosts, and add key to known_hosts
1888     utils.AddHostToEtcHosts(new_node.name)
1889
1890     if new_node.secondary_ip != new_node.primary_ip:
1891       if not self.rpc.call_node_has_ip_address(new_node.name,
1892                                                new_node.secondary_ip):
1893         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1894                                  " you gave (%s). Please fix and re-run this"
1895                                  " command." % new_node.secondary_ip)
1896
1897     node_verify_list = [self.cfg.GetMasterNode()]
1898     node_verify_param = {
1899       'nodelist': [node],
1900       # TODO: do a node-net-test as well?
1901     }
1902
1903     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1904                                        self.cfg.GetClusterName())
1905     for verifier in node_verify_list:
1906       if not result[verifier]:
1907         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1908                                  " for remote verification" % verifier)
1909       if result[verifier]['nodelist']:
1910         for failed in result[verifier]['nodelist']:
1911           feedback_fn("ssh/hostname verification failed %s -> %s" %
1912                       (verifier, result[verifier]['nodelist'][failed]))
1913         raise errors.OpExecError("ssh/hostname verification failed.")
1914
1915     # Distribute updated /etc/hosts and known_hosts to all nodes,
1916     # including the node just added
1917     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918     dist_nodes = self.cfg.GetNodeList()
1919     if not self.op.readd:
1920       dist_nodes.append(node)
1921     if myself.name in dist_nodes:
1922       dist_nodes.remove(myself.name)
1923
1924     logging.debug("Copying hosts and known_hosts to all nodes")
1925     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1926       result = self.rpc.call_upload_file(dist_nodes, fname)
1927       for to_node in dist_nodes:
1928         if not result[to_node]:
1929           logging.error("Copy of file %s to node %s failed", fname, to_node)
1930
1931     to_copy = []
1932     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1933       to_copy.append(constants.VNC_PASSWORD_FILE)
1934     for fname in to_copy:
1935       result = self.rpc.call_upload_file([node], fname)
1936       if not result[node]:
1937         logging.error("Could not copy file %s to node %s", fname, node)
1938
1939     if self.op.readd:
1940       self.context.ReaddNode(new_node)
1941     else:
1942       self.context.AddNode(new_node)
1943
1944
1945 class LUQueryClusterInfo(NoHooksLU):
1946   """Query cluster configuration.
1947
1948   """
1949   _OP_REQP = []
1950   REQ_MASTER = False
1951   REQ_BGL = False
1952
1953   def ExpandNames(self):
1954     self.needed_locks = {}
1955
1956   def CheckPrereq(self):
1957     """No prerequsites needed for this LU.
1958
1959     """
1960     pass
1961
1962   def Exec(self, feedback_fn):
1963     """Return cluster config.
1964
1965     """
1966     cluster = self.cfg.GetClusterInfo()
1967     result = {
1968       "software_version": constants.RELEASE_VERSION,
1969       "protocol_version": constants.PROTOCOL_VERSION,
1970       "config_version": constants.CONFIG_VERSION,
1971       "os_api_version": constants.OS_API_VERSION,
1972       "export_version": constants.EXPORT_VERSION,
1973       "architecture": (platform.architecture()[0], platform.machine()),
1974       "name": cluster.cluster_name,
1975       "master": cluster.master_node,
1976       "default_hypervisor": cluster.default_hypervisor,
1977       "enabled_hypervisors": cluster.enabled_hypervisors,
1978       "hvparams": cluster.hvparams,
1979       "beparams": cluster.beparams,
1980       }
1981
1982     return result
1983
1984
1985 class LUQueryConfigValues(NoHooksLU):
1986   """Return configuration values.
1987
1988   """
1989   _OP_REQP = []
1990   REQ_BGL = False
1991   _FIELDS_DYNAMIC = utils.FieldSet()
1992   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1993
1994   def ExpandNames(self):
1995     self.needed_locks = {}
1996
1997     _CheckOutputFields(static=self._FIELDS_STATIC,
1998                        dynamic=self._FIELDS_DYNAMIC,
1999                        selected=self.op.output_fields)
2000
2001   def CheckPrereq(self):
2002     """No prerequisites.
2003
2004     """
2005     pass
2006
2007   def Exec(self, feedback_fn):
2008     """Dump a representation of the cluster config to the standard output.
2009
2010     """
2011     values = []
2012     for field in self.op.output_fields:
2013       if field == "cluster_name":
2014         entry = self.cfg.GetClusterName()
2015       elif field == "master_node":
2016         entry = self.cfg.GetMasterNode()
2017       elif field == "drain_flag":
2018         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2019       else:
2020         raise errors.ParameterError(field)
2021       values.append(entry)
2022     return values
2023
2024
2025 class LUActivateInstanceDisks(NoHooksLU):
2026   """Bring up an instance's disks.
2027
2028   """
2029   _OP_REQP = ["instance_name"]
2030   REQ_BGL = False
2031
2032   def ExpandNames(self):
2033     self._ExpandAndLockInstance()
2034     self.needed_locks[locking.LEVEL_NODE] = []
2035     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2036
2037   def DeclareLocks(self, level):
2038     if level == locking.LEVEL_NODE:
2039       self._LockInstancesNodes()
2040
2041   def CheckPrereq(self):
2042     """Check prerequisites.
2043
2044     This checks that the instance is in the cluster.
2045
2046     """
2047     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2048     assert self.instance is not None, \
2049       "Cannot retrieve locked instance %s" % self.op.instance_name
2050
2051   def Exec(self, feedback_fn):
2052     """Activate the disks.
2053
2054     """
2055     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2056     if not disks_ok:
2057       raise errors.OpExecError("Cannot activate block devices")
2058
2059     return disks_info
2060
2061
2062 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2063   """Prepare the block devices for an instance.
2064
2065   This sets up the block devices on all nodes.
2066
2067   @type lu: L{LogicalUnit}
2068   @param lu: the logical unit on whose behalf we execute
2069   @type instance: L{objects.Instance}
2070   @param instance: the instance for whose disks we assemble
2071   @type ignore_secondaries: boolean
2072   @param ignore_secondaries: if true, errors on secondary nodes
2073       won't result in an error return from the function
2074   @return: False if the operation failed, otherwise a list of
2075       (host, instance_visible_name, node_visible_name)
2076       with the mapping from node devices to instance devices
2077
2078   """
2079   device_info = []
2080   disks_ok = True
2081   iname = instance.name
2082   # With the two passes mechanism we try to reduce the window of
2083   # opportunity for the race condition of switching DRBD to primary
2084   # before handshaking occured, but we do not eliminate it
2085
2086   # The proper fix would be to wait (with some limits) until the
2087   # connection has been made and drbd transitions from WFConnection
2088   # into any other network-connected state (Connected, SyncTarget,
2089   # SyncSource, etc.)
2090
2091   # 1st pass, assemble on all nodes in secondary mode
2092   for inst_disk in instance.disks:
2093     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094       lu.cfg.SetDiskID(node_disk, node)
2095       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2096       if not result:
2097         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2098                            " (is_primary=False, pass=1)",
2099                            inst_disk.iv_name, node)
2100         if not ignore_secondaries:
2101           disks_ok = False
2102
2103   # FIXME: race condition on drbd migration to primary
2104
2105   # 2nd pass, do only the primary node
2106   for inst_disk in instance.disks:
2107     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2108       if node != instance.primary_node:
2109         continue
2110       lu.cfg.SetDiskID(node_disk, node)
2111       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2112       if not result:
2113         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114                            " (is_primary=True, pass=2)",
2115                            inst_disk.iv_name, node)
2116         disks_ok = False
2117     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2118
2119   # leave the disks configured for the primary node
2120   # this is a workaround that would be fixed better by
2121   # improving the logical/physical id handling
2122   for disk in instance.disks:
2123     lu.cfg.SetDiskID(disk, instance.primary_node)
2124
2125   return disks_ok, device_info
2126
2127
2128 def _StartInstanceDisks(lu, instance, force):
2129   """Start the disks of an instance.
2130
2131   """
2132   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2133                                            ignore_secondaries=force)
2134   if not disks_ok:
2135     _ShutdownInstanceDisks(lu, instance)
2136     if force is not None and not force:
2137       lu.proc.LogWarning("", hint="If the message above refers to a"
2138                          " secondary node,"
2139                          " you can retry the operation using '--force'.")
2140     raise errors.OpExecError("Disk consistency error")
2141
2142
2143 class LUDeactivateInstanceDisks(NoHooksLU):
2144   """Shutdown an instance's disks.
2145
2146   """
2147   _OP_REQP = ["instance_name"]
2148   REQ_BGL = False
2149
2150   def ExpandNames(self):
2151     self._ExpandAndLockInstance()
2152     self.needed_locks[locking.LEVEL_NODE] = []
2153     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2154
2155   def DeclareLocks(self, level):
2156     if level == locking.LEVEL_NODE:
2157       self._LockInstancesNodes()
2158
2159   def CheckPrereq(self):
2160     """Check prerequisites.
2161
2162     This checks that the instance is in the cluster.
2163
2164     """
2165     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2166     assert self.instance is not None, \
2167       "Cannot retrieve locked instance %s" % self.op.instance_name
2168
2169   def Exec(self, feedback_fn):
2170     """Deactivate the disks
2171
2172     """
2173     instance = self.instance
2174     _SafeShutdownInstanceDisks(self, instance)
2175
2176
2177 def _SafeShutdownInstanceDisks(lu, instance):
2178   """Shutdown block devices of an instance.
2179
2180   This function checks if an instance is running, before calling
2181   _ShutdownInstanceDisks.
2182
2183   """
2184   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2185                                       [instance.hypervisor])
2186   ins_l = ins_l[instance.primary_node]
2187   if not type(ins_l) is list:
2188     raise errors.OpExecError("Can't contact node '%s'" %
2189                              instance.primary_node)
2190
2191   if instance.name in ins_l:
2192     raise errors.OpExecError("Instance is running, can't shutdown"
2193                              " block devices.")
2194
2195   _ShutdownInstanceDisks(lu, instance)
2196
2197
2198 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2199   """Shutdown block devices of an instance.
2200
2201   This does the shutdown on all nodes of the instance.
2202
2203   If the ignore_primary is false, errors on the primary node are
2204   ignored.
2205
2206   """
2207   result = True
2208   for disk in instance.disks:
2209     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2210       lu.cfg.SetDiskID(top_disk, node)
2211       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2212         logging.error("Could not shutdown block device %s on node %s",
2213                       disk.iv_name, node)
2214         if not ignore_primary or node != instance.primary_node:
2215           result = False
2216   return result
2217
2218
2219 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2220   """Checks if a node has enough free memory.
2221
2222   This function check if a given node has the needed amount of free
2223   memory. In case the node has less memory or we cannot get the
2224   information from the node, this function raise an OpPrereqError
2225   exception.
2226
2227   @type lu: C{LogicalUnit}
2228   @param lu: a logical unit from which we get configuration data
2229   @type node: C{str}
2230   @param node: the node to check
2231   @type reason: C{str}
2232   @param reason: string to use in the error message
2233   @type requested: C{int}
2234   @param requested: the amount of memory in MiB to check for
2235   @type hypervisor: C{str}
2236   @param hypervisor: the hypervisor to ask for memory stats
2237   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2238       we cannot check the node
2239
2240   """
2241   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2242   if not nodeinfo or not isinstance(nodeinfo, dict):
2243     raise errors.OpPrereqError("Could not contact node %s for resource"
2244                              " information" % (node,))
2245
2246   free_mem = nodeinfo[node].get('memory_free')
2247   if not isinstance(free_mem, int):
2248     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2249                              " was '%s'" % (node, free_mem))
2250   if requested > free_mem:
2251     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2252                              " needed %s MiB, available %s MiB" %
2253                              (node, reason, requested, free_mem))
2254
2255
2256 class LUStartupInstance(LogicalUnit):
2257   """Starts an instance.
2258
2259   """
2260   HPATH = "instance-start"
2261   HTYPE = constants.HTYPE_INSTANCE
2262   _OP_REQP = ["instance_name", "force"]
2263   REQ_BGL = False
2264
2265   def ExpandNames(self):
2266     self._ExpandAndLockInstance()
2267
2268   def BuildHooksEnv(self):
2269     """Build hooks env.
2270
2271     This runs on master, primary and secondary nodes of the instance.
2272
2273     """
2274     env = {
2275       "FORCE": self.op.force,
2276       }
2277     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2278     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2279           list(self.instance.secondary_nodes))
2280     return env, nl, nl
2281
2282   def CheckPrereq(self):
2283     """Check prerequisites.
2284
2285     This checks that the instance is in the cluster.
2286
2287     """
2288     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289     assert self.instance is not None, \
2290       "Cannot retrieve locked instance %s" % self.op.instance_name
2291
2292     bep = self.cfg.GetClusterInfo().FillBE(instance)
2293     # check bridges existance
2294     _CheckInstanceBridgesExist(self, instance)
2295
2296     _CheckNodeFreeMemory(self, instance.primary_node,
2297                          "starting instance %s" % instance.name,
2298                          bep[constants.BE_MEMORY], instance.hypervisor)
2299
2300   def Exec(self, feedback_fn):
2301     """Start the instance.
2302
2303     """
2304     instance = self.instance
2305     force = self.op.force
2306     extra_args = getattr(self.op, "extra_args", "")
2307
2308     self.cfg.MarkInstanceUp(instance.name)
2309
2310     node_current = instance.primary_node
2311
2312     _StartInstanceDisks(self, instance, force)
2313
2314     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2315       _ShutdownInstanceDisks(self, instance)
2316       raise errors.OpExecError("Could not start instance")
2317
2318
2319 class LURebootInstance(LogicalUnit):
2320   """Reboot an instance.
2321
2322   """
2323   HPATH = "instance-reboot"
2324   HTYPE = constants.HTYPE_INSTANCE
2325   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2326   REQ_BGL = False
2327
2328   def ExpandNames(self):
2329     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2330                                    constants.INSTANCE_REBOOT_HARD,
2331                                    constants.INSTANCE_REBOOT_FULL]:
2332       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2333                                   (constants.INSTANCE_REBOOT_SOFT,
2334                                    constants.INSTANCE_REBOOT_HARD,
2335                                    constants.INSTANCE_REBOOT_FULL))
2336     self._ExpandAndLockInstance()
2337
2338   def BuildHooksEnv(self):
2339     """Build hooks env.
2340
2341     This runs on master, primary and secondary nodes of the instance.
2342
2343     """
2344     env = {
2345       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2346       }
2347     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2348     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2349           list(self.instance.secondary_nodes))
2350     return env, nl, nl
2351
2352   def CheckPrereq(self):
2353     """Check prerequisites.
2354
2355     This checks that the instance is in the cluster.
2356
2357     """
2358     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2359     assert self.instance is not None, \
2360       "Cannot retrieve locked instance %s" % self.op.instance_name
2361
2362     # check bridges existance
2363     _CheckInstanceBridgesExist(self, instance)
2364
2365   def Exec(self, feedback_fn):
2366     """Reboot the instance.
2367
2368     """
2369     instance = self.instance
2370     ignore_secondaries = self.op.ignore_secondaries
2371     reboot_type = self.op.reboot_type
2372     extra_args = getattr(self.op, "extra_args", "")
2373
2374     node_current = instance.primary_node
2375
2376     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2377                        constants.INSTANCE_REBOOT_HARD]:
2378       if not self.rpc.call_instance_reboot(node_current, instance,
2379                                            reboot_type, extra_args):
2380         raise errors.OpExecError("Could not reboot instance")
2381     else:
2382       if not self.rpc.call_instance_shutdown(node_current, instance):
2383         raise errors.OpExecError("could not shutdown instance for full reboot")
2384       _ShutdownInstanceDisks(self, instance)
2385       _StartInstanceDisks(self, instance, ignore_secondaries)
2386       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2387         _ShutdownInstanceDisks(self, instance)
2388         raise errors.OpExecError("Could not start instance for full reboot")
2389
2390     self.cfg.MarkInstanceUp(instance.name)
2391
2392
2393 class LUShutdownInstance(LogicalUnit):
2394   """Shutdown an instance.
2395
2396   """
2397   HPATH = "instance-stop"
2398   HTYPE = constants.HTYPE_INSTANCE
2399   _OP_REQP = ["instance_name"]
2400   REQ_BGL = False
2401
2402   def ExpandNames(self):
2403     self._ExpandAndLockInstance()
2404
2405   def BuildHooksEnv(self):
2406     """Build hooks env.
2407
2408     This runs on master, primary and secondary nodes of the instance.
2409
2410     """
2411     env = _BuildInstanceHookEnvByObject(self, self.instance)
2412     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2413           list(self.instance.secondary_nodes))
2414     return env, nl, nl
2415
2416   def CheckPrereq(self):
2417     """Check prerequisites.
2418
2419     This checks that the instance is in the cluster.
2420
2421     """
2422     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2423     assert self.instance is not None, \
2424       "Cannot retrieve locked instance %s" % self.op.instance_name
2425
2426   def Exec(self, feedback_fn):
2427     """Shutdown the instance.
2428
2429     """
2430     instance = self.instance
2431     node_current = instance.primary_node
2432     self.cfg.MarkInstanceDown(instance.name)
2433     if not self.rpc.call_instance_shutdown(node_current, instance):
2434       self.proc.LogWarning("Could not shutdown instance")
2435
2436     _ShutdownInstanceDisks(self, instance)
2437
2438
2439 class LUReinstallInstance(LogicalUnit):
2440   """Reinstall an instance.
2441
2442   """
2443   HPATH = "instance-reinstall"
2444   HTYPE = constants.HTYPE_INSTANCE
2445   _OP_REQP = ["instance_name"]
2446   REQ_BGL = False
2447
2448   def ExpandNames(self):
2449     self._ExpandAndLockInstance()
2450
2451   def BuildHooksEnv(self):
2452     """Build hooks env.
2453
2454     This runs on master, primary and secondary nodes of the instance.
2455
2456     """
2457     env = _BuildInstanceHookEnvByObject(self, self.instance)
2458     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459           list(self.instance.secondary_nodes))
2460     return env, nl, nl
2461
2462   def CheckPrereq(self):
2463     """Check prerequisites.
2464
2465     This checks that the instance is in the cluster and is not running.
2466
2467     """
2468     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2469     assert instance is not None, \
2470       "Cannot retrieve locked instance %s" % self.op.instance_name
2471
2472     if instance.disk_template == constants.DT_DISKLESS:
2473       raise errors.OpPrereqError("Instance '%s' has no disks" %
2474                                  self.op.instance_name)
2475     if instance.status != "down":
2476       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2477                                  self.op.instance_name)
2478     remote_info = self.rpc.call_instance_info(instance.primary_node,
2479                                               instance.name,
2480                                               instance.hypervisor)
2481     if remote_info:
2482       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2483                                  (self.op.instance_name,
2484                                   instance.primary_node))
2485
2486     self.op.os_type = getattr(self.op, "os_type", None)
2487     if self.op.os_type is not None:
2488       # OS verification
2489       pnode = self.cfg.GetNodeInfo(
2490         self.cfg.ExpandNodeName(instance.primary_node))
2491       if pnode is None:
2492         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2493                                    self.op.pnode)
2494       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2495       if not os_obj:
2496         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2497                                    " primary node"  % self.op.os_type)
2498
2499     self.instance = instance
2500
2501   def Exec(self, feedback_fn):
2502     """Reinstall the instance.
2503
2504     """
2505     inst = self.instance
2506
2507     if self.op.os_type is not None:
2508       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2509       inst.os = self.op.os_type
2510       self.cfg.Update(inst)
2511
2512     _StartInstanceDisks(self, inst, None)
2513     try:
2514       feedback_fn("Running the instance OS create scripts...")
2515       if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2516         raise errors.OpExecError("Could not install OS for instance %s"
2517                                  " on node %s" %
2518                                  (inst.name, inst.primary_node))
2519     finally:
2520       _ShutdownInstanceDisks(self, inst)
2521
2522
2523 class LURenameInstance(LogicalUnit):
2524   """Rename an instance.
2525
2526   """
2527   HPATH = "instance-rename"
2528   HTYPE = constants.HTYPE_INSTANCE
2529   _OP_REQP = ["instance_name", "new_name"]
2530
2531   def BuildHooksEnv(self):
2532     """Build hooks env.
2533
2534     This runs on master, primary and secondary nodes of the instance.
2535
2536     """
2537     env = _BuildInstanceHookEnvByObject(self, self.instance)
2538     env["INSTANCE_NEW_NAME"] = self.op.new_name
2539     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540           list(self.instance.secondary_nodes))
2541     return env, nl, nl
2542
2543   def CheckPrereq(self):
2544     """Check prerequisites.
2545
2546     This checks that the instance is in the cluster and is not running.
2547
2548     """
2549     instance = self.cfg.GetInstanceInfo(
2550       self.cfg.ExpandInstanceName(self.op.instance_name))
2551     if instance is None:
2552       raise errors.OpPrereqError("Instance '%s' not known" %
2553                                  self.op.instance_name)
2554     if instance.status != "down":
2555       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2556                                  self.op.instance_name)
2557     remote_info = self.rpc.call_instance_info(instance.primary_node,
2558                                               instance.name,
2559                                               instance.hypervisor)
2560     if remote_info:
2561       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2562                                  (self.op.instance_name,
2563                                   instance.primary_node))
2564     self.instance = instance
2565
2566     # new name verification
2567     name_info = utils.HostInfo(self.op.new_name)
2568
2569     self.op.new_name = new_name = name_info.name
2570     instance_list = self.cfg.GetInstanceList()
2571     if new_name in instance_list:
2572       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2573                                  new_name)
2574
2575     if not getattr(self.op, "ignore_ip", False):
2576       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2577         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2578                                    (name_info.ip, new_name))
2579
2580
2581   def Exec(self, feedback_fn):
2582     """Reinstall the instance.
2583
2584     """
2585     inst = self.instance
2586     old_name = inst.name
2587
2588     if inst.disk_template == constants.DT_FILE:
2589       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2590
2591     self.cfg.RenameInstance(inst.name, self.op.new_name)
2592     # Change the instance lock. This is definitely safe while we hold the BGL
2593     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2594     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2595
2596     # re-read the instance from the configuration after rename
2597     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598
2599     if inst.disk_template == constants.DT_FILE:
2600       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2602                                                      old_file_storage_dir,
2603                                                      new_file_storage_dir)
2604
2605       if not result:
2606         raise errors.OpExecError("Could not connect to node '%s' to rename"
2607                                  " directory '%s' to '%s' (but the instance"
2608                                  " has been renamed in Ganeti)" % (
2609                                  inst.primary_node, old_file_storage_dir,
2610                                  new_file_storage_dir))
2611
2612       if not result[0]:
2613         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2614                                  " (but the instance has been renamed in"
2615                                  " Ganeti)" % (old_file_storage_dir,
2616                                                new_file_storage_dir))
2617
2618     _StartInstanceDisks(self, inst, None)
2619     try:
2620       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2621                                                old_name):
2622         msg = ("Could not run OS rename script for instance %s on node %s"
2623                " (but the instance has been renamed in Ganeti)" %
2624                (inst.name, inst.primary_node))
2625         self.proc.LogWarning(msg)
2626     finally:
2627       _ShutdownInstanceDisks(self, inst)
2628
2629
2630 class LURemoveInstance(LogicalUnit):
2631   """Remove an instance.
2632
2633   """
2634   HPATH = "instance-remove"
2635   HTYPE = constants.HTYPE_INSTANCE
2636   _OP_REQP = ["instance_name", "ignore_failures"]
2637   REQ_BGL = False
2638
2639   def ExpandNames(self):
2640     self._ExpandAndLockInstance()
2641     self.needed_locks[locking.LEVEL_NODE] = []
2642     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2643
2644   def DeclareLocks(self, level):
2645     if level == locking.LEVEL_NODE:
2646       self._LockInstancesNodes()
2647
2648   def BuildHooksEnv(self):
2649     """Build hooks env.
2650
2651     This runs on master, primary and secondary nodes of the instance.
2652
2653     """
2654     env = _BuildInstanceHookEnvByObject(self, self.instance)
2655     nl = [self.cfg.GetMasterNode()]
2656     return env, nl, nl
2657
2658   def CheckPrereq(self):
2659     """Check prerequisites.
2660
2661     This checks that the instance is in the cluster.
2662
2663     """
2664     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665     assert self.instance is not None, \
2666       "Cannot retrieve locked instance %s" % self.op.instance_name
2667
2668   def Exec(self, feedback_fn):
2669     """Remove the instance.
2670
2671     """
2672     instance = self.instance
2673     logging.info("Shutting down instance %s on node %s",
2674                  instance.name, instance.primary_node)
2675
2676     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2677       if self.op.ignore_failures:
2678         feedback_fn("Warning: can't shutdown instance")
2679       else:
2680         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2681                                  (instance.name, instance.primary_node))
2682
2683     logging.info("Removing block devices for instance %s", instance.name)
2684
2685     if not _RemoveDisks(self, instance):
2686       if self.op.ignore_failures:
2687         feedback_fn("Warning: can't remove instance's disks")
2688       else:
2689         raise errors.OpExecError("Can't remove instance's disks")
2690
2691     logging.info("Removing instance %s out of cluster config", instance.name)
2692
2693     self.cfg.RemoveInstance(instance.name)
2694     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2695
2696
2697 class LUQueryInstances(NoHooksLU):
2698   """Logical unit for querying instances.
2699
2700   """
2701   _OP_REQP = ["output_fields", "names"]
2702   REQ_BGL = False
2703   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2704                                     "admin_state", "admin_ram",
2705                                     "disk_template", "ip", "mac", "bridge",
2706                                     "sda_size", "sdb_size", "vcpus", "tags",
2707                                     "network_port", "beparams",
2708                                     "(disk).(size)/([0-9]+)",
2709                                     "(disk).(sizes)",
2710                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2711                                     "(nic).(macs|ips|bridges)",
2712                                     "(disk|nic).(count)",
2713                                     "serial_no", "hypervisor", "hvparams",] +
2714                                   ["hv/%s" % name
2715                                    for name in constants.HVS_PARAMETERS] +
2716                                   ["be/%s" % name
2717                                    for name in constants.BES_PARAMETERS])
2718   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2719
2720
2721   def ExpandNames(self):
2722     _CheckOutputFields(static=self._FIELDS_STATIC,
2723                        dynamic=self._FIELDS_DYNAMIC,
2724                        selected=self.op.output_fields)
2725
2726     self.needed_locks = {}
2727     self.share_locks[locking.LEVEL_INSTANCE] = 1
2728     self.share_locks[locking.LEVEL_NODE] = 1
2729
2730     if self.op.names:
2731       self.wanted = _GetWantedInstances(self, self.op.names)
2732     else:
2733       self.wanted = locking.ALL_SET
2734
2735     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2736     if self.do_locking:
2737       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2738       self.needed_locks[locking.LEVEL_NODE] = []
2739       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2740
2741   def DeclareLocks(self, level):
2742     if level == locking.LEVEL_NODE and self.do_locking:
2743       self._LockInstancesNodes()
2744
2745   def CheckPrereq(self):
2746     """Check prerequisites.
2747
2748     """
2749     pass
2750
2751   def Exec(self, feedback_fn):
2752     """Computes the list of nodes and their attributes.
2753
2754     """
2755     all_info = self.cfg.GetAllInstancesInfo()
2756     if self.do_locking:
2757       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2758     elif self.wanted != locking.ALL_SET:
2759       instance_names = self.wanted
2760       missing = set(instance_names).difference(all_info.keys())
2761       if missing:
2762         raise errors.OpExecError(
2763           "Some instances were removed before retrieving their data: %s"
2764           % missing)
2765     else:
2766       instance_names = all_info.keys()
2767
2768     instance_names = utils.NiceSort(instance_names)
2769     instance_list = [all_info[iname] for iname in instance_names]
2770
2771     # begin data gathering
2772
2773     nodes = frozenset([inst.primary_node for inst in instance_list])
2774     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2775
2776     bad_nodes = []
2777     if self.do_locking:
2778       live_data = {}
2779       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2780       for name in nodes:
2781         result = node_data[name]
2782         if result:
2783           live_data.update(result)
2784         elif result == False:
2785           bad_nodes.append(name)
2786         # else no instance is alive
2787     else:
2788       live_data = dict([(name, {}) for name in instance_names])
2789
2790     # end data gathering
2791
2792     HVPREFIX = "hv/"
2793     BEPREFIX = "be/"
2794     output = []
2795     for instance in instance_list:
2796       iout = []
2797       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2798       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2799       for field in self.op.output_fields:
2800         st_match = self._FIELDS_STATIC.Matches(field)
2801         if field == "name":
2802           val = instance.name
2803         elif field == "os":
2804           val = instance.os
2805         elif field == "pnode":
2806           val = instance.primary_node
2807         elif field == "snodes":
2808           val = list(instance.secondary_nodes)
2809         elif field == "admin_state":
2810           val = (instance.status != "down")
2811         elif field == "oper_state":
2812           if instance.primary_node in bad_nodes:
2813             val = None
2814           else:
2815             val = bool(live_data.get(instance.name))
2816         elif field == "status":
2817           if instance.primary_node in bad_nodes:
2818             val = "ERROR_nodedown"
2819           else:
2820             running = bool(live_data.get(instance.name))
2821             if running:
2822               if instance.status != "down":
2823                 val = "running"
2824               else:
2825                 val = "ERROR_up"
2826             else:
2827               if instance.status != "down":
2828                 val = "ERROR_down"
2829               else:
2830                 val = "ADMIN_down"
2831         elif field == "oper_ram":
2832           if instance.primary_node in bad_nodes:
2833             val = None
2834           elif instance.name in live_data:
2835             val = live_data[instance.name].get("memory", "?")
2836           else:
2837             val = "-"
2838         elif field == "disk_template":
2839           val = instance.disk_template
2840         elif field == "ip":
2841           val = instance.nics[0].ip
2842         elif field == "bridge":
2843           val = instance.nics[0].bridge
2844         elif field == "mac":
2845           val = instance.nics[0].mac
2846         elif field == "sda_size" or field == "sdb_size":
2847           idx = ord(field[2]) - ord('a')
2848           try:
2849             val = instance.FindDisk(idx).size
2850           except errors.OpPrereqError:
2851             val = None
2852         elif field == "tags":
2853           val = list(instance.GetTags())
2854         elif field == "serial_no":
2855           val = instance.serial_no
2856         elif field == "network_port":
2857           val = instance.network_port
2858         elif field == "hypervisor":
2859           val = instance.hypervisor
2860         elif field == "hvparams":
2861           val = i_hv
2862         elif (field.startswith(HVPREFIX) and
2863               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2864           val = i_hv.get(field[len(HVPREFIX):], None)
2865         elif field == "beparams":
2866           val = i_be
2867         elif (field.startswith(BEPREFIX) and
2868               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2869           val = i_be.get(field[len(BEPREFIX):], None)
2870         elif st_match and st_match.groups():
2871           # matches a variable list
2872           st_groups = st_match.groups()
2873           if st_groups and st_groups[0] == "disk":
2874             if st_groups[1] == "count":
2875               val = len(instance.disks)
2876             elif st_groups[1] == "sizes":
2877               val = [disk.size for disk in instance.disks]
2878             elif st_groups[1] == "size":
2879               try:
2880                 val = instance.FindDisk(st_groups[2]).size
2881               except errors.OpPrereqError:
2882                 val = None
2883             else:
2884               assert False, "Unhandled disk parameter"
2885           elif st_groups[0] == "nic":
2886             if st_groups[1] == "count":
2887               val = len(instance.nics)
2888             elif st_groups[1] == "macs":
2889               val = [nic.mac for nic in instance.nics]
2890             elif st_groups[1] == "ips":
2891               val = [nic.ip for nic in instance.nics]
2892             elif st_groups[1] == "bridges":
2893               val = [nic.bridge for nic in instance.nics]
2894             else:
2895               # index-based item
2896               nic_idx = int(st_groups[2])
2897               if nic_idx >= len(instance.nics):
2898                 val = None
2899               else:
2900                 if st_groups[1] == "mac":
2901                   val = instance.nics[nic_idx].mac
2902                 elif st_groups[1] == "ip":
2903                   val = instance.nics[nic_idx].ip
2904                 elif st_groups[1] == "bridge":
2905                   val = instance.nics[nic_idx].bridge
2906                 else:
2907                   assert False, "Unhandled NIC parameter"
2908           else:
2909             assert False, "Unhandled variable parameter"
2910         else:
2911           raise errors.ParameterError(field)
2912         iout.append(val)
2913       output.append(iout)
2914
2915     return output
2916
2917
2918 class LUFailoverInstance(LogicalUnit):
2919   """Failover an instance.
2920
2921   """
2922   HPATH = "instance-failover"
2923   HTYPE = constants.HTYPE_INSTANCE
2924   _OP_REQP = ["instance_name", "ignore_consistency"]
2925   REQ_BGL = False
2926
2927   def ExpandNames(self):
2928     self._ExpandAndLockInstance()
2929     self.needed_locks[locking.LEVEL_NODE] = []
2930     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2931
2932   def DeclareLocks(self, level):
2933     if level == locking.LEVEL_NODE:
2934       self._LockInstancesNodes()
2935
2936   def BuildHooksEnv(self):
2937     """Build hooks env.
2938
2939     This runs on master, primary and secondary nodes of the instance.
2940
2941     """
2942     env = {
2943       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2944       }
2945     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2946     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2947     return env, nl, nl
2948
2949   def CheckPrereq(self):
2950     """Check prerequisites.
2951
2952     This checks that the instance is in the cluster.
2953
2954     """
2955     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2956     assert self.instance is not None, \
2957       "Cannot retrieve locked instance %s" % self.op.instance_name
2958
2959     bep = self.cfg.GetClusterInfo().FillBE(instance)
2960     if instance.disk_template not in constants.DTS_NET_MIRROR:
2961       raise errors.OpPrereqError("Instance's disk layout is not"
2962                                  " network mirrored, cannot failover.")
2963
2964     secondary_nodes = instance.secondary_nodes
2965     if not secondary_nodes:
2966       raise errors.ProgrammerError("no secondary node but using "
2967                                    "a mirrored disk template")
2968
2969     target_node = secondary_nodes[0]
2970     # check memory requirements on the secondary node
2971     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2972                          instance.name, bep[constants.BE_MEMORY],
2973                          instance.hypervisor)
2974
2975     # check bridge existance
2976     brlist = [nic.bridge for nic in instance.nics]
2977     if not self.rpc.call_bridges_exist(target_node, brlist):
2978       raise errors.OpPrereqError("One or more target bridges %s does not"
2979                                  " exist on destination node '%s'" %
2980                                  (brlist, target_node))
2981
2982   def Exec(self, feedback_fn):
2983     """Failover an instance.
2984
2985     The failover is done by shutting it down on its present node and
2986     starting it on the secondary.
2987
2988     """
2989     instance = self.instance
2990
2991     source_node = instance.primary_node
2992     target_node = instance.secondary_nodes[0]
2993
2994     feedback_fn("* checking disk consistency between source and target")
2995     for dev in instance.disks:
2996       # for drbd, these are drbd over lvm
2997       if not _CheckDiskConsistency(self, dev, target_node, False):
2998         if instance.status == "up" and not self.op.ignore_consistency:
2999           raise errors.OpExecError("Disk %s is degraded on target node,"
3000                                    " aborting failover." % dev.iv_name)
3001
3002     feedback_fn("* shutting down instance on source node")
3003     logging.info("Shutting down instance %s on node %s",
3004                  instance.name, source_node)
3005
3006     if not self.rpc.call_instance_shutdown(source_node, instance):
3007       if self.op.ignore_consistency:
3008         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3009                              " Proceeding"
3010                              " anyway. Please make sure node %s is down",
3011                              instance.name, source_node, source_node)
3012       else:
3013         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3014                                  (instance.name, source_node))
3015
3016     feedback_fn("* deactivating the instance's disks on source node")
3017     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3018       raise errors.OpExecError("Can't shut down the instance's disks.")
3019
3020     instance.primary_node = target_node
3021     # distribute new instance config to the other nodes
3022     self.cfg.Update(instance)
3023
3024     # Only start the instance if it's marked as up
3025     if instance.status == "up":
3026       feedback_fn("* activating the instance's disks on target node")
3027       logging.info("Starting instance %s on node %s",
3028                    instance.name, target_node)
3029
3030       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3031                                                ignore_secondaries=True)
3032       if not disks_ok:
3033         _ShutdownInstanceDisks(self, instance)
3034         raise errors.OpExecError("Can't activate the instance's disks")
3035
3036       feedback_fn("* starting the instance on the target node")
3037       if not self.rpc.call_instance_start(target_node, instance, None):
3038         _ShutdownInstanceDisks(self, instance)
3039         raise errors.OpExecError("Could not start instance %s on node %s." %
3040                                  (instance.name, target_node))
3041
3042
3043 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3044   """Create a tree of block devices on the primary node.
3045
3046   This always creates all devices.
3047
3048   """
3049   if device.children:
3050     for child in device.children:
3051       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3052         return False
3053
3054   lu.cfg.SetDiskID(device, node)
3055   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056                                        instance.name, True, info)
3057   if not new_id:
3058     return False
3059   if device.physical_id is None:
3060     device.physical_id = new_id
3061   return True
3062
3063
3064 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3065   """Create a tree of block devices on a secondary node.
3066
3067   If this device type has to be created on secondaries, create it and
3068   all its children.
3069
3070   If not, just recurse to children keeping the same 'force' value.
3071
3072   """
3073   if device.CreateOnSecondary():
3074     force = True
3075   if device.children:
3076     for child in device.children:
3077       if not _CreateBlockDevOnSecondary(lu, node, instance,
3078                                         child, force, info):
3079         return False
3080
3081   if not force:
3082     return True
3083   lu.cfg.SetDiskID(device, node)
3084   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3085                                        instance.name, False, info)
3086   if not new_id:
3087     return False
3088   if device.physical_id is None:
3089     device.physical_id = new_id
3090   return True
3091
3092
3093 def _GenerateUniqueNames(lu, exts):
3094   """Generate a suitable LV name.
3095
3096   This will generate a logical volume name for the given instance.
3097
3098   """
3099   results = []
3100   for val in exts:
3101     new_id = lu.cfg.GenerateUniqueID()
3102     results.append("%s%s" % (new_id, val))
3103   return results
3104
3105
3106 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3107                          p_minor, s_minor):
3108   """Generate a drbd8 device complete with its children.
3109
3110   """
3111   port = lu.cfg.AllocatePort()
3112   vgname = lu.cfg.GetVGName()
3113   shared_secret = lu.cfg.GenerateDRBDSecret()
3114   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3115                           logical_id=(vgname, names[0]))
3116   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3117                           logical_id=(vgname, names[1]))
3118   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3119                           logical_id=(primary, secondary, port,
3120                                       p_minor, s_minor,
3121                                       shared_secret),
3122                           children=[dev_data, dev_meta],
3123                           iv_name=iv_name)
3124   return drbd_dev
3125
3126
3127 def _GenerateDiskTemplate(lu, template_name,
3128                           instance_name, primary_node,
3129                           secondary_nodes, disk_info,
3130                           file_storage_dir, file_driver):
3131   """Generate the entire disk layout for a given template type.
3132
3133   """
3134   #TODO: compute space requirements
3135
3136   vgname = lu.cfg.GetVGName()
3137   disk_count = len(disk_info)
3138   disks = []
3139   if template_name == constants.DT_DISKLESS:
3140     pass
3141   elif template_name == constants.DT_PLAIN:
3142     if len(secondary_nodes) != 0:
3143       raise errors.ProgrammerError("Wrong template configuration")
3144
3145     names = _GenerateUniqueNames(lu, [".disk%d" % i
3146                                       for i in range(disk_count)])
3147     for idx, disk in enumerate(disk_info):
3148       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3149                               logical_id=(vgname, names[idx]),
3150                               iv_name = "disk/%d" % idx)
3151       disks.append(disk_dev)
3152   elif template_name == constants.DT_DRBD8:
3153     if len(secondary_nodes) != 1:
3154       raise errors.ProgrammerError("Wrong template configuration")
3155     remote_node = secondary_nodes[0]
3156     minors = lu.cfg.AllocateDRBDMinor(
3157       [primary_node, remote_node] * len(disk_info), instance_name)
3158
3159     names = _GenerateUniqueNames(lu,
3160                                  [".disk%d_%s" % (i, s)
3161                                   for i in range(disk_count)
3162                                   for s in ("data", "meta")
3163                                   ])
3164     for idx, disk in enumerate(disk_info):
3165       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3166                                       disk["size"], names[idx*2:idx*2+2],
3167                                       "disk/%d" % idx,
3168                                       minors[idx*2], minors[idx*2+1])
3169       disks.append(disk_dev)
3170   elif template_name == constants.DT_FILE:
3171     if len(secondary_nodes) != 0:
3172       raise errors.ProgrammerError("Wrong template configuration")
3173
3174     for idx, disk in enumerate(disk_info):
3175
3176       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3177                               iv_name="disk/%d" % idx,
3178                               logical_id=(file_driver,
3179                                           "%s/disk%d" % (file_storage_dir,
3180                                                          idx)))
3181       disks.append(disk_dev)
3182   else:
3183     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3184   return disks
3185
3186
3187 def _GetInstanceInfoText(instance):
3188   """Compute that text that should be added to the disk's metadata.
3189
3190   """
3191   return "originstname+%s" % instance.name
3192
3193
3194 def _CreateDisks(lu, instance):
3195   """Create all disks for an instance.
3196
3197   This abstracts away some work from AddInstance.
3198
3199   @type lu: L{LogicalUnit}
3200   @param lu: the logical unit on whose behalf we execute
3201   @type instance: L{objects.Instance}
3202   @param instance: the instance whose disks we should create
3203   @rtype: boolean
3204   @return: the success of the creation
3205
3206   """
3207   info = _GetInstanceInfoText(instance)
3208
3209   if instance.disk_template == constants.DT_FILE:
3210     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3211     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3212                                                  file_storage_dir)
3213
3214     if not result:
3215       logging.error("Could not connect to node '%s'", instance.primary_node)
3216       return False
3217
3218     if not result[0]:
3219       logging.error("Failed to create directory '%s'", file_storage_dir)
3220       return False
3221
3222   for device in instance.disks:
3223     logging.info("Creating volume %s for instance %s",
3224                  device.iv_name, instance.name)
3225     #HARDCODE
3226     for secondary_node in instance.secondary_nodes:
3227       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3228                                         device, False, info):
3229         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3230                       device.iv_name, device, secondary_node)
3231         return False
3232     #HARDCODE
3233     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3234                                     instance, device, info):
3235       logging.error("Failed to create volume %s on primary!", device.iv_name)
3236       return False
3237
3238   return True
3239
3240
3241 def _RemoveDisks(lu, instance):
3242   """Remove all disks for an instance.
3243
3244   This abstracts away some work from `AddInstance()` and
3245   `RemoveInstance()`. Note that in case some of the devices couldn't
3246   be removed, the removal will continue with the other ones (compare
3247   with `_CreateDisks()`).
3248
3249   @type lu: L{LogicalUnit}
3250   @param lu: the logical unit on whose behalf we execute
3251   @type instance: L{objects.Instance}
3252   @param instance: the instance whose disks we should remove
3253   @rtype: boolean
3254   @return: the success of the removal
3255
3256   """
3257   logging.info("Removing block devices for instance %s", instance.name)
3258
3259   result = True
3260   for device in instance.disks:
3261     for node, disk in device.ComputeNodeTree(instance.primary_node):
3262       lu.cfg.SetDiskID(disk, node)
3263       if not lu.rpc.call_blockdev_remove(node, disk):
3264         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3265                            " continuing anyway", device.iv_name, node)
3266         result = False
3267
3268   if instance.disk_template == constants.DT_FILE:
3269     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3270     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3271                                                file_storage_dir):
3272       logging.error("Could not remove directory '%s'", file_storage_dir)
3273       result = False
3274
3275   return result
3276
3277
3278 def _ComputeDiskSize(disk_template, disks):
3279   """Compute disk size requirements in the volume group
3280
3281   """
3282   # Required free disk space as a function of disk and swap space
3283   req_size_dict = {
3284     constants.DT_DISKLESS: None,
3285     constants.DT_PLAIN: sum(d["size"] for d in disks),
3286     # 128 MB are added for drbd metadata for each disk
3287     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3288     constants.DT_FILE: None,
3289   }
3290
3291   if disk_template not in req_size_dict:
3292     raise errors.ProgrammerError("Disk template '%s' size requirement"
3293                                  " is unknown" %  disk_template)
3294
3295   return req_size_dict[disk_template]
3296
3297
3298 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3299   """Hypervisor parameter validation.
3300
3301   This function abstract the hypervisor parameter validation to be
3302   used in both instance create and instance modify.
3303
3304   @type lu: L{LogicalUnit}
3305   @param lu: the logical unit for which we check
3306   @type nodenames: list
3307   @param nodenames: the list of nodes on which we should check
3308   @type hvname: string
3309   @param hvname: the name of the hypervisor we should use
3310   @type hvparams: dict
3311   @param hvparams: the parameters which we need to check
3312   @raise errors.OpPrereqError: if the parameters are not valid
3313
3314   """
3315   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3316                                                   hvname,
3317                                                   hvparams)
3318   for node in nodenames:
3319     info = hvinfo.get(node, None)
3320     if not info or not isinstance(info, (tuple, list)):
3321       raise errors.OpPrereqError("Cannot get current information"
3322                                  " from node '%s' (%s)" % (node, info))
3323     if not info[0]:
3324       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3325                                  " %s" % info[1])
3326
3327
3328 class LUCreateInstance(LogicalUnit):
3329   """Create an instance.
3330
3331   """
3332   HPATH = "instance-add"
3333   HTYPE = constants.HTYPE_INSTANCE
3334   _OP_REQP = ["instance_name", "disks", "disk_template",
3335               "mode", "start",
3336               "wait_for_sync", "ip_check", "nics",
3337               "hvparams", "beparams"]
3338   REQ_BGL = False
3339
3340   def _ExpandNode(self, node):
3341     """Expands and checks one node name.
3342
3343     """
3344     node_full = self.cfg.ExpandNodeName(node)
3345     if node_full is None:
3346       raise errors.OpPrereqError("Unknown node %s" % node)
3347     return node_full
3348
3349   def ExpandNames(self):
3350     """ExpandNames for CreateInstance.
3351
3352     Figure out the right locks for instance creation.
3353
3354     """
3355     self.needed_locks = {}
3356
3357     # set optional parameters to none if they don't exist
3358     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3359       if not hasattr(self.op, attr):
3360         setattr(self.op, attr, None)
3361
3362     # cheap checks, mostly valid constants given
3363
3364     # verify creation mode
3365     if self.op.mode not in (constants.INSTANCE_CREATE,
3366                             constants.INSTANCE_IMPORT):
3367       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3368                                  self.op.mode)
3369
3370     # disk template and mirror node verification
3371     if self.op.disk_template not in constants.DISK_TEMPLATES:
3372       raise errors.OpPrereqError("Invalid disk template name")
3373
3374     if self.op.hypervisor is None:
3375       self.op.hypervisor = self.cfg.GetHypervisorType()
3376
3377     cluster = self.cfg.GetClusterInfo()
3378     enabled_hvs = cluster.enabled_hypervisors
3379     if self.op.hypervisor not in enabled_hvs:
3380       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3381                                  " cluster (%s)" % (self.op.hypervisor,
3382                                   ",".join(enabled_hvs)))
3383
3384     # check hypervisor parameter syntax (locally)
3385
3386     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3387                                   self.op.hvparams)
3388     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3389     hv_type.CheckParameterSyntax(filled_hvp)
3390
3391     # fill and remember the beparams dict
3392     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3393                                     self.op.beparams)
3394
3395     #### instance parameters check
3396
3397     # instance name verification
3398     hostname1 = utils.HostInfo(self.op.instance_name)
3399     self.op.instance_name = instance_name = hostname1.name
3400
3401     # this is just a preventive check, but someone might still add this
3402     # instance in the meantime, and creation will fail at lock-add time
3403     if instance_name in self.cfg.GetInstanceList():
3404       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3405                                  instance_name)
3406
3407     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3408
3409     # NIC buildup
3410     self.nics = []
3411     for nic in self.op.nics:
3412       # ip validity checks
3413       ip = nic.get("ip", None)
3414       if ip is None or ip.lower() == "none":
3415         nic_ip = None
3416       elif ip.lower() == constants.VALUE_AUTO:
3417         nic_ip = hostname1.ip
3418       else:
3419         if not utils.IsValidIP(ip):
3420           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3421                                      " like a valid IP" % ip)
3422         nic_ip = ip
3423
3424       # MAC address verification
3425       mac = nic.get("mac", constants.VALUE_AUTO)
3426       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3427         if not utils.IsValidMac(mac.lower()):
3428           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3429                                      mac)
3430       # bridge verification
3431       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3432       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3433
3434     # disk checks/pre-build
3435     self.disks = []
3436     for disk in self.op.disks:
3437       mode = disk.get("mode", constants.DISK_RDWR)
3438       if mode not in constants.DISK_ACCESS_SET:
3439         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3440                                    mode)
3441       size = disk.get("size", None)
3442       if size is None:
3443         raise errors.OpPrereqError("Missing disk size")
3444       try:
3445         size = int(size)
3446       except ValueError:
3447         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3448       self.disks.append({"size": size, "mode": mode})
3449
3450     # used in CheckPrereq for ip ping check
3451     self.check_ip = hostname1.ip
3452
3453     # file storage checks
3454     if (self.op.file_driver and
3455         not self.op.file_driver in constants.FILE_DRIVER):
3456       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3457                                  self.op.file_driver)
3458
3459     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3460       raise errors.OpPrereqError("File storage directory path not absolute")
3461
3462     ### Node/iallocator related checks
3463     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3464       raise errors.OpPrereqError("One and only one of iallocator and primary"
3465                                  " node must be given")
3466
3467     if self.op.iallocator:
3468       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3469     else:
3470       self.op.pnode = self._ExpandNode(self.op.pnode)
3471       nodelist = [self.op.pnode]
3472       if self.op.snode is not None:
3473         self.op.snode = self._ExpandNode(self.op.snode)
3474         nodelist.append(self.op.snode)
3475       self.needed_locks[locking.LEVEL_NODE] = nodelist
3476
3477     # in case of import lock the source node too
3478     if self.op.mode == constants.INSTANCE_IMPORT:
3479       src_node = getattr(self.op, "src_node", None)
3480       src_path = getattr(self.op, "src_path", None)
3481
3482       if src_node is None or src_path is None:
3483         raise errors.OpPrereqError("Importing an instance requires source"
3484                                    " node and path options")
3485
3486       if not os.path.isabs(src_path):
3487         raise errors.OpPrereqError("The source path must be absolute")
3488
3489       self.op.src_node = src_node = self._ExpandNode(src_node)
3490       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3491         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3492
3493     else: # INSTANCE_CREATE
3494       if getattr(self.op, "os_type", None) is None:
3495         raise errors.OpPrereqError("No guest OS specified")
3496
3497   def _RunAllocator(self):
3498     """Run the allocator based on input opcode.
3499
3500     """
3501     nics = [n.ToDict() for n in self.nics]
3502     ial = IAllocator(self,
3503                      mode=constants.IALLOCATOR_MODE_ALLOC,
3504                      name=self.op.instance_name,
3505                      disk_template=self.op.disk_template,
3506                      tags=[],
3507                      os=self.op.os_type,
3508                      vcpus=self.be_full[constants.BE_VCPUS],
3509                      mem_size=self.be_full[constants.BE_MEMORY],
3510                      disks=self.disks,
3511                      nics=nics,
3512                      )
3513
3514     ial.Run(self.op.iallocator)
3515
3516     if not ial.success:
3517       raise errors.OpPrereqError("Can't compute nodes using"
3518                                  " iallocator '%s': %s" % (self.op.iallocator,
3519                                                            ial.info))
3520     if len(ial.nodes) != ial.required_nodes:
3521       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3522                                  " of nodes (%s), required %s" %
3523                                  (self.op.iallocator, len(ial.nodes),
3524                                   ial.required_nodes))
3525     self.op.pnode = ial.nodes[0]
3526     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3527                  self.op.instance_name, self.op.iallocator,
3528                  ", ".join(ial.nodes))
3529     if ial.required_nodes == 2:
3530       self.op.snode = ial.nodes[1]
3531
3532   def BuildHooksEnv(self):
3533     """Build hooks env.
3534
3535     This runs on master, primary and secondary nodes of the instance.
3536
3537     """
3538     env = {
3539       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3540       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3541       "INSTANCE_ADD_MODE": self.op.mode,
3542       }
3543     if self.op.mode == constants.INSTANCE_IMPORT:
3544       env["INSTANCE_SRC_NODE"] = self.op.src_node
3545       env["INSTANCE_SRC_PATH"] = self.op.src_path
3546       env["INSTANCE_SRC_IMAGES"] = self.src_images
3547
3548     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3549       primary_node=self.op.pnode,
3550       secondary_nodes=self.secondaries,
3551       status=self.instance_status,
3552       os_type=self.op.os_type,
3553       memory=self.be_full[constants.BE_MEMORY],
3554       vcpus=self.be_full[constants.BE_VCPUS],
3555       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3556     ))
3557
3558     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3559           self.secondaries)
3560     return env, nl, nl
3561
3562
3563   def CheckPrereq(self):
3564     """Check prerequisites.
3565
3566     """
3567     if (not self.cfg.GetVGName() and
3568         self.op.disk_template not in constants.DTS_NOT_LVM):
3569       raise errors.OpPrereqError("Cluster does not support lvm-based"
3570                                  " instances")
3571
3572
3573     if self.op.mode == constants.INSTANCE_IMPORT:
3574       src_node = self.op.src_node
3575       src_path = self.op.src_path
3576
3577       export_info = self.rpc.call_export_info(src_node, src_path)
3578
3579       if not export_info:
3580         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3581
3582       if not export_info.has_section(constants.INISECT_EXP):
3583         raise errors.ProgrammerError("Corrupted export config")
3584
3585       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3586       if (int(ei_version) != constants.EXPORT_VERSION):
3587         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3588                                    (ei_version, constants.EXPORT_VERSION))
3589
3590       # Check that the new instance doesn't have less disks than the export
3591       instance_disks = len(self.disks)
3592       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3593       if instance_disks < export_disks:
3594         raise errors.OpPrereqError("Not enough disks to import."
3595                                    " (instance: %d, export: %d)" %
3596                                    (2, export_disks))
3597
3598       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3599       disk_images = []
3600       for idx in range(export_disks):
3601         option = 'disk%d_dump' % idx
3602         if export_info.has_option(constants.INISECT_INS, option):
3603           # FIXME: are the old os-es, disk sizes, etc. useful?
3604           export_name = export_info.get(constants.INISECT_INS, option)
3605           image = os.path.join(src_path, export_name)
3606           disk_images.append(image)
3607         else:
3608           disk_images.append(False)
3609
3610       self.src_images = disk_images
3611
3612       if self.op.mac == constants.VALUE_AUTO:
3613         old_name = export_info.get(constants.INISECT_INS, 'name')
3614         if self.op.instance_name == old_name:
3615           # FIXME: adjust every nic, when we'll be able to create instances
3616           # with more than one
3617           if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3618             self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3619
3620     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3621
3622     if self.op.start and not self.op.ip_check:
3623       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3624                                  " adding an instance in start mode")
3625
3626     if self.op.ip_check:
3627       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3628         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3629                                    (self.check_ip, self.op.instance_name))
3630
3631     #### allocator run
3632
3633     if self.op.iallocator is not None:
3634       self._RunAllocator()
3635
3636     #### node related checks
3637
3638     # check primary node
3639     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3640     assert self.pnode is not None, \
3641       "Cannot retrieve locked node %s" % self.op.pnode
3642     self.secondaries = []
3643
3644     # mirror node verification
3645     if self.op.disk_template in constants.DTS_NET_MIRROR:
3646       if self.op.snode is None:
3647         raise errors.OpPrereqError("The networked disk templates need"
3648                                    " a mirror node")
3649       if self.op.snode == pnode.name:
3650         raise errors.OpPrereqError("The secondary node cannot be"
3651                                    " the primary node.")
3652       self.secondaries.append(self.op.snode)
3653
3654     nodenames = [pnode.name] + self.secondaries
3655
3656     req_size = _ComputeDiskSize(self.op.disk_template,
3657                                 self.disks)
3658
3659     # Check lv size requirements
3660     if req_size is not None:
3661       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3662                                          self.op.hypervisor)
3663       for node in nodenames:
3664         info = nodeinfo.get(node, None)
3665         if not info:
3666           raise errors.OpPrereqError("Cannot get current information"
3667                                      " from node '%s'" % node)
3668         vg_free = info.get('vg_free', None)
3669         if not isinstance(vg_free, int):
3670           raise errors.OpPrereqError("Can't compute free disk space on"
3671                                      " node %s" % node)
3672         if req_size > info['vg_free']:
3673           raise errors.OpPrereqError("Not enough disk space on target node %s."
3674                                      " %d MB available, %d MB required" %
3675                                      (node, info['vg_free'], req_size))
3676
3677     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3678
3679     # os verification
3680     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3681     if not os_obj:
3682       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3683                                  " primary node"  % self.op.os_type)
3684
3685     # bridge check on primary node
3686     bridges = [n.bridge for n in self.nics]
3687     if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3688       raise errors.OpPrereqError("one of the target bridges '%s' does not"
3689                                  " exist on"
3690                                  " destination node '%s'" %
3691                                  (",".join(bridges), pnode.name))
3692
3693     # memory check on primary node
3694     if self.op.start:
3695       _CheckNodeFreeMemory(self, self.pnode.name,
3696                            "creating instance %s" % self.op.instance_name,
3697                            self.be_full[constants.BE_MEMORY],
3698                            self.op.hypervisor)
3699
3700     if self.op.start:
3701       self.instance_status = 'up'
3702     else:
3703       self.instance_status = 'down'
3704
3705   def Exec(self, feedback_fn):
3706     """Create and add the instance to the cluster.
3707
3708     """
3709     instance = self.op.instance_name
3710     pnode_name = self.pnode.name
3711
3712     for nic in self.nics:
3713       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3714         nic.mac = self.cfg.GenerateMAC()
3715
3716     ht_kind = self.op.hypervisor
3717     if ht_kind in constants.HTS_REQ_PORT:
3718       network_port = self.cfg.AllocatePort()
3719     else:
3720       network_port = None
3721
3722     ##if self.op.vnc_bind_address is None:
3723     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3724
3725     # this is needed because os.path.join does not accept None arguments
3726     if self.op.file_storage_dir is None:
3727       string_file_storage_dir = ""
3728     else:
3729       string_file_storage_dir = self.op.file_storage_dir
3730
3731     # build the full file storage dir path
3732     file_storage_dir = os.path.normpath(os.path.join(
3733                                         self.cfg.GetFileStorageDir(),
3734                                         string_file_storage_dir, instance))
3735
3736
3737     disks = _GenerateDiskTemplate(self,
3738                                   self.op.disk_template,
3739                                   instance, pnode_name,
3740                                   self.secondaries,
3741                                   self.disks,
3742                                   file_storage_dir,
3743                                   self.op.file_driver)
3744
3745     iobj = objects.Instance(name=instance, os=self.op.os_type,
3746                             primary_node=pnode_name,
3747                             nics=self.nics, disks=disks,
3748                             disk_template=self.op.disk_template,
3749                             status=self.instance_status,
3750                             network_port=network_port,
3751                             beparams=self.op.beparams,
3752                             hvparams=self.op.hvparams,
3753                             hypervisor=self.op.hypervisor,
3754                             )
3755
3756     feedback_fn("* creating instance disks...")
3757     if not _CreateDisks(self, iobj):
3758       _RemoveDisks(self, iobj)
3759       self.cfg.ReleaseDRBDMinors(instance)
3760       raise errors.OpExecError("Device creation failed, reverting...")
3761
3762     feedback_fn("adding instance %s to cluster config" % instance)
3763
3764     self.cfg.AddInstance(iobj)
3765     # Declare that we don't want to remove the instance lock anymore, as we've
3766     # added the instance to the config
3767     del self.remove_locks[locking.LEVEL_INSTANCE]
3768     # Remove the temp. assignements for the instance's drbds
3769     self.cfg.ReleaseDRBDMinors(instance)
3770
3771     if self.op.wait_for_sync:
3772       disk_abort = not _WaitForSync(self, iobj)
3773     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3774       # make sure the disks are not degraded (still sync-ing is ok)
3775       time.sleep(15)
3776       feedback_fn("* checking mirrors status")
3777       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3778     else:
3779       disk_abort = False
3780
3781     if disk_abort:
3782       _RemoveDisks(self, iobj)
3783       self.cfg.RemoveInstance(iobj.name)
3784       # Make sure the instance lock gets removed
3785       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3786       raise errors.OpExecError("There are some degraded disks for"
3787                                " this instance")
3788
3789     feedback_fn("creating os for instance %s on node %s" %
3790                 (instance, pnode_name))
3791
3792     if iobj.disk_template != constants.DT_DISKLESS:
3793       if self.op.mode == constants.INSTANCE_CREATE:
3794         feedback_fn("* running the instance OS create scripts...")
3795         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3796           raise errors.OpExecError("could not add os for instance %s"
3797                                    " on node %s" %
3798                                    (instance, pnode_name))
3799
3800       elif self.op.mode == constants.INSTANCE_IMPORT:
3801         feedback_fn("* running the instance OS import scripts...")
3802         src_node = self.op.src_node
3803         src_images = self.src_images
3804         cluster_name = self.cfg.GetClusterName()
3805         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3806                                                          src_node, src_images,
3807                                                          cluster_name)
3808         for idx, result in enumerate(import_result):
3809           if not result:
3810             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3811                             " on node %s" % (src_images[idx], instance, idx,
3812                                              pnode_name))
3813       else:
3814         # also checked in the prereq part
3815         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3816                                      % self.op.mode)
3817
3818     if self.op.start:
3819       logging.info("Starting instance %s on node %s", instance, pnode_name)
3820       feedback_fn("* starting instance...")
3821       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3822         raise errors.OpExecError("Could not start instance")
3823
3824
3825 class LUConnectConsole(NoHooksLU):
3826   """Connect to an instance's console.
3827
3828   This is somewhat special in that it returns the command line that
3829   you need to run on the master node in order to connect to the
3830   console.
3831
3832   """
3833   _OP_REQP = ["instance_name"]
3834   REQ_BGL = False
3835
3836   def ExpandNames(self):
3837     self._ExpandAndLockInstance()
3838
3839   def CheckPrereq(self):
3840     """Check prerequisites.
3841
3842     This checks that the instance is in the cluster.
3843
3844     """
3845     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3846     assert self.instance is not None, \
3847       "Cannot retrieve locked instance %s" % self.op.instance_name
3848
3849   def Exec(self, feedback_fn):
3850     """Connect to the console of an instance
3851
3852     """
3853     instance = self.instance
3854     node = instance.primary_node
3855
3856     node_insts = self.rpc.call_instance_list([node],
3857                                              [instance.hypervisor])[node]
3858     if node_insts is False:
3859       raise errors.OpExecError("Can't connect to node %s." % node)
3860
3861     if instance.name not in node_insts:
3862       raise errors.OpExecError("Instance %s is not running." % instance.name)
3863
3864     logging.debug("Connecting to console of %s on %s", instance.name, node)
3865
3866     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3867     console_cmd = hyper.GetShellCommandForConsole(instance)
3868
3869     # build ssh cmdline
3870     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3871
3872
3873 class LUReplaceDisks(LogicalUnit):
3874   """Replace the disks of an instance.
3875
3876   """
3877   HPATH = "mirrors-replace"
3878   HTYPE = constants.HTYPE_INSTANCE
3879   _OP_REQP = ["instance_name", "mode", "disks"]
3880   REQ_BGL = False
3881
3882   def ExpandNames(self):
3883     self._ExpandAndLockInstance()
3884
3885     if not hasattr(self.op, "remote_node"):
3886       self.op.remote_node = None
3887
3888     ia_name = getattr(self.op, "iallocator", None)
3889     if ia_name is not None:
3890       if self.op.remote_node is not None:
3891         raise errors.OpPrereqError("Give either the iallocator or the new"
3892                                    " secondary, not both")
3893       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3894     elif self.op.remote_node is not None:
3895       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3896       if remote_node is None:
3897         raise errors.OpPrereqError("Node '%s' not known" %
3898                                    self.op.remote_node)
3899       self.op.remote_node = remote_node
3900       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3901       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3902     else:
3903       self.needed_locks[locking.LEVEL_NODE] = []
3904       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3905
3906   def DeclareLocks(self, level):
3907     # If we're not already locking all nodes in the set we have to declare the
3908     # instance's primary/secondary nodes.
3909     if (level == locking.LEVEL_NODE and
3910         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3911       self._LockInstancesNodes()
3912
3913   def _RunAllocator(self):
3914     """Compute a new secondary node using an IAllocator.
3915
3916     """
3917     ial = IAllocator(self,
3918                      mode=constants.IALLOCATOR_MODE_RELOC,
3919                      name=self.op.instance_name,
3920                      relocate_from=[self.sec_node])
3921
3922     ial.Run(self.op.iallocator)
3923
3924     if not ial.success:
3925       raise errors.OpPrereqError("Can't compute nodes using"
3926                                  " iallocator '%s': %s" % (self.op.iallocator,
3927                                                            ial.info))
3928     if len(ial.nodes) != ial.required_nodes:
3929       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3930                                  " of nodes (%s), required %s" %
3931                                  (len(ial.nodes), ial.required_nodes))
3932     self.op.remote_node = ial.nodes[0]
3933     self.LogInfo("Selected new secondary for the instance: %s",
3934                  self.op.remote_node)
3935
3936   def BuildHooksEnv(self):
3937     """Build hooks env.
3938
3939     This runs on the master, the primary and all the secondaries.
3940
3941     """
3942     env = {
3943       "MODE": self.op.mode,
3944       "NEW_SECONDARY": self.op.remote_node,
3945       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3946       }
3947     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3948     nl = [
3949       self.cfg.GetMasterNode(),
3950       self.instance.primary_node,
3951       ]
3952     if self.op.remote_node is not None:
3953       nl.append(self.op.remote_node)
3954     return env, nl, nl
3955
3956   def CheckPrereq(self):
3957     """Check prerequisites.
3958
3959     This checks that the instance is in the cluster.
3960
3961     """
3962     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3963     assert instance is not None, \
3964       "Cannot retrieve locked instance %s" % self.op.instance_name
3965     self.instance = instance
3966
3967     if instance.disk_template not in constants.DTS_NET_MIRROR:
3968       raise errors.OpPrereqError("Instance's disk layout is not"
3969                                  " network mirrored.")
3970
3971     if len(instance.secondary_nodes) != 1:
3972       raise errors.OpPrereqError("The instance has a strange layout,"
3973                                  " expected one secondary but found %d" %
3974                                  len(instance.secondary_nodes))
3975
3976     self.sec_node = instance.secondary_nodes[0]
3977
3978     ia_name = getattr(self.op, "iallocator", None)
3979     if ia_name is not None:
3980       self._RunAllocator()
3981
3982     remote_node = self.op.remote_node
3983     if remote_node is not None:
3984       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3985       assert self.remote_node_info is not None, \
3986         "Cannot retrieve locked node %s" % remote_node
3987     else:
3988       self.remote_node_info = None
3989     if remote_node == instance.primary_node:
3990       raise errors.OpPrereqError("The specified node is the primary node of"
3991                                  " the instance.")
3992     elif remote_node == self.sec_node:
3993       if self.op.mode == constants.REPLACE_DISK_SEC:
3994         # this is for DRBD8, where we can't execute the same mode of
3995         # replacement as for drbd7 (no different port allocated)
3996         raise errors.OpPrereqError("Same secondary given, cannot execute"
3997                                    " replacement")
3998     if instance.disk_template == constants.DT_DRBD8:
3999       if (self.op.mode == constants.REPLACE_DISK_ALL and
4000           remote_node is not None):
4001         # switch to replace secondary mode
4002         self.op.mode = constants.REPLACE_DISK_SEC
4003
4004       if self.op.mode == constants.REPLACE_DISK_ALL:
4005         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4006                                    " secondary disk replacement, not"
4007                                    " both at once")
4008       elif self.op.mode == constants.REPLACE_DISK_PRI:
4009         if remote_node is not None:
4010           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4011                                      " the secondary while doing a primary"
4012                                      " node disk replacement")
4013         self.tgt_node = instance.primary_node
4014         self.oth_node = instance.secondary_nodes[0]
4015       elif self.op.mode == constants.REPLACE_DISK_SEC:
4016         self.new_node = remote_node # this can be None, in which case
4017                                     # we don't change the secondary
4018         self.tgt_node = instance.secondary_nodes[0]
4019         self.oth_node = instance.primary_node
4020       else:
4021         raise errors.ProgrammerError("Unhandled disk replace mode")
4022
4023     if not self.op.disks:
4024       self.op.disks = range(len(instance.disks))
4025
4026     for disk_idx in self.op.disks:
4027       instance.FindDisk(disk_idx)
4028
4029   def _ExecD8DiskOnly(self, feedback_fn):
4030     """Replace a disk on the primary or secondary for dbrd8.
4031
4032     The algorithm for replace is quite complicated:
4033
4034       1. for each disk to be replaced:
4035
4036         1. create new LVs on the target node with unique names
4037         1. detach old LVs from the drbd device
4038         1. rename old LVs to name_replaced.<time_t>
4039         1. rename new LVs to old LVs
4040         1. attach the new LVs (with the old names now) to the drbd device
4041
4042       1. wait for sync across all devices
4043
4044       1. for each modified disk:
4045
4046         1. remove old LVs (which have the name name_replaces.<time_t>)
4047
4048     Failures are not very well handled.
4049
4050     """
4051     steps_total = 6
4052     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4053     instance = self.instance
4054     iv_names = {}
4055     vgname = self.cfg.GetVGName()
4056     # start of work
4057     cfg = self.cfg
4058     tgt_node = self.tgt_node
4059     oth_node = self.oth_node
4060
4061     # Step: check device activation
4062     self.proc.LogStep(1, steps_total, "check device existence")
4063     info("checking volume groups")
4064     my_vg = cfg.GetVGName()
4065     results = self.rpc.call_vg_list([oth_node, tgt_node])
4066     if not results:
4067       raise errors.OpExecError("Can't list volume groups on the nodes")
4068     for node in oth_node, tgt_node:
4069       res = results.get(node, False)
4070       if not res or my_vg not in res:
4071         raise errors.OpExecError("Volume group '%s' not found on %s" %
4072                                  (my_vg, node))
4073     for idx, dev in enumerate(instance.disks):
4074       if idx not in self.op.disks:
4075         continue
4076       for node in tgt_node, oth_node:
4077         info("checking disk/%d on %s" % (idx, node))
4078         cfg.SetDiskID(dev, node)
4079         if not self.rpc.call_blockdev_find(node, dev):
4080           raise errors.OpExecError("Can't find disk/%d on node %s" %
4081                                    (idx, node))
4082
4083     # Step: check other node consistency
4084     self.proc.LogStep(2, steps_total, "check peer consistency")
4085     for idx, dev in enumerate(instance.disks):
4086       if idx not in self.op.disks:
4087         continue
4088       info("checking disk/%d consistency on %s" % (idx, oth_node))
4089       if not _CheckDiskConsistency(self, dev, oth_node,
4090                                    oth_node==instance.primary_node):
4091         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4092                                  " to replace disks on this node (%s)" %
4093                                  (oth_node, tgt_node))
4094
4095     # Step: create new storage
4096     self.proc.LogStep(3, steps_total, "allocate new storage")
4097     for idx, dev in enumerate(instance.disks):
4098       if idx not in self.op.disks:
4099         continue
4100       size = dev.size
4101       cfg.SetDiskID(dev, tgt_node)
4102       lv_names = [".disk%d_%s" % (idx, suf)
4103                   for suf in ["data", "meta"]]
4104       names = _GenerateUniqueNames(self, lv_names)
4105       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4106                              logical_id=(vgname, names[0]))
4107       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4108                              logical_id=(vgname, names[1]))
4109       new_lvs = [lv_data, lv_meta]
4110       old_lvs = dev.children
4111       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4112       info("creating new local storage on %s for %s" %
4113            (tgt_node, dev.iv_name))
4114       # since we *always* want to create this LV, we use the
4115       # _Create...OnPrimary (which forces the creation), even if we
4116       # are talking about the secondary node
4117       for new_lv in new_lvs:
4118         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4119                                         _GetInstanceInfoText(instance)):
4120           raise errors.OpExecError("Failed to create new LV named '%s' on"
4121                                    " node '%s'" %
4122                                    (new_lv.logical_id[1], tgt_node))
4123
4124     # Step: for each lv, detach+rename*2+attach
4125     self.proc.LogStep(4, steps_total, "change drbd configuration")
4126     for dev, old_lvs, new_lvs in iv_names.itervalues():
4127       info("detaching %s drbd from local storage" % dev.iv_name)
4128       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4129         raise errors.OpExecError("Can't detach drbd from local storage on node"
4130                                  " %s for device %s" % (tgt_node, dev.iv_name))
4131       #dev.children = []
4132       #cfg.Update(instance)
4133
4134       # ok, we created the new LVs, so now we know we have the needed
4135       # storage; as such, we proceed on the target node to rename
4136       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4137       # using the assumption that logical_id == physical_id (which in
4138       # turn is the unique_id on that node)
4139
4140       # FIXME(iustin): use a better name for the replaced LVs
4141       temp_suffix = int(time.time())
4142       ren_fn = lambda d, suff: (d.physical_id[0],
4143                                 d.physical_id[1] + "_replaced-%s" % suff)
4144       # build the rename list based on what LVs exist on the node
4145       rlist = []
4146       for to_ren in old_lvs:
4147         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4148         if find_res is not None: # device exists
4149           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4150
4151       info("renaming the old LVs on the target node")
4152       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4153         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4154       # now we rename the new LVs to the old LVs
4155       info("renaming the new LVs on the target node")
4156       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4157       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4158         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4159
4160       for old, new in zip(old_lvs, new_lvs):
4161         new.logical_id = old.logical_id
4162         cfg.SetDiskID(new, tgt_node)
4163
4164       for disk in old_lvs:
4165         disk.logical_id = ren_fn(disk, temp_suffix)
4166         cfg.SetDiskID(disk, tgt_node)
4167
4168       # now that the new lvs have the old name, we can add them to the device
4169       info("adding new mirror component on %s" % tgt_node)
4170       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4171         for new_lv in new_lvs:
4172           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4173             warning("Can't rollback device %s", hint="manually cleanup unused"
4174                     " logical volumes")
4175         raise errors.OpExecError("Can't add local storage to drbd")
4176
4177       dev.children = new_lvs
4178       cfg.Update(instance)
4179
4180     # Step: wait for sync
4181
4182     # this can fail as the old devices are degraded and _WaitForSync
4183     # does a combined result over all disks, so we don't check its
4184     # return value
4185     self.proc.LogStep(5, steps_total, "sync devices")
4186     _WaitForSync(self, instance, unlock=True)
4187
4188     # so check manually all the devices
4189     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4190       cfg.SetDiskID(dev, instance.primary_node)
4191       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4192       if is_degr:
4193         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4194
4195     # Step: remove old storage
4196     self.proc.LogStep(6, steps_total, "removing old storage")
4197     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4198       info("remove logical volumes for %s" % name)
4199       for lv in old_lvs:
4200         cfg.SetDiskID(lv, tgt_node)
4201         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4202           warning("Can't remove old LV", hint="manually remove unused LVs")
4203           continue
4204
4205   def _ExecD8Secondary(self, feedback_fn):
4206     """Replace the secondary node for drbd8.
4207
4208     The algorithm for replace is quite complicated:
4209       - for all disks of the instance:
4210         - create new LVs on the new node with same names
4211         - shutdown the drbd device on the old secondary
4212         - disconnect the drbd network on the primary
4213         - create the drbd device on the new secondary
4214         - network attach the drbd on the primary, using an artifice:
4215           the drbd code for Attach() will connect to the network if it
4216           finds a device which is connected to the good local disks but
4217           not network enabled
4218       - wait for sync across all devices
4219       - remove all disks from the old secondary
4220
4221     Failures are not very well handled.
4222
4223     """
4224     steps_total = 6
4225     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4226     instance = self.instance
4227     iv_names = {}
4228     vgname = self.cfg.GetVGName()
4229     # start of work
4230     cfg = self.cfg
4231     old_node = self.tgt_node
4232     new_node = self.new_node
4233     pri_node = instance.primary_node
4234
4235     # Step: check device activation
4236     self.proc.LogStep(1, steps_total, "check device existence")
4237     info("checking volume groups")
4238     my_vg = cfg.GetVGName()
4239     results = self.rpc.call_vg_list([pri_node, new_node])
4240     if not results:
4241       raise errors.OpExecError("Can't list volume groups on the nodes")
4242     for node in pri_node, new_node:
4243       res = results.get(node, False)
4244       if not res or my_vg not in res:
4245         raise errors.OpExecError("Volume group '%s' not found on %s" %
4246                                  (my_vg, node))
4247     for idx, dev in enumerate(instance.disks):
4248       if idx not in self.op.disks:
4249         continue
4250       info("checking disk/%d on %s" % (idx, pri_node))
4251       cfg.SetDiskID(dev, pri_node)
4252       if not self.rpc.call_blockdev_find(pri_node, dev):
4253         raise errors.OpExecError("Can't find disk/%d on node %s" %
4254                                  (idx, pri_node))
4255
4256     # Step: check other node consistency
4257     self.proc.LogStep(2, steps_total, "check peer consistency")
4258     for idx, dev in enumerate(instance.disks):
4259       if idx not in self.op.disks:
4260         continue
4261       info("checking disk/%d consistency on %s" % (idx, pri_node))
4262       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4263         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4264                                  " unsafe to replace the secondary" %
4265                                  pri_node)
4266
4267     # Step: create new storage
4268     self.proc.LogStep(3, steps_total, "allocate new storage")
4269     for idx, dev in enumerate(instance.disks):
4270       size = dev.size
4271       info("adding new local storage on %s for disk/%d" %
4272            (new_node, idx))
4273       # since we *always* want to create this LV, we use the
4274       # _Create...OnPrimary (which forces the creation), even if we
4275       # are talking about the secondary node
4276       for new_lv in dev.children:
4277         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4278                                         _GetInstanceInfoText(instance)):
4279           raise errors.OpExecError("Failed to create new LV named '%s' on"
4280                                    " node '%s'" %
4281                                    (new_lv.logical_id[1], new_node))
4282
4283     # Step 4: dbrd minors and drbd setups changes
4284     # after this, we must manually remove the drbd minors on both the
4285     # error and the success paths
4286     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4287                                    instance.name)
4288     logging.debug("Allocated minors %s" % (minors,))
4289     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4290     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4291       size = dev.size
4292       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4293       # create new devices on new_node
4294       if pri_node == dev.logical_id[0]:
4295         new_logical_id = (pri_node, new_node,
4296                           dev.logical_id[2], dev.logical_id[3], new_minor,
4297                           dev.logical_id[5])
4298       else:
4299         new_logical_id = (new_node, pri_node,
4300                           dev.logical_id[2], new_minor, dev.logical_id[4],
4301                           dev.logical_id[5])
4302       iv_names[idx] = (dev, dev.children, new_logical_id)
4303       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4304                     new_logical_id)
4305       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4306                               logical_id=new_logical_id,
4307                               children=dev.children)
4308       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4309                                         new_drbd, False,
4310                                         _GetInstanceInfoText(instance)):
4311         self.cfg.ReleaseDRBDMinors(instance.name)
4312         raise errors.OpExecError("Failed to create new DRBD on"
4313                                  " node '%s'" % new_node)
4314
4315     for idx, dev in enumerate(instance.disks):
4316       # we have new devices, shutdown the drbd on the old secondary
4317       info("shutting down drbd for disk/%d on old node" % idx)
4318       cfg.SetDiskID(dev, old_node)
4319       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4320         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4321                 hint="Please cleanup this device manually as soon as possible")
4322
4323     info("detaching primary drbds from the network (=> standalone)")
4324     done = 0
4325     for idx, dev in enumerate(instance.disks):
4326       cfg.SetDiskID(dev, pri_node)
4327       # set the network part of the physical (unique in bdev terms) id
4328       # to None, meaning detach from network
4329       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4330       # and 'find' the device, which will 'fix' it to match the
4331       # standalone state
4332       if self.rpc.call_blockdev_find(pri_node, dev):
4333         done += 1
4334       else:
4335         warning("Failed to detach drbd disk/%d from network, unusual case" %
4336                 idx)
4337
4338     if not done:
4339       # no detaches succeeded (very unlikely)
4340       self.cfg.ReleaseDRBDMinors(instance.name)
4341       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4342
4343     # if we managed to detach at least one, we update all the disks of
4344     # the instance to point to the new secondary
4345     info("updating instance configuration")
4346     for dev, _, new_logical_id in iv_names.itervalues():
4347       dev.logical_id = new_logical_id
4348       cfg.SetDiskID(dev, pri_node)
4349     cfg.Update(instance)
4350     # we can remove now the temp minors as now the new values are
4351     # written to the config file (and therefore stable)
4352     self.cfg.ReleaseDRBDMinors(instance.name)
4353
4354     # and now perform the drbd attach
4355     info("attaching primary drbds to new secondary (standalone => connected)")
4356     failures = []
4357     for idx, dev in enumerate(instance.disks):
4358       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4359       # since the attach is smart, it's enough to 'find' the device,
4360       # it will automatically activate the network, if the physical_id
4361       # is correct
4362       cfg.SetDiskID(dev, pri_node)
4363       logging.debug("Disk to attach: %s", dev)
4364       if not self.rpc.call_blockdev_find(pri_node, dev):
4365         warning("can't attach drbd disk/%d to new secondary!" % idx,
4366                 "please do a gnt-instance info to see the status of disks")
4367
4368     # this can fail as the old devices are degraded and _WaitForSync
4369     # does a combined result over all disks, so we don't check its
4370     # return value
4371     self.proc.LogStep(5, steps_total, "sync devices")
4372     _WaitForSync(self, instance, unlock=True)
4373
4374     # so check manually all the devices
4375     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4376       cfg.SetDiskID(dev, pri_node)
4377       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4378       if is_degr:
4379         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4380
4381     self.proc.LogStep(6, steps_total, "removing old storage")
4382     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4383       info("remove logical volumes for disk/%d" % idx)
4384       for lv in old_lvs:
4385         cfg.SetDiskID(lv, old_node)
4386         if not self.rpc.call_blockdev_remove(old_node, lv):
4387           warning("Can't remove LV on old secondary",
4388                   hint="Cleanup stale volumes by hand")
4389
4390   def Exec(self, feedback_fn):
4391     """Execute disk replacement.
4392
4393     This dispatches the disk replacement to the appropriate handler.
4394
4395     """
4396     instance = self.instance
4397
4398     # Activate the instance disks if we're replacing them on a down instance
4399     if instance.status == "down":
4400       _StartInstanceDisks(self, instance, True)
4401
4402     if instance.disk_template == constants.DT_DRBD8:
4403       if self.op.remote_node is None:
4404         fn = self._ExecD8DiskOnly
4405       else:
4406         fn = self._ExecD8Secondary
4407     else:
4408       raise errors.ProgrammerError("Unhandled disk replacement case")
4409
4410     ret = fn(feedback_fn)
4411
4412     # Deactivate the instance disks if we're replacing them on a down instance
4413     if instance.status == "down":
4414       _SafeShutdownInstanceDisks(self, instance)
4415
4416     return ret
4417
4418
4419 class LUGrowDisk(LogicalUnit):
4420   """Grow a disk of an instance.
4421
4422   """
4423   HPATH = "disk-grow"
4424   HTYPE = constants.HTYPE_INSTANCE
4425   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4426   REQ_BGL = False
4427
4428   def ExpandNames(self):
4429     self._ExpandAndLockInstance()
4430     self.needed_locks[locking.LEVEL_NODE] = []
4431     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4432
4433   def DeclareLocks(self, level):
4434     if level == locking.LEVEL_NODE:
4435       self._LockInstancesNodes()
4436
4437   def BuildHooksEnv(self):
4438     """Build hooks env.
4439
4440     This runs on the master, the primary and all the secondaries.
4441
4442     """
4443     env = {
4444       "DISK": self.op.disk,
4445       "AMOUNT": self.op.amount,
4446       }
4447     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4448     nl = [
4449       self.cfg.GetMasterNode(),
4450       self.instance.primary_node,
4451       ]
4452     return env, nl, nl
4453
4454   def CheckPrereq(self):
4455     """Check prerequisites.
4456
4457     This checks that the instance is in the cluster.
4458
4459     """
4460     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4461     assert instance is not None, \
4462       "Cannot retrieve locked instance %s" % self.op.instance_name
4463
4464     self.instance = instance
4465
4466     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4467       raise errors.OpPrereqError("Instance's disk layout does not support"
4468                                  " growing.")
4469
4470     self.disk = instance.FindDisk(self.op.disk)
4471
4472     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4473     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4474                                        instance.hypervisor)
4475     for node in nodenames:
4476       info = nodeinfo.get(node, None)
4477       if not info:
4478         raise errors.OpPrereqError("Cannot get current information"
4479                                    " from node '%s'" % node)
4480       vg_free = info.get('vg_free', None)
4481       if not isinstance(vg_free, int):
4482         raise errors.OpPrereqError("Can't compute free disk space on"
4483                                    " node %s" % node)
4484       if self.op.amount > info['vg_free']:
4485         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4486                                    " %d MiB available, %d MiB required" %
4487                                    (node, info['vg_free'], self.op.amount))
4488
4489   def Exec(self, feedback_fn):
4490     """Execute disk grow.
4491
4492     """
4493     instance = self.instance
4494     disk = self.disk
4495     for node in (instance.secondary_nodes + (instance.primary_node,)):
4496       self.cfg.SetDiskID(disk, node)
4497       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4498       if (not result or not isinstance(result, (list, tuple)) or
4499           len(result) != 2):
4500         raise errors.OpExecError("grow request failed to node %s" % node)
4501       elif not result[0]:
4502         raise errors.OpExecError("grow request failed to node %s: %s" %
4503                                  (node, result[1]))
4504     disk.RecordGrow(self.op.amount)
4505     self.cfg.Update(instance)
4506     if self.op.wait_for_sync:
4507       disk_abort = not _WaitForSync(self, instance)
4508       if disk_abort:
4509         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4510                              " status.\nPlease check the instance.")
4511
4512
4513 class LUQueryInstanceData(NoHooksLU):
4514   """Query runtime instance data.
4515
4516   """
4517   _OP_REQP = ["instances", "static"]
4518   REQ_BGL = False
4519
4520   def ExpandNames(self):
4521     self.needed_locks = {}
4522     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4523
4524     if not isinstance(self.op.instances, list):
4525       raise errors.OpPrereqError("Invalid argument type 'instances'")
4526
4527     if self.op.instances:
4528       self.wanted_names = []
4529       for name in self.op.instances:
4530         full_name = self.cfg.ExpandInstanceName(name)
4531         if full_name is None:
4532           raise errors.OpPrereqError("Instance '%s' not known" %
4533                                      self.op.instance_name)
4534         self.wanted_names.append(full_name)
4535       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4536     else:
4537       self.wanted_names = None
4538       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4539
4540     self.needed_locks[locking.LEVEL_NODE] = []
4541     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4542
4543   def DeclareLocks(self, level):
4544     if level == locking.LEVEL_NODE:
4545       self._LockInstancesNodes()
4546
4547   def CheckPrereq(self):
4548     """Check prerequisites.
4549
4550     This only checks the optional instance list against the existing names.
4551
4552     """
4553     if self.wanted_names is None:
4554       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4555
4556     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4557                              in self.wanted_names]
4558     return
4559
4560   def _ComputeDiskStatus(self, instance, snode, dev):
4561     """Compute block device status.
4562
4563     """
4564     static = self.op.static
4565     if not static:
4566       self.cfg.SetDiskID(dev, instance.primary_node)
4567       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4568     else:
4569       dev_pstatus = None
4570
4571     if dev.dev_type in constants.LDS_DRBD:
4572       # we change the snode then (otherwise we use the one passed in)
4573       if dev.logical_id[0] == instance.primary_node:
4574         snode = dev.logical_id[1]
4575       else:
4576         snode = dev.logical_id[0]
4577
4578     if snode and not static:
4579       self.cfg.SetDiskID(dev, snode)
4580       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4581     else:
4582       dev_sstatus = None
4583
4584     if dev.children:
4585       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4586                       for child in dev.children]
4587     else:
4588       dev_children = []
4589
4590     data = {
4591       "iv_name": dev.iv_name,
4592       "dev_type": dev.dev_type,
4593       "logical_id": dev.logical_id,
4594       "physical_id": dev.physical_id,
4595       "pstatus": dev_pstatus,
4596       "sstatus": dev_sstatus,
4597       "children": dev_children,
4598       }
4599
4600     return data
4601
4602   def Exec(self, feedback_fn):
4603     """Gather and return data"""
4604     result = {}
4605
4606     cluster = self.cfg.GetClusterInfo()
4607
4608     for instance in self.wanted_instances:
4609       if not self.op.static:
4610         remote_info = self.rpc.call_instance_info(instance.primary_node,
4611                                                   instance.name,
4612                                                   instance.hypervisor)
4613         if remote_info and "state" in remote_info:
4614           remote_state = "up"
4615         else:
4616           remote_state = "down"
4617       else:
4618         remote_state = None
4619       if instance.status == "down":
4620         config_state = "down"
4621       else:
4622         config_state = "up"
4623
4624       disks = [self._ComputeDiskStatus(instance, None, device)
4625                for device in instance.disks]
4626
4627       idict = {
4628         "name": instance.name,
4629         "config_state": config_state,
4630         "run_state": remote_state,
4631         "pnode": instance.primary_node,
4632         "snodes": instance.secondary_nodes,
4633         "os": instance.os,
4634         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4635         "disks": disks,
4636         "hypervisor": instance.hypervisor,
4637         "network_port": instance.network_port,
4638         "hv_instance": instance.hvparams,
4639         "hv_actual": cluster.FillHV(instance),
4640         "be_instance": instance.beparams,
4641         "be_actual": cluster.FillBE(instance),
4642         }
4643
4644       result[instance.name] = idict
4645
4646     return result
4647
4648
4649 class LUSetInstanceParams(LogicalUnit):
4650   """Modifies an instances's parameters.
4651
4652   """
4653   HPATH = "instance-modify"
4654   HTYPE = constants.HTYPE_INSTANCE
4655   _OP_REQP = ["instance_name", "hvparams"]
4656   REQ_BGL = False
4657
4658   def ExpandNames(self):
4659     self._ExpandAndLockInstance()
4660     self.needed_locks[locking.LEVEL_NODE] = []
4661     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4662
4663
4664   def DeclareLocks(self, level):
4665     if level == locking.LEVEL_NODE:
4666       self._LockInstancesNodes()
4667
4668   def BuildHooksEnv(self):
4669     """Build hooks env.
4670
4671     This runs on the master, primary and secondaries.
4672
4673     """
4674     args = dict()
4675     if constants.BE_MEMORY in self.be_new:
4676       args['memory'] = self.be_new[constants.BE_MEMORY]
4677     if constants.BE_VCPUS in self.be_new:
4678       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4679     if self.do_ip or self.do_bridge or self.mac:
4680       if self.do_ip:
4681         ip = self.ip
4682       else:
4683         ip = self.instance.nics[0].ip
4684       if self.bridge:
4685         bridge = self.bridge
4686       else:
4687         bridge = self.instance.nics[0].bridge
4688       if self.mac:
4689         mac = self.mac
4690       else:
4691         mac = self.instance.nics[0].mac
4692       args['nics'] = [(ip, bridge, mac)]
4693     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4694     nl = [self.cfg.GetMasterNode(),
4695           self.instance.primary_node] + list(self.instance.secondary_nodes)
4696     return env, nl, nl
4697
4698   def CheckPrereq(self):
4699     """Check prerequisites.
4700
4701     This only checks the instance list against the existing names.
4702
4703     """
4704     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4705     # a separate CheckArguments function, if we implement one, so the operation
4706     # can be aborted without waiting for any lock, should it have an error...
4707     self.ip = getattr(self.op, "ip", None)
4708     self.mac = getattr(self.op, "mac", None)
4709     self.bridge = getattr(self.op, "bridge", None)
4710     self.kernel_path = getattr(self.op, "kernel_path", None)
4711     self.initrd_path = getattr(self.op, "initrd_path", None)
4712     self.force = getattr(self.op, "force", None)
4713     all_parms = [self.ip, self.bridge, self.mac]
4714     if (all_parms.count(None) == len(all_parms) and
4715         not self.op.hvparams and
4716         not self.op.beparams):
4717       raise errors.OpPrereqError("No changes submitted")
4718     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4719       val = self.op.beparams.get(item, None)
4720       if val is not None:
4721         try:
4722           val = int(val)
4723         except ValueError, err:
4724           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4725         self.op.beparams[item] = val
4726     if self.ip is not None:
4727       self.do_ip = True
4728       if self.ip.lower() == "none":
4729         self.ip = None
4730       else:
4731         if not utils.IsValidIP(self.ip):
4732           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4733     else:
4734       self.do_ip = False
4735     self.do_bridge = (self.bridge is not None)
4736     if self.mac is not None:
4737       if self.cfg.IsMacInUse(self.mac):
4738         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4739                                    self.mac)
4740       if not utils.IsValidMac(self.mac):
4741         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4742
4743     # checking the new params on the primary/secondary nodes
4744
4745     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4746     assert self.instance is not None, \
4747       "Cannot retrieve locked instance %s" % self.op.instance_name
4748     pnode = self.instance.primary_node
4749     nodelist = [pnode]
4750     nodelist.extend(instance.secondary_nodes)
4751
4752     # hvparams processing
4753     if self.op.hvparams:
4754       i_hvdict = copy.deepcopy(instance.hvparams)
4755       for key, val in self.op.hvparams.iteritems():
4756         if val is None:
4757           try:
4758             del i_hvdict[key]
4759           except KeyError:
4760             pass
4761         else:
4762           i_hvdict[key] = val
4763       cluster = self.cfg.GetClusterInfo()
4764       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4765                                 i_hvdict)
4766       # local check
4767       hypervisor.GetHypervisor(
4768         instance.hypervisor).CheckParameterSyntax(hv_new)
4769       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4770       self.hv_new = hv_new # the new actual values
4771       self.hv_inst = i_hvdict # the new dict (without defaults)
4772     else:
4773       self.hv_new = self.hv_inst = {}
4774
4775     # beparams processing
4776     if self.op.beparams:
4777       i_bedict = copy.deepcopy(instance.beparams)
4778       for key, val in self.op.beparams.iteritems():
4779         if val is None:
4780           try:
4781             del i_bedict[key]
4782           except KeyError:
4783             pass
4784         else:
4785           i_bedict[key] = val
4786       cluster = self.cfg.GetClusterInfo()
4787       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4788                                 i_bedict)
4789       self.be_new = be_new # the new actual values
4790       self.be_inst = i_bedict # the new dict (without defaults)
4791     else:
4792       self.hv_new = self.hv_inst = {}
4793
4794     self.warn = []
4795
4796     if constants.BE_MEMORY in self.op.beparams and not self.force:
4797       mem_check_list = [pnode]
4798       if be_new[constants.BE_AUTO_BALANCE]:
4799         # either we changed auto_balance to yes or it was from before
4800         mem_check_list.extend(instance.secondary_nodes)
4801       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4802                                                   instance.hypervisor)
4803       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4804                                          instance.hypervisor)
4805
4806       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4807         # Assume the primary node is unreachable and go ahead
4808         self.warn.append("Can't get info from primary node %s" % pnode)
4809       else:
4810         if instance_info:
4811           current_mem = instance_info['memory']
4812         else:
4813           # Assume instance not running
4814           # (there is a slight race condition here, but it's not very probable,
4815           # and we have no other way to check)
4816           current_mem = 0
4817         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4818                     nodeinfo[pnode]['memory_free'])
4819         if miss_mem > 0:
4820           raise errors.OpPrereqError("This change will prevent the instance"
4821                                      " from starting, due to %d MB of memory"
4822                                      " missing on its primary node" % miss_mem)
4823
4824       if be_new[constants.BE_AUTO_BALANCE]:
4825         for node in instance.secondary_nodes:
4826           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4827             self.warn.append("Can't get info from secondary node %s" % node)
4828           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4829             self.warn.append("Not enough memory to failover instance to"
4830                              " secondary node %s" % node)
4831
4832     return
4833
4834   def Exec(self, feedback_fn):
4835     """Modifies an instance.
4836
4837     All parameters take effect only at the next restart of the instance.
4838     """
4839     # Process here the warnings from CheckPrereq, as we don't have a
4840     # feedback_fn there.
4841     for warn in self.warn:
4842       feedback_fn("WARNING: %s" % warn)
4843
4844     result = []
4845     instance = self.instance
4846     if self.do_ip:
4847       instance.nics[0].ip = self.ip
4848       result.append(("ip", self.ip))
4849     if self.bridge:
4850       instance.nics[0].bridge = self.bridge
4851       result.append(("bridge", self.bridge))
4852     if self.mac:
4853       instance.nics[0].mac = self.mac
4854       result.append(("mac", self.mac))
4855     if self.op.hvparams:
4856       instance.hvparams = self.hv_new
4857       for key, val in self.op.hvparams.iteritems():
4858         result.append(("hv/%s" % key, val))
4859     if self.op.beparams:
4860       instance.beparams = self.be_inst
4861       for key, val in self.op.beparams.iteritems():
4862         result.append(("be/%s" % key, val))
4863
4864     self.cfg.Update(instance)
4865
4866     return result
4867
4868
4869 class LUQueryExports(NoHooksLU):
4870   """Query the exports list
4871
4872   """
4873   _OP_REQP = ['nodes']
4874   REQ_BGL = False
4875
4876   def ExpandNames(self):
4877     self.needed_locks = {}
4878     self.share_locks[locking.LEVEL_NODE] = 1
4879     if not self.op.nodes:
4880       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4881     else:
4882       self.needed_locks[locking.LEVEL_NODE] = \
4883         _GetWantedNodes(self, self.op.nodes)
4884
4885   def CheckPrereq(self):
4886     """Check prerequisites.
4887
4888     """
4889     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4890
4891   def Exec(self, feedback_fn):
4892     """Compute the list of all the exported system images.
4893
4894     @rtype: dict
4895     @return: a dictionary with the structure node->(export-list)
4896         where export-list is a list of the instances exported on
4897         that node.
4898
4899     """
4900     return self.rpc.call_export_list(self.nodes)
4901
4902
4903 class LUExportInstance(LogicalUnit):
4904   """Export an instance to an image in the cluster.
4905
4906   """
4907   HPATH = "instance-export"
4908   HTYPE = constants.HTYPE_INSTANCE
4909   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4910   REQ_BGL = False
4911
4912   def ExpandNames(self):
4913     self._ExpandAndLockInstance()
4914     # FIXME: lock only instance primary and destination node
4915     #
4916     # Sad but true, for now we have do lock all nodes, as we don't know where
4917     # the previous export might be, and and in this LU we search for it and
4918     # remove it from its current node. In the future we could fix this by:
4919     #  - making a tasklet to search (share-lock all), then create the new one,
4920     #    then one to remove, after
4921     #  - removing the removal operation altoghether
4922     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4923
4924   def DeclareLocks(self, level):
4925     """Last minute lock declaration."""
4926     # All nodes are locked anyway, so nothing to do here.
4927
4928   def BuildHooksEnv(self):
4929     """Build hooks env.
4930
4931     This will run on the master, primary node and target node.
4932
4933     """
4934     env = {
4935       "EXPORT_NODE": self.op.target_node,
4936       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4937       }
4938     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4939     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4940           self.op.target_node]
4941     return env, nl, nl
4942
4943   def CheckPrereq(self):
4944     """Check prerequisites.
4945
4946     This checks that the instance and node names are valid.
4947
4948     """
4949     instance_name = self.op.instance_name
4950     self.instance = self.cfg.GetInstanceInfo(instance_name)
4951     assert self.instance is not None, \
4952           "Cannot retrieve locked instance %s" % self.op.instance_name
4953
4954     self.dst_node = self.cfg.GetNodeInfo(
4955       self.cfg.ExpandNodeName(self.op.target_node))
4956
4957     assert self.dst_node is not None, \
4958           "Cannot retrieve locked node %s" % self.op.target_node
4959
4960     # instance disk type verification
4961     for disk in self.instance.disks:
4962       if disk.dev_type == constants.LD_FILE:
4963         raise errors.OpPrereqError("Export not supported for instances with"
4964                                    " file-based disks")
4965
4966   def Exec(self, feedback_fn):
4967     """Export an instance to an image in the cluster.
4968
4969     """
4970     instance = self.instance
4971     dst_node = self.dst_node
4972     src_node = instance.primary_node
4973     if self.op.shutdown:
4974       # shutdown the instance, but not the disks
4975       if not self.rpc.call_instance_shutdown(src_node, instance):
4976         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4977                                  (instance.name, src_node))
4978
4979     vgname = self.cfg.GetVGName()
4980
4981     snap_disks = []
4982
4983     try:
4984       for disk in instance.disks:
4985         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4986         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4987
4988         if not new_dev_name:
4989           self.LogWarning("Could not snapshot block device %s on node %s",
4990                           disk.logical_id[1], src_node)
4991           snap_disks.append(False)
4992         else:
4993           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4994                                  logical_id=(vgname, new_dev_name),
4995                                  physical_id=(vgname, new_dev_name),
4996                                  iv_name=disk.iv_name)
4997           snap_disks.append(new_dev)
4998
4999     finally:
5000       if self.op.shutdown and instance.status == "up":
5001         if not self.rpc.call_instance_start(src_node, instance, None):
5002           _ShutdownInstanceDisks(self, instance)
5003           raise errors.OpExecError("Could not start instance")
5004
5005     # TODO: check for size
5006
5007     cluster_name = self.cfg.GetClusterName()
5008     for idx, dev in enumerate(snap_disks):
5009       if dev:
5010         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5011                                              instance, cluster_name, idx):
5012           self.LogWarning("Could not export block device %s from node %s to"
5013                           " node %s", dev.logical_id[1], src_node,
5014                           dst_node.name)
5015         if not self.rpc.call_blockdev_remove(src_node, dev):
5016           self.LogWarning("Could not remove snapshot block device %s from node"
5017                           " %s", dev.logical_id[1], src_node)
5018
5019     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5020       self.LogWarning("Could not finalize export for instance %s on node %s",
5021                       instance.name, dst_node.name)
5022
5023     nodelist = self.cfg.GetNodeList()
5024     nodelist.remove(dst_node.name)
5025
5026     # on one-node clusters nodelist will be empty after the removal
5027     # if we proceed the backup would be removed because OpQueryExports
5028     # substitutes an empty list with the full cluster node list.
5029     if nodelist:
5030       exportlist = self.rpc.call_export_list(nodelist)
5031       for node in exportlist:
5032         if instance.name in exportlist[node]:
5033           if not self.rpc.call_export_remove(node, instance.name):
5034             self.LogWarning("Could not remove older export for instance %s"
5035                             " on node %s", instance.name, node)
5036
5037
5038 class LURemoveExport(NoHooksLU):
5039   """Remove exports related to the named instance.
5040
5041   """
5042   _OP_REQP = ["instance_name"]
5043   REQ_BGL = False
5044
5045   def ExpandNames(self):
5046     self.needed_locks = {}
5047     # We need all nodes to be locked in order for RemoveExport to work, but we
5048     # don't need to lock the instance itself, as nothing will happen to it (and
5049     # we can remove exports also for a removed instance)
5050     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5051
5052   def CheckPrereq(self):
5053     """Check prerequisites.
5054     """
5055     pass
5056
5057   def Exec(self, feedback_fn):
5058     """Remove any export.
5059
5060     """
5061     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5062     # If the instance was not found we'll try with the name that was passed in.
5063     # This will only work if it was an FQDN, though.
5064     fqdn_warn = False
5065     if not instance_name:
5066       fqdn_warn = True
5067       instance_name = self.op.instance_name
5068
5069     exportlist = self.rpc.call_export_list(self.acquired_locks[
5070       locking.LEVEL_NODE])
5071     found = False
5072     for node in exportlist:
5073       if instance_name in exportlist[node]:
5074         found = True
5075         if not self.rpc.call_export_remove(node, instance_name):
5076           logging.error("Could not remove export for instance %s"
5077                         " on node %s", instance_name, node)
5078
5079     if fqdn_warn and not found:
5080       feedback_fn("Export not found. If trying to remove an export belonging"
5081                   " to a deleted instance please use its Fully Qualified"
5082                   " Domain Name.")
5083
5084
5085 class TagsLU(NoHooksLU):
5086   """Generic tags LU.
5087
5088   This is an abstract class which is the parent of all the other tags LUs.
5089
5090   """
5091
5092   def ExpandNames(self):
5093     self.needed_locks = {}
5094     if self.op.kind == constants.TAG_NODE:
5095       name = self.cfg.ExpandNodeName(self.op.name)
5096       if name is None:
5097         raise errors.OpPrereqError("Invalid node name (%s)" %
5098                                    (self.op.name,))
5099       self.op.name = name
5100       self.needed_locks[locking.LEVEL_NODE] = name
5101     elif self.op.kind == constants.TAG_INSTANCE:
5102       name = self.cfg.ExpandInstanceName(self.op.name)
5103       if name is None:
5104         raise errors.OpPrereqError("Invalid instance name (%s)" %
5105                                    (self.op.name,))
5106       self.op.name = name
5107       self.needed_locks[locking.LEVEL_INSTANCE] = name
5108
5109   def CheckPrereq(self):
5110     """Check prerequisites.
5111
5112     """
5113     if self.op.kind == constants.TAG_CLUSTER:
5114       self.target = self.cfg.GetClusterInfo()
5115     elif self.op.kind == constants.TAG_NODE:
5116       self.target = self.cfg.GetNodeInfo(self.op.name)
5117     elif self.op.kind == constants.TAG_INSTANCE:
5118       self.target = self.cfg.GetInstanceInfo(self.op.name)
5119     else:
5120       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5121                                  str(self.op.kind))
5122
5123
5124 class LUGetTags(TagsLU):
5125   """Returns the tags of a given object.
5126
5127   """
5128   _OP_REQP = ["kind", "name"]
5129   REQ_BGL = False
5130
5131   def Exec(self, feedback_fn):
5132     """Returns the tag list.
5133
5134     """
5135     return list(self.target.GetTags())
5136
5137
5138 class LUSearchTags(NoHooksLU):
5139   """Searches the tags for a given pattern.
5140
5141   """
5142   _OP_REQP = ["pattern"]
5143   REQ_BGL = False
5144
5145   def ExpandNames(self):
5146     self.needed_locks = {}
5147
5148   def CheckPrereq(self):
5149     """Check prerequisites.
5150
5151     This checks the pattern passed for validity by compiling it.
5152
5153     """
5154     try:
5155       self.re = re.compile(self.op.pattern)
5156     except re.error, err:
5157       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5158                                  (self.op.pattern, err))
5159
5160   def Exec(self, feedback_fn):
5161     """Returns the tag list.
5162
5163     """
5164     cfg = self.cfg
5165     tgts = [("/cluster", cfg.GetClusterInfo())]
5166     ilist = cfg.GetAllInstancesInfo().values()
5167     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5168     nlist = cfg.GetAllNodesInfo().values()
5169     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5170     results = []
5171     for path, target in tgts:
5172       for tag in target.GetTags():
5173         if self.re.search(tag):
5174           results.append((path, tag))
5175     return results
5176
5177
5178 class LUAddTags(TagsLU):
5179   """Sets a tag on a given object.
5180
5181   """
5182   _OP_REQP = ["kind", "name", "tags"]
5183   REQ_BGL = False
5184
5185   def CheckPrereq(self):
5186     """Check prerequisites.
5187
5188     This checks the type and length of the tag name and value.
5189
5190     """
5191     TagsLU.CheckPrereq(self)
5192     for tag in self.op.tags:
5193       objects.TaggableObject.ValidateTag(tag)
5194
5195   def Exec(self, feedback_fn):
5196     """Sets the tag.
5197
5198     """
5199     try:
5200       for tag in self.op.tags:
5201         self.target.AddTag(tag)
5202     except errors.TagError, err:
5203       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5204     try:
5205       self.cfg.Update(self.target)
5206     except errors.ConfigurationError:
5207       raise errors.OpRetryError("There has been a modification to the"
5208                                 " config file and the operation has been"
5209                                 " aborted. Please retry.")
5210
5211
5212 class LUDelTags(TagsLU):
5213   """Delete a list of tags from a given object.
5214
5215   """
5216   _OP_REQP = ["kind", "name", "tags"]
5217   REQ_BGL = False
5218
5219   def CheckPrereq(self):
5220     """Check prerequisites.
5221
5222     This checks that we have the given tag.
5223
5224     """
5225     TagsLU.CheckPrereq(self)
5226     for tag in self.op.tags:
5227       objects.TaggableObject.ValidateTag(tag)
5228     del_tags = frozenset(self.op.tags)
5229     cur_tags = self.target.GetTags()
5230     if not del_tags <= cur_tags:
5231       diff_tags = del_tags - cur_tags
5232       diff_names = ["'%s'" % tag for tag in diff_tags]
5233       diff_names.sort()
5234       raise errors.OpPrereqError("Tag(s) %s not found" %
5235                                  (",".join(diff_names)))
5236
5237   def Exec(self, feedback_fn):
5238     """Remove the tag from the object.
5239
5240     """
5241     for tag in self.op.tags:
5242       self.target.RemoveTag(tag)
5243     try:
5244       self.cfg.Update(self.target)
5245     except errors.ConfigurationError:
5246       raise errors.OpRetryError("There has been a modification to the"
5247                                 " config file and the operation has been"
5248                                 " aborted. Please retry.")
5249
5250
5251 class LUTestDelay(NoHooksLU):
5252   """Sleep for a specified amount of time.
5253
5254   This LU sleeps on the master and/or nodes for a specified amount of
5255   time.
5256
5257   """
5258   _OP_REQP = ["duration", "on_master", "on_nodes"]
5259   REQ_BGL = False
5260
5261   def ExpandNames(self):
5262     """Expand names and set required locks.
5263
5264     This expands the node list, if any.
5265
5266     """
5267     self.needed_locks = {}
5268     if self.op.on_nodes:
5269       # _GetWantedNodes can be used here, but is not always appropriate to use
5270       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5271       # more information.
5272       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5273       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5274
5275   def CheckPrereq(self):
5276     """Check prerequisites.
5277
5278     """
5279
5280   def Exec(self, feedback_fn):
5281     """Do the actual sleep.
5282
5283     """
5284     if self.op.on_master:
5285       if not utils.TestDelay(self.op.duration):
5286         raise errors.OpExecError("Error during master delay test")
5287     if self.op.on_nodes:
5288       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5289       if not result:
5290         raise errors.OpExecError("Complete failure from rpc call")
5291       for node, node_result in result.items():
5292         if not node_result:
5293           raise errors.OpExecError("Failure during rpc call to node %s,"
5294                                    " result: %s" % (node, node_result))
5295
5296
5297 class IAllocator(object):
5298   """IAllocator framework.
5299
5300   An IAllocator instance has three sets of attributes:
5301     - cfg that is needed to query the cluster
5302     - input data (all members of the _KEYS class attribute are required)
5303     - four buffer attributes (in|out_data|text), that represent the
5304       input (to the external script) in text and data structure format,
5305       and the output from it, again in two formats
5306     - the result variables from the script (success, info, nodes) for
5307       easy usage
5308
5309   """
5310   _ALLO_KEYS = [
5311     "mem_size", "disks", "disk_template",
5312     "os", "tags", "nics", "vcpus",
5313     ]
5314   _RELO_KEYS = [
5315     "relocate_from",
5316     ]
5317
5318   def __init__(self, lu, mode, name, **kwargs):
5319     self.lu = lu
5320     # init buffer variables
5321     self.in_text = self.out_text = self.in_data = self.out_data = None
5322     # init all input fields so that pylint is happy
5323     self.mode = mode
5324     self.name = name
5325     self.mem_size = self.disks = self.disk_template = None
5326     self.os = self.tags = self.nics = self.vcpus = None
5327     self.relocate_from = None
5328     # computed fields
5329     self.required_nodes = None
5330     # init result fields
5331     self.success = self.info = self.nodes = None
5332     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5333       keyset = self._ALLO_KEYS
5334     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5335       keyset = self._RELO_KEYS
5336     else:
5337       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5338                                    " IAllocator" % self.mode)
5339     for key in kwargs:
5340       if key not in keyset:
5341         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5342                                      " IAllocator" % key)
5343       setattr(self, key, kwargs[key])
5344     for key in keyset:
5345       if key not in kwargs:
5346         raise errors.ProgrammerError("Missing input parameter '%s' to"
5347                                      " IAllocator" % key)
5348     self._BuildInputData()
5349
5350   def _ComputeClusterData(self):
5351     """Compute the generic allocator input data.
5352
5353     This is the data that is independent of the actual operation.
5354
5355     """
5356     cfg = self.lu.cfg
5357     cluster_info = cfg.GetClusterInfo()
5358     # cluster data
5359     data = {
5360       "version": 1,
5361       "cluster_name": cfg.GetClusterName(),
5362       "cluster_tags": list(cluster_info.GetTags()),
5363       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5364       # we don't have job IDs
5365       }
5366
5367     i_list = []
5368     cluster = self.cfg.GetClusterInfo()
5369     for iname in cfg.GetInstanceList():
5370       i_obj = cfg.GetInstanceInfo(iname)
5371       i_list.append((i_obj, cluster.FillBE(i_obj)))
5372
5373     # node data
5374     node_results = {}
5375     node_list = cfg.GetNodeList()
5376     # FIXME: here we have only one hypervisor information, but
5377     # instance can belong to different hypervisors
5378     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5379                                            cfg.GetHypervisorType())
5380     for nname in node_list:
5381       ninfo = cfg.GetNodeInfo(nname)
5382       if nname not in node_data or not isinstance(node_data[nname], dict):
5383         raise errors.OpExecError("Can't get data for node %s" % nname)
5384       remote_info = node_data[nname]
5385       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5386                    'vg_size', 'vg_free', 'cpu_total']:
5387         if attr not in remote_info:
5388           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5389                                    (nname, attr))
5390         try:
5391           remote_info[attr] = int(remote_info[attr])
5392         except ValueError, err:
5393           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5394                                    " %s" % (nname, attr, str(err)))
5395       # compute memory used by primary instances
5396       i_p_mem = i_p_up_mem = 0
5397       for iinfo, beinfo in i_list:
5398         if iinfo.primary_node == nname:
5399           i_p_mem += beinfo[constants.BE_MEMORY]
5400           if iinfo.status == "up":
5401             i_p_up_mem += beinfo[constants.BE_MEMORY]
5402
5403       # compute memory used by instances
5404       pnr = {
5405         "tags": list(ninfo.GetTags()),
5406         "total_memory": remote_info['memory_total'],
5407         "reserved_memory": remote_info['memory_dom0'],
5408         "free_memory": remote_info['memory_free'],
5409         "i_pri_memory": i_p_mem,
5410         "i_pri_up_memory": i_p_up_mem,
5411         "total_disk": remote_info['vg_size'],
5412         "free_disk": remote_info['vg_free'],
5413         "primary_ip": ninfo.primary_ip,
5414         "secondary_ip": ninfo.secondary_ip,
5415         "total_cpus": remote_info['cpu_total'],
5416         }
5417       node_results[nname] = pnr
5418     data["nodes"] = node_results
5419
5420     # instance data
5421     instance_data = {}
5422     for iinfo, beinfo in i_list:
5423       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5424                   for n in iinfo.nics]
5425       pir = {
5426         "tags": list(iinfo.GetTags()),
5427         "should_run": iinfo.status == "up",
5428         "vcpus": beinfo[constants.BE_VCPUS],
5429         "memory": beinfo[constants.BE_MEMORY],
5430         "os": iinfo.os,
5431         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5432         "nics": nic_data,
5433         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5434         "disk_template": iinfo.disk_template,
5435         "hypervisor": iinfo.hypervisor,
5436         }
5437       instance_data[iinfo.name] = pir
5438
5439     data["instances"] = instance_data
5440
5441     self.in_data = data
5442
5443   def _AddNewInstance(self):
5444     """Add new instance data to allocator structure.
5445
5446     This in combination with _AllocatorGetClusterData will create the
5447     correct structure needed as input for the allocator.
5448
5449     The checks for the completeness of the opcode must have already been
5450     done.
5451
5452     """
5453     data = self.in_data
5454     if len(self.disks) != 2:
5455       raise errors.OpExecError("Only two-disk configurations supported")
5456
5457     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5458
5459     if self.disk_template in constants.DTS_NET_MIRROR:
5460       self.required_nodes = 2
5461     else:
5462       self.required_nodes = 1
5463     request = {
5464       "type": "allocate",
5465       "name": self.name,
5466       "disk_template": self.disk_template,
5467       "tags": self.tags,
5468       "os": self.os,
5469       "vcpus": self.vcpus,
5470       "memory": self.mem_size,
5471       "disks": self.disks,
5472       "disk_space_total": disk_space,
5473       "nics": self.nics,
5474       "required_nodes": self.required_nodes,
5475       }
5476     data["request"] = request
5477
5478   def _AddRelocateInstance(self):
5479     """Add relocate instance data to allocator structure.
5480
5481     This in combination with _IAllocatorGetClusterData will create the
5482     correct structure needed as input for the allocator.
5483
5484     The checks for the completeness of the opcode must have already been
5485     done.
5486
5487     """
5488     instance = self.lu.cfg.GetInstanceInfo(self.name)
5489     if instance is None:
5490       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5491                                    " IAllocator" % self.name)
5492
5493     if instance.disk_template not in constants.DTS_NET_MIRROR:
5494       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5495
5496     if len(instance.secondary_nodes) != 1:
5497       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5498
5499     self.required_nodes = 1
5500     disk_sizes = [{'size': disk.size} for disk in instance.disks]
5501     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5502
5503     request = {
5504       "type": "relocate",
5505       "name": self.name,
5506       "disk_space_total": disk_space,
5507       "required_nodes": self.required_nodes,
5508       "relocate_from": self.relocate_from,
5509       }
5510     self.in_data["request"] = request
5511
5512   def _BuildInputData(self):
5513     """Build input data structures.
5514
5515     """
5516     self._ComputeClusterData()
5517
5518     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5519       self._AddNewInstance()
5520     else:
5521       self._AddRelocateInstance()
5522
5523     self.in_text = serializer.Dump(self.in_data)
5524
5525   def Run(self, name, validate=True, call_fn=None):
5526     """Run an instance allocator and return the results.
5527
5528     """
5529     if call_fn is None:
5530       call_fn = self.lu.rpc.call_iallocator_runner
5531     data = self.in_text
5532
5533     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5534
5535     if not isinstance(result, (list, tuple)) or len(result) != 4:
5536       raise errors.OpExecError("Invalid result from master iallocator runner")
5537
5538     rcode, stdout, stderr, fail = result
5539
5540     if rcode == constants.IARUN_NOTFOUND:
5541       raise errors.OpExecError("Can't find allocator '%s'" % name)
5542     elif rcode == constants.IARUN_FAILURE:
5543       raise errors.OpExecError("Instance allocator call failed: %s,"
5544                                " output: %s" % (fail, stdout+stderr))
5545     self.out_text = stdout
5546     if validate:
5547       self._ValidateResult()
5548
5549   def _ValidateResult(self):
5550     """Process the allocator results.
5551
5552     This will process and if successful save the result in
5553     self.out_data and the other parameters.
5554
5555     """
5556     try:
5557       rdict = serializer.Load(self.out_text)
5558     except Exception, err:
5559       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5560
5561     if not isinstance(rdict, dict):
5562       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5563
5564     for key in "success", "info", "nodes":
5565       if key not in rdict:
5566         raise errors.OpExecError("Can't parse iallocator results:"
5567                                  " missing key '%s'" % key)
5568       setattr(self, key, rdict[key])
5569
5570     if not isinstance(rdict["nodes"], list):
5571       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5572                                " is not a list")
5573     self.out_data = rdict
5574
5575
5576 class LUTestAllocator(NoHooksLU):
5577   """Run allocator tests.
5578
5579   This LU runs the allocator tests
5580
5581   """
5582   _OP_REQP = ["direction", "mode", "name"]
5583
5584   def CheckPrereq(self):
5585     """Check prerequisites.
5586
5587     This checks the opcode parameters depending on the director and mode test.
5588
5589     """
5590     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5591       for attr in ["name", "mem_size", "disks", "disk_template",
5592                    "os", "tags", "nics", "vcpus"]:
5593         if not hasattr(self.op, attr):
5594           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5595                                      attr)
5596       iname = self.cfg.ExpandInstanceName(self.op.name)
5597       if iname is not None:
5598         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5599                                    iname)
5600       if not isinstance(self.op.nics, list):
5601         raise errors.OpPrereqError("Invalid parameter 'nics'")
5602       for row in self.op.nics:
5603         if (not isinstance(row, dict) or
5604             "mac" not in row or
5605             "ip" not in row or
5606             "bridge" not in row):
5607           raise errors.OpPrereqError("Invalid contents of the"
5608                                      " 'nics' parameter")
5609       if not isinstance(self.op.disks, list):
5610         raise errors.OpPrereqError("Invalid parameter 'disks'")
5611       if len(self.op.disks) != 2:
5612         raise errors.OpPrereqError("Only two-disk configurations supported")
5613       for row in self.op.disks:
5614         if (not isinstance(row, dict) or
5615             "size" not in row or
5616             not isinstance(row["size"], int) or
5617             "mode" not in row or
5618             row["mode"] not in ['r', 'w']):
5619           raise errors.OpPrereqError("Invalid contents of the"
5620                                      " 'disks' parameter")
5621     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5622       if not hasattr(self.op, "name"):
5623         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5624       fname = self.cfg.ExpandInstanceName(self.op.name)
5625       if fname is None:
5626         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5627                                    self.op.name)
5628       self.op.name = fname
5629       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5630     else:
5631       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5632                                  self.op.mode)
5633
5634     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5635       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5636         raise errors.OpPrereqError("Missing allocator name")
5637     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5638       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5639                                  self.op.direction)
5640
5641   def Exec(self, feedback_fn):
5642     """Run the allocator test.
5643
5644     """
5645     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5646       ial = IAllocator(self,
5647                        mode=self.op.mode,
5648                        name=self.op.name,
5649                        mem_size=self.op.mem_size,
5650                        disks=self.op.disks,
5651                        disk_template=self.op.disk_template,
5652                        os=self.op.os,
5653                        tags=self.op.tags,
5654                        nics=self.op.nics,
5655                        vcpus=self.op.vcpus,
5656                        )
5657     else:
5658       ial = IAllocator(self,
5659                        mode=self.op.mode,
5660                        name=self.op.name,
5661                        relocate_from=list(self.relocate_from),
5662                        )
5663
5664     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5665       result = ial.in_text
5666     else:
5667       ial.Run(self.op.allocator, validate=False)
5668       result = ial.out_text
5669     return result