Add custom logging setup for daemons
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61
62   Note that all commands require root permissions.
63
64   """
65   HPATH = None
66   HTYPE = None
67   _OP_REQP = []
68   REQ_MASTER = True
69   REQ_WSSTORE = False
70   REQ_BGL = True
71
72   def __init__(self, processor, op, cfg, sstore):
73     """Constructor for LogicalUnit.
74
75     This needs to be overriden in derived classes in order to check op
76     validity.
77
78     """
79     self.proc = processor
80     self.op = op
81     self.cfg = cfg
82     self.sstore = sstore
83     self.__ssh = None
84
85     for attr_name in self._OP_REQP:
86       attr_val = getattr(op, attr_name, None)
87       if attr_val is None:
88         raise errors.OpPrereqError("Required parameter '%s' missing" %
89                                    attr_name)
90
91     if not cfg.IsCluster():
92       raise errors.OpPrereqError("Cluster not initialized yet,"
93                                  " use 'gnt-cluster init' first.")
94     if self.REQ_MASTER:
95       master = sstore.GetMasterNode()
96       if master != utils.HostInfo().name:
97         raise errors.OpPrereqError("Commands must be run on the master"
98                                    " node %s" % master)
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.sstore)
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckPrereq(self):
111     """Check prerequisites for this LU.
112
113     This method should check that the prerequisites for the execution
114     of this LU are fulfilled. It can do internode communication, but
115     it should be idempotent - no cluster or system changes are
116     allowed.
117
118     The method should raise errors.OpPrereqError in case something is
119     not fulfilled. Its return value is ignored.
120
121     This method should also update all the parameters of the opcode to
122     their canonical form; e.g. a short node name must be fully
123     expanded after this method has successfully completed (so that
124     hooks, logging, etc. work correctly).
125
126     """
127     raise NotImplementedError
128
129   def Exec(self, feedback_fn):
130     """Execute the LU.
131
132     This method should implement the actual work. It should raise
133     errors.OpExecError for failures that are somewhat dealt with in
134     code, or expected.
135
136     """
137     raise NotImplementedError
138
139   def BuildHooksEnv(self):
140     """Build hooks environment for this LU.
141
142     This method should return a three-node tuple consisting of: a dict
143     containing the environment that will be used for running the
144     specific hook for this LU, a list of node names on which the hook
145     should run before the execution, and a list of node names on which
146     the hook should run after the execution.
147
148     The keys of the dict must not have 'GANETI_' prefixed as this will
149     be handled in the hooks runner. Also note additional keys will be
150     added by the hooks runner. If the LU doesn't define any
151     environment, an empty dict (and not None) should be returned.
152
153     No nodes should be returned as an empty list (and not None).
154
155     Note that if the HPATH for a LU class is None, this function will
156     not be called.
157
158     """
159     raise NotImplementedError
160
161   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
162     """Notify the LU about the results of its hooks.
163
164     This method is called every time a hooks phase is executed, and notifies
165     the Logical Unit about the hooks' result. The LU can then use it to alter
166     its result based on the hooks.  By default the method does nothing and the
167     previous result is passed back unchanged but any LU can define it if it
168     wants to use the local cluster hook-scripts somehow.
169
170     Args:
171       phase: the hooks phase that has just been run
172       hooks_results: the results of the multi-node hooks rpc call
173       feedback_fn: function to send feedback back to the caller
174       lu_result: the previous result this LU had, or None in the PRE phase.
175
176     """
177     return lu_result
178
179
180 class NoHooksLU(LogicalUnit):
181   """Simple LU which runs no hooks.
182
183   This LU is intended as a parent for other LogicalUnits which will
184   run no hooks, in order to reduce duplicate code.
185
186   """
187   HPATH = None
188   HTYPE = None
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _CheckInstanceBridgesExist(instance):
315   """Check that the brigdes needed by an instance exist.
316
317   """
318   # check bridges existance
319   brlist = [nic.bridge for nic in instance.nics]
320   if not rpc.call_bridges_exist(instance.primary_node, brlist):
321     raise errors.OpPrereqError("one or more target bridges %s does not"
322                                " exist on destination node '%s'" %
323                                (brlist, instance.primary_node))
324
325
326 class LUDestroyCluster(NoHooksLU):
327   """Logical unit for destroying the cluster.
328
329   """
330   _OP_REQP = []
331
332   def CheckPrereq(self):
333     """Check prerequisites.
334
335     This checks whether the cluster is empty.
336
337     Any errors are signalled by raising errors.OpPrereqError.
338
339     """
340     master = self.sstore.GetMasterNode()
341
342     nodelist = self.cfg.GetNodeList()
343     if len(nodelist) != 1 or nodelist[0] != master:
344       raise errors.OpPrereqError("There are still %d node(s) in"
345                                  " this cluster." % (len(nodelist) - 1))
346     instancelist = self.cfg.GetInstanceList()
347     if instancelist:
348       raise errors.OpPrereqError("There are still %d instance(s) in"
349                                  " this cluster." % len(instancelist))
350
351   def Exec(self, feedback_fn):
352     """Destroys the cluster.
353
354     """
355     master = self.sstore.GetMasterNode()
356     if not rpc.call_node_stop_master(master):
357       raise errors.OpExecError("Could not disable the master role")
358     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
359     utils.CreateBackup(priv_key)
360     utils.CreateBackup(pub_key)
361     rpc.call_node_leave_cluster(master)
362
363
364 class LUVerifyCluster(LogicalUnit):
365   """Verifies the cluster status.
366
367   """
368   HPATH = "cluster-verify"
369   HTYPE = constants.HTYPE_CLUSTER
370   _OP_REQP = ["skip_checks"]
371
372   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
373                   remote_version, feedback_fn):
374     """Run multiple tests against a node.
375
376     Test list:
377       - compares ganeti version
378       - checks vg existance and size > 20G
379       - checks config file checksum
380       - checks ssh to other nodes
381
382     Args:
383       node: name of the node to check
384       file_list: required list of files
385       local_cksum: dictionary of local files and their checksums
386
387     """
388     # compares ganeti version
389     local_version = constants.PROTOCOL_VERSION
390     if not remote_version:
391       feedback_fn("  - ERROR: connection to %s failed" % (node))
392       return True
393
394     if local_version != remote_version:
395       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
396                       (local_version, node, remote_version))
397       return True
398
399     # checks vg existance and size > 20G
400
401     bad = False
402     if not vglist:
403       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
404                       (node,))
405       bad = True
406     else:
407       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
408                                             constants.MIN_VG_SIZE)
409       if vgstatus:
410         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
411         bad = True
412
413     # checks config file checksum
414     # checks ssh to any
415
416     if 'filelist' not in node_result:
417       bad = True
418       feedback_fn("  - ERROR: node hasn't returned file checksum data")
419     else:
420       remote_cksum = node_result['filelist']
421       for file_name in file_list:
422         if file_name not in remote_cksum:
423           bad = True
424           feedback_fn("  - ERROR: file '%s' missing" % file_name)
425         elif remote_cksum[file_name] != local_cksum[file_name]:
426           bad = True
427           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
428
429     if 'nodelist' not in node_result:
430       bad = True
431       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
432     else:
433       if node_result['nodelist']:
434         bad = True
435         for node in node_result['nodelist']:
436           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
437                           (node, node_result['nodelist'][node]))
438     if 'node-net-test' not in node_result:
439       bad = True
440       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
441     else:
442       if node_result['node-net-test']:
443         bad = True
444         nlist = utils.NiceSort(node_result['node-net-test'].keys())
445         for node in nlist:
446           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
447                           (node, node_result['node-net-test'][node]))
448
449     hyp_result = node_result.get('hypervisor', None)
450     if hyp_result is not None:
451       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
452     return bad
453
454   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
455                       node_instance, feedback_fn):
456     """Verify an instance.
457
458     This function checks to see if the required block devices are
459     available on the instance's node.
460
461     """
462     bad = False
463
464     node_current = instanceconfig.primary_node
465
466     node_vol_should = {}
467     instanceconfig.MapLVsByNode(node_vol_should)
468
469     for node in node_vol_should:
470       for volume in node_vol_should[node]:
471         if node not in node_vol_is or volume not in node_vol_is[node]:
472           feedback_fn("  - ERROR: volume %s missing on node %s" %
473                           (volume, node))
474           bad = True
475
476     if not instanceconfig.status == 'down':
477       if (node_current not in node_instance or
478           not instance in node_instance[node_current]):
479         feedback_fn("  - ERROR: instance %s not running on node %s" %
480                         (instance, node_current))
481         bad = True
482
483     for node in node_instance:
484       if (not node == node_current):
485         if instance in node_instance[node]:
486           feedback_fn("  - ERROR: instance %s should not run on node %s" %
487                           (instance, node))
488           bad = True
489
490     return bad
491
492   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
493     """Verify if there are any unknown volumes in the cluster.
494
495     The .os, .swap and backup volumes are ignored. All other volumes are
496     reported as unknown.
497
498     """
499     bad = False
500
501     for node in node_vol_is:
502       for volume in node_vol_is[node]:
503         if node not in node_vol_should or volume not in node_vol_should[node]:
504           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
505                       (volume, node))
506           bad = True
507     return bad
508
509   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
510     """Verify the list of running instances.
511
512     This checks what instances are running but unknown to the cluster.
513
514     """
515     bad = False
516     for node in node_instance:
517       for runninginstance in node_instance[node]:
518         if runninginstance not in instancelist:
519           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
520                           (runninginstance, node))
521           bad = True
522     return bad
523
524   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
525     """Verify N+1 Memory Resilience.
526
527     Check that if one single node dies we can still start all the instances it
528     was primary for.
529
530     """
531     bad = False
532
533     for node, nodeinfo in node_info.iteritems():
534       # This code checks that every node which is now listed as secondary has
535       # enough memory to host all instances it is supposed to should a single
536       # other node in the cluster fail.
537       # FIXME: not ready for failover to an arbitrary node
538       # FIXME: does not support file-backed instances
539       # WARNING: we currently take into account down instances as well as up
540       # ones, considering that even if they're down someone might want to start
541       # them even in the event of a node failure.
542       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
543         needed_mem = 0
544         for instance in instances:
545           needed_mem += instance_cfg[instance].memory
546         if nodeinfo['mfree'] < needed_mem:
547           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
548                       " failovers should node %s fail" % (node, prinode))
549           bad = True
550     return bad
551
552   def CheckPrereq(self):
553     """Check prerequisites.
554
555     Transform the list of checks we're going to skip into a set and check that
556     all its members are valid.
557
558     """
559     self.skip_set = frozenset(self.op.skip_checks)
560     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
561       raise errors.OpPrereqError("Invalid checks to be skipped specified")
562
563   def BuildHooksEnv(self):
564     """Build hooks env.
565
566     Cluster-Verify hooks just rone in the post phase and their failure makes
567     the output be logged in the verify output and the verification to fail.
568
569     """
570     all_nodes = self.cfg.GetNodeList()
571     # TODO: populate the environment with useful information for verify hooks
572     env = {}
573     return env, [], all_nodes
574
575   def Exec(self, feedback_fn):
576     """Verify integrity of cluster, performing various test on nodes.
577
578     """
579     bad = False
580     feedback_fn("* Verifying global settings")
581     for msg in self.cfg.VerifyConfig():
582       feedback_fn("  - ERROR: %s" % msg)
583
584     vg_name = self.cfg.GetVGName()
585     nodelist = utils.NiceSort(self.cfg.GetNodeList())
586     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
587     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
588     i_non_redundant = [] # Non redundant instances
589     node_volume = {}
590     node_instance = {}
591     node_info = {}
592     instance_cfg = {}
593
594     # FIXME: verify OS list
595     # do local checksums
596     file_names = list(self.sstore.GetFileList())
597     file_names.append(constants.SSL_CERT_FILE)
598     file_names.append(constants.CLUSTER_CONF_FILE)
599     local_checksums = utils.FingerprintFiles(file_names)
600
601     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
602     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
603     all_instanceinfo = rpc.call_instance_list(nodelist)
604     all_vglist = rpc.call_vg_list(nodelist)
605     node_verify_param = {
606       'filelist': file_names,
607       'nodelist': nodelist,
608       'hypervisor': None,
609       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
610                         for node in nodeinfo]
611       }
612     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
613     all_rversion = rpc.call_version(nodelist)
614     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
615
616     for node in nodelist:
617       feedback_fn("* Verifying node %s" % node)
618       result = self._VerifyNode(node, file_names, local_checksums,
619                                 all_vglist[node], all_nvinfo[node],
620                                 all_rversion[node], feedback_fn)
621       bad = bad or result
622
623       # node_volume
624       volumeinfo = all_volumeinfo[node]
625
626       if isinstance(volumeinfo, basestring):
627         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
628                     (node, volumeinfo[-400:].encode('string_escape')))
629         bad = True
630         node_volume[node] = {}
631       elif not isinstance(volumeinfo, dict):
632         feedback_fn("  - ERROR: connection to %s failed" % (node,))
633         bad = True
634         continue
635       else:
636         node_volume[node] = volumeinfo
637
638       # node_instance
639       nodeinstance = all_instanceinfo[node]
640       if type(nodeinstance) != list:
641         feedback_fn("  - ERROR: connection to %s failed" % (node,))
642         bad = True
643         continue
644
645       node_instance[node] = nodeinstance
646
647       # node_info
648       nodeinfo = all_ninfo[node]
649       if not isinstance(nodeinfo, dict):
650         feedback_fn("  - ERROR: connection to %s failed" % (node,))
651         bad = True
652         continue
653
654       try:
655         node_info[node] = {
656           "mfree": int(nodeinfo['memory_free']),
657           "dfree": int(nodeinfo['vg_free']),
658           "pinst": [],
659           "sinst": [],
660           # dictionary holding all instances this node is secondary for,
661           # grouped by their primary node. Each key is a cluster node, and each
662           # value is a list of instances which have the key as primary and the
663           # current node as secondary.  this is handy to calculate N+1 memory
664           # availability if you can only failover from a primary to its
665           # secondary.
666           "sinst-by-pnode": {},
667         }
668       except ValueError:
669         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
670         bad = True
671         continue
672
673     node_vol_should = {}
674
675     for instance in instancelist:
676       feedback_fn("* Verifying instance %s" % instance)
677       inst_config = self.cfg.GetInstanceInfo(instance)
678       result =  self._VerifyInstance(instance, inst_config, node_volume,
679                                      node_instance, feedback_fn)
680       bad = bad or result
681
682       inst_config.MapLVsByNode(node_vol_should)
683
684       instance_cfg[instance] = inst_config
685
686       pnode = inst_config.primary_node
687       if pnode in node_info:
688         node_info[pnode]['pinst'].append(instance)
689       else:
690         feedback_fn("  - ERROR: instance %s, connection to primary node"
691                     " %s failed" % (instance, pnode))
692         bad = True
693
694       # If the instance is non-redundant we cannot survive losing its primary
695       # node, so we are not N+1 compliant. On the other hand we have no disk
696       # templates with more than one secondary so that situation is not well
697       # supported either.
698       # FIXME: does not support file-backed instances
699       if len(inst_config.secondary_nodes) == 0:
700         i_non_redundant.append(instance)
701       elif len(inst_config.secondary_nodes) > 1:
702         feedback_fn("  - WARNING: multiple secondaries for instance %s"
703                     % instance)
704
705       for snode in inst_config.secondary_nodes:
706         if snode in node_info:
707           node_info[snode]['sinst'].append(instance)
708           if pnode not in node_info[snode]['sinst-by-pnode']:
709             node_info[snode]['sinst-by-pnode'][pnode] = []
710           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
711         else:
712           feedback_fn("  - ERROR: instance %s, connection to secondary node"
713                       " %s failed" % (instance, snode))
714
715     feedback_fn("* Verifying orphan volumes")
716     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
717                                        feedback_fn)
718     bad = bad or result
719
720     feedback_fn("* Verifying remaining instances")
721     result = self._VerifyOrphanInstances(instancelist, node_instance,
722                                          feedback_fn)
723     bad = bad or result
724
725     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
726       feedback_fn("* Verifying N+1 Memory redundancy")
727       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
728       bad = bad or result
729
730     feedback_fn("* Other Notes")
731     if i_non_redundant:
732       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
733                   % len(i_non_redundant))
734
735     return int(bad)
736
737   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
738     """Analize the post-hooks' result, handle it, and send some
739     nicely-formatted feedback back to the user.
740
741     Args:
742       phase: the hooks phase that has just been run
743       hooks_results: the results of the multi-node hooks rpc call
744       feedback_fn: function to send feedback back to the caller
745       lu_result: previous Exec result
746
747     """
748     # We only really run POST phase hooks, and are only interested in their results
749     if phase == constants.HOOKS_PHASE_POST:
750       # Used to change hooks' output to proper indentation
751       indent_re = re.compile('^', re.M)
752       feedback_fn("* Hooks Results")
753       if not hooks_results:
754         feedback_fn("  - ERROR: general communication failure")
755         lu_result = 1
756       else:
757         for node_name in hooks_results:
758           show_node_header = True
759           res = hooks_results[node_name]
760           if res is False or not isinstance(res, list):
761             feedback_fn("    Communication failure")
762             lu_result = 1
763             continue
764           for script, hkr, output in res:
765             if hkr == constants.HKR_FAIL:
766               # The node header is only shown once, if there are
767               # failing hooks on that node
768               if show_node_header:
769                 feedback_fn("  Node %s:" % node_name)
770                 show_node_header = False
771               feedback_fn("    ERROR: Script %s failed, output:" % script)
772               output = indent_re.sub('      ', output)
773               feedback_fn("%s" % output)
774               lu_result = 1
775
776       return lu_result
777
778
779 class LUVerifyDisks(NoHooksLU):
780   """Verifies the cluster disks status.
781
782   """
783   _OP_REQP = []
784
785   def CheckPrereq(self):
786     """Check prerequisites.
787
788     This has no prerequisites.
789
790     """
791     pass
792
793   def Exec(self, feedback_fn):
794     """Verify integrity of cluster disks.
795
796     """
797     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
798
799     vg_name = self.cfg.GetVGName()
800     nodes = utils.NiceSort(self.cfg.GetNodeList())
801     instances = [self.cfg.GetInstanceInfo(name)
802                  for name in self.cfg.GetInstanceList()]
803
804     nv_dict = {}
805     for inst in instances:
806       inst_lvs = {}
807       if (inst.status != "up" or
808           inst.disk_template not in constants.DTS_NET_MIRROR):
809         continue
810       inst.MapLVsByNode(inst_lvs)
811       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
812       for node, vol_list in inst_lvs.iteritems():
813         for vol in vol_list:
814           nv_dict[(node, vol)] = inst
815
816     if not nv_dict:
817       return result
818
819     node_lvs = rpc.call_volume_list(nodes, vg_name)
820
821     to_act = set()
822     for node in nodes:
823       # node_volume
824       lvs = node_lvs[node]
825
826       if isinstance(lvs, basestring):
827         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
828         res_nlvm[node] = lvs
829       elif not isinstance(lvs, dict):
830         logger.Info("connection to node %s failed or invalid data returned" %
831                     (node,))
832         res_nodes.append(node)
833         continue
834
835       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
836         inst = nv_dict.pop((node, lv_name), None)
837         if (not lv_online and inst is not None
838             and inst.name not in res_instances):
839           res_instances.append(inst.name)
840
841     # any leftover items in nv_dict are missing LVs, let's arrange the
842     # data better
843     for key, inst in nv_dict.iteritems():
844       if inst.name not in res_missing:
845         res_missing[inst.name] = []
846       res_missing[inst.name].append(key)
847
848     return result
849
850
851 class LURenameCluster(LogicalUnit):
852   """Rename the cluster.
853
854   """
855   HPATH = "cluster-rename"
856   HTYPE = constants.HTYPE_CLUSTER
857   _OP_REQP = ["name"]
858   REQ_WSSTORE = True
859
860   def BuildHooksEnv(self):
861     """Build hooks env.
862
863     """
864     env = {
865       "OP_TARGET": self.sstore.GetClusterName(),
866       "NEW_NAME": self.op.name,
867       }
868     mn = self.sstore.GetMasterNode()
869     return env, [mn], [mn]
870
871   def CheckPrereq(self):
872     """Verify that the passed name is a valid one.
873
874     """
875     hostname = utils.HostInfo(self.op.name)
876
877     new_name = hostname.name
878     self.ip = new_ip = hostname.ip
879     old_name = self.sstore.GetClusterName()
880     old_ip = self.sstore.GetMasterIP()
881     if new_name == old_name and new_ip == old_ip:
882       raise errors.OpPrereqError("Neither the name nor the IP address of the"
883                                  " cluster has changed")
884     if new_ip != old_ip:
885       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
886         raise errors.OpPrereqError("The given cluster IP address (%s) is"
887                                    " reachable on the network. Aborting." %
888                                    new_ip)
889
890     self.op.name = new_name
891
892   def Exec(self, feedback_fn):
893     """Rename the cluster.
894
895     """
896     clustername = self.op.name
897     ip = self.ip
898     ss = self.sstore
899
900     # shutdown the master IP
901     master = ss.GetMasterNode()
902     if not rpc.call_node_stop_master(master):
903       raise errors.OpExecError("Could not disable the master role")
904
905     try:
906       # modify the sstore
907       ss.SetKey(ss.SS_MASTER_IP, ip)
908       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
909
910       # Distribute updated ss config to all nodes
911       myself = self.cfg.GetNodeInfo(master)
912       dist_nodes = self.cfg.GetNodeList()
913       if myself.name in dist_nodes:
914         dist_nodes.remove(myself.name)
915
916       logger.Debug("Copying updated ssconf data to all nodes")
917       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
918         fname = ss.KeyToFilename(keyname)
919         result = rpc.call_upload_file(dist_nodes, fname)
920         for to_node in dist_nodes:
921           if not result[to_node]:
922             logger.Error("copy of file %s to node %s failed" %
923                          (fname, to_node))
924     finally:
925       if not rpc.call_node_start_master(master):
926         logger.Error("Could not re-enable the master role on the master,"
927                      " please restart manually.")
928
929
930 def _RecursiveCheckIfLVMBased(disk):
931   """Check if the given disk or its children are lvm-based.
932
933   Args:
934     disk: ganeti.objects.Disk object
935
936   Returns:
937     boolean indicating whether a LD_LV dev_type was found or not
938
939   """
940   if disk.children:
941     for chdisk in disk.children:
942       if _RecursiveCheckIfLVMBased(chdisk):
943         return True
944   return disk.dev_type == constants.LD_LV
945
946
947 class LUSetClusterParams(LogicalUnit):
948   """Change the parameters of the cluster.
949
950   """
951   HPATH = "cluster-modify"
952   HTYPE = constants.HTYPE_CLUSTER
953   _OP_REQP = []
954
955   def BuildHooksEnv(self):
956     """Build hooks env.
957
958     """
959     env = {
960       "OP_TARGET": self.sstore.GetClusterName(),
961       "NEW_VG_NAME": self.op.vg_name,
962       }
963     mn = self.sstore.GetMasterNode()
964     return env, [mn], [mn]
965
966   def CheckPrereq(self):
967     """Check prerequisites.
968
969     This checks whether the given params don't conflict and
970     if the given volume group is valid.
971
972     """
973     if not self.op.vg_name:
974       instances = [self.cfg.GetInstanceInfo(name)
975                    for name in self.cfg.GetInstanceList()]
976       for inst in instances:
977         for disk in inst.disks:
978           if _RecursiveCheckIfLVMBased(disk):
979             raise errors.OpPrereqError("Cannot disable lvm storage while"
980                                        " lvm-based instances exist")
981
982     # if vg_name not None, checks given volume group on all nodes
983     if self.op.vg_name:
984       node_list = self.cfg.GetNodeList()
985       vglist = rpc.call_vg_list(node_list)
986       for node in node_list:
987         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
988                                               constants.MIN_VG_SIZE)
989         if vgstatus:
990           raise errors.OpPrereqError("Error on node '%s': %s" %
991                                      (node, vgstatus))
992
993   def Exec(self, feedback_fn):
994     """Change the parameters of the cluster.
995
996     """
997     if self.op.vg_name != self.cfg.GetVGName():
998       self.cfg.SetVGName(self.op.vg_name)
999     else:
1000       feedback_fn("Cluster LVM configuration already in desired"
1001                   " state, not changing")
1002
1003
1004 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1005   """Sleep and poll for an instance's disk to sync.
1006
1007   """
1008   if not instance.disks:
1009     return True
1010
1011   if not oneshot:
1012     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1013
1014   node = instance.primary_node
1015
1016   for dev in instance.disks:
1017     cfgw.SetDiskID(dev, node)
1018
1019   retries = 0
1020   while True:
1021     max_time = 0
1022     done = True
1023     cumul_degraded = False
1024     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1025     if not rstats:
1026       proc.LogWarning("Can't get any data from node %s" % node)
1027       retries += 1
1028       if retries >= 10:
1029         raise errors.RemoteError("Can't contact node %s for mirror data,"
1030                                  " aborting." % node)
1031       time.sleep(6)
1032       continue
1033     retries = 0
1034     for i in range(len(rstats)):
1035       mstat = rstats[i]
1036       if mstat is None:
1037         proc.LogWarning("Can't compute data for node %s/%s" %
1038                         (node, instance.disks[i].iv_name))
1039         continue
1040       # we ignore the ldisk parameter
1041       perc_done, est_time, is_degraded, _ = mstat
1042       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1043       if perc_done is not None:
1044         done = False
1045         if est_time is not None:
1046           rem_time = "%d estimated seconds remaining" % est_time
1047           max_time = est_time
1048         else:
1049           rem_time = "no time estimate"
1050         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1051                      (instance.disks[i].iv_name, perc_done, rem_time))
1052     if done or oneshot:
1053       break
1054
1055     if unlock:
1056       #utils.Unlock('cmd')
1057       pass
1058     try:
1059       time.sleep(min(60, max_time))
1060     finally:
1061       if unlock:
1062         #utils.Lock('cmd')
1063         pass
1064
1065   if done:
1066     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1067   return not cumul_degraded
1068
1069
1070 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1071   """Check that mirrors are not degraded.
1072
1073   The ldisk parameter, if True, will change the test from the
1074   is_degraded attribute (which represents overall non-ok status for
1075   the device(s)) to the ldisk (representing the local storage status).
1076
1077   """
1078   cfgw.SetDiskID(dev, node)
1079   if ldisk:
1080     idx = 6
1081   else:
1082     idx = 5
1083
1084   result = True
1085   if on_primary or dev.AssembleOnSecondary():
1086     rstats = rpc.call_blockdev_find(node, dev)
1087     if not rstats:
1088       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1089       result = False
1090     else:
1091       result = result and (not rstats[idx])
1092   if dev.children:
1093     for child in dev.children:
1094       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1095
1096   return result
1097
1098
1099 class LUDiagnoseOS(NoHooksLU):
1100   """Logical unit for OS diagnose/query.
1101
1102   """
1103   _OP_REQP = ["output_fields", "names"]
1104
1105   def CheckPrereq(self):
1106     """Check prerequisites.
1107
1108     This always succeeds, since this is a pure query LU.
1109
1110     """
1111     if self.op.names:
1112       raise errors.OpPrereqError("Selective OS query not supported")
1113
1114     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1115     _CheckOutputFields(static=[],
1116                        dynamic=self.dynamic_fields,
1117                        selected=self.op.output_fields)
1118
1119   @staticmethod
1120   def _DiagnoseByOS(node_list, rlist):
1121     """Remaps a per-node return list into an a per-os per-node dictionary
1122
1123       Args:
1124         node_list: a list with the names of all nodes
1125         rlist: a map with node names as keys and OS objects as values
1126
1127       Returns:
1128         map: a map with osnames as keys and as value another map, with
1129              nodes as
1130              keys and list of OS objects as values
1131              e.g. {"debian-etch": {"node1": [<object>,...],
1132                                    "node2": [<object>,]}
1133                   }
1134
1135     """
1136     all_os = {}
1137     for node_name, nr in rlist.iteritems():
1138       if not nr:
1139         continue
1140       for os_obj in nr:
1141         if os_obj.name not in all_os:
1142           # build a list of nodes for this os containing empty lists
1143           # for each node in node_list
1144           all_os[os_obj.name] = {}
1145           for nname in node_list:
1146             all_os[os_obj.name][nname] = []
1147         all_os[os_obj.name][node_name].append(os_obj)
1148     return all_os
1149
1150   def Exec(self, feedback_fn):
1151     """Compute the list of OSes.
1152
1153     """
1154     node_list = self.cfg.GetNodeList()
1155     node_data = rpc.call_os_diagnose(node_list)
1156     if node_data == False:
1157       raise errors.OpExecError("Can't gather the list of OSes")
1158     pol = self._DiagnoseByOS(node_list, node_data)
1159     output = []
1160     for os_name, os_data in pol.iteritems():
1161       row = []
1162       for field in self.op.output_fields:
1163         if field == "name":
1164           val = os_name
1165         elif field == "valid":
1166           val = utils.all([osl and osl[0] for osl in os_data.values()])
1167         elif field == "node_status":
1168           val = {}
1169           for node_name, nos_list in os_data.iteritems():
1170             val[node_name] = [(v.status, v.path) for v in nos_list]
1171         else:
1172           raise errors.ParameterError(field)
1173         row.append(val)
1174       output.append(row)
1175
1176     return output
1177
1178
1179 class LURemoveNode(LogicalUnit):
1180   """Logical unit for removing a node.
1181
1182   """
1183   HPATH = "node-remove"
1184   HTYPE = constants.HTYPE_NODE
1185   _OP_REQP = ["node_name"]
1186
1187   def BuildHooksEnv(self):
1188     """Build hooks env.
1189
1190     This doesn't run on the target node in the pre phase as a failed
1191     node would then be impossible to remove.
1192
1193     """
1194     env = {
1195       "OP_TARGET": self.op.node_name,
1196       "NODE_NAME": self.op.node_name,
1197       }
1198     all_nodes = self.cfg.GetNodeList()
1199     all_nodes.remove(self.op.node_name)
1200     return env, all_nodes, all_nodes
1201
1202   def CheckPrereq(self):
1203     """Check prerequisites.
1204
1205     This checks:
1206      - the node exists in the configuration
1207      - it does not have primary or secondary instances
1208      - it's not the master
1209
1210     Any errors are signalled by raising errors.OpPrereqError.
1211
1212     """
1213     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1214     if node is None:
1215       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1216
1217     instance_list = self.cfg.GetInstanceList()
1218
1219     masternode = self.sstore.GetMasterNode()
1220     if node.name == masternode:
1221       raise errors.OpPrereqError("Node is the master node,"
1222                                  " you need to failover first.")
1223
1224     for instance_name in instance_list:
1225       instance = self.cfg.GetInstanceInfo(instance_name)
1226       if node.name == instance.primary_node:
1227         raise errors.OpPrereqError("Instance %s still running on the node,"
1228                                    " please remove first." % instance_name)
1229       if node.name in instance.secondary_nodes:
1230         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1231                                    " please remove first." % instance_name)
1232     self.op.node_name = node.name
1233     self.node = node
1234
1235   def Exec(self, feedback_fn):
1236     """Removes the node from the cluster.
1237
1238     """
1239     node = self.node
1240     logger.Info("stopping the node daemon and removing configs from node %s" %
1241                 node.name)
1242
1243     rpc.call_node_leave_cluster(node.name)
1244
1245     logger.Info("Removing node %s from config" % node.name)
1246
1247     self.cfg.RemoveNode(node.name)
1248
1249     utils.RemoveHostFromEtcHosts(node.name)
1250
1251
1252 class LUQueryNodes(NoHooksLU):
1253   """Logical unit for querying nodes.
1254
1255   """
1256   _OP_REQP = ["output_fields", "names"]
1257
1258   def CheckPrereq(self):
1259     """Check prerequisites.
1260
1261     This checks that the fields required are valid output fields.
1262
1263     """
1264     self.dynamic_fields = frozenset([
1265       "dtotal", "dfree",
1266       "mtotal", "mnode", "mfree",
1267       "bootid",
1268       "ctotal",
1269       ])
1270
1271     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1272                                "pinst_list", "sinst_list",
1273                                "pip", "sip", "tags"],
1274                        dynamic=self.dynamic_fields,
1275                        selected=self.op.output_fields)
1276
1277     self.wanted = _GetWantedNodes(self, self.op.names)
1278
1279   def Exec(self, feedback_fn):
1280     """Computes the list of nodes and their attributes.
1281
1282     """
1283     nodenames = self.wanted
1284     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1285
1286     # begin data gathering
1287
1288     if self.dynamic_fields.intersection(self.op.output_fields):
1289       live_data = {}
1290       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1291       for name in nodenames:
1292         nodeinfo = node_data.get(name, None)
1293         if nodeinfo:
1294           live_data[name] = {
1295             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1296             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1297             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1298             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1299             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1300             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1301             "bootid": nodeinfo['bootid'],
1302             }
1303         else:
1304           live_data[name] = {}
1305     else:
1306       live_data = dict.fromkeys(nodenames, {})
1307
1308     node_to_primary = dict([(name, set()) for name in nodenames])
1309     node_to_secondary = dict([(name, set()) for name in nodenames])
1310
1311     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1312                              "sinst_cnt", "sinst_list"))
1313     if inst_fields & frozenset(self.op.output_fields):
1314       instancelist = self.cfg.GetInstanceList()
1315
1316       for instance_name in instancelist:
1317         inst = self.cfg.GetInstanceInfo(instance_name)
1318         if inst.primary_node in node_to_primary:
1319           node_to_primary[inst.primary_node].add(inst.name)
1320         for secnode in inst.secondary_nodes:
1321           if secnode in node_to_secondary:
1322             node_to_secondary[secnode].add(inst.name)
1323
1324     # end data gathering
1325
1326     output = []
1327     for node in nodelist:
1328       node_output = []
1329       for field in self.op.output_fields:
1330         if field == "name":
1331           val = node.name
1332         elif field == "pinst_list":
1333           val = list(node_to_primary[node.name])
1334         elif field == "sinst_list":
1335           val = list(node_to_secondary[node.name])
1336         elif field == "pinst_cnt":
1337           val = len(node_to_primary[node.name])
1338         elif field == "sinst_cnt":
1339           val = len(node_to_secondary[node.name])
1340         elif field == "pip":
1341           val = node.primary_ip
1342         elif field == "sip":
1343           val = node.secondary_ip
1344         elif field == "tags":
1345           val = list(node.GetTags())
1346         elif field in self.dynamic_fields:
1347           val = live_data[node.name].get(field, None)
1348         else:
1349           raise errors.ParameterError(field)
1350         node_output.append(val)
1351       output.append(node_output)
1352
1353     return output
1354
1355
1356 class LUQueryNodeVolumes(NoHooksLU):
1357   """Logical unit for getting volumes on node(s).
1358
1359   """
1360   _OP_REQP = ["nodes", "output_fields"]
1361
1362   def CheckPrereq(self):
1363     """Check prerequisites.
1364
1365     This checks that the fields required are valid output fields.
1366
1367     """
1368     self.nodes = _GetWantedNodes(self, self.op.nodes)
1369
1370     _CheckOutputFields(static=["node"],
1371                        dynamic=["phys", "vg", "name", "size", "instance"],
1372                        selected=self.op.output_fields)
1373
1374
1375   def Exec(self, feedback_fn):
1376     """Computes the list of nodes and their attributes.
1377
1378     """
1379     nodenames = self.nodes
1380     volumes = rpc.call_node_volumes(nodenames)
1381
1382     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1383              in self.cfg.GetInstanceList()]
1384
1385     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1386
1387     output = []
1388     for node in nodenames:
1389       if node not in volumes or not volumes[node]:
1390         continue
1391
1392       node_vols = volumes[node][:]
1393       node_vols.sort(key=lambda vol: vol['dev'])
1394
1395       for vol in node_vols:
1396         node_output = []
1397         for field in self.op.output_fields:
1398           if field == "node":
1399             val = node
1400           elif field == "phys":
1401             val = vol['dev']
1402           elif field == "vg":
1403             val = vol['vg']
1404           elif field == "name":
1405             val = vol['name']
1406           elif field == "size":
1407             val = int(float(vol['size']))
1408           elif field == "instance":
1409             for inst in ilist:
1410               if node not in lv_by_node[inst]:
1411                 continue
1412               if vol['name'] in lv_by_node[inst][node]:
1413                 val = inst.name
1414                 break
1415             else:
1416               val = '-'
1417           else:
1418             raise errors.ParameterError(field)
1419           node_output.append(str(val))
1420
1421         output.append(node_output)
1422
1423     return output
1424
1425
1426 class LUAddNode(LogicalUnit):
1427   """Logical unit for adding node to the cluster.
1428
1429   """
1430   HPATH = "node-add"
1431   HTYPE = constants.HTYPE_NODE
1432   _OP_REQP = ["node_name"]
1433
1434   def BuildHooksEnv(self):
1435     """Build hooks env.
1436
1437     This will run on all nodes before, and on all nodes + the new node after.
1438
1439     """
1440     env = {
1441       "OP_TARGET": self.op.node_name,
1442       "NODE_NAME": self.op.node_name,
1443       "NODE_PIP": self.op.primary_ip,
1444       "NODE_SIP": self.op.secondary_ip,
1445       }
1446     nodes_0 = self.cfg.GetNodeList()
1447     nodes_1 = nodes_0 + [self.op.node_name, ]
1448     return env, nodes_0, nodes_1
1449
1450   def CheckPrereq(self):
1451     """Check prerequisites.
1452
1453     This checks:
1454      - the new node is not already in the config
1455      - it is resolvable
1456      - its parameters (single/dual homed) matches the cluster
1457
1458     Any errors are signalled by raising errors.OpPrereqError.
1459
1460     """
1461     node_name = self.op.node_name
1462     cfg = self.cfg
1463
1464     dns_data = utils.HostInfo(node_name)
1465
1466     node = dns_data.name
1467     primary_ip = self.op.primary_ip = dns_data.ip
1468     secondary_ip = getattr(self.op, "secondary_ip", None)
1469     if secondary_ip is None:
1470       secondary_ip = primary_ip
1471     if not utils.IsValidIP(secondary_ip):
1472       raise errors.OpPrereqError("Invalid secondary IP given")
1473     self.op.secondary_ip = secondary_ip
1474
1475     node_list = cfg.GetNodeList()
1476     if not self.op.readd and node in node_list:
1477       raise errors.OpPrereqError("Node %s is already in the configuration" %
1478                                  node)
1479     elif self.op.readd and node not in node_list:
1480       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1481
1482     for existing_node_name in node_list:
1483       existing_node = cfg.GetNodeInfo(existing_node_name)
1484
1485       if self.op.readd and node == existing_node_name:
1486         if (existing_node.primary_ip != primary_ip or
1487             existing_node.secondary_ip != secondary_ip):
1488           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1489                                      " address configuration as before")
1490         continue
1491
1492       if (existing_node.primary_ip == primary_ip or
1493           existing_node.secondary_ip == primary_ip or
1494           existing_node.primary_ip == secondary_ip or
1495           existing_node.secondary_ip == secondary_ip):
1496         raise errors.OpPrereqError("New node ip address(es) conflict with"
1497                                    " existing node %s" % existing_node.name)
1498
1499     # check that the type of the node (single versus dual homed) is the
1500     # same as for the master
1501     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1502     master_singlehomed = myself.secondary_ip == myself.primary_ip
1503     newbie_singlehomed = secondary_ip == primary_ip
1504     if master_singlehomed != newbie_singlehomed:
1505       if master_singlehomed:
1506         raise errors.OpPrereqError("The master has no private ip but the"
1507                                    " new node has one")
1508       else:
1509         raise errors.OpPrereqError("The master has a private ip but the"
1510                                    " new node doesn't have one")
1511
1512     # checks reachablity
1513     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1514       raise errors.OpPrereqError("Node not reachable by ping")
1515
1516     if not newbie_singlehomed:
1517       # check reachability from my secondary ip to newbie's secondary ip
1518       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1519                            source=myself.secondary_ip):
1520         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1521                                    " based ping to noded port")
1522
1523     self.new_node = objects.Node(name=node,
1524                                  primary_ip=primary_ip,
1525                                  secondary_ip=secondary_ip)
1526
1527   def Exec(self, feedback_fn):
1528     """Adds the new node to the cluster.
1529
1530     """
1531     new_node = self.new_node
1532     node = new_node.name
1533
1534     # check connectivity
1535     result = rpc.call_version([node])[node]
1536     if result:
1537       if constants.PROTOCOL_VERSION == result:
1538         logger.Info("communication to node %s fine, sw version %s match" %
1539                     (node, result))
1540       else:
1541         raise errors.OpExecError("Version mismatch master version %s,"
1542                                  " node version %s" %
1543                                  (constants.PROTOCOL_VERSION, result))
1544     else:
1545       raise errors.OpExecError("Cannot get version from the new node")
1546
1547     # setup ssh on node
1548     logger.Info("copy ssh key to node %s" % node)
1549     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1550     keyarray = []
1551     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1552                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1553                 priv_key, pub_key]
1554
1555     for i in keyfiles:
1556       f = open(i, 'r')
1557       try:
1558         keyarray.append(f.read())
1559       finally:
1560         f.close()
1561
1562     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1563                                keyarray[3], keyarray[4], keyarray[5])
1564
1565     if not result:
1566       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1567
1568     # Add node to our /etc/hosts, and add key to known_hosts
1569     utils.AddHostToEtcHosts(new_node.name)
1570
1571     if new_node.secondary_ip != new_node.primary_ip:
1572       if not rpc.call_node_tcp_ping(new_node.name,
1573                                     constants.LOCALHOST_IP_ADDRESS,
1574                                     new_node.secondary_ip,
1575                                     constants.DEFAULT_NODED_PORT,
1576                                     10, False):
1577         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1578                                  " you gave (%s). Please fix and re-run this"
1579                                  " command." % new_node.secondary_ip)
1580
1581     node_verify_list = [self.sstore.GetMasterNode()]
1582     node_verify_param = {
1583       'nodelist': [node],
1584       # TODO: do a node-net-test as well?
1585     }
1586
1587     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1588     for verifier in node_verify_list:
1589       if not result[verifier]:
1590         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1591                                  " for remote verification" % verifier)
1592       if result[verifier]['nodelist']:
1593         for failed in result[verifier]['nodelist']:
1594           feedback_fn("ssh/hostname verification failed %s -> %s" %
1595                       (verifier, result[verifier]['nodelist'][failed]))
1596         raise errors.OpExecError("ssh/hostname verification failed.")
1597
1598     # Distribute updated /etc/hosts and known_hosts to all nodes,
1599     # including the node just added
1600     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1601     dist_nodes = self.cfg.GetNodeList()
1602     if not self.op.readd:
1603       dist_nodes.append(node)
1604     if myself.name in dist_nodes:
1605       dist_nodes.remove(myself.name)
1606
1607     logger.Debug("Copying hosts and known_hosts to all nodes")
1608     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1609       result = rpc.call_upload_file(dist_nodes, fname)
1610       for to_node in dist_nodes:
1611         if not result[to_node]:
1612           logger.Error("copy of file %s to node %s failed" %
1613                        (fname, to_node))
1614
1615     to_copy = self.sstore.GetFileList()
1616     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1617       to_copy.append(constants.VNC_PASSWORD_FILE)
1618     for fname in to_copy:
1619       result = rpc.call_upload_file([node], fname)
1620       if not result[node]:
1621         logger.Error("could not copy file %s to node %s" % (fname, node))
1622
1623     if not self.op.readd:
1624       logger.Info("adding node %s to cluster.conf" % node)
1625       self.cfg.AddNode(new_node)
1626
1627
1628 class LUMasterFailover(LogicalUnit):
1629   """Failover the master node to the current node.
1630
1631   This is a special LU in that it must run on a non-master node.
1632
1633   """
1634   HPATH = "master-failover"
1635   HTYPE = constants.HTYPE_CLUSTER
1636   REQ_MASTER = False
1637   REQ_WSSTORE = True
1638   _OP_REQP = []
1639
1640   def BuildHooksEnv(self):
1641     """Build hooks env.
1642
1643     This will run on the new master only in the pre phase, and on all
1644     the nodes in the post phase.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.new_master,
1649       "NEW_MASTER": self.new_master,
1650       "OLD_MASTER": self.old_master,
1651       }
1652     return env, [self.new_master], self.cfg.GetNodeList()
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that we are not already the master.
1658
1659     """
1660     self.new_master = utils.HostInfo().name
1661     self.old_master = self.sstore.GetMasterNode()
1662
1663     if self.old_master == self.new_master:
1664       raise errors.OpPrereqError("This commands must be run on the node"
1665                                  " where you want the new master to be."
1666                                  " %s is already the master" %
1667                                  self.old_master)
1668
1669   def Exec(self, feedback_fn):
1670     """Failover the master node.
1671
1672     This command, when run on a non-master node, will cause the current
1673     master to cease being master, and the non-master to become new
1674     master.
1675
1676     """
1677     #TODO: do not rely on gethostname returning the FQDN
1678     logger.Info("setting master to %s, old master: %s" %
1679                 (self.new_master, self.old_master))
1680
1681     if not rpc.call_node_stop_master(self.old_master):
1682       logger.Error("could disable the master role on the old master"
1683                    " %s, please disable manually" % self.old_master)
1684
1685     ss = self.sstore
1686     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689       logger.Error("could not distribute the new simple store master file"
1690                    " to the other nodes, please check.")
1691
1692     if not rpc.call_node_start_master(self.new_master):
1693       logger.Error("could not start the master role on the new master"
1694                    " %s, please check" % self.new_master)
1695       feedback_fn("Error in activating the master IP on the new master,"
1696                   " please fix manually.")
1697
1698
1699
1700 class LUQueryClusterInfo(NoHooksLU):
1701   """Query cluster configuration.
1702
1703   """
1704   _OP_REQP = []
1705   REQ_MASTER = False
1706
1707   def CheckPrereq(self):
1708     """No prerequsites needed for this LU.
1709
1710     """
1711     pass
1712
1713   def Exec(self, feedback_fn):
1714     """Return cluster config.
1715
1716     """
1717     result = {
1718       "name": self.sstore.GetClusterName(),
1719       "software_version": constants.RELEASE_VERSION,
1720       "protocol_version": constants.PROTOCOL_VERSION,
1721       "config_version": constants.CONFIG_VERSION,
1722       "os_api_version": constants.OS_API_VERSION,
1723       "export_version": constants.EXPORT_VERSION,
1724       "master": self.sstore.GetMasterNode(),
1725       "architecture": (platform.architecture()[0], platform.machine()),
1726       "hypervisor_type": self.sstore.GetHypervisorType(),
1727       }
1728
1729     return result
1730
1731
1732 class LUDumpClusterConfig(NoHooksLU):
1733   """Return a text-representation of the cluster-config.
1734
1735   """
1736   _OP_REQP = []
1737
1738   def CheckPrereq(self):
1739     """No prerequisites.
1740
1741     """
1742     pass
1743
1744   def Exec(self, feedback_fn):
1745     """Dump a representation of the cluster config to the standard output.
1746
1747     """
1748     return self.cfg.DumpConfig()
1749
1750
1751 class LUActivateInstanceDisks(NoHooksLU):
1752   """Bring up an instance's disks.
1753
1754   """
1755   _OP_REQP = ["instance_name"]
1756
1757   def CheckPrereq(self):
1758     """Check prerequisites.
1759
1760     This checks that the instance is in the cluster.
1761
1762     """
1763     instance = self.cfg.GetInstanceInfo(
1764       self.cfg.ExpandInstanceName(self.op.instance_name))
1765     if instance is None:
1766       raise errors.OpPrereqError("Instance '%s' not known" %
1767                                  self.op.instance_name)
1768     self.instance = instance
1769
1770
1771   def Exec(self, feedback_fn):
1772     """Activate the disks.
1773
1774     """
1775     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1776     if not disks_ok:
1777       raise errors.OpExecError("Cannot activate block devices")
1778
1779     return disks_info
1780
1781
1782 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1783   """Prepare the block devices for an instance.
1784
1785   This sets up the block devices on all nodes.
1786
1787   Args:
1788     instance: a ganeti.objects.Instance object
1789     ignore_secondaries: if true, errors on secondary nodes won't result
1790                         in an error return from the function
1791
1792   Returns:
1793     false if the operation failed
1794     list of (host, instance_visible_name, node_visible_name) if the operation
1795          suceeded with the mapping from node devices to instance devices
1796   """
1797   device_info = []
1798   disks_ok = True
1799   iname = instance.name
1800   # With the two passes mechanism we try to reduce the window of
1801   # opportunity for the race condition of switching DRBD to primary
1802   # before handshaking occured, but we do not eliminate it
1803
1804   # The proper fix would be to wait (with some limits) until the
1805   # connection has been made and drbd transitions from WFConnection
1806   # into any other network-connected state (Connected, SyncTarget,
1807   # SyncSource, etc.)
1808
1809   # 1st pass, assemble on all nodes in secondary mode
1810   for inst_disk in instance.disks:
1811     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1812       cfg.SetDiskID(node_disk, node)
1813       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1814       if not result:
1815         logger.Error("could not prepare block device %s on node %s"
1816                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1817         if not ignore_secondaries:
1818           disks_ok = False
1819
1820   # FIXME: race condition on drbd migration to primary
1821
1822   # 2nd pass, do only the primary node
1823   for inst_disk in instance.disks:
1824     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825       if node != instance.primary_node:
1826         continue
1827       cfg.SetDiskID(node_disk, node)
1828       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1829       if not result:
1830         logger.Error("could not prepare block device %s on node %s"
1831                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1832         disks_ok = False
1833     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1834
1835   # leave the disks configured for the primary node
1836   # this is a workaround that would be fixed better by
1837   # improving the logical/physical id handling
1838   for disk in instance.disks:
1839     cfg.SetDiskID(disk, instance.primary_node)
1840
1841   return disks_ok, device_info
1842
1843
1844 def _StartInstanceDisks(cfg, instance, force):
1845   """Start the disks of an instance.
1846
1847   """
1848   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1849                                            ignore_secondaries=force)
1850   if not disks_ok:
1851     _ShutdownInstanceDisks(instance, cfg)
1852     if force is not None and not force:
1853       logger.Error("If the message above refers to a secondary node,"
1854                    " you can retry the operation using '--force'.")
1855     raise errors.OpExecError("Disk consistency error")
1856
1857
1858 class LUDeactivateInstanceDisks(NoHooksLU):
1859   """Shutdown an instance's disks.
1860
1861   """
1862   _OP_REQP = ["instance_name"]
1863
1864   def CheckPrereq(self):
1865     """Check prerequisites.
1866
1867     This checks that the instance is in the cluster.
1868
1869     """
1870     instance = self.cfg.GetInstanceInfo(
1871       self.cfg.ExpandInstanceName(self.op.instance_name))
1872     if instance is None:
1873       raise errors.OpPrereqError("Instance '%s' not known" %
1874                                  self.op.instance_name)
1875     self.instance = instance
1876
1877   def Exec(self, feedback_fn):
1878     """Deactivate the disks
1879
1880     """
1881     instance = self.instance
1882     ins_l = rpc.call_instance_list([instance.primary_node])
1883     ins_l = ins_l[instance.primary_node]
1884     if not type(ins_l) is list:
1885       raise errors.OpExecError("Can't contact node '%s'" %
1886                                instance.primary_node)
1887
1888     if self.instance.name in ins_l:
1889       raise errors.OpExecError("Instance is running, can't shutdown"
1890                                " block devices.")
1891
1892     _ShutdownInstanceDisks(instance, self.cfg)
1893
1894
1895 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1896   """Shutdown block devices of an instance.
1897
1898   This does the shutdown on all nodes of the instance.
1899
1900   If the ignore_primary is false, errors on the primary node are
1901   ignored.
1902
1903   """
1904   result = True
1905   for disk in instance.disks:
1906     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1907       cfg.SetDiskID(top_disk, node)
1908       if not rpc.call_blockdev_shutdown(node, top_disk):
1909         logger.Error("could not shutdown block device %s on node %s" %
1910                      (disk.iv_name, node))
1911         if not ignore_primary or node != instance.primary_node:
1912           result = False
1913   return result
1914
1915
1916 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1917   """Checks if a node has enough free memory.
1918
1919   This function check if a given node has the needed amount of free
1920   memory. In case the node has less memory or we cannot get the
1921   information from the node, this function raise an OpPrereqError
1922   exception.
1923
1924   Args:
1925     - cfg: a ConfigWriter instance
1926     - node: the node name
1927     - reason: string to use in the error message
1928     - requested: the amount of memory in MiB
1929
1930   """
1931   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1932   if not nodeinfo or not isinstance(nodeinfo, dict):
1933     raise errors.OpPrereqError("Could not contact node %s for resource"
1934                              " information" % (node,))
1935
1936   free_mem = nodeinfo[node].get('memory_free')
1937   if not isinstance(free_mem, int):
1938     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1939                              " was '%s'" % (node, free_mem))
1940   if requested > free_mem:
1941     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1942                              " needed %s MiB, available %s MiB" %
1943                              (node, reason, requested, free_mem))
1944
1945
1946 class LUStartupInstance(LogicalUnit):
1947   """Starts an instance.
1948
1949   """
1950   HPATH = "instance-start"
1951   HTYPE = constants.HTYPE_INSTANCE
1952   _OP_REQP = ["instance_name", "force"]
1953
1954   def BuildHooksEnv(self):
1955     """Build hooks env.
1956
1957     This runs on master, primary and secondary nodes of the instance.
1958
1959     """
1960     env = {
1961       "FORCE": self.op.force,
1962       }
1963     env.update(_BuildInstanceHookEnvByObject(self.instance))
1964     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965           list(self.instance.secondary_nodes))
1966     return env, nl, nl
1967
1968   def CheckPrereq(self):
1969     """Check prerequisites.
1970
1971     This checks that the instance is in the cluster.
1972
1973     """
1974     instance = self.cfg.GetInstanceInfo(
1975       self.cfg.ExpandInstanceName(self.op.instance_name))
1976     if instance is None:
1977       raise errors.OpPrereqError("Instance '%s' not known" %
1978                                  self.op.instance_name)
1979
1980     # check bridges existance
1981     _CheckInstanceBridgesExist(instance)
1982
1983     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1984                          "starting instance %s" % instance.name,
1985                          instance.memory)
1986
1987     self.instance = instance
1988     self.op.instance_name = instance.name
1989
1990   def Exec(self, feedback_fn):
1991     """Start the instance.
1992
1993     """
1994     instance = self.instance
1995     force = self.op.force
1996     extra_args = getattr(self.op, "extra_args", "")
1997
1998     self.cfg.MarkInstanceUp(instance.name)
1999
2000     node_current = instance.primary_node
2001
2002     _StartInstanceDisks(self.cfg, instance, force)
2003
2004     if not rpc.call_instance_start(node_current, instance, extra_args):
2005       _ShutdownInstanceDisks(instance, self.cfg)
2006       raise errors.OpExecError("Could not start instance")
2007
2008
2009 class LURebootInstance(LogicalUnit):
2010   """Reboot an instance.
2011
2012   """
2013   HPATH = "instance-reboot"
2014   HTYPE = constants.HTYPE_INSTANCE
2015   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2016
2017   def BuildHooksEnv(self):
2018     """Build hooks env.
2019
2020     This runs on master, primary and secondary nodes of the instance.
2021
2022     """
2023     env = {
2024       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2025       }
2026     env.update(_BuildInstanceHookEnvByObject(self.instance))
2027     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2028           list(self.instance.secondary_nodes))
2029     return env, nl, nl
2030
2031   def CheckPrereq(self):
2032     """Check prerequisites.
2033
2034     This checks that the instance is in the cluster.
2035
2036     """
2037     instance = self.cfg.GetInstanceInfo(
2038       self.cfg.ExpandInstanceName(self.op.instance_name))
2039     if instance is None:
2040       raise errors.OpPrereqError("Instance '%s' not known" %
2041                                  self.op.instance_name)
2042
2043     # check bridges existance
2044     _CheckInstanceBridgesExist(instance)
2045
2046     self.instance = instance
2047     self.op.instance_name = instance.name
2048
2049   def Exec(self, feedback_fn):
2050     """Reboot the instance.
2051
2052     """
2053     instance = self.instance
2054     ignore_secondaries = self.op.ignore_secondaries
2055     reboot_type = self.op.reboot_type
2056     extra_args = getattr(self.op, "extra_args", "")
2057
2058     node_current = instance.primary_node
2059
2060     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2061                            constants.INSTANCE_REBOOT_HARD,
2062                            constants.INSTANCE_REBOOT_FULL]:
2063       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2064                                   (constants.INSTANCE_REBOOT_SOFT,
2065                                    constants.INSTANCE_REBOOT_HARD,
2066                                    constants.INSTANCE_REBOOT_FULL))
2067
2068     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2069                        constants.INSTANCE_REBOOT_HARD]:
2070       if not rpc.call_instance_reboot(node_current, instance,
2071                                       reboot_type, extra_args):
2072         raise errors.OpExecError("Could not reboot instance")
2073     else:
2074       if not rpc.call_instance_shutdown(node_current, instance):
2075         raise errors.OpExecError("could not shutdown instance for full reboot")
2076       _ShutdownInstanceDisks(instance, self.cfg)
2077       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2078       if not rpc.call_instance_start(node_current, instance, extra_args):
2079         _ShutdownInstanceDisks(instance, self.cfg)
2080         raise errors.OpExecError("Could not start instance for full reboot")
2081
2082     self.cfg.MarkInstanceUp(instance.name)
2083
2084
2085 class LUShutdownInstance(LogicalUnit):
2086   """Shutdown an instance.
2087
2088   """
2089   HPATH = "instance-stop"
2090   HTYPE = constants.HTYPE_INSTANCE
2091   _OP_REQP = ["instance_name"]
2092
2093   def BuildHooksEnv(self):
2094     """Build hooks env.
2095
2096     This runs on master, primary and secondary nodes of the instance.
2097
2098     """
2099     env = _BuildInstanceHookEnvByObject(self.instance)
2100     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2101           list(self.instance.secondary_nodes))
2102     return env, nl, nl
2103
2104   def CheckPrereq(self):
2105     """Check prerequisites.
2106
2107     This checks that the instance is in the cluster.
2108
2109     """
2110     instance = self.cfg.GetInstanceInfo(
2111       self.cfg.ExpandInstanceName(self.op.instance_name))
2112     if instance is None:
2113       raise errors.OpPrereqError("Instance '%s' not known" %
2114                                  self.op.instance_name)
2115     self.instance = instance
2116
2117   def Exec(self, feedback_fn):
2118     """Shutdown the instance.
2119
2120     """
2121     instance = self.instance
2122     node_current = instance.primary_node
2123     self.cfg.MarkInstanceDown(instance.name)
2124     if not rpc.call_instance_shutdown(node_current, instance):
2125       logger.Error("could not shutdown instance")
2126
2127     _ShutdownInstanceDisks(instance, self.cfg)
2128
2129
2130 class LUReinstallInstance(LogicalUnit):
2131   """Reinstall an instance.
2132
2133   """
2134   HPATH = "instance-reinstall"
2135   HTYPE = constants.HTYPE_INSTANCE
2136   _OP_REQP = ["instance_name"]
2137
2138   def BuildHooksEnv(self):
2139     """Build hooks env.
2140
2141     This runs on master, primary and secondary nodes of the instance.
2142
2143     """
2144     env = _BuildInstanceHookEnvByObject(self.instance)
2145     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146           list(self.instance.secondary_nodes))
2147     return env, nl, nl
2148
2149   def CheckPrereq(self):
2150     """Check prerequisites.
2151
2152     This checks that the instance is in the cluster and is not running.
2153
2154     """
2155     instance = self.cfg.GetInstanceInfo(
2156       self.cfg.ExpandInstanceName(self.op.instance_name))
2157     if instance is None:
2158       raise errors.OpPrereqError("Instance '%s' not known" %
2159                                  self.op.instance_name)
2160     if instance.disk_template == constants.DT_DISKLESS:
2161       raise errors.OpPrereqError("Instance '%s' has no disks" %
2162                                  self.op.instance_name)
2163     if instance.status != "down":
2164       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2165                                  self.op.instance_name)
2166     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2167     if remote_info:
2168       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2169                                  (self.op.instance_name,
2170                                   instance.primary_node))
2171
2172     self.op.os_type = getattr(self.op, "os_type", None)
2173     if self.op.os_type is not None:
2174       # OS verification
2175       pnode = self.cfg.GetNodeInfo(
2176         self.cfg.ExpandNodeName(instance.primary_node))
2177       if pnode is None:
2178         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2179                                    self.op.pnode)
2180       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2181       if not os_obj:
2182         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2183                                    " primary node"  % self.op.os_type)
2184
2185     self.instance = instance
2186
2187   def Exec(self, feedback_fn):
2188     """Reinstall the instance.
2189
2190     """
2191     inst = self.instance
2192
2193     if self.op.os_type is not None:
2194       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2195       inst.os = self.op.os_type
2196       self.cfg.AddInstance(inst)
2197
2198     _StartInstanceDisks(self.cfg, inst, None)
2199     try:
2200       feedback_fn("Running the instance OS create scripts...")
2201       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2202         raise errors.OpExecError("Could not install OS for instance %s"
2203                                  " on node %s" %
2204                                  (inst.name, inst.primary_node))
2205     finally:
2206       _ShutdownInstanceDisks(inst, self.cfg)
2207
2208
2209 class LURenameInstance(LogicalUnit):
2210   """Rename an instance.
2211
2212   """
2213   HPATH = "instance-rename"
2214   HTYPE = constants.HTYPE_INSTANCE
2215   _OP_REQP = ["instance_name", "new_name"]
2216
2217   def BuildHooksEnv(self):
2218     """Build hooks env.
2219
2220     This runs on master, primary and secondary nodes of the instance.
2221
2222     """
2223     env = _BuildInstanceHookEnvByObject(self.instance)
2224     env["INSTANCE_NEW_NAME"] = self.op.new_name
2225     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2226           list(self.instance.secondary_nodes))
2227     return env, nl, nl
2228
2229   def CheckPrereq(self):
2230     """Check prerequisites.
2231
2232     This checks that the instance is in the cluster and is not running.
2233
2234     """
2235     instance = self.cfg.GetInstanceInfo(
2236       self.cfg.ExpandInstanceName(self.op.instance_name))
2237     if instance is None:
2238       raise errors.OpPrereqError("Instance '%s' not known" %
2239                                  self.op.instance_name)
2240     if instance.status != "down":
2241       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2242                                  self.op.instance_name)
2243     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2244     if remote_info:
2245       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2246                                  (self.op.instance_name,
2247                                   instance.primary_node))
2248     self.instance = instance
2249
2250     # new name verification
2251     name_info = utils.HostInfo(self.op.new_name)
2252
2253     self.op.new_name = new_name = name_info.name
2254     instance_list = self.cfg.GetInstanceList()
2255     if new_name in instance_list:
2256       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2257                                  new_name)
2258
2259     if not getattr(self.op, "ignore_ip", False):
2260       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2261         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2262                                    (name_info.ip, new_name))
2263
2264
2265   def Exec(self, feedback_fn):
2266     """Reinstall the instance.
2267
2268     """
2269     inst = self.instance
2270     old_name = inst.name
2271
2272     if inst.disk_template == constants.DT_FILE:
2273       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2274
2275     self.cfg.RenameInstance(inst.name, self.op.new_name)
2276
2277     # re-read the instance from the configuration after rename
2278     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2279
2280     if inst.disk_template == constants.DT_FILE:
2281       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2282       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2283                                                 old_file_storage_dir,
2284                                                 new_file_storage_dir)
2285
2286       if not result:
2287         raise errors.OpExecError("Could not connect to node '%s' to rename"
2288                                  " directory '%s' to '%s' (but the instance"
2289                                  " has been renamed in Ganeti)" % (
2290                                  inst.primary_node, old_file_storage_dir,
2291                                  new_file_storage_dir))
2292
2293       if not result[0]:
2294         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2295                                  " (but the instance has been renamed in"
2296                                  " Ganeti)" % (old_file_storage_dir,
2297                                                new_file_storage_dir))
2298
2299     _StartInstanceDisks(self.cfg, inst, None)
2300     try:
2301       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2302                                           "sda", "sdb"):
2303         msg = ("Could run OS rename script for instance %s on node %s (but the"
2304                " instance has been renamed in Ganeti)" %
2305                (inst.name, inst.primary_node))
2306         logger.Error(msg)
2307     finally:
2308       _ShutdownInstanceDisks(inst, self.cfg)
2309
2310
2311 class LURemoveInstance(LogicalUnit):
2312   """Remove an instance.
2313
2314   """
2315   HPATH = "instance-remove"
2316   HTYPE = constants.HTYPE_INSTANCE
2317   _OP_REQP = ["instance_name", "ignore_failures"]
2318
2319   def BuildHooksEnv(self):
2320     """Build hooks env.
2321
2322     This runs on master, primary and secondary nodes of the instance.
2323
2324     """
2325     env = _BuildInstanceHookEnvByObject(self.instance)
2326     nl = [self.sstore.GetMasterNode()]
2327     return env, nl, nl
2328
2329   def CheckPrereq(self):
2330     """Check prerequisites.
2331
2332     This checks that the instance is in the cluster.
2333
2334     """
2335     instance = self.cfg.GetInstanceInfo(
2336       self.cfg.ExpandInstanceName(self.op.instance_name))
2337     if instance is None:
2338       raise errors.OpPrereqError("Instance '%s' not known" %
2339                                  self.op.instance_name)
2340     self.instance = instance
2341
2342   def Exec(self, feedback_fn):
2343     """Remove the instance.
2344
2345     """
2346     instance = self.instance
2347     logger.Info("shutting down instance %s on node %s" %
2348                 (instance.name, instance.primary_node))
2349
2350     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351       if self.op.ignore_failures:
2352         feedback_fn("Warning: can't shutdown instance")
2353       else:
2354         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355                                  (instance.name, instance.primary_node))
2356
2357     logger.Info("removing block devices for instance %s" % instance.name)
2358
2359     if not _RemoveDisks(instance, self.cfg):
2360       if self.op.ignore_failures:
2361         feedback_fn("Warning: can't remove instance's disks")
2362       else:
2363         raise errors.OpExecError("Can't remove instance's disks")
2364
2365     logger.Info("removing instance %s out of cluster config" % instance.name)
2366
2367     self.cfg.RemoveInstance(instance.name)
2368
2369
2370 class LUQueryInstances(NoHooksLU):
2371   """Logical unit for querying instances.
2372
2373   """
2374   _OP_REQP = ["output_fields", "names"]
2375
2376   def CheckPrereq(self):
2377     """Check prerequisites.
2378
2379     This checks that the fields required are valid output fields.
2380
2381     """
2382     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2383     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384                                "admin_state", "admin_ram",
2385                                "disk_template", "ip", "mac", "bridge",
2386                                "sda_size", "sdb_size", "vcpus", "tags"],
2387                        dynamic=self.dynamic_fields,
2388                        selected=self.op.output_fields)
2389
2390     self.wanted = _GetWantedInstances(self, self.op.names)
2391
2392   def Exec(self, feedback_fn):
2393     """Computes the list of nodes and their attributes.
2394
2395     """
2396     instance_names = self.wanted
2397     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2398                      in instance_names]
2399
2400     # begin data gathering
2401
2402     nodes = frozenset([inst.primary_node for inst in instance_list])
2403
2404     bad_nodes = []
2405     if self.dynamic_fields.intersection(self.op.output_fields):
2406       live_data = {}
2407       node_data = rpc.call_all_instances_info(nodes)
2408       for name in nodes:
2409         result = node_data[name]
2410         if result:
2411           live_data.update(result)
2412         elif result == False:
2413           bad_nodes.append(name)
2414         # else no instance is alive
2415     else:
2416       live_data = dict([(name, {}) for name in instance_names])
2417
2418     # end data gathering
2419
2420     output = []
2421     for instance in instance_list:
2422       iout = []
2423       for field in self.op.output_fields:
2424         if field == "name":
2425           val = instance.name
2426         elif field == "os":
2427           val = instance.os
2428         elif field == "pnode":
2429           val = instance.primary_node
2430         elif field == "snodes":
2431           val = list(instance.secondary_nodes)
2432         elif field == "admin_state":
2433           val = (instance.status != "down")
2434         elif field == "oper_state":
2435           if instance.primary_node in bad_nodes:
2436             val = None
2437           else:
2438             val = bool(live_data.get(instance.name))
2439         elif field == "status":
2440           if instance.primary_node in bad_nodes:
2441             val = "ERROR_nodedown"
2442           else:
2443             running = bool(live_data.get(instance.name))
2444             if running:
2445               if instance.status != "down":
2446                 val = "running"
2447               else:
2448                 val = "ERROR_up"
2449             else:
2450               if instance.status != "down":
2451                 val = "ERROR_down"
2452               else:
2453                 val = "ADMIN_down"
2454         elif field == "admin_ram":
2455           val = instance.memory
2456         elif field == "oper_ram":
2457           if instance.primary_node in bad_nodes:
2458             val = None
2459           elif instance.name in live_data:
2460             val = live_data[instance.name].get("memory", "?")
2461           else:
2462             val = "-"
2463         elif field == "disk_template":
2464           val = instance.disk_template
2465         elif field == "ip":
2466           val = instance.nics[0].ip
2467         elif field == "bridge":
2468           val = instance.nics[0].bridge
2469         elif field == "mac":
2470           val = instance.nics[0].mac
2471         elif field == "sda_size" or field == "sdb_size":
2472           disk = instance.FindDisk(field[:3])
2473           if disk is None:
2474             val = None
2475           else:
2476             val = disk.size
2477         elif field == "vcpus":
2478           val = instance.vcpus
2479         elif field == "tags":
2480           val = list(instance.GetTags())
2481         else:
2482           raise errors.ParameterError(field)
2483         iout.append(val)
2484       output.append(iout)
2485
2486     return output
2487
2488
2489 class LUFailoverInstance(LogicalUnit):
2490   """Failover an instance.
2491
2492   """
2493   HPATH = "instance-failover"
2494   HTYPE = constants.HTYPE_INSTANCE
2495   _OP_REQP = ["instance_name", "ignore_consistency"]
2496
2497   def BuildHooksEnv(self):
2498     """Build hooks env.
2499
2500     This runs on master, primary and secondary nodes of the instance.
2501
2502     """
2503     env = {
2504       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2505       }
2506     env.update(_BuildInstanceHookEnvByObject(self.instance))
2507     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2508     return env, nl, nl
2509
2510   def CheckPrereq(self):
2511     """Check prerequisites.
2512
2513     This checks that the instance is in the cluster.
2514
2515     """
2516     instance = self.cfg.GetInstanceInfo(
2517       self.cfg.ExpandInstanceName(self.op.instance_name))
2518     if instance is None:
2519       raise errors.OpPrereqError("Instance '%s' not known" %
2520                                  self.op.instance_name)
2521
2522     if instance.disk_template not in constants.DTS_NET_MIRROR:
2523       raise errors.OpPrereqError("Instance's disk layout is not"
2524                                  " network mirrored, cannot failover.")
2525
2526     secondary_nodes = instance.secondary_nodes
2527     if not secondary_nodes:
2528       raise errors.ProgrammerError("no secondary node but using "
2529                                    "a mirrored disk template")
2530
2531     target_node = secondary_nodes[0]
2532     # check memory requirements on the secondary node
2533     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2534                          instance.name, instance.memory)
2535
2536     # check bridge existance
2537     brlist = [nic.bridge for nic in instance.nics]
2538     if not rpc.call_bridges_exist(target_node, brlist):
2539       raise errors.OpPrereqError("One or more target bridges %s does not"
2540                                  " exist on destination node '%s'" %
2541                                  (brlist, target_node))
2542
2543     self.instance = instance
2544
2545   def Exec(self, feedback_fn):
2546     """Failover an instance.
2547
2548     The failover is done by shutting it down on its present node and
2549     starting it on the secondary.
2550
2551     """
2552     instance = self.instance
2553
2554     source_node = instance.primary_node
2555     target_node = instance.secondary_nodes[0]
2556
2557     feedback_fn("* checking disk consistency between source and target")
2558     for dev in instance.disks:
2559       # for drbd, these are drbd over lvm
2560       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2561         if instance.status == "up" and not self.op.ignore_consistency:
2562           raise errors.OpExecError("Disk %s is degraded on target node,"
2563                                    " aborting failover." % dev.iv_name)
2564
2565     feedback_fn("* shutting down instance on source node")
2566     logger.Info("Shutting down instance %s on node %s" %
2567                 (instance.name, source_node))
2568
2569     if not rpc.call_instance_shutdown(source_node, instance):
2570       if self.op.ignore_consistency:
2571         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2572                      " anyway. Please make sure node %s is down"  %
2573                      (instance.name, source_node, source_node))
2574       else:
2575         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576                                  (instance.name, source_node))
2577
2578     feedback_fn("* deactivating the instance's disks on source node")
2579     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2580       raise errors.OpExecError("Can't shut down the instance's disks.")
2581
2582     instance.primary_node = target_node
2583     # distribute new instance config to the other nodes
2584     self.cfg.Update(instance)
2585
2586     # Only start the instance if it's marked as up
2587     if instance.status == "up":
2588       feedback_fn("* activating the instance's disks on target node")
2589       logger.Info("Starting instance %s on node %s" %
2590                   (instance.name, target_node))
2591
2592       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2593                                                ignore_secondaries=True)
2594       if not disks_ok:
2595         _ShutdownInstanceDisks(instance, self.cfg)
2596         raise errors.OpExecError("Can't activate the instance's disks")
2597
2598       feedback_fn("* starting the instance on the target node")
2599       if not rpc.call_instance_start(target_node, instance, None):
2600         _ShutdownInstanceDisks(instance, self.cfg)
2601         raise errors.OpExecError("Could not start instance %s on node %s." %
2602                                  (instance.name, target_node))
2603
2604
2605 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2606   """Create a tree of block devices on the primary node.
2607
2608   This always creates all devices.
2609
2610   """
2611   if device.children:
2612     for child in device.children:
2613       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2614         return False
2615
2616   cfg.SetDiskID(device, node)
2617   new_id = rpc.call_blockdev_create(node, device, device.size,
2618                                     instance.name, True, info)
2619   if not new_id:
2620     return False
2621   if device.physical_id is None:
2622     device.physical_id = new_id
2623   return True
2624
2625
2626 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2627   """Create a tree of block devices on a secondary node.
2628
2629   If this device type has to be created on secondaries, create it and
2630   all its children.
2631
2632   If not, just recurse to children keeping the same 'force' value.
2633
2634   """
2635   if device.CreateOnSecondary():
2636     force = True
2637   if device.children:
2638     for child in device.children:
2639       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2640                                         child, force, info):
2641         return False
2642
2643   if not force:
2644     return True
2645   cfg.SetDiskID(device, node)
2646   new_id = rpc.call_blockdev_create(node, device, device.size,
2647                                     instance.name, False, info)
2648   if not new_id:
2649     return False
2650   if device.physical_id is None:
2651     device.physical_id = new_id
2652   return True
2653
2654
2655 def _GenerateUniqueNames(cfg, exts):
2656   """Generate a suitable LV name.
2657
2658   This will generate a logical volume name for the given instance.
2659
2660   """
2661   results = []
2662   for val in exts:
2663     new_id = cfg.GenerateUniqueID()
2664     results.append("%s%s" % (new_id, val))
2665   return results
2666
2667
2668 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2669   """Generate a drbd8 device complete with its children.
2670
2671   """
2672   port = cfg.AllocatePort()
2673   vgname = cfg.GetVGName()
2674   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2675                           logical_id=(vgname, names[0]))
2676   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2677                           logical_id=(vgname, names[1]))
2678   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2679                           logical_id = (primary, secondary, port),
2680                           children = [dev_data, dev_meta],
2681                           iv_name=iv_name)
2682   return drbd_dev
2683
2684
2685 def _GenerateDiskTemplate(cfg, template_name,
2686                           instance_name, primary_node,
2687                           secondary_nodes, disk_sz, swap_sz,
2688                           file_storage_dir, file_driver):
2689   """Generate the entire disk layout for a given template type.
2690
2691   """
2692   #TODO: compute space requirements
2693
2694   vgname = cfg.GetVGName()
2695   if template_name == constants.DT_DISKLESS:
2696     disks = []
2697   elif template_name == constants.DT_PLAIN:
2698     if len(secondary_nodes) != 0:
2699       raise errors.ProgrammerError("Wrong template configuration")
2700
2701     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2702     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2703                            logical_id=(vgname, names[0]),
2704                            iv_name = "sda")
2705     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2706                            logical_id=(vgname, names[1]),
2707                            iv_name = "sdb")
2708     disks = [sda_dev, sdb_dev]
2709   elif template_name == constants.DT_DRBD8:
2710     if len(secondary_nodes) != 1:
2711       raise errors.ProgrammerError("Wrong template configuration")
2712     remote_node = secondary_nodes[0]
2713     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2714                                        ".sdb_data", ".sdb_meta"])
2715     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2716                                          disk_sz, names[0:2], "sda")
2717     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2718                                          swap_sz, names[2:4], "sdb")
2719     disks = [drbd_sda_dev, drbd_sdb_dev]
2720   elif template_name == constants.DT_FILE:
2721     if len(secondary_nodes) != 0:
2722       raise errors.ProgrammerError("Wrong template configuration")
2723
2724     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2725                                 iv_name="sda", logical_id=(file_driver,
2726                                 "%s/sda" % file_storage_dir))
2727     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2728                                 iv_name="sdb", logical_id=(file_driver,
2729                                 "%s/sdb" % file_storage_dir))
2730     disks = [file_sda_dev, file_sdb_dev]
2731   else:
2732     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2733   return disks
2734
2735
2736 def _GetInstanceInfoText(instance):
2737   """Compute that text that should be added to the disk's metadata.
2738
2739   """
2740   return "originstname+%s" % instance.name
2741
2742
2743 def _CreateDisks(cfg, instance):
2744   """Create all disks for an instance.
2745
2746   This abstracts away some work from AddInstance.
2747
2748   Args:
2749     instance: the instance object
2750
2751   Returns:
2752     True or False showing the success of the creation process
2753
2754   """
2755   info = _GetInstanceInfoText(instance)
2756
2757   if instance.disk_template == constants.DT_FILE:
2758     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2759     result = rpc.call_file_storage_dir_create(instance.primary_node,
2760                                               file_storage_dir)
2761
2762     if not result:
2763       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2764       return False
2765
2766     if not result[0]:
2767       logger.Error("failed to create directory '%s'" % file_storage_dir)
2768       return False
2769
2770   for device in instance.disks:
2771     logger.Info("creating volume %s for instance %s" %
2772                 (device.iv_name, instance.name))
2773     #HARDCODE
2774     for secondary_node in instance.secondary_nodes:
2775       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2776                                         device, False, info):
2777         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2778                      (device.iv_name, device, secondary_node))
2779         return False
2780     #HARDCODE
2781     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2782                                     instance, device, info):
2783       logger.Error("failed to create volume %s on primary!" %
2784                    device.iv_name)
2785       return False
2786
2787   return True
2788
2789
2790 def _RemoveDisks(instance, cfg):
2791   """Remove all disks for an instance.
2792
2793   This abstracts away some work from `AddInstance()` and
2794   `RemoveInstance()`. Note that in case some of the devices couldn't
2795   be removed, the removal will continue with the other ones (compare
2796   with `_CreateDisks()`).
2797
2798   Args:
2799     instance: the instance object
2800
2801   Returns:
2802     True or False showing the success of the removal proces
2803
2804   """
2805   logger.Info("removing block devices for instance %s" % instance.name)
2806
2807   result = True
2808   for device in instance.disks:
2809     for node, disk in device.ComputeNodeTree(instance.primary_node):
2810       cfg.SetDiskID(disk, node)
2811       if not rpc.call_blockdev_remove(node, disk):
2812         logger.Error("could not remove block device %s on node %s,"
2813                      " continuing anyway" %
2814                      (device.iv_name, node))
2815         result = False
2816
2817   if instance.disk_template == constants.DT_FILE:
2818     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2819     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2820                                             file_storage_dir):
2821       logger.Error("could not remove directory '%s'" % file_storage_dir)
2822       result = False
2823
2824   return result
2825
2826
2827 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2828   """Compute disk size requirements in the volume group
2829
2830   This is currently hard-coded for the two-drive layout.
2831
2832   """
2833   # Required free disk space as a function of disk and swap space
2834   req_size_dict = {
2835     constants.DT_DISKLESS: None,
2836     constants.DT_PLAIN: disk_size + swap_size,
2837     # 256 MB are added for drbd metadata, 128MB for each drbd device
2838     constants.DT_DRBD8: disk_size + swap_size + 256,
2839     constants.DT_FILE: None,
2840   }
2841
2842   if disk_template not in req_size_dict:
2843     raise errors.ProgrammerError("Disk template '%s' size requirement"
2844                                  " is unknown" %  disk_template)
2845
2846   return req_size_dict[disk_template]
2847
2848
2849 class LUCreateInstance(LogicalUnit):
2850   """Create an instance.
2851
2852   """
2853   HPATH = "instance-add"
2854   HTYPE = constants.HTYPE_INSTANCE
2855   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2856               "disk_template", "swap_size", "mode", "start", "vcpus",
2857               "wait_for_sync", "ip_check", "mac"]
2858
2859   def _RunAllocator(self):
2860     """Run the allocator based on input opcode.
2861
2862     """
2863     disks = [{"size": self.op.disk_size, "mode": "w"},
2864              {"size": self.op.swap_size, "mode": "w"}]
2865     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2866              "bridge": self.op.bridge}]
2867     ial = IAllocator(self.cfg, self.sstore,
2868                      mode=constants.IALLOCATOR_MODE_ALLOC,
2869                      name=self.op.instance_name,
2870                      disk_template=self.op.disk_template,
2871                      tags=[],
2872                      os=self.op.os_type,
2873                      vcpus=self.op.vcpus,
2874                      mem_size=self.op.mem_size,
2875                      disks=disks,
2876                      nics=nics,
2877                      )
2878
2879     ial.Run(self.op.iallocator)
2880
2881     if not ial.success:
2882       raise errors.OpPrereqError("Can't compute nodes using"
2883                                  " iallocator '%s': %s" % (self.op.iallocator,
2884                                                            ial.info))
2885     if len(ial.nodes) != ial.required_nodes:
2886       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2887                                  " of nodes (%s), required %s" %
2888                                  (len(ial.nodes), ial.required_nodes))
2889     self.op.pnode = ial.nodes[0]
2890     logger.ToStdout("Selected nodes for the instance: %s" %
2891                     (", ".join(ial.nodes),))
2892     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2893                 (self.op.instance_name, self.op.iallocator, ial.nodes))
2894     if ial.required_nodes == 2:
2895       self.op.snode = ial.nodes[1]
2896
2897   def BuildHooksEnv(self):
2898     """Build hooks env.
2899
2900     This runs on master, primary and secondary nodes of the instance.
2901
2902     """
2903     env = {
2904       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2905       "INSTANCE_DISK_SIZE": self.op.disk_size,
2906       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2907       "INSTANCE_ADD_MODE": self.op.mode,
2908       }
2909     if self.op.mode == constants.INSTANCE_IMPORT:
2910       env["INSTANCE_SRC_NODE"] = self.op.src_node
2911       env["INSTANCE_SRC_PATH"] = self.op.src_path
2912       env["INSTANCE_SRC_IMAGE"] = self.src_image
2913
2914     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2915       primary_node=self.op.pnode,
2916       secondary_nodes=self.secondaries,
2917       status=self.instance_status,
2918       os_type=self.op.os_type,
2919       memory=self.op.mem_size,
2920       vcpus=self.op.vcpus,
2921       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2922     ))
2923
2924     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2925           self.secondaries)
2926     return env, nl, nl
2927
2928
2929   def CheckPrereq(self):
2930     """Check prerequisites.
2931
2932     """
2933     # set optional parameters to none if they don't exist
2934     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2935                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2936                  "vnc_bind_address"]:
2937       if not hasattr(self.op, attr):
2938         setattr(self.op, attr, None)
2939
2940     if self.op.mode not in (constants.INSTANCE_CREATE,
2941                             constants.INSTANCE_IMPORT):
2942       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2943                                  self.op.mode)
2944
2945     if (not self.cfg.GetVGName() and
2946         self.op.disk_template not in constants.DTS_NOT_LVM):
2947       raise errors.OpPrereqError("Cluster does not support lvm-based"
2948                                  " instances")
2949
2950     if self.op.mode == constants.INSTANCE_IMPORT:
2951       src_node = getattr(self.op, "src_node", None)
2952       src_path = getattr(self.op, "src_path", None)
2953       if src_node is None or src_path is None:
2954         raise errors.OpPrereqError("Importing an instance requires source"
2955                                    " node and path options")
2956       src_node_full = self.cfg.ExpandNodeName(src_node)
2957       if src_node_full is None:
2958         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2959       self.op.src_node = src_node = src_node_full
2960
2961       if not os.path.isabs(src_path):
2962         raise errors.OpPrereqError("The source path must be absolute")
2963
2964       export_info = rpc.call_export_info(src_node, src_path)
2965
2966       if not export_info:
2967         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2968
2969       if not export_info.has_section(constants.INISECT_EXP):
2970         raise errors.ProgrammerError("Corrupted export config")
2971
2972       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2973       if (int(ei_version) != constants.EXPORT_VERSION):
2974         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2975                                    (ei_version, constants.EXPORT_VERSION))
2976
2977       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2978         raise errors.OpPrereqError("Can't import instance with more than"
2979                                    " one data disk")
2980
2981       # FIXME: are the old os-es, disk sizes, etc. useful?
2982       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2983       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2984                                                          'disk0_dump'))
2985       self.src_image = diskimage
2986     else: # INSTANCE_CREATE
2987       if getattr(self.op, "os_type", None) is None:
2988         raise errors.OpPrereqError("No guest OS specified")
2989
2990     #### instance parameters check
2991
2992     # disk template and mirror node verification
2993     if self.op.disk_template not in constants.DISK_TEMPLATES:
2994       raise errors.OpPrereqError("Invalid disk template name")
2995
2996     # instance name verification
2997     hostname1 = utils.HostInfo(self.op.instance_name)
2998
2999     self.op.instance_name = instance_name = hostname1.name
3000     instance_list = self.cfg.GetInstanceList()
3001     if instance_name in instance_list:
3002       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3003                                  instance_name)
3004
3005     # ip validity checks
3006     ip = getattr(self.op, "ip", None)
3007     if ip is None or ip.lower() == "none":
3008       inst_ip = None
3009     elif ip.lower() == "auto":
3010       inst_ip = hostname1.ip
3011     else:
3012       if not utils.IsValidIP(ip):
3013         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3014                                    " like a valid IP" % ip)
3015       inst_ip = ip
3016     self.inst_ip = self.op.ip = inst_ip
3017
3018     if self.op.start and not self.op.ip_check:
3019       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3020                                  " adding an instance in start mode")
3021
3022     if self.op.ip_check:
3023       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3024         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3025                                    (hostname1.ip, instance_name))
3026
3027     # MAC address verification
3028     if self.op.mac != "auto":
3029       if not utils.IsValidMac(self.op.mac.lower()):
3030         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3031                                    self.op.mac)
3032
3033     # bridge verification
3034     bridge = getattr(self.op, "bridge", None)
3035     if bridge is None:
3036       self.op.bridge = self.cfg.GetDefBridge()
3037     else:
3038       self.op.bridge = bridge
3039
3040     # boot order verification
3041     if self.op.hvm_boot_order is not None:
3042       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3043         raise errors.OpPrereqError("invalid boot order specified,"
3044                                    " must be one or more of [acdn]")
3045     # file storage checks
3046     if (self.op.file_driver and
3047         not self.op.file_driver in constants.FILE_DRIVER):
3048       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3049                                  self.op.file_driver)
3050
3051     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3052       raise errors.OpPrereqError("File storage directory not a relative"
3053                                  " path")
3054     #### allocator run
3055
3056     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3057       raise errors.OpPrereqError("One and only one of iallocator and primary"
3058                                  " node must be given")
3059
3060     if self.op.iallocator is not None:
3061       self._RunAllocator()
3062
3063     #### node related checks
3064
3065     # check primary node
3066     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3067     if pnode is None:
3068       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3069                                  self.op.pnode)
3070     self.op.pnode = pnode.name
3071     self.pnode = pnode
3072     self.secondaries = []
3073
3074     # mirror node verification
3075     if self.op.disk_template in constants.DTS_NET_MIRROR:
3076       if getattr(self.op, "snode", None) is None:
3077         raise errors.OpPrereqError("The networked disk templates need"
3078                                    " a mirror node")
3079
3080       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3081       if snode_name is None:
3082         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3083                                    self.op.snode)
3084       elif snode_name == pnode.name:
3085         raise errors.OpPrereqError("The secondary node cannot be"
3086                                    " the primary node.")
3087       self.secondaries.append(snode_name)
3088
3089     req_size = _ComputeDiskSize(self.op.disk_template,
3090                                 self.op.disk_size, self.op.swap_size)
3091
3092     # Check lv size requirements
3093     if req_size is not None:
3094       nodenames = [pnode.name] + self.secondaries
3095       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3096       for node in nodenames:
3097         info = nodeinfo.get(node, None)
3098         if not info:
3099           raise errors.OpPrereqError("Cannot get current information"
3100                                      " from node '%s'" % node)
3101         vg_free = info.get('vg_free', None)
3102         if not isinstance(vg_free, int):
3103           raise errors.OpPrereqError("Can't compute free disk space on"
3104                                      " node %s" % node)
3105         if req_size > info['vg_free']:
3106           raise errors.OpPrereqError("Not enough disk space on target node %s."
3107                                      " %d MB available, %d MB required" %
3108                                      (node, info['vg_free'], req_size))
3109
3110     # os verification
3111     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3112     if not os_obj:
3113       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3114                                  " primary node"  % self.op.os_type)
3115
3116     if self.op.kernel_path == constants.VALUE_NONE:
3117       raise errors.OpPrereqError("Can't set instance kernel to none")
3118
3119
3120     # bridge check on primary node
3121     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3122       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3123                                  " destination node '%s'" %
3124                                  (self.op.bridge, pnode.name))
3125
3126     # memory check on primary node
3127     if self.op.start:
3128       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3129                            "creating instance %s" % self.op.instance_name,
3130                            self.op.mem_size)
3131
3132     # hvm_cdrom_image_path verification
3133     if self.op.hvm_cdrom_image_path is not None:
3134       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3135         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3136                                    " be an absolute path or None, not %s" %
3137                                    self.op.hvm_cdrom_image_path)
3138       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3139         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3140                                    " regular file or a symlink pointing to"
3141                                    " an existing regular file, not %s" %
3142                                    self.op.hvm_cdrom_image_path)
3143
3144     # vnc_bind_address verification
3145     if self.op.vnc_bind_address is not None:
3146       if not utils.IsValidIP(self.op.vnc_bind_address):
3147         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3148                                    " like a valid IP address" %
3149                                    self.op.vnc_bind_address)
3150
3151     if self.op.start:
3152       self.instance_status = 'up'
3153     else:
3154       self.instance_status = 'down'
3155
3156   def Exec(self, feedback_fn):
3157     """Create and add the instance to the cluster.
3158
3159     """
3160     instance = self.op.instance_name
3161     pnode_name = self.pnode.name
3162
3163     if self.op.mac == "auto":
3164       mac_address = self.cfg.GenerateMAC()
3165     else:
3166       mac_address = self.op.mac
3167
3168     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3169     if self.inst_ip is not None:
3170       nic.ip = self.inst_ip
3171
3172     ht_kind = self.sstore.GetHypervisorType()
3173     if ht_kind in constants.HTS_REQ_PORT:
3174       network_port = self.cfg.AllocatePort()
3175     else:
3176       network_port = None
3177
3178     if self.op.vnc_bind_address is None:
3179       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3180
3181     # this is needed because os.path.join does not accept None arguments
3182     if self.op.file_storage_dir is None:
3183       string_file_storage_dir = ""
3184     else:
3185       string_file_storage_dir = self.op.file_storage_dir
3186
3187     # build the full file storage dir path
3188     file_storage_dir = os.path.normpath(os.path.join(
3189                                         self.sstore.GetFileStorageDir(),
3190                                         string_file_storage_dir, instance))
3191
3192
3193     disks = _GenerateDiskTemplate(self.cfg,
3194                                   self.op.disk_template,
3195                                   instance, pnode_name,
3196                                   self.secondaries, self.op.disk_size,
3197                                   self.op.swap_size,
3198                                   file_storage_dir,
3199                                   self.op.file_driver)
3200
3201     iobj = objects.Instance(name=instance, os=self.op.os_type,
3202                             primary_node=pnode_name,
3203                             memory=self.op.mem_size,
3204                             vcpus=self.op.vcpus,
3205                             nics=[nic], disks=disks,
3206                             disk_template=self.op.disk_template,
3207                             status=self.instance_status,
3208                             network_port=network_port,
3209                             kernel_path=self.op.kernel_path,
3210                             initrd_path=self.op.initrd_path,
3211                             hvm_boot_order=self.op.hvm_boot_order,
3212                             hvm_acpi=self.op.hvm_acpi,
3213                             hvm_pae=self.op.hvm_pae,
3214                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3215                             vnc_bind_address=self.op.vnc_bind_address,
3216                             )
3217
3218     feedback_fn("* creating instance disks...")
3219     if not _CreateDisks(self.cfg, iobj):
3220       _RemoveDisks(iobj, self.cfg)
3221       raise errors.OpExecError("Device creation failed, reverting...")
3222
3223     feedback_fn("adding instance %s to cluster config" % instance)
3224
3225     self.cfg.AddInstance(iobj)
3226
3227     if self.op.wait_for_sync:
3228       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3229     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3230       # make sure the disks are not degraded (still sync-ing is ok)
3231       time.sleep(15)
3232       feedback_fn("* checking mirrors status")
3233       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3234     else:
3235       disk_abort = False
3236
3237     if disk_abort:
3238       _RemoveDisks(iobj, self.cfg)
3239       self.cfg.RemoveInstance(iobj.name)
3240       raise errors.OpExecError("There are some degraded disks for"
3241                                " this instance")
3242
3243     feedback_fn("creating os for instance %s on node %s" %
3244                 (instance, pnode_name))
3245
3246     if iobj.disk_template != constants.DT_DISKLESS:
3247       if self.op.mode == constants.INSTANCE_CREATE:
3248         feedback_fn("* running the instance OS create scripts...")
3249         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3250           raise errors.OpExecError("could not add os for instance %s"
3251                                    " on node %s" %
3252                                    (instance, pnode_name))
3253
3254       elif self.op.mode == constants.INSTANCE_IMPORT:
3255         feedback_fn("* running the instance OS import scripts...")
3256         src_node = self.op.src_node
3257         src_image = self.src_image
3258         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3259                                                 src_node, src_image):
3260           raise errors.OpExecError("Could not import os for instance"
3261                                    " %s on node %s" %
3262                                    (instance, pnode_name))
3263       else:
3264         # also checked in the prereq part
3265         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3266                                      % self.op.mode)
3267
3268     if self.op.start:
3269       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3270       feedback_fn("* starting instance...")
3271       if not rpc.call_instance_start(pnode_name, iobj, None):
3272         raise errors.OpExecError("Could not start instance")
3273
3274
3275 class LUConnectConsole(NoHooksLU):
3276   """Connect to an instance's console.
3277
3278   This is somewhat special in that it returns the command line that
3279   you need to run on the master node in order to connect to the
3280   console.
3281
3282   """
3283   _OP_REQP = ["instance_name"]
3284
3285   def CheckPrereq(self):
3286     """Check prerequisites.
3287
3288     This checks that the instance is in the cluster.
3289
3290     """
3291     instance = self.cfg.GetInstanceInfo(
3292       self.cfg.ExpandInstanceName(self.op.instance_name))
3293     if instance is None:
3294       raise errors.OpPrereqError("Instance '%s' not known" %
3295                                  self.op.instance_name)
3296     self.instance = instance
3297
3298   def Exec(self, feedback_fn):
3299     """Connect to the console of an instance
3300
3301     """
3302     instance = self.instance
3303     node = instance.primary_node
3304
3305     node_insts = rpc.call_instance_list([node])[node]
3306     if node_insts is False:
3307       raise errors.OpExecError("Can't connect to node %s." % node)
3308
3309     if instance.name not in node_insts:
3310       raise errors.OpExecError("Instance %s is not running." % instance.name)
3311
3312     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3313
3314     hyper = hypervisor.GetHypervisor()
3315     console_cmd = hyper.GetShellCommandForConsole(instance)
3316
3317     # build ssh cmdline
3318     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3319
3320
3321 class LUReplaceDisks(LogicalUnit):
3322   """Replace the disks of an instance.
3323
3324   """
3325   HPATH = "mirrors-replace"
3326   HTYPE = constants.HTYPE_INSTANCE
3327   _OP_REQP = ["instance_name", "mode", "disks"]
3328
3329   def _RunAllocator(self):
3330     """Compute a new secondary node using an IAllocator.
3331
3332     """
3333     ial = IAllocator(self.cfg, self.sstore,
3334                      mode=constants.IALLOCATOR_MODE_RELOC,
3335                      name=self.op.instance_name,
3336                      relocate_from=[self.sec_node])
3337
3338     ial.Run(self.op.iallocator)
3339
3340     if not ial.success:
3341       raise errors.OpPrereqError("Can't compute nodes using"
3342                                  " iallocator '%s': %s" % (self.op.iallocator,
3343                                                            ial.info))
3344     if len(ial.nodes) != ial.required_nodes:
3345       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3346                                  " of nodes (%s), required %s" %
3347                                  (len(ial.nodes), ial.required_nodes))
3348     self.op.remote_node = ial.nodes[0]
3349     logger.ToStdout("Selected new secondary for the instance: %s" %
3350                     self.op.remote_node)
3351
3352   def BuildHooksEnv(self):
3353     """Build hooks env.
3354
3355     This runs on the master, the primary and all the secondaries.
3356
3357     """
3358     env = {
3359       "MODE": self.op.mode,
3360       "NEW_SECONDARY": self.op.remote_node,
3361       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3362       }
3363     env.update(_BuildInstanceHookEnvByObject(self.instance))
3364     nl = [
3365       self.sstore.GetMasterNode(),
3366       self.instance.primary_node,
3367       ]
3368     if self.op.remote_node is not None:
3369       nl.append(self.op.remote_node)
3370     return env, nl, nl
3371
3372   def CheckPrereq(self):
3373     """Check prerequisites.
3374
3375     This checks that the instance is in the cluster.
3376
3377     """
3378     if not hasattr(self.op, "remote_node"):
3379       self.op.remote_node = None
3380
3381     instance = self.cfg.GetInstanceInfo(
3382       self.cfg.ExpandInstanceName(self.op.instance_name))
3383     if instance is None:
3384       raise errors.OpPrereqError("Instance '%s' not known" %
3385                                  self.op.instance_name)
3386     self.instance = instance
3387     self.op.instance_name = instance.name
3388
3389     if instance.disk_template not in constants.DTS_NET_MIRROR:
3390       raise errors.OpPrereqError("Instance's disk layout is not"
3391                                  " network mirrored.")
3392
3393     if len(instance.secondary_nodes) != 1:
3394       raise errors.OpPrereqError("The instance has a strange layout,"
3395                                  " expected one secondary but found %d" %
3396                                  len(instance.secondary_nodes))
3397
3398     self.sec_node = instance.secondary_nodes[0]
3399
3400     ia_name = getattr(self.op, "iallocator", None)
3401     if ia_name is not None:
3402       if self.op.remote_node is not None:
3403         raise errors.OpPrereqError("Give either the iallocator or the new"
3404                                    " secondary, not both")
3405       self.op.remote_node = self._RunAllocator()
3406
3407     remote_node = self.op.remote_node
3408     if remote_node is not None:
3409       remote_node = self.cfg.ExpandNodeName(remote_node)
3410       if remote_node is None:
3411         raise errors.OpPrereqError("Node '%s' not known" %
3412                                    self.op.remote_node)
3413       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3414     else:
3415       self.remote_node_info = None
3416     if remote_node == instance.primary_node:
3417       raise errors.OpPrereqError("The specified node is the primary node of"
3418                                  " the instance.")
3419     elif remote_node == self.sec_node:
3420       if self.op.mode == constants.REPLACE_DISK_SEC:
3421         # this is for DRBD8, where we can't execute the same mode of
3422         # replacement as for drbd7 (no different port allocated)
3423         raise errors.OpPrereqError("Same secondary given, cannot execute"
3424                                    " replacement")
3425     if instance.disk_template == constants.DT_DRBD8:
3426       if (self.op.mode == constants.REPLACE_DISK_ALL and
3427           remote_node is not None):
3428         # switch to replace secondary mode
3429         self.op.mode = constants.REPLACE_DISK_SEC
3430
3431       if self.op.mode == constants.REPLACE_DISK_ALL:
3432         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3433                                    " secondary disk replacement, not"
3434                                    " both at once")
3435       elif self.op.mode == constants.REPLACE_DISK_PRI:
3436         if remote_node is not None:
3437           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3438                                      " the secondary while doing a primary"
3439                                      " node disk replacement")
3440         self.tgt_node = instance.primary_node
3441         self.oth_node = instance.secondary_nodes[0]
3442       elif self.op.mode == constants.REPLACE_DISK_SEC:
3443         self.new_node = remote_node # this can be None, in which case
3444                                     # we don't change the secondary
3445         self.tgt_node = instance.secondary_nodes[0]
3446         self.oth_node = instance.primary_node
3447       else:
3448         raise errors.ProgrammerError("Unhandled disk replace mode")
3449
3450     for name in self.op.disks:
3451       if instance.FindDisk(name) is None:
3452         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3453                                    (name, instance.name))
3454     self.op.remote_node = remote_node
3455
3456   def _ExecD8DiskOnly(self, feedback_fn):
3457     """Replace a disk on the primary or secondary for dbrd8.
3458
3459     The algorithm for replace is quite complicated:
3460       - for each disk to be replaced:
3461         - create new LVs on the target node with unique names
3462         - detach old LVs from the drbd device
3463         - rename old LVs to name_replaced.<time_t>
3464         - rename new LVs to old LVs
3465         - attach the new LVs (with the old names now) to the drbd device
3466       - wait for sync across all devices
3467       - for each modified disk:
3468         - remove old LVs (which have the name name_replaces.<time_t>)
3469
3470     Failures are not very well handled.
3471
3472     """
3473     steps_total = 6
3474     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3475     instance = self.instance
3476     iv_names = {}
3477     vgname = self.cfg.GetVGName()
3478     # start of work
3479     cfg = self.cfg
3480     tgt_node = self.tgt_node
3481     oth_node = self.oth_node
3482
3483     # Step: check device activation
3484     self.proc.LogStep(1, steps_total, "check device existence")
3485     info("checking volume groups")
3486     my_vg = cfg.GetVGName()
3487     results = rpc.call_vg_list([oth_node, tgt_node])
3488     if not results:
3489       raise errors.OpExecError("Can't list volume groups on the nodes")
3490     for node in oth_node, tgt_node:
3491       res = results.get(node, False)
3492       if not res or my_vg not in res:
3493         raise errors.OpExecError("Volume group '%s' not found on %s" %
3494                                  (my_vg, node))
3495     for dev in instance.disks:
3496       if not dev.iv_name in self.op.disks:
3497         continue
3498       for node in tgt_node, oth_node:
3499         info("checking %s on %s" % (dev.iv_name, node))
3500         cfg.SetDiskID(dev, node)
3501         if not rpc.call_blockdev_find(node, dev):
3502           raise errors.OpExecError("Can't find device %s on node %s" %
3503                                    (dev.iv_name, node))
3504
3505     # Step: check other node consistency
3506     self.proc.LogStep(2, steps_total, "check peer consistency")
3507     for dev in instance.disks:
3508       if not dev.iv_name in self.op.disks:
3509         continue
3510       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3511       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3512                                    oth_node==instance.primary_node):
3513         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3514                                  " to replace disks on this node (%s)" %
3515                                  (oth_node, tgt_node))
3516
3517     # Step: create new storage
3518     self.proc.LogStep(3, steps_total, "allocate new storage")
3519     for dev in instance.disks:
3520       if not dev.iv_name in self.op.disks:
3521         continue
3522       size = dev.size
3523       cfg.SetDiskID(dev, tgt_node)
3524       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3525       names = _GenerateUniqueNames(cfg, lv_names)
3526       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3527                              logical_id=(vgname, names[0]))
3528       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3529                              logical_id=(vgname, names[1]))
3530       new_lvs = [lv_data, lv_meta]
3531       old_lvs = dev.children
3532       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3533       info("creating new local storage on %s for %s" %
3534            (tgt_node, dev.iv_name))
3535       # since we *always* want to create this LV, we use the
3536       # _Create...OnPrimary (which forces the creation), even if we
3537       # are talking about the secondary node
3538       for new_lv in new_lvs:
3539         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3540                                         _GetInstanceInfoText(instance)):
3541           raise errors.OpExecError("Failed to create new LV named '%s' on"
3542                                    " node '%s'" %
3543                                    (new_lv.logical_id[1], tgt_node))
3544
3545     # Step: for each lv, detach+rename*2+attach
3546     self.proc.LogStep(4, steps_total, "change drbd configuration")
3547     for dev, old_lvs, new_lvs in iv_names.itervalues():
3548       info("detaching %s drbd from local storage" % dev.iv_name)
3549       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3550         raise errors.OpExecError("Can't detach drbd from local storage on node"
3551                                  " %s for device %s" % (tgt_node, dev.iv_name))
3552       #dev.children = []
3553       #cfg.Update(instance)
3554
3555       # ok, we created the new LVs, so now we know we have the needed
3556       # storage; as such, we proceed on the target node to rename
3557       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3558       # using the assumption that logical_id == physical_id (which in
3559       # turn is the unique_id on that node)
3560
3561       # FIXME(iustin): use a better name for the replaced LVs
3562       temp_suffix = int(time.time())
3563       ren_fn = lambda d, suff: (d.physical_id[0],
3564                                 d.physical_id[1] + "_replaced-%s" % suff)
3565       # build the rename list based on what LVs exist on the node
3566       rlist = []
3567       for to_ren in old_lvs:
3568         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3569         if find_res is not None: # device exists
3570           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3571
3572       info("renaming the old LVs on the target node")
3573       if not rpc.call_blockdev_rename(tgt_node, rlist):
3574         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3575       # now we rename the new LVs to the old LVs
3576       info("renaming the new LVs on the target node")
3577       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3578       if not rpc.call_blockdev_rename(tgt_node, rlist):
3579         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3580
3581       for old, new in zip(old_lvs, new_lvs):
3582         new.logical_id = old.logical_id
3583         cfg.SetDiskID(new, tgt_node)
3584
3585       for disk in old_lvs:
3586         disk.logical_id = ren_fn(disk, temp_suffix)
3587         cfg.SetDiskID(disk, tgt_node)
3588
3589       # now that the new lvs have the old name, we can add them to the device
3590       info("adding new mirror component on %s" % tgt_node)
3591       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3592         for new_lv in new_lvs:
3593           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3594             warning("Can't rollback device %s", hint="manually cleanup unused"
3595                     " logical volumes")
3596         raise errors.OpExecError("Can't add local storage to drbd")
3597
3598       dev.children = new_lvs
3599       cfg.Update(instance)
3600
3601     # Step: wait for sync
3602
3603     # this can fail as the old devices are degraded and _WaitForSync
3604     # does a combined result over all disks, so we don't check its
3605     # return value
3606     self.proc.LogStep(5, steps_total, "sync devices")
3607     _WaitForSync(cfg, instance, self.proc, unlock=True)
3608
3609     # so check manually all the devices
3610     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3611       cfg.SetDiskID(dev, instance.primary_node)
3612       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3613       if is_degr:
3614         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3615
3616     # Step: remove old storage
3617     self.proc.LogStep(6, steps_total, "removing old storage")
3618     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3619       info("remove logical volumes for %s" % name)
3620       for lv in old_lvs:
3621         cfg.SetDiskID(lv, tgt_node)
3622         if not rpc.call_blockdev_remove(tgt_node, lv):
3623           warning("Can't remove old LV", hint="manually remove unused LVs")
3624           continue
3625
3626   def _ExecD8Secondary(self, feedback_fn):
3627     """Replace the secondary node for drbd8.
3628
3629     The algorithm for replace is quite complicated:
3630       - for all disks of the instance:
3631         - create new LVs on the new node with same names
3632         - shutdown the drbd device on the old secondary
3633         - disconnect the drbd network on the primary
3634         - create the drbd device on the new secondary
3635         - network attach the drbd on the primary, using an artifice:
3636           the drbd code for Attach() will connect to the network if it
3637           finds a device which is connected to the good local disks but
3638           not network enabled
3639       - wait for sync across all devices
3640       - remove all disks from the old secondary
3641
3642     Failures are not very well handled.
3643
3644     """
3645     steps_total = 6
3646     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3647     instance = self.instance
3648     iv_names = {}
3649     vgname = self.cfg.GetVGName()
3650     # start of work
3651     cfg = self.cfg
3652     old_node = self.tgt_node
3653     new_node = self.new_node
3654     pri_node = instance.primary_node
3655
3656     # Step: check device activation
3657     self.proc.LogStep(1, steps_total, "check device existence")
3658     info("checking volume groups")
3659     my_vg = cfg.GetVGName()
3660     results = rpc.call_vg_list([pri_node, new_node])
3661     if not results:
3662       raise errors.OpExecError("Can't list volume groups on the nodes")
3663     for node in pri_node, new_node:
3664       res = results.get(node, False)
3665       if not res or my_vg not in res:
3666         raise errors.OpExecError("Volume group '%s' not found on %s" %
3667                                  (my_vg, node))
3668     for dev in instance.disks:
3669       if not dev.iv_name in self.op.disks:
3670         continue
3671       info("checking %s on %s" % (dev.iv_name, pri_node))
3672       cfg.SetDiskID(dev, pri_node)
3673       if not rpc.call_blockdev_find(pri_node, dev):
3674         raise errors.OpExecError("Can't find device %s on node %s" %
3675                                  (dev.iv_name, pri_node))
3676
3677     # Step: check other node consistency
3678     self.proc.LogStep(2, steps_total, "check peer consistency")
3679     for dev in instance.disks:
3680       if not dev.iv_name in self.op.disks:
3681         continue
3682       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3683       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3684         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3685                                  " unsafe to replace the secondary" %
3686                                  pri_node)
3687
3688     # Step: create new storage
3689     self.proc.LogStep(3, steps_total, "allocate new storage")
3690     for dev in instance.disks:
3691       size = dev.size
3692       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3693       # since we *always* want to create this LV, we use the
3694       # _Create...OnPrimary (which forces the creation), even if we
3695       # are talking about the secondary node
3696       for new_lv in dev.children:
3697         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3698                                         _GetInstanceInfoText(instance)):
3699           raise errors.OpExecError("Failed to create new LV named '%s' on"
3700                                    " node '%s'" %
3701                                    (new_lv.logical_id[1], new_node))
3702
3703       iv_names[dev.iv_name] = (dev, dev.children)
3704
3705     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3706     for dev in instance.disks:
3707       size = dev.size
3708       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3709       # create new devices on new_node
3710       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3711                               logical_id=(pri_node, new_node,
3712                                           dev.logical_id[2]),
3713                               children=dev.children)
3714       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3715                                         new_drbd, False,
3716                                       _GetInstanceInfoText(instance)):
3717         raise errors.OpExecError("Failed to create new DRBD on"
3718                                  " node '%s'" % new_node)
3719
3720     for dev in instance.disks:
3721       # we have new devices, shutdown the drbd on the old secondary
3722       info("shutting down drbd for %s on old node" % dev.iv_name)
3723       cfg.SetDiskID(dev, old_node)
3724       if not rpc.call_blockdev_shutdown(old_node, dev):
3725         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3726                 hint="Please cleanup this device manually as soon as possible")
3727
3728     info("detaching primary drbds from the network (=> standalone)")
3729     done = 0
3730     for dev in instance.disks:
3731       cfg.SetDiskID(dev, pri_node)
3732       # set the physical (unique in bdev terms) id to None, meaning
3733       # detach from network
3734       dev.physical_id = (None,) * len(dev.physical_id)
3735       # and 'find' the device, which will 'fix' it to match the
3736       # standalone state
3737       if rpc.call_blockdev_find(pri_node, dev):
3738         done += 1
3739       else:
3740         warning("Failed to detach drbd %s from network, unusual case" %
3741                 dev.iv_name)
3742
3743     if not done:
3744       # no detaches succeeded (very unlikely)
3745       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3746
3747     # if we managed to detach at least one, we update all the disks of
3748     # the instance to point to the new secondary
3749     info("updating instance configuration")
3750     for dev in instance.disks:
3751       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3752       cfg.SetDiskID(dev, pri_node)
3753     cfg.Update(instance)
3754
3755     # and now perform the drbd attach
3756     info("attaching primary drbds to new secondary (standalone => connected)")
3757     failures = []
3758     for dev in instance.disks:
3759       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3760       # since the attach is smart, it's enough to 'find' the device,
3761       # it will automatically activate the network, if the physical_id
3762       # is correct
3763       cfg.SetDiskID(dev, pri_node)
3764       if not rpc.call_blockdev_find(pri_node, dev):
3765         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3766                 "please do a gnt-instance info to see the status of disks")
3767
3768     # this can fail as the old devices are degraded and _WaitForSync
3769     # does a combined result over all disks, so we don't check its
3770     # return value
3771     self.proc.LogStep(5, steps_total, "sync devices")
3772     _WaitForSync(cfg, instance, self.proc, unlock=True)
3773
3774     # so check manually all the devices
3775     for name, (dev, old_lvs) in iv_names.iteritems():
3776       cfg.SetDiskID(dev, pri_node)
3777       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3778       if is_degr:
3779         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3780
3781     self.proc.LogStep(6, steps_total, "removing old storage")
3782     for name, (dev, old_lvs) in iv_names.iteritems():
3783       info("remove logical volumes for %s" % name)
3784       for lv in old_lvs:
3785         cfg.SetDiskID(lv, old_node)
3786         if not rpc.call_blockdev_remove(old_node, lv):
3787           warning("Can't remove LV on old secondary",
3788                   hint="Cleanup stale volumes by hand")
3789
3790   def Exec(self, feedback_fn):
3791     """Execute disk replacement.
3792
3793     This dispatches the disk replacement to the appropriate handler.
3794
3795     """
3796     instance = self.instance
3797
3798     # Activate the instance disks if we're replacing them on a down instance
3799     if instance.status == "down":
3800       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3801       self.proc.ChainOpCode(op)
3802
3803     if instance.disk_template == constants.DT_DRBD8:
3804       if self.op.remote_node is None:
3805         fn = self._ExecD8DiskOnly
3806       else:
3807         fn = self._ExecD8Secondary
3808     else:
3809       raise errors.ProgrammerError("Unhandled disk replacement case")
3810
3811     ret = fn(feedback_fn)
3812
3813     # Deactivate the instance disks if we're replacing them on a down instance
3814     if instance.status == "down":
3815       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3816       self.proc.ChainOpCode(op)
3817
3818     return ret
3819
3820
3821 class LUGrowDisk(LogicalUnit):
3822   """Grow a disk of an instance.
3823
3824   """
3825   HPATH = "disk-grow"
3826   HTYPE = constants.HTYPE_INSTANCE
3827   _OP_REQP = ["instance_name", "disk", "amount"]
3828
3829   def BuildHooksEnv(self):
3830     """Build hooks env.
3831
3832     This runs on the master, the primary and all the secondaries.
3833
3834     """
3835     env = {
3836       "DISK": self.op.disk,
3837       "AMOUNT": self.op.amount,
3838       }
3839     env.update(_BuildInstanceHookEnvByObject(self.instance))
3840     nl = [
3841       self.sstore.GetMasterNode(),
3842       self.instance.primary_node,
3843       ]
3844     return env, nl, nl
3845
3846   def CheckPrereq(self):
3847     """Check prerequisites.
3848
3849     This checks that the instance is in the cluster.
3850
3851     """
3852     instance = self.cfg.GetInstanceInfo(
3853       self.cfg.ExpandInstanceName(self.op.instance_name))
3854     if instance is None:
3855       raise errors.OpPrereqError("Instance '%s' not known" %
3856                                  self.op.instance_name)
3857     self.instance = instance
3858     self.op.instance_name = instance.name
3859
3860     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3861       raise errors.OpPrereqError("Instance's disk layout does not support"
3862                                  " growing.")
3863
3864     if instance.FindDisk(self.op.disk) is None:
3865       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3866                                  (self.op.disk, instance.name))
3867
3868     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3869     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3870     for node in nodenames:
3871       info = nodeinfo.get(node, None)
3872       if not info:
3873         raise errors.OpPrereqError("Cannot get current information"
3874                                    " from node '%s'" % node)
3875       vg_free = info.get('vg_free', None)
3876       if not isinstance(vg_free, int):
3877         raise errors.OpPrereqError("Can't compute free disk space on"
3878                                    " node %s" % node)
3879       if self.op.amount > info['vg_free']:
3880         raise errors.OpPrereqError("Not enough disk space on target node %s:"
3881                                    " %d MiB available, %d MiB required" %
3882                                    (node, info['vg_free'], self.op.amount))
3883
3884   def Exec(self, feedback_fn):
3885     """Execute disk grow.
3886
3887     """
3888     instance = self.instance
3889     disk = instance.FindDisk(self.op.disk)
3890     for node in (instance.secondary_nodes + (instance.primary_node,)):
3891       self.cfg.SetDiskID(disk, node)
3892       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3893       if not result or not isinstance(result, tuple) or len(result) != 2:
3894         raise errors.OpExecError("grow request failed to node %s" % node)
3895       elif not result[0]:
3896         raise errors.OpExecError("grow request failed to node %s: %s" %
3897                                  (node, result[1]))
3898     disk.RecordGrow(self.op.amount)
3899     self.cfg.Update(instance)
3900     return
3901
3902
3903 class LUQueryInstanceData(NoHooksLU):
3904   """Query runtime instance data.
3905
3906   """
3907   _OP_REQP = ["instances"]
3908
3909   def CheckPrereq(self):
3910     """Check prerequisites.
3911
3912     This only checks the optional instance list against the existing names.
3913
3914     """
3915     if not isinstance(self.op.instances, list):
3916       raise errors.OpPrereqError("Invalid argument type 'instances'")
3917     if self.op.instances:
3918       self.wanted_instances = []
3919       names = self.op.instances
3920       for name in names:
3921         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3922         if instance is None:
3923           raise errors.OpPrereqError("No such instance name '%s'" % name)
3924         self.wanted_instances.append(instance)
3925     else:
3926       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3927                                in self.cfg.GetInstanceList()]
3928     return
3929
3930
3931   def _ComputeDiskStatus(self, instance, snode, dev):
3932     """Compute block device status.
3933
3934     """
3935     self.cfg.SetDiskID(dev, instance.primary_node)
3936     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3937     if dev.dev_type in constants.LDS_DRBD:
3938       # we change the snode then (otherwise we use the one passed in)
3939       if dev.logical_id[0] == instance.primary_node:
3940         snode = dev.logical_id[1]
3941       else:
3942         snode = dev.logical_id[0]
3943
3944     if snode:
3945       self.cfg.SetDiskID(dev, snode)
3946       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3947     else:
3948       dev_sstatus = None
3949
3950     if dev.children:
3951       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3952                       for child in dev.children]
3953     else:
3954       dev_children = []
3955
3956     data = {
3957       "iv_name": dev.iv_name,
3958       "dev_type": dev.dev_type,
3959       "logical_id": dev.logical_id,
3960       "physical_id": dev.physical_id,
3961       "pstatus": dev_pstatus,
3962       "sstatus": dev_sstatus,
3963       "children": dev_children,
3964       }
3965
3966     return data
3967
3968   def Exec(self, feedback_fn):
3969     """Gather and return data"""
3970     result = {}
3971     for instance in self.wanted_instances:
3972       remote_info = rpc.call_instance_info(instance.primary_node,
3973                                                 instance.name)
3974       if remote_info and "state" in remote_info:
3975         remote_state = "up"
3976       else:
3977         remote_state = "down"
3978       if instance.status == "down":
3979         config_state = "down"
3980       else:
3981         config_state = "up"
3982
3983       disks = [self._ComputeDiskStatus(instance, None, device)
3984                for device in instance.disks]
3985
3986       idict = {
3987         "name": instance.name,
3988         "config_state": config_state,
3989         "run_state": remote_state,
3990         "pnode": instance.primary_node,
3991         "snodes": instance.secondary_nodes,
3992         "os": instance.os,
3993         "memory": instance.memory,
3994         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3995         "disks": disks,
3996         "vcpus": instance.vcpus,
3997         }
3998
3999       htkind = self.sstore.GetHypervisorType()
4000       if htkind == constants.HT_XEN_PVM30:
4001         idict["kernel_path"] = instance.kernel_path
4002         idict["initrd_path"] = instance.initrd_path
4003
4004       if htkind == constants.HT_XEN_HVM31:
4005         idict["hvm_boot_order"] = instance.hvm_boot_order
4006         idict["hvm_acpi"] = instance.hvm_acpi
4007         idict["hvm_pae"] = instance.hvm_pae
4008         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4009
4010       if htkind in constants.HTS_REQ_PORT:
4011         idict["vnc_bind_address"] = instance.vnc_bind_address
4012         idict["network_port"] = instance.network_port
4013
4014       result[instance.name] = idict
4015
4016     return result
4017
4018
4019 class LUSetInstanceParams(LogicalUnit):
4020   """Modifies an instances's parameters.
4021
4022   """
4023   HPATH = "instance-modify"
4024   HTYPE = constants.HTYPE_INSTANCE
4025   _OP_REQP = ["instance_name"]
4026
4027   def BuildHooksEnv(self):
4028     """Build hooks env.
4029
4030     This runs on the master, primary and secondaries.
4031
4032     """
4033     args = dict()
4034     if self.mem:
4035       args['memory'] = self.mem
4036     if self.vcpus:
4037       args['vcpus'] = self.vcpus
4038     if self.do_ip or self.do_bridge or self.mac:
4039       if self.do_ip:
4040         ip = self.ip
4041       else:
4042         ip = self.instance.nics[0].ip
4043       if self.bridge:
4044         bridge = self.bridge
4045       else:
4046         bridge = self.instance.nics[0].bridge
4047       if self.mac:
4048         mac = self.mac
4049       else:
4050         mac = self.instance.nics[0].mac
4051       args['nics'] = [(ip, bridge, mac)]
4052     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4053     nl = [self.sstore.GetMasterNode(),
4054           self.instance.primary_node] + list(self.instance.secondary_nodes)
4055     return env, nl, nl
4056
4057   def CheckPrereq(self):
4058     """Check prerequisites.
4059
4060     This only checks the instance list against the existing names.
4061
4062     """
4063     self.mem = getattr(self.op, "mem", None)
4064     self.vcpus = getattr(self.op, "vcpus", None)
4065     self.ip = getattr(self.op, "ip", None)
4066     self.mac = getattr(self.op, "mac", None)
4067     self.bridge = getattr(self.op, "bridge", None)
4068     self.kernel_path = getattr(self.op, "kernel_path", None)
4069     self.initrd_path = getattr(self.op, "initrd_path", None)
4070     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4071     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4072     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4073     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4074     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4075     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4076                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4077                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4078                  self.vnc_bind_address]
4079     if all_parms.count(None) == len(all_parms):
4080       raise errors.OpPrereqError("No changes submitted")
4081     if self.mem is not None:
4082       try:
4083         self.mem = int(self.mem)
4084       except ValueError, err:
4085         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4086     if self.vcpus is not None:
4087       try:
4088         self.vcpus = int(self.vcpus)
4089       except ValueError, err:
4090         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4091     if self.ip is not None:
4092       self.do_ip = True
4093       if self.ip.lower() == "none":
4094         self.ip = None
4095       else:
4096         if not utils.IsValidIP(self.ip):
4097           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4098     else:
4099       self.do_ip = False
4100     self.do_bridge = (self.bridge is not None)
4101     if self.mac is not None:
4102       if self.cfg.IsMacInUse(self.mac):
4103         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4104                                    self.mac)
4105       if not utils.IsValidMac(self.mac):
4106         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4107
4108     if self.kernel_path is not None:
4109       self.do_kernel_path = True
4110       if self.kernel_path == constants.VALUE_NONE:
4111         raise errors.OpPrereqError("Can't set instance to no kernel")
4112
4113       if self.kernel_path != constants.VALUE_DEFAULT:
4114         if not os.path.isabs(self.kernel_path):
4115           raise errors.OpPrereqError("The kernel path must be an absolute"
4116                                     " filename")
4117     else:
4118       self.do_kernel_path = False
4119
4120     if self.initrd_path is not None:
4121       self.do_initrd_path = True
4122       if self.initrd_path not in (constants.VALUE_NONE,
4123                                   constants.VALUE_DEFAULT):
4124         if not os.path.isabs(self.initrd_path):
4125           raise errors.OpPrereqError("The initrd path must be an absolute"
4126                                     " filename")
4127     else:
4128       self.do_initrd_path = False
4129
4130     # boot order verification
4131     if self.hvm_boot_order is not None:
4132       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4133         if len(self.hvm_boot_order.strip("acdn")) != 0:
4134           raise errors.OpPrereqError("invalid boot order specified,"
4135                                      " must be one or more of [acdn]"
4136                                      " or 'default'")
4137
4138     # hvm_cdrom_image_path verification
4139     if self.op.hvm_cdrom_image_path is not None:
4140       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4141         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4142                                    " be an absolute path or None, not %s" %
4143                                    self.op.hvm_cdrom_image_path)
4144       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4145         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4146                                    " regular file or a symlink pointing to"
4147                                    " an existing regular file, not %s" %
4148                                    self.op.hvm_cdrom_image_path)
4149
4150     # vnc_bind_address verification
4151     if self.op.vnc_bind_address is not None:
4152       if not utils.IsValidIP(self.op.vnc_bind_address):
4153         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4154                                    " like a valid IP address" %
4155                                    self.op.vnc_bind_address)
4156
4157     instance = self.cfg.GetInstanceInfo(
4158       self.cfg.ExpandInstanceName(self.op.instance_name))
4159     if instance is None:
4160       raise errors.OpPrereqError("No such instance name '%s'" %
4161                                  self.op.instance_name)
4162     self.op.instance_name = instance.name
4163     self.instance = instance
4164     return
4165
4166   def Exec(self, feedback_fn):
4167     """Modifies an instance.
4168
4169     All parameters take effect only at the next restart of the instance.
4170     """
4171     result = []
4172     instance = self.instance
4173     if self.mem:
4174       instance.memory = self.mem
4175       result.append(("mem", self.mem))
4176     if self.vcpus:
4177       instance.vcpus = self.vcpus
4178       result.append(("vcpus",  self.vcpus))
4179     if self.do_ip:
4180       instance.nics[0].ip = self.ip
4181       result.append(("ip", self.ip))
4182     if self.bridge:
4183       instance.nics[0].bridge = self.bridge
4184       result.append(("bridge", self.bridge))
4185     if self.mac:
4186       instance.nics[0].mac = self.mac
4187       result.append(("mac", self.mac))
4188     if self.do_kernel_path:
4189       instance.kernel_path = self.kernel_path
4190       result.append(("kernel_path", self.kernel_path))
4191     if self.do_initrd_path:
4192       instance.initrd_path = self.initrd_path
4193       result.append(("initrd_path", self.initrd_path))
4194     if self.hvm_boot_order:
4195       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4196         instance.hvm_boot_order = None
4197       else:
4198         instance.hvm_boot_order = self.hvm_boot_order
4199       result.append(("hvm_boot_order", self.hvm_boot_order))
4200     if self.hvm_acpi:
4201       instance.hvm_acpi = self.hvm_acpi
4202       result.append(("hvm_acpi", self.hvm_acpi))
4203     if self.hvm_pae:
4204       instance.hvm_pae = self.hvm_pae
4205       result.append(("hvm_pae", self.hvm_pae))
4206     if self.hvm_cdrom_image_path:
4207       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4208       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4209     if self.vnc_bind_address:
4210       instance.vnc_bind_address = self.vnc_bind_address
4211       result.append(("vnc_bind_address", self.vnc_bind_address))
4212
4213     self.cfg.AddInstance(instance)
4214
4215     return result
4216
4217
4218 class LUQueryExports(NoHooksLU):
4219   """Query the exports list
4220
4221   """
4222   _OP_REQP = []
4223
4224   def CheckPrereq(self):
4225     """Check that the nodelist contains only existing nodes.
4226
4227     """
4228     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4229
4230   def Exec(self, feedback_fn):
4231     """Compute the list of all the exported system images.
4232
4233     Returns:
4234       a dictionary with the structure node->(export-list)
4235       where export-list is a list of the instances exported on
4236       that node.
4237
4238     """
4239     return rpc.call_export_list(self.nodes)
4240
4241
4242 class LUExportInstance(LogicalUnit):
4243   """Export an instance to an image in the cluster.
4244
4245   """
4246   HPATH = "instance-export"
4247   HTYPE = constants.HTYPE_INSTANCE
4248   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4249
4250   def BuildHooksEnv(self):
4251     """Build hooks env.
4252
4253     This will run on the master, primary node and target node.
4254
4255     """
4256     env = {
4257       "EXPORT_NODE": self.op.target_node,
4258       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4259       }
4260     env.update(_BuildInstanceHookEnvByObject(self.instance))
4261     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4262           self.op.target_node]
4263     return env, nl, nl
4264
4265   def CheckPrereq(self):
4266     """Check prerequisites.
4267
4268     This checks that the instance and node names are valid.
4269
4270     """
4271     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4272     self.instance = self.cfg.GetInstanceInfo(instance_name)
4273     if self.instance is None:
4274       raise errors.OpPrereqError("Instance '%s' not found" %
4275                                  self.op.instance_name)
4276
4277     # node verification
4278     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4279     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4280
4281     if self.dst_node is None:
4282       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4283                                  self.op.target_node)
4284     self.op.target_node = self.dst_node.name
4285
4286     # instance disk type verification
4287     for disk in self.instance.disks:
4288       if disk.dev_type == constants.LD_FILE:
4289         raise errors.OpPrereqError("Export not supported for instances with"
4290                                    " file-based disks")
4291
4292   def Exec(self, feedback_fn):
4293     """Export an instance to an image in the cluster.
4294
4295     """
4296     instance = self.instance
4297     dst_node = self.dst_node
4298     src_node = instance.primary_node
4299     if self.op.shutdown:
4300       # shutdown the instance, but not the disks
4301       if not rpc.call_instance_shutdown(src_node, instance):
4302          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4303                                   (instance.name, src_node))
4304
4305     vgname = self.cfg.GetVGName()
4306
4307     snap_disks = []
4308
4309     try:
4310       for disk in instance.disks:
4311         if disk.iv_name == "sda":
4312           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4313           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4314
4315           if not new_dev_name:
4316             logger.Error("could not snapshot block device %s on node %s" %
4317                          (disk.logical_id[1], src_node))
4318           else:
4319             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4320                                       logical_id=(vgname, new_dev_name),
4321                                       physical_id=(vgname, new_dev_name),
4322                                       iv_name=disk.iv_name)
4323             snap_disks.append(new_dev)
4324
4325     finally:
4326       if self.op.shutdown and instance.status == "up":
4327         if not rpc.call_instance_start(src_node, instance, None):
4328           _ShutdownInstanceDisks(instance, self.cfg)
4329           raise errors.OpExecError("Could not start instance")
4330
4331     # TODO: check for size
4332
4333     for dev in snap_disks:
4334       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4335         logger.Error("could not export block device %s from node %s to node %s"
4336                      % (dev.logical_id[1], src_node, dst_node.name))
4337       if not rpc.call_blockdev_remove(src_node, dev):
4338         logger.Error("could not remove snapshot block device %s from node %s" %
4339                      (dev.logical_id[1], src_node))
4340
4341     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4342       logger.Error("could not finalize export for instance %s on node %s" %
4343                    (instance.name, dst_node.name))
4344
4345     nodelist = self.cfg.GetNodeList()
4346     nodelist.remove(dst_node.name)
4347
4348     # on one-node clusters nodelist will be empty after the removal
4349     # if we proceed the backup would be removed because OpQueryExports
4350     # substitutes an empty list with the full cluster node list.
4351     if nodelist:
4352       op = opcodes.OpQueryExports(nodes=nodelist)
4353       exportlist = self.proc.ChainOpCode(op)
4354       for node in exportlist:
4355         if instance.name in exportlist[node]:
4356           if not rpc.call_export_remove(node, instance.name):
4357             logger.Error("could not remove older export for instance %s"
4358                          " on node %s" % (instance.name, node))
4359
4360
4361 class LURemoveExport(NoHooksLU):
4362   """Remove exports related to the named instance.
4363
4364   """
4365   _OP_REQP = ["instance_name"]
4366
4367   def CheckPrereq(self):
4368     """Check prerequisites.
4369     """
4370     pass
4371
4372   def Exec(self, feedback_fn):
4373     """Remove any export.
4374
4375     """
4376     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4377     # If the instance was not found we'll try with the name that was passed in.
4378     # This will only work if it was an FQDN, though.
4379     fqdn_warn = False
4380     if not instance_name:
4381       fqdn_warn = True
4382       instance_name = self.op.instance_name
4383
4384     op = opcodes.OpQueryExports(nodes=[])
4385     exportlist = self.proc.ChainOpCode(op)
4386     found = False
4387     for node in exportlist:
4388       if instance_name in exportlist[node]:
4389         found = True
4390         if not rpc.call_export_remove(node, instance_name):
4391           logger.Error("could not remove export for instance %s"
4392                        " on node %s" % (instance_name, node))
4393
4394     if fqdn_warn and not found:
4395       feedback_fn("Export not found. If trying to remove an export belonging"
4396                   " to a deleted instance please use its Fully Qualified"
4397                   " Domain Name.")
4398
4399
4400 class TagsLU(NoHooksLU):
4401   """Generic tags LU.
4402
4403   This is an abstract class which is the parent of all the other tags LUs.
4404
4405   """
4406   def CheckPrereq(self):
4407     """Check prerequisites.
4408
4409     """
4410     if self.op.kind == constants.TAG_CLUSTER:
4411       self.target = self.cfg.GetClusterInfo()
4412     elif self.op.kind == constants.TAG_NODE:
4413       name = self.cfg.ExpandNodeName(self.op.name)
4414       if name is None:
4415         raise errors.OpPrereqError("Invalid node name (%s)" %
4416                                    (self.op.name,))
4417       self.op.name = name
4418       self.target = self.cfg.GetNodeInfo(name)
4419     elif self.op.kind == constants.TAG_INSTANCE:
4420       name = self.cfg.ExpandInstanceName(self.op.name)
4421       if name is None:
4422         raise errors.OpPrereqError("Invalid instance name (%s)" %
4423                                    (self.op.name,))
4424       self.op.name = name
4425       self.target = self.cfg.GetInstanceInfo(name)
4426     else:
4427       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4428                                  str(self.op.kind))
4429
4430
4431 class LUGetTags(TagsLU):
4432   """Returns the tags of a given object.
4433
4434   """
4435   _OP_REQP = ["kind", "name"]
4436
4437   def Exec(self, feedback_fn):
4438     """Returns the tag list.
4439
4440     """
4441     return self.target.GetTags()
4442
4443
4444 class LUSearchTags(NoHooksLU):
4445   """Searches the tags for a given pattern.
4446
4447   """
4448   _OP_REQP = ["pattern"]
4449
4450   def CheckPrereq(self):
4451     """Check prerequisites.
4452
4453     This checks the pattern passed for validity by compiling it.
4454
4455     """
4456     try:
4457       self.re = re.compile(self.op.pattern)
4458     except re.error, err:
4459       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4460                                  (self.op.pattern, err))
4461
4462   def Exec(self, feedback_fn):
4463     """Returns the tag list.
4464
4465     """
4466     cfg = self.cfg
4467     tgts = [("/cluster", cfg.GetClusterInfo())]
4468     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4469     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4470     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4471     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4472     results = []
4473     for path, target in tgts:
4474       for tag in target.GetTags():
4475         if self.re.search(tag):
4476           results.append((path, tag))
4477     return results
4478
4479
4480 class LUAddTags(TagsLU):
4481   """Sets a tag on a given object.
4482
4483   """
4484   _OP_REQP = ["kind", "name", "tags"]
4485
4486   def CheckPrereq(self):
4487     """Check prerequisites.
4488
4489     This checks the type and length of the tag name and value.
4490
4491     """
4492     TagsLU.CheckPrereq(self)
4493     for tag in self.op.tags:
4494       objects.TaggableObject.ValidateTag(tag)
4495
4496   def Exec(self, feedback_fn):
4497     """Sets the tag.
4498
4499     """
4500     try:
4501       for tag in self.op.tags:
4502         self.target.AddTag(tag)
4503     except errors.TagError, err:
4504       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4505     try:
4506       self.cfg.Update(self.target)
4507     except errors.ConfigurationError:
4508       raise errors.OpRetryError("There has been a modification to the"
4509                                 " config file and the operation has been"
4510                                 " aborted. Please retry.")
4511
4512
4513 class LUDelTags(TagsLU):
4514   """Delete a list of tags from a given object.
4515
4516   """
4517   _OP_REQP = ["kind", "name", "tags"]
4518
4519   def CheckPrereq(self):
4520     """Check prerequisites.
4521
4522     This checks that we have the given tag.
4523
4524     """
4525     TagsLU.CheckPrereq(self)
4526     for tag in self.op.tags:
4527       objects.TaggableObject.ValidateTag(tag)
4528     del_tags = frozenset(self.op.tags)
4529     cur_tags = self.target.GetTags()
4530     if not del_tags <= cur_tags:
4531       diff_tags = del_tags - cur_tags
4532       diff_names = ["'%s'" % tag for tag in diff_tags]
4533       diff_names.sort()
4534       raise errors.OpPrereqError("Tag(s) %s not found" %
4535                                  (",".join(diff_names)))
4536
4537   def Exec(self, feedback_fn):
4538     """Remove the tag from the object.
4539
4540     """
4541     for tag in self.op.tags:
4542       self.target.RemoveTag(tag)
4543     try:
4544       self.cfg.Update(self.target)
4545     except errors.ConfigurationError:
4546       raise errors.OpRetryError("There has been a modification to the"
4547                                 " config file and the operation has been"
4548                                 " aborted. Please retry.")
4549
4550 class LUTestDelay(NoHooksLU):
4551   """Sleep for a specified amount of time.
4552
4553   This LU sleeps on the master and/or nodes for a specified amoutn of
4554   time.
4555
4556   """
4557   _OP_REQP = ["duration", "on_master", "on_nodes"]
4558
4559   def CheckPrereq(self):
4560     """Check prerequisites.
4561
4562     This checks that we have a good list of nodes and/or the duration
4563     is valid.
4564
4565     """
4566
4567     if self.op.on_nodes:
4568       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4569
4570   def Exec(self, feedback_fn):
4571     """Do the actual sleep.
4572
4573     """
4574     if self.op.on_master:
4575       if not utils.TestDelay(self.op.duration):
4576         raise errors.OpExecError("Error during master delay test")
4577     if self.op.on_nodes:
4578       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4579       if not result:
4580         raise errors.OpExecError("Complete failure from rpc call")
4581       for node, node_result in result.items():
4582         if not node_result:
4583           raise errors.OpExecError("Failure during rpc call to node %s,"
4584                                    " result: %s" % (node, node_result))
4585
4586
4587 class IAllocator(object):
4588   """IAllocator framework.
4589
4590   An IAllocator instance has three sets of attributes:
4591     - cfg/sstore that are needed to query the cluster
4592     - input data (all members of the _KEYS class attribute are required)
4593     - four buffer attributes (in|out_data|text), that represent the
4594       input (to the external script) in text and data structure format,
4595       and the output from it, again in two formats
4596     - the result variables from the script (success, info, nodes) for
4597       easy usage
4598
4599   """
4600   _ALLO_KEYS = [
4601     "mem_size", "disks", "disk_template",
4602     "os", "tags", "nics", "vcpus",
4603     ]
4604   _RELO_KEYS = [
4605     "relocate_from",
4606     ]
4607
4608   def __init__(self, cfg, sstore, mode, name, **kwargs):
4609     self.cfg = cfg
4610     self.sstore = sstore
4611     # init buffer variables
4612     self.in_text = self.out_text = self.in_data = self.out_data = None
4613     # init all input fields so that pylint is happy
4614     self.mode = mode
4615     self.name = name
4616     self.mem_size = self.disks = self.disk_template = None
4617     self.os = self.tags = self.nics = self.vcpus = None
4618     self.relocate_from = None
4619     # computed fields
4620     self.required_nodes = None
4621     # init result fields
4622     self.success = self.info = self.nodes = None
4623     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4624       keyset = self._ALLO_KEYS
4625     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4626       keyset = self._RELO_KEYS
4627     else:
4628       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4629                                    " IAllocator" % self.mode)
4630     for key in kwargs:
4631       if key not in keyset:
4632         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4633                                      " IAllocator" % key)
4634       setattr(self, key, kwargs[key])
4635     for key in keyset:
4636       if key not in kwargs:
4637         raise errors.ProgrammerError("Missing input parameter '%s' to"
4638                                      " IAllocator" % key)
4639     self._BuildInputData()
4640
4641   def _ComputeClusterData(self):
4642     """Compute the generic allocator input data.
4643
4644     This is the data that is independent of the actual operation.
4645
4646     """
4647     cfg = self.cfg
4648     # cluster data
4649     data = {
4650       "version": 1,
4651       "cluster_name": self.sstore.GetClusterName(),
4652       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4653       "hypervisor_type": self.sstore.GetHypervisorType(),
4654       # we don't have job IDs
4655       }
4656
4657     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4658
4659     # node data
4660     node_results = {}
4661     node_list = cfg.GetNodeList()
4662     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4663     for nname in node_list:
4664       ninfo = cfg.GetNodeInfo(nname)
4665       if nname not in node_data or not isinstance(node_data[nname], dict):
4666         raise errors.OpExecError("Can't get data for node %s" % nname)
4667       remote_info = node_data[nname]
4668       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4669                    'vg_size', 'vg_free', 'cpu_total']:
4670         if attr not in remote_info:
4671           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4672                                    (nname, attr))
4673         try:
4674           remote_info[attr] = int(remote_info[attr])
4675         except ValueError, err:
4676           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4677                                    " %s" % (nname, attr, str(err)))
4678       # compute memory used by primary instances
4679       i_p_mem = i_p_up_mem = 0
4680       for iinfo in i_list:
4681         if iinfo.primary_node == nname:
4682           i_p_mem += iinfo.memory
4683           if iinfo.status == "up":
4684             i_p_up_mem += iinfo.memory
4685
4686       # compute memory used by instances
4687       pnr = {
4688         "tags": list(ninfo.GetTags()),
4689         "total_memory": remote_info['memory_total'],
4690         "reserved_memory": remote_info['memory_dom0'],
4691         "free_memory": remote_info['memory_free'],
4692         "i_pri_memory": i_p_mem,
4693         "i_pri_up_memory": i_p_up_mem,
4694         "total_disk": remote_info['vg_size'],
4695         "free_disk": remote_info['vg_free'],
4696         "primary_ip": ninfo.primary_ip,
4697         "secondary_ip": ninfo.secondary_ip,
4698         "total_cpus": remote_info['cpu_total'],
4699         }
4700       node_results[nname] = pnr
4701     data["nodes"] = node_results
4702
4703     # instance data
4704     instance_data = {}
4705     for iinfo in i_list:
4706       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4707                   for n in iinfo.nics]
4708       pir = {
4709         "tags": list(iinfo.GetTags()),
4710         "should_run": iinfo.status == "up",
4711         "vcpus": iinfo.vcpus,
4712         "memory": iinfo.memory,
4713         "os": iinfo.os,
4714         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4715         "nics": nic_data,
4716         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4717         "disk_template": iinfo.disk_template,
4718         }
4719       instance_data[iinfo.name] = pir
4720
4721     data["instances"] = instance_data
4722
4723     self.in_data = data
4724
4725   def _AddNewInstance(self):
4726     """Add new instance data to allocator structure.
4727
4728     This in combination with _AllocatorGetClusterData will create the
4729     correct structure needed as input for the allocator.
4730
4731     The checks for the completeness of the opcode must have already been
4732     done.
4733
4734     """
4735     data = self.in_data
4736     if len(self.disks) != 2:
4737       raise errors.OpExecError("Only two-disk configurations supported")
4738
4739     disk_space = _ComputeDiskSize(self.disk_template,
4740                                   self.disks[0]["size"], self.disks[1]["size"])
4741
4742     if self.disk_template in constants.DTS_NET_MIRROR:
4743       self.required_nodes = 2
4744     else:
4745       self.required_nodes = 1
4746     request = {
4747       "type": "allocate",
4748       "name": self.name,
4749       "disk_template": self.disk_template,
4750       "tags": self.tags,
4751       "os": self.os,
4752       "vcpus": self.vcpus,
4753       "memory": self.mem_size,
4754       "disks": self.disks,
4755       "disk_space_total": disk_space,
4756       "nics": self.nics,
4757       "required_nodes": self.required_nodes,
4758       }
4759     data["request"] = request
4760
4761   def _AddRelocateInstance(self):
4762     """Add relocate instance data to allocator structure.
4763
4764     This in combination with _IAllocatorGetClusterData will create the
4765     correct structure needed as input for the allocator.
4766
4767     The checks for the completeness of the opcode must have already been
4768     done.
4769
4770     """
4771     instance = self.cfg.GetInstanceInfo(self.name)
4772     if instance is None:
4773       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4774                                    " IAllocator" % self.name)
4775
4776     if instance.disk_template not in constants.DTS_NET_MIRROR:
4777       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4778
4779     if len(instance.secondary_nodes) != 1:
4780       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4781
4782     self.required_nodes = 1
4783
4784     disk_space = _ComputeDiskSize(instance.disk_template,
4785                                   instance.disks[0].size,
4786                                   instance.disks[1].size)
4787
4788     request = {
4789       "type": "relocate",
4790       "name": self.name,
4791       "disk_space_total": disk_space,
4792       "required_nodes": self.required_nodes,
4793       "relocate_from": self.relocate_from,
4794       }
4795     self.in_data["request"] = request
4796
4797   def _BuildInputData(self):
4798     """Build input data structures.
4799
4800     """
4801     self._ComputeClusterData()
4802
4803     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4804       self._AddNewInstance()
4805     else:
4806       self._AddRelocateInstance()
4807
4808     self.in_text = serializer.Dump(self.in_data)
4809
4810   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4811     """Run an instance allocator and return the results.
4812
4813     """
4814     data = self.in_text
4815
4816     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4817
4818     if not isinstance(result, tuple) or len(result) != 4:
4819       raise errors.OpExecError("Invalid result from master iallocator runner")
4820
4821     rcode, stdout, stderr, fail = result
4822
4823     if rcode == constants.IARUN_NOTFOUND:
4824       raise errors.OpExecError("Can't find allocator '%s'" % name)
4825     elif rcode == constants.IARUN_FAILURE:
4826         raise errors.OpExecError("Instance allocator call failed: %s,"
4827                                  " output: %s" %
4828                                  (fail, stdout+stderr))
4829     self.out_text = stdout
4830     if validate:
4831       self._ValidateResult()
4832
4833   def _ValidateResult(self):
4834     """Process the allocator results.
4835
4836     This will process and if successful save the result in
4837     self.out_data and the other parameters.
4838
4839     """
4840     try:
4841       rdict = serializer.Load(self.out_text)
4842     except Exception, err:
4843       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4844
4845     if not isinstance(rdict, dict):
4846       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4847
4848     for key in "success", "info", "nodes":
4849       if key not in rdict:
4850         raise errors.OpExecError("Can't parse iallocator results:"
4851                                  " missing key '%s'" % key)
4852       setattr(self, key, rdict[key])
4853
4854     if not isinstance(rdict["nodes"], list):
4855       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4856                                " is not a list")
4857     self.out_data = rdict
4858
4859
4860 class LUTestAllocator(NoHooksLU):
4861   """Run allocator tests.
4862
4863   This LU runs the allocator tests
4864
4865   """
4866   _OP_REQP = ["direction", "mode", "name"]
4867
4868   def CheckPrereq(self):
4869     """Check prerequisites.
4870
4871     This checks the opcode parameters depending on the director and mode test.
4872
4873     """
4874     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4875       for attr in ["name", "mem_size", "disks", "disk_template",
4876                    "os", "tags", "nics", "vcpus"]:
4877         if not hasattr(self.op, attr):
4878           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4879                                      attr)
4880       iname = self.cfg.ExpandInstanceName(self.op.name)
4881       if iname is not None:
4882         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4883                                    iname)
4884       if not isinstance(self.op.nics, list):
4885         raise errors.OpPrereqError("Invalid parameter 'nics'")
4886       for row in self.op.nics:
4887         if (not isinstance(row, dict) or
4888             "mac" not in row or
4889             "ip" not in row or
4890             "bridge" not in row):
4891           raise errors.OpPrereqError("Invalid contents of the"
4892                                      " 'nics' parameter")
4893       if not isinstance(self.op.disks, list):
4894         raise errors.OpPrereqError("Invalid parameter 'disks'")
4895       if len(self.op.disks) != 2:
4896         raise errors.OpPrereqError("Only two-disk configurations supported")
4897       for row in self.op.disks:
4898         if (not isinstance(row, dict) or
4899             "size" not in row or
4900             not isinstance(row["size"], int) or
4901             "mode" not in row or
4902             row["mode"] not in ['r', 'w']):
4903           raise errors.OpPrereqError("Invalid contents of the"
4904                                      " 'disks' parameter")
4905     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4906       if not hasattr(self.op, "name"):
4907         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4908       fname = self.cfg.ExpandInstanceName(self.op.name)
4909       if fname is None:
4910         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4911                                    self.op.name)
4912       self.op.name = fname
4913       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4914     else:
4915       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4916                                  self.op.mode)
4917
4918     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4919       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4920         raise errors.OpPrereqError("Missing allocator name")
4921     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4922       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4923                                  self.op.direction)
4924
4925   def Exec(self, feedback_fn):
4926     """Run the allocator test.
4927
4928     """
4929     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4930       ial = IAllocator(self.cfg, self.sstore,
4931                        mode=self.op.mode,
4932                        name=self.op.name,
4933                        mem_size=self.op.mem_size,
4934                        disks=self.op.disks,
4935                        disk_template=self.op.disk_template,
4936                        os=self.op.os,
4937                        tags=self.op.tags,
4938                        nics=self.op.nics,
4939                        vcpus=self.op.vcpus,
4940                        )
4941     else:
4942       ial = IAllocator(self.cfg, self.sstore,
4943                        mode=self.op.mode,
4944                        name=self.op.name,
4945                        relocate_from=list(self.relocate_from),
4946                        )
4947
4948     if self.op.direction == constants.IALLOCATOR_DIR_IN:
4949       result = ial.in_text
4950     else:
4951       ial.Run(self.op.allocator, validate=False)
4952       result = ial.out_text
4953     return result