Move InitCluster opcode into a single function
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_MASTER); note that all
58       commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.proc = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     self.__ssh = None
78
79     for attr_name in self._OP_REQP:
80       attr_val = getattr(op, attr_name, None)
81       if attr_val is None:
82         raise errors.OpPrereqError("Required parameter '%s' missing" %
83                                    attr_name)
84
85     if not cfg.IsCluster():
86       raise errors.OpPrereqError("Cluster not initialized yet,"
87                                  " use 'gnt-cluster init' first.")
88     if self.REQ_MASTER:
89       master = sstore.GetMasterNode()
90       if master != utils.HostInfo().name:
91         raise errors.OpPrereqError("Commands must be run on the master"
92                                    " node %s" % master)
93
94   def __GetSSH(self):
95     """Returns the SshRunner object
96
97     """
98     if not self.__ssh:
99       self.__ssh = ssh.SshRunner(self.sstore)
100     return self.__ssh
101
102   ssh = property(fget=__GetSSH)
103
104   def CheckPrereq(self):
105     """Check prerequisites for this LU.
106
107     This method should check that the prerequisites for the execution
108     of this LU are fulfilled. It can do internode communication, but
109     it should be idempotent - no cluster or system changes are
110     allowed.
111
112     The method should raise errors.OpPrereqError in case something is
113     not fulfilled. Its return value is ignored.
114
115     This method should also update all the parameters of the opcode to
116     their canonical form; e.g. a short node name must be fully
117     expanded after this method has successfully completed (so that
118     hooks, logging, etc. work correctly).
119
120     """
121     raise NotImplementedError
122
123   def Exec(self, feedback_fn):
124     """Execute the LU.
125
126     This method should implement the actual work. It should raise
127     errors.OpExecError for failures that are somewhat dealt with in
128     code, or expected.
129
130     """
131     raise NotImplementedError
132
133   def BuildHooksEnv(self):
134     """Build hooks environment for this LU.
135
136     This method should return a three-node tuple consisting of: a dict
137     containing the environment that will be used for running the
138     specific hook for this LU, a list of node names on which the hook
139     should run before the execution, and a list of node names on which
140     the hook should run after the execution.
141
142     The keys of the dict must not have 'GANETI_' prefixed as this will
143     be handled in the hooks runner. Also note additional keys will be
144     added by the hooks runner. If the LU doesn't define any
145     environment, an empty dict (and not None) should be returned.
146
147     No nodes should be returned as an empty list (and not None).
148
149     Note that if the HPATH for a LU class is None, this function will
150     not be called.
151
152     """
153     raise NotImplementedError
154
155   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
156     """Notify the LU about the results of its hooks.
157
158     This method is called every time a hooks phase is executed, and notifies
159     the Logical Unit about the hooks' result. The LU can then use it to alter
160     its result based on the hooks.  By default the method does nothing and the
161     previous result is passed back unchanged but any LU can define it if it
162     wants to use the local cluster hook-scripts somehow.
163
164     Args:
165       phase: the hooks phase that has just been run
166       hooks_results: the results of the multi-node hooks rpc call
167       feedback_fn: function to send feedback back to the caller
168       lu_result: the previous result this LU had, or None in the PRE phase.
169
170     """
171     return lu_result
172
173
174 class NoHooksLU(LogicalUnit):
175   """Simple LU which runs no hooks.
176
177   This LU is intended as a parent for other LogicalUnits which will
178   run no hooks, in order to reduce duplicate code.
179
180   """
181   HPATH = None
182   HTYPE = None
183
184
185 def _GetWantedNodes(lu, nodes):
186   """Returns list of checked and expanded node names.
187
188   Args:
189     nodes: List of nodes (strings) or None for all
190
191   """
192   if not isinstance(nodes, list):
193     raise errors.OpPrereqError("Invalid argument type 'nodes'")
194
195   if nodes:
196     wanted = []
197
198     for name in nodes:
199       node = lu.cfg.ExpandNodeName(name)
200       if node is None:
201         raise errors.OpPrereqError("No such node name '%s'" % name)
202       wanted.append(node)
203
204   else:
205     wanted = lu.cfg.GetNodeList()
206   return utils.NiceSort(wanted)
207
208
209 def _GetWantedInstances(lu, instances):
210   """Returns list of checked and expanded instance names.
211
212   Args:
213     instances: List of instances (strings) or None for all
214
215   """
216   if not isinstance(instances, list):
217     raise errors.OpPrereqError("Invalid argument type 'instances'")
218
219   if instances:
220     wanted = []
221
222     for name in instances:
223       instance = lu.cfg.ExpandInstanceName(name)
224       if instance is None:
225         raise errors.OpPrereqError("No such instance name '%s'" % name)
226       wanted.append(instance)
227
228   else:
229     wanted = lu.cfg.GetInstanceList()
230   return utils.NiceSort(wanted)
231
232
233 def _CheckOutputFields(static, dynamic, selected):
234   """Checks whether all selected fields are valid.
235
236   Args:
237     static: Static fields
238     dynamic: Dynamic fields
239
240   """
241   static_fields = frozenset(static)
242   dynamic_fields = frozenset(dynamic)
243
244   all_fields = static_fields | dynamic_fields
245
246   if not all_fields.issuperset(selected):
247     raise errors.OpPrereqError("Unknown output fields selected: %s"
248                                % ",".join(frozenset(selected).
249                                           difference(all_fields)))
250
251
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253                           memory, vcpus, nics):
254   """Builds instance related env variables for hooks from single variables.
255
256   Args:
257     secondary_nodes: List of secondary nodes as strings
258   """
259   env = {
260     "OP_TARGET": name,
261     "INSTANCE_NAME": name,
262     "INSTANCE_PRIMARY": primary_node,
263     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264     "INSTANCE_OS_TYPE": os_type,
265     "INSTANCE_STATUS": status,
266     "INSTANCE_MEMORY": memory,
267     "INSTANCE_VCPUS": vcpus,
268   }
269
270   if nics:
271     nic_count = len(nics)
272     for idx, (ip, bridge, mac) in enumerate(nics):
273       if ip is None:
274         ip = ""
275       env["INSTANCE_NIC%d_IP" % idx] = ip
276       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
278   else:
279     nic_count = 0
280
281   env["INSTANCE_NIC_COUNT"] = nic_count
282
283   return env
284
285
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287   """Builds instance related env variables for hooks from an object.
288
289   Args:
290     instance: objects.Instance object of instance
291     override: dict of values to override
292   """
293   args = {
294     'name': instance.name,
295     'primary_node': instance.primary_node,
296     'secondary_nodes': instance.secondary_nodes,
297     'os_type': instance.os,
298     'status': instance.os,
299     'memory': instance.memory,
300     'vcpus': instance.vcpus,
301     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302   }
303   if override:
304     args.update(override)
305   return _BuildInstanceHookEnv(**args)
306
307
308 def _CheckInstanceBridgesExist(instance):
309   """Check that the brigdes needed by an instance exist.
310
311   """
312   # check bridges existance
313   brlist = [nic.bridge for nic in instance.nics]
314   if not rpc.call_bridges_exist(instance.primary_node, brlist):
315     raise errors.OpPrereqError("one or more target bridges %s does not"
316                                " exist on destination node '%s'" %
317                                (brlist, instance.primary_node))
318
319
320 class LUDestroyCluster(NoHooksLU):
321   """Logical unit for destroying the cluster.
322
323   """
324   _OP_REQP = []
325
326   def CheckPrereq(self):
327     """Check prerequisites.
328
329     This checks whether the cluster is empty.
330
331     Any errors are signalled by raising errors.OpPrereqError.
332
333     """
334     master = self.sstore.GetMasterNode()
335
336     nodelist = self.cfg.GetNodeList()
337     if len(nodelist) != 1 or nodelist[0] != master:
338       raise errors.OpPrereqError("There are still %d node(s) in"
339                                  " this cluster." % (len(nodelist) - 1))
340     instancelist = self.cfg.GetInstanceList()
341     if instancelist:
342       raise errors.OpPrereqError("There are still %d instance(s) in"
343                                  " this cluster." % len(instancelist))
344
345   def Exec(self, feedback_fn):
346     """Destroys the cluster.
347
348     """
349     master = self.sstore.GetMasterNode()
350     if not rpc.call_node_stop_master(master):
351       raise errors.OpExecError("Could not disable the master role")
352     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
353     utils.CreateBackup(priv_key)
354     utils.CreateBackup(pub_key)
355     rpc.call_node_leave_cluster(master)
356
357
358 class LUVerifyCluster(LogicalUnit):
359   """Verifies the cluster status.
360
361   """
362   HPATH = "cluster-verify"
363   HTYPE = constants.HTYPE_CLUSTER
364   _OP_REQP = ["skip_checks"]
365
366   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
367                   remote_version, feedback_fn):
368     """Run multiple tests against a node.
369
370     Test list:
371       - compares ganeti version
372       - checks vg existance and size > 20G
373       - checks config file checksum
374       - checks ssh to other nodes
375
376     Args:
377       node: name of the node to check
378       file_list: required list of files
379       local_cksum: dictionary of local files and their checksums
380
381     """
382     # compares ganeti version
383     local_version = constants.PROTOCOL_VERSION
384     if not remote_version:
385       feedback_fn("  - ERROR: connection to %s failed" % (node))
386       return True
387
388     if local_version != remote_version:
389       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
390                       (local_version, node, remote_version))
391       return True
392
393     # checks vg existance and size > 20G
394
395     bad = False
396     if not vglist:
397       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
398                       (node,))
399       bad = True
400     else:
401       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
402                                             constants.MIN_VG_SIZE)
403       if vgstatus:
404         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
405         bad = True
406
407     # checks config file checksum
408     # checks ssh to any
409
410     if 'filelist' not in node_result:
411       bad = True
412       feedback_fn("  - ERROR: node hasn't returned file checksum data")
413     else:
414       remote_cksum = node_result['filelist']
415       for file_name in file_list:
416         if file_name not in remote_cksum:
417           bad = True
418           feedback_fn("  - ERROR: file '%s' missing" % file_name)
419         elif remote_cksum[file_name] != local_cksum[file_name]:
420           bad = True
421           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
422
423     if 'nodelist' not in node_result:
424       bad = True
425       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
426     else:
427       if node_result['nodelist']:
428         bad = True
429         for node in node_result['nodelist']:
430           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
431                           (node, node_result['nodelist'][node]))
432     if 'node-net-test' not in node_result:
433       bad = True
434       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
435     else:
436       if node_result['node-net-test']:
437         bad = True
438         nlist = utils.NiceSort(node_result['node-net-test'].keys())
439         for node in nlist:
440           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
441                           (node, node_result['node-net-test'][node]))
442
443     hyp_result = node_result.get('hypervisor', None)
444     if hyp_result is not None:
445       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
446     return bad
447
448   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
449                       node_instance, feedback_fn):
450     """Verify an instance.
451
452     This function checks to see if the required block devices are
453     available on the instance's node.
454
455     """
456     bad = False
457
458     node_current = instanceconfig.primary_node
459
460     node_vol_should = {}
461     instanceconfig.MapLVsByNode(node_vol_should)
462
463     for node in node_vol_should:
464       for volume in node_vol_should[node]:
465         if node not in node_vol_is or volume not in node_vol_is[node]:
466           feedback_fn("  - ERROR: volume %s missing on node %s" %
467                           (volume, node))
468           bad = True
469
470     if not instanceconfig.status == 'down':
471       if (node_current not in node_instance or
472           not instance in node_instance[node_current]):
473         feedback_fn("  - ERROR: instance %s not running on node %s" %
474                         (instance, node_current))
475         bad = True
476
477     for node in node_instance:
478       if (not node == node_current):
479         if instance in node_instance[node]:
480           feedback_fn("  - ERROR: instance %s should not run on node %s" %
481                           (instance, node))
482           bad = True
483
484     return bad
485
486   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
487     """Verify if there are any unknown volumes in the cluster.
488
489     The .os, .swap and backup volumes are ignored. All other volumes are
490     reported as unknown.
491
492     """
493     bad = False
494
495     for node in node_vol_is:
496       for volume in node_vol_is[node]:
497         if node not in node_vol_should or volume not in node_vol_should[node]:
498           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
499                       (volume, node))
500           bad = True
501     return bad
502
503   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
504     """Verify the list of running instances.
505
506     This checks what instances are running but unknown to the cluster.
507
508     """
509     bad = False
510     for node in node_instance:
511       for runninginstance in node_instance[node]:
512         if runninginstance not in instancelist:
513           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
514                           (runninginstance, node))
515           bad = True
516     return bad
517
518   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
519     """Verify N+1 Memory Resilience.
520
521     Check that if one single node dies we can still start all the instances it
522     was primary for.
523
524     """
525     bad = False
526
527     for node, nodeinfo in node_info.iteritems():
528       # This code checks that every node which is now listed as secondary has
529       # enough memory to host all instances it is supposed to should a single
530       # other node in the cluster fail.
531       # FIXME: not ready for failover to an arbitrary node
532       # FIXME: does not support file-backed instances
533       # WARNING: we currently take into account down instances as well as up
534       # ones, considering that even if they're down someone might want to start
535       # them even in the event of a node failure.
536       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
537         needed_mem = 0
538         for instance in instances:
539           needed_mem += instance_cfg[instance].memory
540         if nodeinfo['mfree'] < needed_mem:
541           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
542                       " failovers should node %s fail" % (node, prinode))
543           bad = True
544     return bad
545
546   def CheckPrereq(self):
547     """Check prerequisites.
548
549     Transform the list of checks we're going to skip into a set and check that
550     all its members are valid.
551
552     """
553     self.skip_set = frozenset(self.op.skip_checks)
554     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
555       raise errors.OpPrereqError("Invalid checks to be skipped specified")
556
557   def BuildHooksEnv(self):
558     """Build hooks env.
559
560     Cluster-Verify hooks just rone in the post phase and their failure makes
561     the output be logged in the verify output and the verification to fail.
562
563     """
564     all_nodes = self.cfg.GetNodeList()
565     # TODO: populate the environment with useful information for verify hooks
566     env = {}
567     return env, [], all_nodes
568
569   def Exec(self, feedback_fn):
570     """Verify integrity of cluster, performing various test on nodes.
571
572     """
573     bad = False
574     feedback_fn("* Verifying global settings")
575     for msg in self.cfg.VerifyConfig():
576       feedback_fn("  - ERROR: %s" % msg)
577
578     vg_name = self.cfg.GetVGName()
579     nodelist = utils.NiceSort(self.cfg.GetNodeList())
580     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
581     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
582     i_non_redundant = [] # Non redundant instances
583     node_volume = {}
584     node_instance = {}
585     node_info = {}
586     instance_cfg = {}
587
588     # FIXME: verify OS list
589     # do local checksums
590     file_names = list(self.sstore.GetFileList())
591     file_names.append(constants.SSL_CERT_FILE)
592     file_names.append(constants.CLUSTER_CONF_FILE)
593     local_checksums = utils.FingerprintFiles(file_names)
594
595     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
596     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
597     all_instanceinfo = rpc.call_instance_list(nodelist)
598     all_vglist = rpc.call_vg_list(nodelist)
599     node_verify_param = {
600       'filelist': file_names,
601       'nodelist': nodelist,
602       'hypervisor': None,
603       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
604                         for node in nodeinfo]
605       }
606     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
607     all_rversion = rpc.call_version(nodelist)
608     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
609
610     for node in nodelist:
611       feedback_fn("* Verifying node %s" % node)
612       result = self._VerifyNode(node, file_names, local_checksums,
613                                 all_vglist[node], all_nvinfo[node],
614                                 all_rversion[node], feedback_fn)
615       bad = bad or result
616
617       # node_volume
618       volumeinfo = all_volumeinfo[node]
619
620       if isinstance(volumeinfo, basestring):
621         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
622                     (node, volumeinfo[-400:].encode('string_escape')))
623         bad = True
624         node_volume[node] = {}
625       elif not isinstance(volumeinfo, dict):
626         feedback_fn("  - ERROR: connection to %s failed" % (node,))
627         bad = True
628         continue
629       else:
630         node_volume[node] = volumeinfo
631
632       # node_instance
633       nodeinstance = all_instanceinfo[node]
634       if type(nodeinstance) != list:
635         feedback_fn("  - ERROR: connection to %s failed" % (node,))
636         bad = True
637         continue
638
639       node_instance[node] = nodeinstance
640
641       # node_info
642       nodeinfo = all_ninfo[node]
643       if not isinstance(nodeinfo, dict):
644         feedback_fn("  - ERROR: connection to %s failed" % (node,))
645         bad = True
646         continue
647
648       try:
649         node_info[node] = {
650           "mfree": int(nodeinfo['memory_free']),
651           "dfree": int(nodeinfo['vg_free']),
652           "pinst": [],
653           "sinst": [],
654           # dictionary holding all instances this node is secondary for,
655           # grouped by their primary node. Each key is a cluster node, and each
656           # value is a list of instances which have the key as primary and the
657           # current node as secondary.  this is handy to calculate N+1 memory
658           # availability if you can only failover from a primary to its
659           # secondary.
660           "sinst-by-pnode": {},
661         }
662       except ValueError:
663         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
664         bad = True
665         continue
666
667     node_vol_should = {}
668
669     for instance in instancelist:
670       feedback_fn("* Verifying instance %s" % instance)
671       inst_config = self.cfg.GetInstanceInfo(instance)
672       result =  self._VerifyInstance(instance, inst_config, node_volume,
673                                      node_instance, feedback_fn)
674       bad = bad or result
675
676       inst_config.MapLVsByNode(node_vol_should)
677
678       instance_cfg[instance] = inst_config
679
680       pnode = inst_config.primary_node
681       if pnode in node_info:
682         node_info[pnode]['pinst'].append(instance)
683       else:
684         feedback_fn("  - ERROR: instance %s, connection to primary node"
685                     " %s failed" % (instance, pnode))
686         bad = True
687
688       # If the instance is non-redundant we cannot survive losing its primary
689       # node, so we are not N+1 compliant. On the other hand we have no disk
690       # templates with more than one secondary so that situation is not well
691       # supported either.
692       # FIXME: does not support file-backed instances
693       if len(inst_config.secondary_nodes) == 0:
694         i_non_redundant.append(instance)
695       elif len(inst_config.secondary_nodes) > 1:
696         feedback_fn("  - WARNING: multiple secondaries for instance %s"
697                     % instance)
698
699       for snode in inst_config.secondary_nodes:
700         if snode in node_info:
701           node_info[snode]['sinst'].append(instance)
702           if pnode not in node_info[snode]['sinst-by-pnode']:
703             node_info[snode]['sinst-by-pnode'][pnode] = []
704           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
705         else:
706           feedback_fn("  - ERROR: instance %s, connection to secondary node"
707                       " %s failed" % (instance, snode))
708
709     feedback_fn("* Verifying orphan volumes")
710     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
711                                        feedback_fn)
712     bad = bad or result
713
714     feedback_fn("* Verifying remaining instances")
715     result = self._VerifyOrphanInstances(instancelist, node_instance,
716                                          feedback_fn)
717     bad = bad or result
718
719     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
720       feedback_fn("* Verifying N+1 Memory redundancy")
721       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
722       bad = bad or result
723
724     feedback_fn("* Other Notes")
725     if i_non_redundant:
726       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
727                   % len(i_non_redundant))
728
729     return int(bad)
730
731   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
732     """Analize the post-hooks' result, handle it, and send some
733     nicely-formatted feedback back to the user.
734
735     Args:
736       phase: the hooks phase that has just been run
737       hooks_results: the results of the multi-node hooks rpc call
738       feedback_fn: function to send feedback back to the caller
739       lu_result: previous Exec result
740
741     """
742     # We only really run POST phase hooks, and are only interested in their results
743     if phase == constants.HOOKS_PHASE_POST:
744       # Used to change hooks' output to proper indentation
745       indent_re = re.compile('^', re.M)
746       feedback_fn("* Hooks Results")
747       if not hooks_results:
748         feedback_fn("  - ERROR: general communication failure")
749         lu_result = 1
750       else:
751         for node_name in hooks_results:
752           show_node_header = True
753           res = hooks_results[node_name]
754           if res is False or not isinstance(res, list):
755             feedback_fn("    Communication failure")
756             lu_result = 1
757             continue
758           for script, hkr, output in res:
759             if hkr == constants.HKR_FAIL:
760               # The node header is only shown once, if there are
761               # failing hooks on that node
762               if show_node_header:
763                 feedback_fn("  Node %s:" % node_name)
764                 show_node_header = False
765               feedback_fn("    ERROR: Script %s failed, output:" % script)
766               output = indent_re.sub('      ', output)
767               feedback_fn("%s" % output)
768               lu_result = 1
769
770       return lu_result
771
772
773 class LUVerifyDisks(NoHooksLU):
774   """Verifies the cluster disks status.
775
776   """
777   _OP_REQP = []
778
779   def CheckPrereq(self):
780     """Check prerequisites.
781
782     This has no prerequisites.
783
784     """
785     pass
786
787   def Exec(self, feedback_fn):
788     """Verify integrity of cluster disks.
789
790     """
791     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
792
793     vg_name = self.cfg.GetVGName()
794     nodes = utils.NiceSort(self.cfg.GetNodeList())
795     instances = [self.cfg.GetInstanceInfo(name)
796                  for name in self.cfg.GetInstanceList()]
797
798     nv_dict = {}
799     for inst in instances:
800       inst_lvs = {}
801       if (inst.status != "up" or
802           inst.disk_template not in constants.DTS_NET_MIRROR):
803         continue
804       inst.MapLVsByNode(inst_lvs)
805       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
806       for node, vol_list in inst_lvs.iteritems():
807         for vol in vol_list:
808           nv_dict[(node, vol)] = inst
809
810     if not nv_dict:
811       return result
812
813     node_lvs = rpc.call_volume_list(nodes, vg_name)
814
815     to_act = set()
816     for node in nodes:
817       # node_volume
818       lvs = node_lvs[node]
819
820       if isinstance(lvs, basestring):
821         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
822         res_nlvm[node] = lvs
823       elif not isinstance(lvs, dict):
824         logger.Info("connection to node %s failed or invalid data returned" %
825                     (node,))
826         res_nodes.append(node)
827         continue
828
829       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
830         inst = nv_dict.pop((node, lv_name), None)
831         if (not lv_online and inst is not None
832             and inst.name not in res_instances):
833           res_instances.append(inst.name)
834
835     # any leftover items in nv_dict are missing LVs, let's arrange the
836     # data better
837     for key, inst in nv_dict.iteritems():
838       if inst.name not in res_missing:
839         res_missing[inst.name] = []
840       res_missing[inst.name].append(key)
841
842     return result
843
844
845 class LURenameCluster(LogicalUnit):
846   """Rename the cluster.
847
848   """
849   HPATH = "cluster-rename"
850   HTYPE = constants.HTYPE_CLUSTER
851   _OP_REQP = ["name"]
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     """
857     env = {
858       "OP_TARGET": self.sstore.GetClusterName(),
859       "NEW_NAME": self.op.name,
860       }
861     mn = self.sstore.GetMasterNode()
862     return env, [mn], [mn]
863
864   def CheckPrereq(self):
865     """Verify that the passed name is a valid one.
866
867     """
868     hostname = utils.HostInfo(self.op.name)
869
870     new_name = hostname.name
871     self.ip = new_ip = hostname.ip
872     old_name = self.sstore.GetClusterName()
873     old_ip = self.sstore.GetMasterIP()
874     if new_name == old_name and new_ip == old_ip:
875       raise errors.OpPrereqError("Neither the name nor the IP address of the"
876                                  " cluster has changed")
877     if new_ip != old_ip:
878       result = utils.RunCmd(["fping", "-q", new_ip])
879       if not result.failed:
880         raise errors.OpPrereqError("The given cluster IP address (%s) is"
881                                    " reachable on the network. Aborting." %
882                                    new_ip)
883
884     self.op.name = new_name
885
886   def Exec(self, feedback_fn):
887     """Rename the cluster.
888
889     """
890     clustername = self.op.name
891     ip = self.ip
892     ss = self.sstore
893
894     # shutdown the master IP
895     master = ss.GetMasterNode()
896     if not rpc.call_node_stop_master(master):
897       raise errors.OpExecError("Could not disable the master role")
898
899     try:
900       # modify the sstore
901       ss.SetKey(ss.SS_MASTER_IP, ip)
902       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
903
904       # Distribute updated ss config to all nodes
905       myself = self.cfg.GetNodeInfo(master)
906       dist_nodes = self.cfg.GetNodeList()
907       if myself.name in dist_nodes:
908         dist_nodes.remove(myself.name)
909
910       logger.Debug("Copying updated ssconf data to all nodes")
911       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
912         fname = ss.KeyToFilename(keyname)
913         result = rpc.call_upload_file(dist_nodes, fname)
914         for to_node in dist_nodes:
915           if not result[to_node]:
916             logger.Error("copy of file %s to node %s failed" %
917                          (fname, to_node))
918     finally:
919       if not rpc.call_node_start_master(master):
920         logger.Error("Could not re-enable the master role on the master,"
921                      " please restart manually.")
922
923
924 def _RecursiveCheckIfLVMBased(disk):
925   """Check if the given disk or its children are lvm-based.
926
927   Args:
928     disk: ganeti.objects.Disk object
929
930   Returns:
931     boolean indicating whether a LD_LV dev_type was found or not
932
933   """
934   if disk.children:
935     for chdisk in disk.children:
936       if _RecursiveCheckIfLVMBased(chdisk):
937         return True
938   return disk.dev_type == constants.LD_LV
939
940
941 class LUSetClusterParams(LogicalUnit):
942   """Change the parameters of the cluster.
943
944   """
945   HPATH = "cluster-modify"
946   HTYPE = constants.HTYPE_CLUSTER
947   _OP_REQP = []
948
949   def BuildHooksEnv(self):
950     """Build hooks env.
951
952     """
953     env = {
954       "OP_TARGET": self.sstore.GetClusterName(),
955       "NEW_VG_NAME": self.op.vg_name,
956       }
957     mn = self.sstore.GetMasterNode()
958     return env, [mn], [mn]
959
960   def CheckPrereq(self):
961     """Check prerequisites.
962
963     This checks whether the given params don't conflict and
964     if the given volume group is valid.
965
966     """
967     if not self.op.vg_name:
968       instances = [self.cfg.GetInstanceInfo(name)
969                    for name in self.cfg.GetInstanceList()]
970       for inst in instances:
971         for disk in inst.disks:
972           if _RecursiveCheckIfLVMBased(disk):
973             raise errors.OpPrereqError("Cannot disable lvm storage while"
974                                        " lvm-based instances exist")
975
976     # if vg_name not None, checks given volume group on all nodes
977     if self.op.vg_name:
978       node_list = self.cfg.GetNodeList()
979       vglist = rpc.call_vg_list(node_list)
980       for node in node_list:
981         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
982                                               constants.MIN_VG_SIZE)
983         if vgstatus:
984           raise errors.OpPrereqError("Error on node '%s': %s" %
985                                      (node, vgstatus))
986
987   def Exec(self, feedback_fn):
988     """Change the parameters of the cluster.
989
990     """
991     if self.op.vg_name != self.cfg.GetVGName():
992       self.cfg.SetVGName(self.op.vg_name)
993     else:
994       feedback_fn("Cluster LVM configuration already in desired"
995                   " state, not changing")
996
997
998 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
999   """Sleep and poll for an instance's disk to sync.
1000
1001   """
1002   if not instance.disks:
1003     return True
1004
1005   if not oneshot:
1006     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1007
1008   node = instance.primary_node
1009
1010   for dev in instance.disks:
1011     cfgw.SetDiskID(dev, node)
1012
1013   retries = 0
1014   while True:
1015     max_time = 0
1016     done = True
1017     cumul_degraded = False
1018     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1019     if not rstats:
1020       proc.LogWarning("Can't get any data from node %s" % node)
1021       retries += 1
1022       if retries >= 10:
1023         raise errors.RemoteError("Can't contact node %s for mirror data,"
1024                                  " aborting." % node)
1025       time.sleep(6)
1026       continue
1027     retries = 0
1028     for i in range(len(rstats)):
1029       mstat = rstats[i]
1030       if mstat is None:
1031         proc.LogWarning("Can't compute data for node %s/%s" %
1032                         (node, instance.disks[i].iv_name))
1033         continue
1034       # we ignore the ldisk parameter
1035       perc_done, est_time, is_degraded, _ = mstat
1036       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1037       if perc_done is not None:
1038         done = False
1039         if est_time is not None:
1040           rem_time = "%d estimated seconds remaining" % est_time
1041           max_time = est_time
1042         else:
1043           rem_time = "no time estimate"
1044         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1045                      (instance.disks[i].iv_name, perc_done, rem_time))
1046     if done or oneshot:
1047       break
1048
1049     if unlock:
1050       #utils.Unlock('cmd')
1051       pass
1052     try:
1053       time.sleep(min(60, max_time))
1054     finally:
1055       if unlock:
1056         #utils.Lock('cmd')
1057         pass
1058
1059   if done:
1060     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1061   return not cumul_degraded
1062
1063
1064 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1065   """Check that mirrors are not degraded.
1066
1067   The ldisk parameter, if True, will change the test from the
1068   is_degraded attribute (which represents overall non-ok status for
1069   the device(s)) to the ldisk (representing the local storage status).
1070
1071   """
1072   cfgw.SetDiskID(dev, node)
1073   if ldisk:
1074     idx = 6
1075   else:
1076     idx = 5
1077
1078   result = True
1079   if on_primary or dev.AssembleOnSecondary():
1080     rstats = rpc.call_blockdev_find(node, dev)
1081     if not rstats:
1082       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1083       result = False
1084     else:
1085       result = result and (not rstats[idx])
1086   if dev.children:
1087     for child in dev.children:
1088       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1089
1090   return result
1091
1092
1093 class LUDiagnoseOS(NoHooksLU):
1094   """Logical unit for OS diagnose/query.
1095
1096   """
1097   _OP_REQP = ["output_fields", "names"]
1098
1099   def CheckPrereq(self):
1100     """Check prerequisites.
1101
1102     This always succeeds, since this is a pure query LU.
1103
1104     """
1105     if self.op.names:
1106       raise errors.OpPrereqError("Selective OS query not supported")
1107
1108     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1109     _CheckOutputFields(static=[],
1110                        dynamic=self.dynamic_fields,
1111                        selected=self.op.output_fields)
1112
1113   @staticmethod
1114   def _DiagnoseByOS(node_list, rlist):
1115     """Remaps a per-node return list into an a per-os per-node dictionary
1116
1117       Args:
1118         node_list: a list with the names of all nodes
1119         rlist: a map with node names as keys and OS objects as values
1120
1121       Returns:
1122         map: a map with osnames as keys and as value another map, with
1123              nodes as
1124              keys and list of OS objects as values
1125              e.g. {"debian-etch": {"node1": [<object>,...],
1126                                    "node2": [<object>,]}
1127                   }
1128
1129     """
1130     all_os = {}
1131     for node_name, nr in rlist.iteritems():
1132       if not nr:
1133         continue
1134       for os_obj in nr:
1135         if os_obj.name not in all_os:
1136           # build a list of nodes for this os containing empty lists
1137           # for each node in node_list
1138           all_os[os_obj.name] = {}
1139           for nname in node_list:
1140             all_os[os_obj.name][nname] = []
1141         all_os[os_obj.name][node_name].append(os_obj)
1142     return all_os
1143
1144   def Exec(self, feedback_fn):
1145     """Compute the list of OSes.
1146
1147     """
1148     node_list = self.cfg.GetNodeList()
1149     node_data = rpc.call_os_diagnose(node_list)
1150     if node_data == False:
1151       raise errors.OpExecError("Can't gather the list of OSes")
1152     pol = self._DiagnoseByOS(node_list, node_data)
1153     output = []
1154     for os_name, os_data in pol.iteritems():
1155       row = []
1156       for field in self.op.output_fields:
1157         if field == "name":
1158           val = os_name
1159         elif field == "valid":
1160           val = utils.all([osl and osl[0] for osl in os_data.values()])
1161         elif field == "node_status":
1162           val = {}
1163           for node_name, nos_list in os_data.iteritems():
1164             val[node_name] = [(v.status, v.path) for v in nos_list]
1165         else:
1166           raise errors.ParameterError(field)
1167         row.append(val)
1168       output.append(row)
1169
1170     return output
1171
1172
1173 class LURemoveNode(LogicalUnit):
1174   """Logical unit for removing a node.
1175
1176   """
1177   HPATH = "node-remove"
1178   HTYPE = constants.HTYPE_NODE
1179   _OP_REQP = ["node_name"]
1180
1181   def BuildHooksEnv(self):
1182     """Build hooks env.
1183
1184     This doesn't run on the target node in the pre phase as a failed
1185     node would not allows itself to run.
1186
1187     """
1188     env = {
1189       "OP_TARGET": self.op.node_name,
1190       "NODE_NAME": self.op.node_name,
1191       }
1192     all_nodes = self.cfg.GetNodeList()
1193     all_nodes.remove(self.op.node_name)
1194     return env, all_nodes, all_nodes
1195
1196   def CheckPrereq(self):
1197     """Check prerequisites.
1198
1199     This checks:
1200      - the node exists in the configuration
1201      - it does not have primary or secondary instances
1202      - it's not the master
1203
1204     Any errors are signalled by raising errors.OpPrereqError.
1205
1206     """
1207     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1208     if node is None:
1209       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1210
1211     instance_list = self.cfg.GetInstanceList()
1212
1213     masternode = self.sstore.GetMasterNode()
1214     if node.name == masternode:
1215       raise errors.OpPrereqError("Node is the master node,"
1216                                  " you need to failover first.")
1217
1218     for instance_name in instance_list:
1219       instance = self.cfg.GetInstanceInfo(instance_name)
1220       if node.name == instance.primary_node:
1221         raise errors.OpPrereqError("Instance %s still running on the node,"
1222                                    " please remove first." % instance_name)
1223       if node.name in instance.secondary_nodes:
1224         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1225                                    " please remove first." % instance_name)
1226     self.op.node_name = node.name
1227     self.node = node
1228
1229   def Exec(self, feedback_fn):
1230     """Removes the node from the cluster.
1231
1232     """
1233     node = self.node
1234     logger.Info("stopping the node daemon and removing configs from node %s" %
1235                 node.name)
1236
1237     rpc.call_node_leave_cluster(node.name)
1238
1239     self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1240
1241     logger.Info("Removing node %s from config" % node.name)
1242
1243     self.cfg.RemoveNode(node.name)
1244
1245     utils.RemoveHostFromEtcHosts(node.name)
1246
1247
1248 class LUQueryNodes(NoHooksLU):
1249   """Logical unit for querying nodes.
1250
1251   """
1252   _OP_REQP = ["output_fields", "names"]
1253
1254   def CheckPrereq(self):
1255     """Check prerequisites.
1256
1257     This checks that the fields required are valid output fields.
1258
1259     """
1260     self.dynamic_fields = frozenset([
1261       "dtotal", "dfree",
1262       "mtotal", "mnode", "mfree",
1263       "bootid",
1264       "ctotal",
1265       ])
1266
1267     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1268                                "pinst_list", "sinst_list",
1269                                "pip", "sip"],
1270                        dynamic=self.dynamic_fields,
1271                        selected=self.op.output_fields)
1272
1273     self.wanted = _GetWantedNodes(self, self.op.names)
1274
1275   def Exec(self, feedback_fn):
1276     """Computes the list of nodes and their attributes.
1277
1278     """
1279     nodenames = self.wanted
1280     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1281
1282     # begin data gathering
1283
1284     if self.dynamic_fields.intersection(self.op.output_fields):
1285       live_data = {}
1286       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1287       for name in nodenames:
1288         nodeinfo = node_data.get(name, None)
1289         if nodeinfo:
1290           live_data[name] = {
1291             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1292             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1293             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1294             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1295             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1296             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1297             "bootid": nodeinfo['bootid'],
1298             }
1299         else:
1300           live_data[name] = {}
1301     else:
1302       live_data = dict.fromkeys(nodenames, {})
1303
1304     node_to_primary = dict([(name, set()) for name in nodenames])
1305     node_to_secondary = dict([(name, set()) for name in nodenames])
1306
1307     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1308                              "sinst_cnt", "sinst_list"))
1309     if inst_fields & frozenset(self.op.output_fields):
1310       instancelist = self.cfg.GetInstanceList()
1311
1312       for instance_name in instancelist:
1313         inst = self.cfg.GetInstanceInfo(instance_name)
1314         if inst.primary_node in node_to_primary:
1315           node_to_primary[inst.primary_node].add(inst.name)
1316         for secnode in inst.secondary_nodes:
1317           if secnode in node_to_secondary:
1318             node_to_secondary[secnode].add(inst.name)
1319
1320     # end data gathering
1321
1322     output = []
1323     for node in nodelist:
1324       node_output = []
1325       for field in self.op.output_fields:
1326         if field == "name":
1327           val = node.name
1328         elif field == "pinst_list":
1329           val = list(node_to_primary[node.name])
1330         elif field == "sinst_list":
1331           val = list(node_to_secondary[node.name])
1332         elif field == "pinst_cnt":
1333           val = len(node_to_primary[node.name])
1334         elif field == "sinst_cnt":
1335           val = len(node_to_secondary[node.name])
1336         elif field == "pip":
1337           val = node.primary_ip
1338         elif field == "sip":
1339           val = node.secondary_ip
1340         elif field in self.dynamic_fields:
1341           val = live_data[node.name].get(field, None)
1342         else:
1343           raise errors.ParameterError(field)
1344         node_output.append(val)
1345       output.append(node_output)
1346
1347     return output
1348
1349
1350 class LUQueryNodeVolumes(NoHooksLU):
1351   """Logical unit for getting volumes on node(s).
1352
1353   """
1354   _OP_REQP = ["nodes", "output_fields"]
1355
1356   def CheckPrereq(self):
1357     """Check prerequisites.
1358
1359     This checks that the fields required are valid output fields.
1360
1361     """
1362     self.nodes = _GetWantedNodes(self, self.op.nodes)
1363
1364     _CheckOutputFields(static=["node"],
1365                        dynamic=["phys", "vg", "name", "size", "instance"],
1366                        selected=self.op.output_fields)
1367
1368
1369   def Exec(self, feedback_fn):
1370     """Computes the list of nodes and their attributes.
1371
1372     """
1373     nodenames = self.nodes
1374     volumes = rpc.call_node_volumes(nodenames)
1375
1376     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1377              in self.cfg.GetInstanceList()]
1378
1379     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1380
1381     output = []
1382     for node in nodenames:
1383       if node not in volumes or not volumes[node]:
1384         continue
1385
1386       node_vols = volumes[node][:]
1387       node_vols.sort(key=lambda vol: vol['dev'])
1388
1389       for vol in node_vols:
1390         node_output = []
1391         for field in self.op.output_fields:
1392           if field == "node":
1393             val = node
1394           elif field == "phys":
1395             val = vol['dev']
1396           elif field == "vg":
1397             val = vol['vg']
1398           elif field == "name":
1399             val = vol['name']
1400           elif field == "size":
1401             val = int(float(vol['size']))
1402           elif field == "instance":
1403             for inst in ilist:
1404               if node not in lv_by_node[inst]:
1405                 continue
1406               if vol['name'] in lv_by_node[inst][node]:
1407                 val = inst.name
1408                 break
1409             else:
1410               val = '-'
1411           else:
1412             raise errors.ParameterError(field)
1413           node_output.append(str(val))
1414
1415         output.append(node_output)
1416
1417     return output
1418
1419
1420 class LUAddNode(LogicalUnit):
1421   """Logical unit for adding node to the cluster.
1422
1423   """
1424   HPATH = "node-add"
1425   HTYPE = constants.HTYPE_NODE
1426   _OP_REQP = ["node_name"]
1427
1428   def BuildHooksEnv(self):
1429     """Build hooks env.
1430
1431     This will run on all nodes before, and on all nodes + the new node after.
1432
1433     """
1434     env = {
1435       "OP_TARGET": self.op.node_name,
1436       "NODE_NAME": self.op.node_name,
1437       "NODE_PIP": self.op.primary_ip,
1438       "NODE_SIP": self.op.secondary_ip,
1439       }
1440     nodes_0 = self.cfg.GetNodeList()
1441     nodes_1 = nodes_0 + [self.op.node_name, ]
1442     return env, nodes_0, nodes_1
1443
1444   def CheckPrereq(self):
1445     """Check prerequisites.
1446
1447     This checks:
1448      - the new node is not already in the config
1449      - it is resolvable
1450      - its parameters (single/dual homed) matches the cluster
1451
1452     Any errors are signalled by raising errors.OpPrereqError.
1453
1454     """
1455     node_name = self.op.node_name
1456     cfg = self.cfg
1457
1458     dns_data = utils.HostInfo(node_name)
1459
1460     node = dns_data.name
1461     primary_ip = self.op.primary_ip = dns_data.ip
1462     secondary_ip = getattr(self.op, "secondary_ip", None)
1463     if secondary_ip is None:
1464       secondary_ip = primary_ip
1465     if not utils.IsValidIP(secondary_ip):
1466       raise errors.OpPrereqError("Invalid secondary IP given")
1467     self.op.secondary_ip = secondary_ip
1468
1469     node_list = cfg.GetNodeList()
1470     if not self.op.readd and node in node_list:
1471       raise errors.OpPrereqError("Node %s is already in the configuration" %
1472                                  node)
1473     elif self.op.readd and node not in node_list:
1474       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1475
1476     for existing_node_name in node_list:
1477       existing_node = cfg.GetNodeInfo(existing_node_name)
1478
1479       if self.op.readd and node == existing_node_name:
1480         if (existing_node.primary_ip != primary_ip or
1481             existing_node.secondary_ip != secondary_ip):
1482           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1483                                      " address configuration as before")
1484         continue
1485
1486       if (existing_node.primary_ip == primary_ip or
1487           existing_node.secondary_ip == primary_ip or
1488           existing_node.primary_ip == secondary_ip or
1489           existing_node.secondary_ip == secondary_ip):
1490         raise errors.OpPrereqError("New node ip address(es) conflict with"
1491                                    " existing node %s" % existing_node.name)
1492
1493     # check that the type of the node (single versus dual homed) is the
1494     # same as for the master
1495     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1496     master_singlehomed = myself.secondary_ip == myself.primary_ip
1497     newbie_singlehomed = secondary_ip == primary_ip
1498     if master_singlehomed != newbie_singlehomed:
1499       if master_singlehomed:
1500         raise errors.OpPrereqError("The master has no private ip but the"
1501                                    " new node has one")
1502       else:
1503         raise errors.OpPrereqError("The master has a private ip but the"
1504                                    " new node doesn't have one")
1505
1506     # checks reachablity
1507     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1508       raise errors.OpPrereqError("Node not reachable by ping")
1509
1510     if not newbie_singlehomed:
1511       # check reachability from my secondary ip to newbie's secondary ip
1512       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1513                            source=myself.secondary_ip):
1514         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1515                                    " based ping to noded port")
1516
1517     self.new_node = objects.Node(name=node,
1518                                  primary_ip=primary_ip,
1519                                  secondary_ip=secondary_ip)
1520
1521     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1522       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1523         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1524                                    constants.VNC_PASSWORD_FILE)
1525
1526   def Exec(self, feedback_fn):
1527     """Adds the new node to the cluster.
1528
1529     """
1530     new_node = self.new_node
1531     node = new_node.name
1532
1533     # set up inter-node password and certificate and restarts the node daemon
1534     gntpass = self.sstore.GetNodeDaemonPassword()
1535     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1536       raise errors.OpExecError("ganeti password corruption detected")
1537     f = open(constants.SSL_CERT_FILE)
1538     try:
1539       gntpem = f.read(8192)
1540     finally:
1541       f.close()
1542     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1543     # so we use this to detect an invalid certificate; as long as the
1544     # cert doesn't contain this, the here-document will be correctly
1545     # parsed by the shell sequence below
1546     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1547       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1548     if not gntpem.endswith("\n"):
1549       raise errors.OpExecError("PEM must end with newline")
1550     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1551
1552     # and then connect with ssh to set password and start ganeti-noded
1553     # note that all the below variables are sanitized at this point,
1554     # either by being constants or by the checks above
1555     ss = self.sstore
1556     mycommand = ("umask 077 && "
1557                  "echo '%s' > '%s' && "
1558                  "cat > '%s' << '!EOF.' && \n"
1559                  "%s!EOF.\n%s restart" %
1560                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1561                   constants.SSL_CERT_FILE, gntpem,
1562                   constants.NODE_INITD_SCRIPT))
1563
1564     result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1565     if result.failed:
1566       raise errors.OpExecError("Remote command on node %s, error: %s,"
1567                                " output: %s" %
1568                                (node, result.fail_reason, result.output))
1569
1570     # check connectivity
1571     time.sleep(4)
1572
1573     result = rpc.call_version([node])[node]
1574     if result:
1575       if constants.PROTOCOL_VERSION == result:
1576         logger.Info("communication to node %s fine, sw version %s match" %
1577                     (node, result))
1578       else:
1579         raise errors.OpExecError("Version mismatch master version %s,"
1580                                  " node version %s" %
1581                                  (constants.PROTOCOL_VERSION, result))
1582     else:
1583       raise errors.OpExecError("Cannot get version from the new node")
1584
1585     # setup ssh on node
1586     logger.Info("copy ssh key to node %s" % node)
1587     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1588     keyarray = []
1589     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1590                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1591                 priv_key, pub_key]
1592
1593     for i in keyfiles:
1594       f = open(i, 'r')
1595       try:
1596         keyarray.append(f.read())
1597       finally:
1598         f.close()
1599
1600     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1601                                keyarray[3], keyarray[4], keyarray[5])
1602
1603     if not result:
1604       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1605
1606     # Add node to our /etc/hosts, and add key to known_hosts
1607     utils.AddHostToEtcHosts(new_node.name)
1608
1609     if new_node.secondary_ip != new_node.primary_ip:
1610       if not rpc.call_node_tcp_ping(new_node.name,
1611                                     constants.LOCALHOST_IP_ADDRESS,
1612                                     new_node.secondary_ip,
1613                                     constants.DEFAULT_NODED_PORT,
1614                                     10, False):
1615         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1616                                  " you gave (%s). Please fix and re-run this"
1617                                  " command." % new_node.secondary_ip)
1618
1619     success, msg = self.ssh.VerifyNodeHostname(node)
1620     if not success:
1621       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1622                                " than the one the resolver gives: %s."
1623                                " Please fix and re-run this command." %
1624                                (node, msg))
1625
1626     # Distribute updated /etc/hosts and known_hosts to all nodes,
1627     # including the node just added
1628     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1629     dist_nodes = self.cfg.GetNodeList()
1630     if not self.op.readd:
1631       dist_nodes.append(node)
1632     if myself.name in dist_nodes:
1633       dist_nodes.remove(myself.name)
1634
1635     logger.Debug("Copying hosts and known_hosts to all nodes")
1636     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1637       result = rpc.call_upload_file(dist_nodes, fname)
1638       for to_node in dist_nodes:
1639         if not result[to_node]:
1640           logger.Error("copy of file %s to node %s failed" %
1641                        (fname, to_node))
1642
1643     to_copy = ss.GetFileList()
1644     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1645       to_copy.append(constants.VNC_PASSWORD_FILE)
1646     for fname in to_copy:
1647       if not self.ssh.CopyFileToNode(node, fname):
1648         logger.Error("could not copy file %s to node %s" % (fname, node))
1649
1650     if not self.op.readd:
1651       logger.Info("adding node %s to cluster.conf" % node)
1652       self.cfg.AddNode(new_node)
1653
1654
1655 class LUMasterFailover(LogicalUnit):
1656   """Failover the master node to the current node.
1657
1658   This is a special LU in that it must run on a non-master node.
1659
1660   """
1661   HPATH = "master-failover"
1662   HTYPE = constants.HTYPE_CLUSTER
1663   REQ_MASTER = False
1664   _OP_REQP = []
1665
1666   def BuildHooksEnv(self):
1667     """Build hooks env.
1668
1669     This will run on the new master only in the pre phase, and on all
1670     the nodes in the post phase.
1671
1672     """
1673     env = {
1674       "OP_TARGET": self.new_master,
1675       "NEW_MASTER": self.new_master,
1676       "OLD_MASTER": self.old_master,
1677       }
1678     return env, [self.new_master], self.cfg.GetNodeList()
1679
1680   def CheckPrereq(self):
1681     """Check prerequisites.
1682
1683     This checks that we are not already the master.
1684
1685     """
1686     self.new_master = utils.HostInfo().name
1687     self.old_master = self.sstore.GetMasterNode()
1688
1689     if self.old_master == self.new_master:
1690       raise errors.OpPrereqError("This commands must be run on the node"
1691                                  " where you want the new master to be."
1692                                  " %s is already the master" %
1693                                  self.old_master)
1694
1695   def Exec(self, feedback_fn):
1696     """Failover the master node.
1697
1698     This command, when run on a non-master node, will cause the current
1699     master to cease being master, and the non-master to become new
1700     master.
1701
1702     """
1703     #TODO: do not rely on gethostname returning the FQDN
1704     logger.Info("setting master to %s, old master: %s" %
1705                 (self.new_master, self.old_master))
1706
1707     if not rpc.call_node_stop_master(self.old_master):
1708       logger.Error("could disable the master role on the old master"
1709                    " %s, please disable manually" % self.old_master)
1710
1711     ss = self.sstore
1712     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1713     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1714                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1715       logger.Error("could not distribute the new simple store master file"
1716                    " to the other nodes, please check.")
1717
1718     if not rpc.call_node_start_master(self.new_master):
1719       logger.Error("could not start the master role on the new master"
1720                    " %s, please check" % self.new_master)
1721       feedback_fn("Error in activating the master IP on the new master,"
1722                   " please fix manually.")
1723
1724
1725
1726 class LUQueryClusterInfo(NoHooksLU):
1727   """Query cluster configuration.
1728
1729   """
1730   _OP_REQP = []
1731   REQ_MASTER = False
1732
1733   def CheckPrereq(self):
1734     """No prerequsites needed for this LU.
1735
1736     """
1737     pass
1738
1739   def Exec(self, feedback_fn):
1740     """Return cluster config.
1741
1742     """
1743     result = {
1744       "name": self.sstore.GetClusterName(),
1745       "software_version": constants.RELEASE_VERSION,
1746       "protocol_version": constants.PROTOCOL_VERSION,
1747       "config_version": constants.CONFIG_VERSION,
1748       "os_api_version": constants.OS_API_VERSION,
1749       "export_version": constants.EXPORT_VERSION,
1750       "master": self.sstore.GetMasterNode(),
1751       "architecture": (platform.architecture()[0], platform.machine()),
1752       "hypervisor_type": self.sstore.GetHypervisorType(),
1753       }
1754
1755     return result
1756
1757
1758 class LUClusterCopyFile(NoHooksLU):
1759   """Copy file to cluster.
1760
1761   """
1762   _OP_REQP = ["nodes", "filename"]
1763
1764   def CheckPrereq(self):
1765     """Check prerequisites.
1766
1767     It should check that the named file exists and that the given list
1768     of nodes is valid.
1769
1770     """
1771     if not os.path.exists(self.op.filename):
1772       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1773
1774     self.nodes = _GetWantedNodes(self, self.op.nodes)
1775
1776   def Exec(self, feedback_fn):
1777     """Copy a file from master to some nodes.
1778
1779     Args:
1780       opts - class with options as members
1781       args - list containing a single element, the file name
1782     Opts used:
1783       nodes - list containing the name of target nodes; if empty, all nodes
1784
1785     """
1786     filename = self.op.filename
1787
1788     myname = utils.HostInfo().name
1789
1790     for node in self.nodes:
1791       if node == myname:
1792         continue
1793       if not self.ssh.CopyFileToNode(node, filename):
1794         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1795
1796
1797 class LUDumpClusterConfig(NoHooksLU):
1798   """Return a text-representation of the cluster-config.
1799
1800   """
1801   _OP_REQP = []
1802
1803   def CheckPrereq(self):
1804     """No prerequisites.
1805
1806     """
1807     pass
1808
1809   def Exec(self, feedback_fn):
1810     """Dump a representation of the cluster config to the standard output.
1811
1812     """
1813     return self.cfg.DumpConfig()
1814
1815
1816 class LURunClusterCommand(NoHooksLU):
1817   """Run a command on some nodes.
1818
1819   """
1820   _OP_REQP = ["command", "nodes"]
1821
1822   def CheckPrereq(self):
1823     """Check prerequisites.
1824
1825     It checks that the given list of nodes is valid.
1826
1827     """
1828     self.nodes = _GetWantedNodes(self, self.op.nodes)
1829
1830   def Exec(self, feedback_fn):
1831     """Run a command on some nodes.
1832
1833     """
1834     # put the master at the end of the nodes list
1835     master_node = self.sstore.GetMasterNode()
1836     if master_node in self.nodes:
1837       self.nodes.remove(master_node)
1838       self.nodes.append(master_node)
1839
1840     data = []
1841     for node in self.nodes:
1842       result = self.ssh.Run(node, "root", self.op.command)
1843       data.append((node, result.output, result.exit_code))
1844
1845     return data
1846
1847
1848 class LUActivateInstanceDisks(NoHooksLU):
1849   """Bring up an instance's disks.
1850
1851   """
1852   _OP_REQP = ["instance_name"]
1853
1854   def CheckPrereq(self):
1855     """Check prerequisites.
1856
1857     This checks that the instance is in the cluster.
1858
1859     """
1860     instance = self.cfg.GetInstanceInfo(
1861       self.cfg.ExpandInstanceName(self.op.instance_name))
1862     if instance is None:
1863       raise errors.OpPrereqError("Instance '%s' not known" %
1864                                  self.op.instance_name)
1865     self.instance = instance
1866
1867
1868   def Exec(self, feedback_fn):
1869     """Activate the disks.
1870
1871     """
1872     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1873     if not disks_ok:
1874       raise errors.OpExecError("Cannot activate block devices")
1875
1876     return disks_info
1877
1878
1879 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1880   """Prepare the block devices for an instance.
1881
1882   This sets up the block devices on all nodes.
1883
1884   Args:
1885     instance: a ganeti.objects.Instance object
1886     ignore_secondaries: if true, errors on secondary nodes won't result
1887                         in an error return from the function
1888
1889   Returns:
1890     false if the operation failed
1891     list of (host, instance_visible_name, node_visible_name) if the operation
1892          suceeded with the mapping from node devices to instance devices
1893   """
1894   device_info = []
1895   disks_ok = True
1896   iname = instance.name
1897   # With the two passes mechanism we try to reduce the window of
1898   # opportunity for the race condition of switching DRBD to primary
1899   # before handshaking occured, but we do not eliminate it
1900
1901   # The proper fix would be to wait (with some limits) until the
1902   # connection has been made and drbd transitions from WFConnection
1903   # into any other network-connected state (Connected, SyncTarget,
1904   # SyncSource, etc.)
1905
1906   # 1st pass, assemble on all nodes in secondary mode
1907   for inst_disk in instance.disks:
1908     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1909       cfg.SetDiskID(node_disk, node)
1910       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1911       if not result:
1912         logger.Error("could not prepare block device %s on node %s"
1913                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1914         if not ignore_secondaries:
1915           disks_ok = False
1916
1917   # FIXME: race condition on drbd migration to primary
1918
1919   # 2nd pass, do only the primary node
1920   for inst_disk in instance.disks:
1921     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1922       if node != instance.primary_node:
1923         continue
1924       cfg.SetDiskID(node_disk, node)
1925       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1926       if not result:
1927         logger.Error("could not prepare block device %s on node %s"
1928                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1929         disks_ok = False
1930     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1931
1932   # leave the disks configured for the primary node
1933   # this is a workaround that would be fixed better by
1934   # improving the logical/physical id handling
1935   for disk in instance.disks:
1936     cfg.SetDiskID(disk, instance.primary_node)
1937
1938   return disks_ok, device_info
1939
1940
1941 def _StartInstanceDisks(cfg, instance, force):
1942   """Start the disks of an instance.
1943
1944   """
1945   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1946                                            ignore_secondaries=force)
1947   if not disks_ok:
1948     _ShutdownInstanceDisks(instance, cfg)
1949     if force is not None and not force:
1950       logger.Error("If the message above refers to a secondary node,"
1951                    " you can retry the operation using '--force'.")
1952     raise errors.OpExecError("Disk consistency error")
1953
1954
1955 class LUDeactivateInstanceDisks(NoHooksLU):
1956   """Shutdown an instance's disks.
1957
1958   """
1959   _OP_REQP = ["instance_name"]
1960
1961   def CheckPrereq(self):
1962     """Check prerequisites.
1963
1964     This checks that the instance is in the cluster.
1965
1966     """
1967     instance = self.cfg.GetInstanceInfo(
1968       self.cfg.ExpandInstanceName(self.op.instance_name))
1969     if instance is None:
1970       raise errors.OpPrereqError("Instance '%s' not known" %
1971                                  self.op.instance_name)
1972     self.instance = instance
1973
1974   def Exec(self, feedback_fn):
1975     """Deactivate the disks
1976
1977     """
1978     instance = self.instance
1979     ins_l = rpc.call_instance_list([instance.primary_node])
1980     ins_l = ins_l[instance.primary_node]
1981     if not type(ins_l) is list:
1982       raise errors.OpExecError("Can't contact node '%s'" %
1983                                instance.primary_node)
1984
1985     if self.instance.name in ins_l:
1986       raise errors.OpExecError("Instance is running, can't shutdown"
1987                                " block devices.")
1988
1989     _ShutdownInstanceDisks(instance, self.cfg)
1990
1991
1992 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1993   """Shutdown block devices of an instance.
1994
1995   This does the shutdown on all nodes of the instance.
1996
1997   If the ignore_primary is false, errors on the primary node are
1998   ignored.
1999
2000   """
2001   result = True
2002   for disk in instance.disks:
2003     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2004       cfg.SetDiskID(top_disk, node)
2005       if not rpc.call_blockdev_shutdown(node, top_disk):
2006         logger.Error("could not shutdown block device %s on node %s" %
2007                      (disk.iv_name, node))
2008         if not ignore_primary or node != instance.primary_node:
2009           result = False
2010   return result
2011
2012
2013 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2014   """Checks if a node has enough free memory.
2015
2016   This function check if a given node has the needed amount of free
2017   memory. In case the node has less memory or we cannot get the
2018   information from the node, this function raise an OpPrereqError
2019   exception.
2020
2021   Args:
2022     - cfg: a ConfigWriter instance
2023     - node: the node name
2024     - reason: string to use in the error message
2025     - requested: the amount of memory in MiB
2026
2027   """
2028   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2029   if not nodeinfo or not isinstance(nodeinfo, dict):
2030     raise errors.OpPrereqError("Could not contact node %s for resource"
2031                              " information" % (node,))
2032
2033   free_mem = nodeinfo[node].get('memory_free')
2034   if not isinstance(free_mem, int):
2035     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2036                              " was '%s'" % (node, free_mem))
2037   if requested > free_mem:
2038     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2039                              " needed %s MiB, available %s MiB" %
2040                              (node, reason, requested, free_mem))
2041
2042
2043 class LUStartupInstance(LogicalUnit):
2044   """Starts an instance.
2045
2046   """
2047   HPATH = "instance-start"
2048   HTYPE = constants.HTYPE_INSTANCE
2049   _OP_REQP = ["instance_name", "force"]
2050
2051   def BuildHooksEnv(self):
2052     """Build hooks env.
2053
2054     This runs on master, primary and secondary nodes of the instance.
2055
2056     """
2057     env = {
2058       "FORCE": self.op.force,
2059       }
2060     env.update(_BuildInstanceHookEnvByObject(self.instance))
2061     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2062           list(self.instance.secondary_nodes))
2063     return env, nl, nl
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     This checks that the instance is in the cluster.
2069
2070     """
2071     instance = self.cfg.GetInstanceInfo(
2072       self.cfg.ExpandInstanceName(self.op.instance_name))
2073     if instance is None:
2074       raise errors.OpPrereqError("Instance '%s' not known" %
2075                                  self.op.instance_name)
2076
2077     # check bridges existance
2078     _CheckInstanceBridgesExist(instance)
2079
2080     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2081                          "starting instance %s" % instance.name,
2082                          instance.memory)
2083
2084     self.instance = instance
2085     self.op.instance_name = instance.name
2086
2087   def Exec(self, feedback_fn):
2088     """Start the instance.
2089
2090     """
2091     instance = self.instance
2092     force = self.op.force
2093     extra_args = getattr(self.op, "extra_args", "")
2094
2095     self.cfg.MarkInstanceUp(instance.name)
2096
2097     node_current = instance.primary_node
2098
2099     _StartInstanceDisks(self.cfg, instance, force)
2100
2101     if not rpc.call_instance_start(node_current, instance, extra_args):
2102       _ShutdownInstanceDisks(instance, self.cfg)
2103       raise errors.OpExecError("Could not start instance")
2104
2105
2106 class LURebootInstance(LogicalUnit):
2107   """Reboot an instance.
2108
2109   """
2110   HPATH = "instance-reboot"
2111   HTYPE = constants.HTYPE_INSTANCE
2112   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2113
2114   def BuildHooksEnv(self):
2115     """Build hooks env.
2116
2117     This runs on master, primary and secondary nodes of the instance.
2118
2119     """
2120     env = {
2121       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2122       }
2123     env.update(_BuildInstanceHookEnvByObject(self.instance))
2124     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2125           list(self.instance.secondary_nodes))
2126     return env, nl, nl
2127
2128   def CheckPrereq(self):
2129     """Check prerequisites.
2130
2131     This checks that the instance is in the cluster.
2132
2133     """
2134     instance = self.cfg.GetInstanceInfo(
2135       self.cfg.ExpandInstanceName(self.op.instance_name))
2136     if instance is None:
2137       raise errors.OpPrereqError("Instance '%s' not known" %
2138                                  self.op.instance_name)
2139
2140     # check bridges existance
2141     _CheckInstanceBridgesExist(instance)
2142
2143     self.instance = instance
2144     self.op.instance_name = instance.name
2145
2146   def Exec(self, feedback_fn):
2147     """Reboot the instance.
2148
2149     """
2150     instance = self.instance
2151     ignore_secondaries = self.op.ignore_secondaries
2152     reboot_type = self.op.reboot_type
2153     extra_args = getattr(self.op, "extra_args", "")
2154
2155     node_current = instance.primary_node
2156
2157     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2158                            constants.INSTANCE_REBOOT_HARD,
2159                            constants.INSTANCE_REBOOT_FULL]:
2160       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2161                                   (constants.INSTANCE_REBOOT_SOFT,
2162                                    constants.INSTANCE_REBOOT_HARD,
2163                                    constants.INSTANCE_REBOOT_FULL))
2164
2165     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2166                        constants.INSTANCE_REBOOT_HARD]:
2167       if not rpc.call_instance_reboot(node_current, instance,
2168                                       reboot_type, extra_args):
2169         raise errors.OpExecError("Could not reboot instance")
2170     else:
2171       if not rpc.call_instance_shutdown(node_current, instance):
2172         raise errors.OpExecError("could not shutdown instance for full reboot")
2173       _ShutdownInstanceDisks(instance, self.cfg)
2174       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2175       if not rpc.call_instance_start(node_current, instance, extra_args):
2176         _ShutdownInstanceDisks(instance, self.cfg)
2177         raise errors.OpExecError("Could not start instance for full reboot")
2178
2179     self.cfg.MarkInstanceUp(instance.name)
2180
2181
2182 class LUShutdownInstance(LogicalUnit):
2183   """Shutdown an instance.
2184
2185   """
2186   HPATH = "instance-stop"
2187   HTYPE = constants.HTYPE_INSTANCE
2188   _OP_REQP = ["instance_name"]
2189
2190   def BuildHooksEnv(self):
2191     """Build hooks env.
2192
2193     This runs on master, primary and secondary nodes of the instance.
2194
2195     """
2196     env = _BuildInstanceHookEnvByObject(self.instance)
2197     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198           list(self.instance.secondary_nodes))
2199     return env, nl, nl
2200
2201   def CheckPrereq(self):
2202     """Check prerequisites.
2203
2204     This checks that the instance is in the cluster.
2205
2206     """
2207     instance = self.cfg.GetInstanceInfo(
2208       self.cfg.ExpandInstanceName(self.op.instance_name))
2209     if instance is None:
2210       raise errors.OpPrereqError("Instance '%s' not known" %
2211                                  self.op.instance_name)
2212     self.instance = instance
2213
2214   def Exec(self, feedback_fn):
2215     """Shutdown the instance.
2216
2217     """
2218     instance = self.instance
2219     node_current = instance.primary_node
2220     self.cfg.MarkInstanceDown(instance.name)
2221     if not rpc.call_instance_shutdown(node_current, instance):
2222       logger.Error("could not shutdown instance")
2223
2224     _ShutdownInstanceDisks(instance, self.cfg)
2225
2226
2227 class LUReinstallInstance(LogicalUnit):
2228   """Reinstall an instance.
2229
2230   """
2231   HPATH = "instance-reinstall"
2232   HTYPE = constants.HTYPE_INSTANCE
2233   _OP_REQP = ["instance_name"]
2234
2235   def BuildHooksEnv(self):
2236     """Build hooks env.
2237
2238     This runs on master, primary and secondary nodes of the instance.
2239
2240     """
2241     env = _BuildInstanceHookEnvByObject(self.instance)
2242     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2243           list(self.instance.secondary_nodes))
2244     return env, nl, nl
2245
2246   def CheckPrereq(self):
2247     """Check prerequisites.
2248
2249     This checks that the instance is in the cluster and is not running.
2250
2251     """
2252     instance = self.cfg.GetInstanceInfo(
2253       self.cfg.ExpandInstanceName(self.op.instance_name))
2254     if instance is None:
2255       raise errors.OpPrereqError("Instance '%s' not known" %
2256                                  self.op.instance_name)
2257     if instance.disk_template == constants.DT_DISKLESS:
2258       raise errors.OpPrereqError("Instance '%s' has no disks" %
2259                                  self.op.instance_name)
2260     if instance.status != "down":
2261       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2262                                  self.op.instance_name)
2263     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2264     if remote_info:
2265       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2266                                  (self.op.instance_name,
2267                                   instance.primary_node))
2268
2269     self.op.os_type = getattr(self.op, "os_type", None)
2270     if self.op.os_type is not None:
2271       # OS verification
2272       pnode = self.cfg.GetNodeInfo(
2273         self.cfg.ExpandNodeName(instance.primary_node))
2274       if pnode is None:
2275         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2276                                    self.op.pnode)
2277       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2278       if not os_obj:
2279         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2280                                    " primary node"  % self.op.os_type)
2281
2282     self.instance = instance
2283
2284   def Exec(self, feedback_fn):
2285     """Reinstall the instance.
2286
2287     """
2288     inst = self.instance
2289
2290     if self.op.os_type is not None:
2291       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2292       inst.os = self.op.os_type
2293       self.cfg.AddInstance(inst)
2294
2295     _StartInstanceDisks(self.cfg, inst, None)
2296     try:
2297       feedback_fn("Running the instance OS create scripts...")
2298       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2299         raise errors.OpExecError("Could not install OS for instance %s"
2300                                  " on node %s" %
2301                                  (inst.name, inst.primary_node))
2302     finally:
2303       _ShutdownInstanceDisks(inst, self.cfg)
2304
2305
2306 class LURenameInstance(LogicalUnit):
2307   """Rename an instance.
2308
2309   """
2310   HPATH = "instance-rename"
2311   HTYPE = constants.HTYPE_INSTANCE
2312   _OP_REQP = ["instance_name", "new_name"]
2313
2314   def BuildHooksEnv(self):
2315     """Build hooks env.
2316
2317     This runs on master, primary and secondary nodes of the instance.
2318
2319     """
2320     env = _BuildInstanceHookEnvByObject(self.instance)
2321     env["INSTANCE_NEW_NAME"] = self.op.new_name
2322     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2323           list(self.instance.secondary_nodes))
2324     return env, nl, nl
2325
2326   def CheckPrereq(self):
2327     """Check prerequisites.
2328
2329     This checks that the instance is in the cluster and is not running.
2330
2331     """
2332     instance = self.cfg.GetInstanceInfo(
2333       self.cfg.ExpandInstanceName(self.op.instance_name))
2334     if instance is None:
2335       raise errors.OpPrereqError("Instance '%s' not known" %
2336                                  self.op.instance_name)
2337     if instance.status != "down":
2338       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2339                                  self.op.instance_name)
2340     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2341     if remote_info:
2342       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2343                                  (self.op.instance_name,
2344                                   instance.primary_node))
2345     self.instance = instance
2346
2347     # new name verification
2348     name_info = utils.HostInfo(self.op.new_name)
2349
2350     self.op.new_name = new_name = name_info.name
2351     instance_list = self.cfg.GetInstanceList()
2352     if new_name in instance_list:
2353       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2354                                  new_name)
2355
2356     if not getattr(self.op, "ignore_ip", False):
2357       command = ["fping", "-q", name_info.ip]
2358       result = utils.RunCmd(command)
2359       if not result.failed:
2360         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2361                                    (name_info.ip, new_name))
2362
2363
2364   def Exec(self, feedback_fn):
2365     """Reinstall the instance.
2366
2367     """
2368     inst = self.instance
2369     old_name = inst.name
2370
2371     if inst.disk_template == constants.DT_FILE:
2372       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373
2374     self.cfg.RenameInstance(inst.name, self.op.new_name)
2375
2376     # re-read the instance from the configuration after rename
2377     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2378
2379     if inst.disk_template == constants.DT_FILE:
2380       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2381       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2382                                                 old_file_storage_dir,
2383                                                 new_file_storage_dir)
2384
2385       if not result:
2386         raise errors.OpExecError("Could not connect to node '%s' to rename"
2387                                  " directory '%s' to '%s' (but the instance"
2388                                  " has been renamed in Ganeti)" % (
2389                                  inst.primary_node, old_file_storage_dir,
2390                                  new_file_storage_dir))
2391
2392       if not result[0]:
2393         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2394                                  " (but the instance has been renamed in"
2395                                  " Ganeti)" % (old_file_storage_dir,
2396                                                new_file_storage_dir))
2397
2398     _StartInstanceDisks(self.cfg, inst, None)
2399     try:
2400       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2401                                           "sda", "sdb"):
2402         msg = ("Could run OS rename script for instance %s on node %s (but the"
2403                " instance has been renamed in Ganeti)" %
2404                (inst.name, inst.primary_node))
2405         logger.Error(msg)
2406     finally:
2407       _ShutdownInstanceDisks(inst, self.cfg)
2408
2409
2410 class LURemoveInstance(LogicalUnit):
2411   """Remove an instance.
2412
2413   """
2414   HPATH = "instance-remove"
2415   HTYPE = constants.HTYPE_INSTANCE
2416   _OP_REQP = ["instance_name", "ignore_failures"]
2417
2418   def BuildHooksEnv(self):
2419     """Build hooks env.
2420
2421     This runs on master, primary and secondary nodes of the instance.
2422
2423     """
2424     env = _BuildInstanceHookEnvByObject(self.instance)
2425     nl = [self.sstore.GetMasterNode()]
2426     return env, nl, nl
2427
2428   def CheckPrereq(self):
2429     """Check prerequisites.
2430
2431     This checks that the instance is in the cluster.
2432
2433     """
2434     instance = self.cfg.GetInstanceInfo(
2435       self.cfg.ExpandInstanceName(self.op.instance_name))
2436     if instance is None:
2437       raise errors.OpPrereqError("Instance '%s' not known" %
2438                                  self.op.instance_name)
2439     self.instance = instance
2440
2441   def Exec(self, feedback_fn):
2442     """Remove the instance.
2443
2444     """
2445     instance = self.instance
2446     logger.Info("shutting down instance %s on node %s" %
2447                 (instance.name, instance.primary_node))
2448
2449     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2450       if self.op.ignore_failures:
2451         feedback_fn("Warning: can't shutdown instance")
2452       else:
2453         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2454                                  (instance.name, instance.primary_node))
2455
2456     logger.Info("removing block devices for instance %s" % instance.name)
2457
2458     if not _RemoveDisks(instance, self.cfg):
2459       if self.op.ignore_failures:
2460         feedback_fn("Warning: can't remove instance's disks")
2461       else:
2462         raise errors.OpExecError("Can't remove instance's disks")
2463
2464     logger.Info("removing instance %s out of cluster config" % instance.name)
2465
2466     self.cfg.RemoveInstance(instance.name)
2467
2468
2469 class LUQueryInstances(NoHooksLU):
2470   """Logical unit for querying instances.
2471
2472   """
2473   _OP_REQP = ["output_fields", "names"]
2474
2475   def CheckPrereq(self):
2476     """Check prerequisites.
2477
2478     This checks that the fields required are valid output fields.
2479
2480     """
2481     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2482     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2483                                "admin_state", "admin_ram",
2484                                "disk_template", "ip", "mac", "bridge",
2485                                "sda_size", "sdb_size", "vcpus"],
2486                        dynamic=self.dynamic_fields,
2487                        selected=self.op.output_fields)
2488
2489     self.wanted = _GetWantedInstances(self, self.op.names)
2490
2491   def Exec(self, feedback_fn):
2492     """Computes the list of nodes and their attributes.
2493
2494     """
2495     instance_names = self.wanted
2496     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2497                      in instance_names]
2498
2499     # begin data gathering
2500
2501     nodes = frozenset([inst.primary_node for inst in instance_list])
2502
2503     bad_nodes = []
2504     if self.dynamic_fields.intersection(self.op.output_fields):
2505       live_data = {}
2506       node_data = rpc.call_all_instances_info(nodes)
2507       for name in nodes:
2508         result = node_data[name]
2509         if result:
2510           live_data.update(result)
2511         elif result == False:
2512           bad_nodes.append(name)
2513         # else no instance is alive
2514     else:
2515       live_data = dict([(name, {}) for name in instance_names])
2516
2517     # end data gathering
2518
2519     output = []
2520     for instance in instance_list:
2521       iout = []
2522       for field in self.op.output_fields:
2523         if field == "name":
2524           val = instance.name
2525         elif field == "os":
2526           val = instance.os
2527         elif field == "pnode":
2528           val = instance.primary_node
2529         elif field == "snodes":
2530           val = list(instance.secondary_nodes)
2531         elif field == "admin_state":
2532           val = (instance.status != "down")
2533         elif field == "oper_state":
2534           if instance.primary_node in bad_nodes:
2535             val = None
2536           else:
2537             val = bool(live_data.get(instance.name))
2538         elif field == "status":
2539           if instance.primary_node in bad_nodes:
2540             val = "ERROR_nodedown"
2541           else:
2542             running = bool(live_data.get(instance.name))
2543             if running:
2544               if instance.status != "down":
2545                 val = "running"
2546               else:
2547                 val = "ERROR_up"
2548             else:
2549               if instance.status != "down":
2550                 val = "ERROR_down"
2551               else:
2552                 val = "ADMIN_down"
2553         elif field == "admin_ram":
2554           val = instance.memory
2555         elif field == "oper_ram":
2556           if instance.primary_node in bad_nodes:
2557             val = None
2558           elif instance.name in live_data:
2559             val = live_data[instance.name].get("memory", "?")
2560           else:
2561             val = "-"
2562         elif field == "disk_template":
2563           val = instance.disk_template
2564         elif field == "ip":
2565           val = instance.nics[0].ip
2566         elif field == "bridge":
2567           val = instance.nics[0].bridge
2568         elif field == "mac":
2569           val = instance.nics[0].mac
2570         elif field == "sda_size" or field == "sdb_size":
2571           disk = instance.FindDisk(field[:3])
2572           if disk is None:
2573             val = None
2574           else:
2575             val = disk.size
2576         elif field == "vcpus":
2577           val = instance.vcpus
2578         else:
2579           raise errors.ParameterError(field)
2580         iout.append(val)
2581       output.append(iout)
2582
2583     return output
2584
2585
2586 class LUFailoverInstance(LogicalUnit):
2587   """Failover an instance.
2588
2589   """
2590   HPATH = "instance-failover"
2591   HTYPE = constants.HTYPE_INSTANCE
2592   _OP_REQP = ["instance_name", "ignore_consistency"]
2593
2594   def BuildHooksEnv(self):
2595     """Build hooks env.
2596
2597     This runs on master, primary and secondary nodes of the instance.
2598
2599     """
2600     env = {
2601       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2602       }
2603     env.update(_BuildInstanceHookEnvByObject(self.instance))
2604     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2605     return env, nl, nl
2606
2607   def CheckPrereq(self):
2608     """Check prerequisites.
2609
2610     This checks that the instance is in the cluster.
2611
2612     """
2613     instance = self.cfg.GetInstanceInfo(
2614       self.cfg.ExpandInstanceName(self.op.instance_name))
2615     if instance is None:
2616       raise errors.OpPrereqError("Instance '%s' not known" %
2617                                  self.op.instance_name)
2618
2619     if instance.disk_template not in constants.DTS_NET_MIRROR:
2620       raise errors.OpPrereqError("Instance's disk layout is not"
2621                                  " network mirrored, cannot failover.")
2622
2623     secondary_nodes = instance.secondary_nodes
2624     if not secondary_nodes:
2625       raise errors.ProgrammerError("no secondary node but using "
2626                                    "a mirrored disk template")
2627
2628     target_node = secondary_nodes[0]
2629     # check memory requirements on the secondary node
2630     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2631                          instance.name, instance.memory)
2632
2633     # check bridge existance
2634     brlist = [nic.bridge for nic in instance.nics]
2635     if not rpc.call_bridges_exist(target_node, brlist):
2636       raise errors.OpPrereqError("One or more target bridges %s does not"
2637                                  " exist on destination node '%s'" %
2638                                  (brlist, target_node))
2639
2640     self.instance = instance
2641
2642   def Exec(self, feedback_fn):
2643     """Failover an instance.
2644
2645     The failover is done by shutting it down on its present node and
2646     starting it on the secondary.
2647
2648     """
2649     instance = self.instance
2650
2651     source_node = instance.primary_node
2652     target_node = instance.secondary_nodes[0]
2653
2654     feedback_fn("* checking disk consistency between source and target")
2655     for dev in instance.disks:
2656       # for drbd, these are drbd over lvm
2657       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2658         if instance.status == "up" and not self.op.ignore_consistency:
2659           raise errors.OpExecError("Disk %s is degraded on target node,"
2660                                    " aborting failover." % dev.iv_name)
2661
2662     feedback_fn("* shutting down instance on source node")
2663     logger.Info("Shutting down instance %s on node %s" %
2664                 (instance.name, source_node))
2665
2666     if not rpc.call_instance_shutdown(source_node, instance):
2667       if self.op.ignore_consistency:
2668         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2669                      " anyway. Please make sure node %s is down"  %
2670                      (instance.name, source_node, source_node))
2671       else:
2672         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2673                                  (instance.name, source_node))
2674
2675     feedback_fn("* deactivating the instance's disks on source node")
2676     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2677       raise errors.OpExecError("Can't shut down the instance's disks.")
2678
2679     instance.primary_node = target_node
2680     # distribute new instance config to the other nodes
2681     self.cfg.AddInstance(instance)
2682
2683     # Only start the instance if it's marked as up
2684     if instance.status == "up":
2685       feedback_fn("* activating the instance's disks on target node")
2686       logger.Info("Starting instance %s on node %s" %
2687                   (instance.name, target_node))
2688
2689       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2690                                                ignore_secondaries=True)
2691       if not disks_ok:
2692         _ShutdownInstanceDisks(instance, self.cfg)
2693         raise errors.OpExecError("Can't activate the instance's disks")
2694
2695       feedback_fn("* starting the instance on the target node")
2696       if not rpc.call_instance_start(target_node, instance, None):
2697         _ShutdownInstanceDisks(instance, self.cfg)
2698         raise errors.OpExecError("Could not start instance %s on node %s." %
2699                                  (instance.name, target_node))
2700
2701
2702 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2703   """Create a tree of block devices on the primary node.
2704
2705   This always creates all devices.
2706
2707   """
2708   if device.children:
2709     for child in device.children:
2710       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2711         return False
2712
2713   cfg.SetDiskID(device, node)
2714   new_id = rpc.call_blockdev_create(node, device, device.size,
2715                                     instance.name, True, info)
2716   if not new_id:
2717     return False
2718   if device.physical_id is None:
2719     device.physical_id = new_id
2720   return True
2721
2722
2723 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2724   """Create a tree of block devices on a secondary node.
2725
2726   If this device type has to be created on secondaries, create it and
2727   all its children.
2728
2729   If not, just recurse to children keeping the same 'force' value.
2730
2731   """
2732   if device.CreateOnSecondary():
2733     force = True
2734   if device.children:
2735     for child in device.children:
2736       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2737                                         child, force, info):
2738         return False
2739
2740   if not force:
2741     return True
2742   cfg.SetDiskID(device, node)
2743   new_id = rpc.call_blockdev_create(node, device, device.size,
2744                                     instance.name, False, info)
2745   if not new_id:
2746     return False
2747   if device.physical_id is None:
2748     device.physical_id = new_id
2749   return True
2750
2751
2752 def _GenerateUniqueNames(cfg, exts):
2753   """Generate a suitable LV name.
2754
2755   This will generate a logical volume name for the given instance.
2756
2757   """
2758   results = []
2759   for val in exts:
2760     new_id = cfg.GenerateUniqueID()
2761     results.append("%s%s" % (new_id, val))
2762   return results
2763
2764
2765 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2766   """Generate a drbd device complete with its children.
2767
2768   """
2769   port = cfg.AllocatePort()
2770   vgname = cfg.GetVGName()
2771   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2772                           logical_id=(vgname, names[0]))
2773   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2774                           logical_id=(vgname, names[1]))
2775   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2776                           logical_id = (primary, secondary, port),
2777                           children = [dev_data, dev_meta])
2778   return drbd_dev
2779
2780
2781 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2782   """Generate a drbd8 device complete with its children.
2783
2784   """
2785   port = cfg.AllocatePort()
2786   vgname = cfg.GetVGName()
2787   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2788                           logical_id=(vgname, names[0]))
2789   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2790                           logical_id=(vgname, names[1]))
2791   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2792                           logical_id = (primary, secondary, port),
2793                           children = [dev_data, dev_meta],
2794                           iv_name=iv_name)
2795   return drbd_dev
2796
2797
2798 def _GenerateDiskTemplate(cfg, template_name,
2799                           instance_name, primary_node,
2800                           secondary_nodes, disk_sz, swap_sz,
2801                           file_storage_dir, file_driver):
2802   """Generate the entire disk layout for a given template type.
2803
2804   """
2805   #TODO: compute space requirements
2806
2807   vgname = cfg.GetVGName()
2808   if template_name == constants.DT_DISKLESS:
2809     disks = []
2810   elif template_name == constants.DT_PLAIN:
2811     if len(secondary_nodes) != 0:
2812       raise errors.ProgrammerError("Wrong template configuration")
2813
2814     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2815     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2816                            logical_id=(vgname, names[0]),
2817                            iv_name = "sda")
2818     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2819                            logical_id=(vgname, names[1]),
2820                            iv_name = "sdb")
2821     disks = [sda_dev, sdb_dev]
2822   elif template_name == constants.DT_DRBD8:
2823     if len(secondary_nodes) != 1:
2824       raise errors.ProgrammerError("Wrong template configuration")
2825     remote_node = secondary_nodes[0]
2826     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2827                                        ".sdb_data", ".sdb_meta"])
2828     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2829                                          disk_sz, names[0:2], "sda")
2830     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2831                                          swap_sz, names[2:4], "sdb")
2832     disks = [drbd_sda_dev, drbd_sdb_dev]
2833   elif template_name == constants.DT_FILE:
2834     if len(secondary_nodes) != 0:
2835       raise errors.ProgrammerError("Wrong template configuration")
2836
2837     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2838                                 iv_name="sda", logical_id=(file_driver,
2839                                 "%s/sda" % file_storage_dir))
2840     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2841                                 iv_name="sdb", logical_id=(file_driver,
2842                                 "%s/sdb" % file_storage_dir))
2843     disks = [file_sda_dev, file_sdb_dev]
2844   else:
2845     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2846   return disks
2847
2848
2849 def _GetInstanceInfoText(instance):
2850   """Compute that text that should be added to the disk's metadata.
2851
2852   """
2853   return "originstname+%s" % instance.name
2854
2855
2856 def _CreateDisks(cfg, instance):
2857   """Create all disks for an instance.
2858
2859   This abstracts away some work from AddInstance.
2860
2861   Args:
2862     instance: the instance object
2863
2864   Returns:
2865     True or False showing the success of the creation process
2866
2867   """
2868   info = _GetInstanceInfoText(instance)
2869
2870   if instance.disk_template == constants.DT_FILE:
2871     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2872     result = rpc.call_file_storage_dir_create(instance.primary_node,
2873                                               file_storage_dir)
2874
2875     if not result:
2876       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2877       return False
2878
2879     if not result[0]:
2880       logger.Error("failed to create directory '%s'" % file_storage_dir)
2881       return False
2882
2883   for device in instance.disks:
2884     logger.Info("creating volume %s for instance %s" %
2885                 (device.iv_name, instance.name))
2886     #HARDCODE
2887     for secondary_node in instance.secondary_nodes:
2888       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2889                                         device, False, info):
2890         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2891                      (device.iv_name, device, secondary_node))
2892         return False
2893     #HARDCODE
2894     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2895                                     instance, device, info):
2896       logger.Error("failed to create volume %s on primary!" %
2897                    device.iv_name)
2898       return False
2899
2900   return True
2901
2902
2903 def _RemoveDisks(instance, cfg):
2904   """Remove all disks for an instance.
2905
2906   This abstracts away some work from `AddInstance()` and
2907   `RemoveInstance()`. Note that in case some of the devices couldn't
2908   be removed, the removal will continue with the other ones (compare
2909   with `_CreateDisks()`).
2910
2911   Args:
2912     instance: the instance object
2913
2914   Returns:
2915     True or False showing the success of the removal proces
2916
2917   """
2918   logger.Info("removing block devices for instance %s" % instance.name)
2919
2920   result = True
2921   for device in instance.disks:
2922     for node, disk in device.ComputeNodeTree(instance.primary_node):
2923       cfg.SetDiskID(disk, node)
2924       if not rpc.call_blockdev_remove(node, disk):
2925         logger.Error("could not remove block device %s on node %s,"
2926                      " continuing anyway" %
2927                      (device.iv_name, node))
2928         result = False
2929
2930   if instance.disk_template == constants.DT_FILE:
2931     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2932     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2933                                             file_storage_dir):
2934       logger.Error("could not remove directory '%s'" % file_storage_dir)
2935       result = False
2936
2937   return result
2938
2939
2940 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2941   """Compute disk size requirements in the volume group
2942
2943   This is currently hard-coded for the two-drive layout.
2944
2945   """
2946   # Required free disk space as a function of disk and swap space
2947   req_size_dict = {
2948     constants.DT_DISKLESS: None,
2949     constants.DT_PLAIN: disk_size + swap_size,
2950     # 256 MB are added for drbd metadata, 128MB for each drbd device
2951     constants.DT_DRBD8: disk_size + swap_size + 256,
2952     constants.DT_FILE: None,
2953   }
2954
2955   if disk_template not in req_size_dict:
2956     raise errors.ProgrammerError("Disk template '%s' size requirement"
2957                                  " is unknown" %  disk_template)
2958
2959   return req_size_dict[disk_template]
2960
2961
2962 class LUCreateInstance(LogicalUnit):
2963   """Create an instance.
2964
2965   """
2966   HPATH = "instance-add"
2967   HTYPE = constants.HTYPE_INSTANCE
2968   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2969               "disk_template", "swap_size", "mode", "start", "vcpus",
2970               "wait_for_sync", "ip_check", "mac"]
2971
2972   def _RunAllocator(self):
2973     """Run the allocator based on input opcode.
2974
2975     """
2976     disks = [{"size": self.op.disk_size, "mode": "w"},
2977              {"size": self.op.swap_size, "mode": "w"}]
2978     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2979              "bridge": self.op.bridge}]
2980     ial = IAllocator(self.cfg, self.sstore,
2981                      mode=constants.IALLOCATOR_MODE_ALLOC,
2982                      name=self.op.instance_name,
2983                      disk_template=self.op.disk_template,
2984                      tags=[],
2985                      os=self.op.os_type,
2986                      vcpus=self.op.vcpus,
2987                      mem_size=self.op.mem_size,
2988                      disks=disks,
2989                      nics=nics,
2990                      )
2991
2992     ial.Run(self.op.iallocator)
2993
2994     if not ial.success:
2995       raise errors.OpPrereqError("Can't compute nodes using"
2996                                  " iallocator '%s': %s" % (self.op.iallocator,
2997                                                            ial.info))
2998     if len(ial.nodes) != ial.required_nodes:
2999       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3000                                  " of nodes (%s), required %s" %
3001                                  (len(ial.nodes), ial.required_nodes))
3002     self.op.pnode = ial.nodes[0]
3003     logger.ToStdout("Selected nodes for the instance: %s" %
3004                     (", ".join(ial.nodes),))
3005     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3006                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3007     if ial.required_nodes == 2:
3008       self.op.snode = ial.nodes[1]
3009
3010   def BuildHooksEnv(self):
3011     """Build hooks env.
3012
3013     This runs on master, primary and secondary nodes of the instance.
3014
3015     """
3016     env = {
3017       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3018       "INSTANCE_DISK_SIZE": self.op.disk_size,
3019       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3020       "INSTANCE_ADD_MODE": self.op.mode,
3021       }
3022     if self.op.mode == constants.INSTANCE_IMPORT:
3023       env["INSTANCE_SRC_NODE"] = self.op.src_node
3024       env["INSTANCE_SRC_PATH"] = self.op.src_path
3025       env["INSTANCE_SRC_IMAGE"] = self.src_image
3026
3027     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3028       primary_node=self.op.pnode,
3029       secondary_nodes=self.secondaries,
3030       status=self.instance_status,
3031       os_type=self.op.os_type,
3032       memory=self.op.mem_size,
3033       vcpus=self.op.vcpus,
3034       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3035     ))
3036
3037     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3038           self.secondaries)
3039     return env, nl, nl
3040
3041
3042   def CheckPrereq(self):
3043     """Check prerequisites.
3044
3045     """
3046     # set optional parameters to none if they don't exist
3047     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3048                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3049                  "vnc_bind_address"]:
3050       if not hasattr(self.op, attr):
3051         setattr(self.op, attr, None)
3052
3053     if self.op.mode not in (constants.INSTANCE_CREATE,
3054                             constants.INSTANCE_IMPORT):
3055       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3056                                  self.op.mode)
3057
3058     if (not self.cfg.GetVGName() and
3059         self.op.disk_template not in constants.DTS_NOT_LVM):
3060       raise errors.OpPrereqError("Cluster does not support lvm-based"
3061                                  " instances")
3062
3063     if self.op.mode == constants.INSTANCE_IMPORT:
3064       src_node = getattr(self.op, "src_node", None)
3065       src_path = getattr(self.op, "src_path", None)
3066       if src_node is None or src_path is None:
3067         raise errors.OpPrereqError("Importing an instance requires source"
3068                                    " node and path options")
3069       src_node_full = self.cfg.ExpandNodeName(src_node)
3070       if src_node_full is None:
3071         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3072       self.op.src_node = src_node = src_node_full
3073
3074       if not os.path.isabs(src_path):
3075         raise errors.OpPrereqError("The source path must be absolute")
3076
3077       export_info = rpc.call_export_info(src_node, src_path)
3078
3079       if not export_info:
3080         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3081
3082       if not export_info.has_section(constants.INISECT_EXP):
3083         raise errors.ProgrammerError("Corrupted export config")
3084
3085       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3086       if (int(ei_version) != constants.EXPORT_VERSION):
3087         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3088                                    (ei_version, constants.EXPORT_VERSION))
3089
3090       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3091         raise errors.OpPrereqError("Can't import instance with more than"
3092                                    " one data disk")
3093
3094       # FIXME: are the old os-es, disk sizes, etc. useful?
3095       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3096       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3097                                                          'disk0_dump'))
3098       self.src_image = diskimage
3099     else: # INSTANCE_CREATE
3100       if getattr(self.op, "os_type", None) is None:
3101         raise errors.OpPrereqError("No guest OS specified")
3102
3103     #### instance parameters check
3104
3105     # disk template and mirror node verification
3106     if self.op.disk_template not in constants.DISK_TEMPLATES:
3107       raise errors.OpPrereqError("Invalid disk template name")
3108
3109     # instance name verification
3110     hostname1 = utils.HostInfo(self.op.instance_name)
3111
3112     self.op.instance_name = instance_name = hostname1.name
3113     instance_list = self.cfg.GetInstanceList()
3114     if instance_name in instance_list:
3115       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3116                                  instance_name)
3117
3118     # ip validity checks
3119     ip = getattr(self.op, "ip", None)
3120     if ip is None or ip.lower() == "none":
3121       inst_ip = None
3122     elif ip.lower() == "auto":
3123       inst_ip = hostname1.ip
3124     else:
3125       if not utils.IsValidIP(ip):
3126         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3127                                    " like a valid IP" % ip)
3128       inst_ip = ip
3129     self.inst_ip = self.op.ip = inst_ip
3130
3131     if self.op.start and not self.op.ip_check:
3132       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3133                                  " adding an instance in start mode")
3134
3135     if self.op.ip_check:
3136       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3137         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3138                                    (hostname1.ip, instance_name))
3139
3140     # MAC address verification
3141     if self.op.mac != "auto":
3142       if not utils.IsValidMac(self.op.mac.lower()):
3143         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3144                                    self.op.mac)
3145
3146     # bridge verification
3147     bridge = getattr(self.op, "bridge", None)
3148     if bridge is None:
3149       self.op.bridge = self.cfg.GetDefBridge()
3150     else:
3151       self.op.bridge = bridge
3152
3153     # boot order verification
3154     if self.op.hvm_boot_order is not None:
3155       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3156         raise errors.OpPrereqError("invalid boot order specified,"
3157                                    " must be one or more of [acdn]")
3158     # file storage checks
3159     if (self.op.file_driver and
3160         not self.op.file_driver in constants.FILE_DRIVER):
3161       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3162                                  self.op.file_driver)
3163
3164     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3165       raise errors.OpPrereqError("File storage directory not a relative"
3166                                  " path")
3167     #### allocator run
3168
3169     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3170       raise errors.OpPrereqError("One and only one of iallocator and primary"
3171                                  " node must be given")
3172
3173     if self.op.iallocator is not None:
3174       self._RunAllocator()
3175
3176     #### node related checks
3177
3178     # check primary node
3179     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3180     if pnode is None:
3181       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3182                                  self.op.pnode)
3183     self.op.pnode = pnode.name
3184     self.pnode = pnode
3185     self.secondaries = []
3186
3187     # mirror node verification
3188     if self.op.disk_template in constants.DTS_NET_MIRROR:
3189       if getattr(self.op, "snode", None) is None:
3190         raise errors.OpPrereqError("The networked disk templates need"
3191                                    " a mirror node")
3192
3193       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3194       if snode_name is None:
3195         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3196                                    self.op.snode)
3197       elif snode_name == pnode.name:
3198         raise errors.OpPrereqError("The secondary node cannot be"
3199                                    " the primary node.")
3200       self.secondaries.append(snode_name)
3201
3202     req_size = _ComputeDiskSize(self.op.disk_template,
3203                                 self.op.disk_size, self.op.swap_size)
3204
3205     # Check lv size requirements
3206     if req_size is not None:
3207       nodenames = [pnode.name] + self.secondaries
3208       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3209       for node in nodenames:
3210         info = nodeinfo.get(node, None)
3211         if not info:
3212           raise errors.OpPrereqError("Cannot get current information"
3213                                      " from node '%s'" % nodeinfo)
3214         vg_free = info.get('vg_free', None)
3215         if not isinstance(vg_free, int):
3216           raise errors.OpPrereqError("Can't compute free disk space on"
3217                                      " node %s" % node)
3218         if req_size > info['vg_free']:
3219           raise errors.OpPrereqError("Not enough disk space on target node %s."
3220                                      " %d MB available, %d MB required" %
3221                                      (node, info['vg_free'], req_size))
3222
3223     # os verification
3224     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3225     if not os_obj:
3226       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3227                                  " primary node"  % self.op.os_type)
3228
3229     if self.op.kernel_path == constants.VALUE_NONE:
3230       raise errors.OpPrereqError("Can't set instance kernel to none")
3231
3232
3233     # bridge check on primary node
3234     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3235       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3236                                  " destination node '%s'" %
3237                                  (self.op.bridge, pnode.name))
3238
3239     # memory check on primary node
3240     if self.op.start:
3241       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3242                            "creating instance %s" % self.op.instance_name,
3243                            self.op.mem_size)
3244
3245     # hvm_cdrom_image_path verification
3246     if self.op.hvm_cdrom_image_path is not None:
3247       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3248         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3249                                    " be an absolute path or None, not %s" %
3250                                    self.op.hvm_cdrom_image_path)
3251       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3252         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3253                                    " regular file or a symlink pointing to"
3254                                    " an existing regular file, not %s" %
3255                                    self.op.hvm_cdrom_image_path)
3256
3257     # vnc_bind_address verification
3258     if self.op.vnc_bind_address is not None:
3259       if not utils.IsValidIP(self.op.vnc_bind_address):
3260         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3261                                    " like a valid IP address" %
3262                                    self.op.vnc_bind_address)
3263
3264     if self.op.start:
3265       self.instance_status = 'up'
3266     else:
3267       self.instance_status = 'down'
3268
3269   def Exec(self, feedback_fn):
3270     """Create and add the instance to the cluster.
3271
3272     """
3273     instance = self.op.instance_name
3274     pnode_name = self.pnode.name
3275
3276     if self.op.mac == "auto":
3277       mac_address = self.cfg.GenerateMAC()
3278     else:
3279       mac_address = self.op.mac
3280
3281     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3282     if self.inst_ip is not None:
3283       nic.ip = self.inst_ip
3284
3285     ht_kind = self.sstore.GetHypervisorType()
3286     if ht_kind in constants.HTS_REQ_PORT:
3287       network_port = self.cfg.AllocatePort()
3288     else:
3289       network_port = None
3290
3291     if self.op.vnc_bind_address is None:
3292       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3293
3294     # this is needed because os.path.join does not accept None arguments
3295     if self.op.file_storage_dir is None:
3296       string_file_storage_dir = ""
3297     else:
3298       string_file_storage_dir = self.op.file_storage_dir
3299
3300     # build the full file storage dir path
3301     file_storage_dir = os.path.normpath(os.path.join(
3302                                         self.sstore.GetFileStorageDir(),
3303                                         string_file_storage_dir, instance))
3304
3305
3306     disks = _GenerateDiskTemplate(self.cfg,
3307                                   self.op.disk_template,
3308                                   instance, pnode_name,
3309                                   self.secondaries, self.op.disk_size,
3310                                   self.op.swap_size,
3311                                   file_storage_dir,
3312                                   self.op.file_driver)
3313
3314     iobj = objects.Instance(name=instance, os=self.op.os_type,
3315                             primary_node=pnode_name,
3316                             memory=self.op.mem_size,
3317                             vcpus=self.op.vcpus,
3318                             nics=[nic], disks=disks,
3319                             disk_template=self.op.disk_template,
3320                             status=self.instance_status,
3321                             network_port=network_port,
3322                             kernel_path=self.op.kernel_path,
3323                             initrd_path=self.op.initrd_path,
3324                             hvm_boot_order=self.op.hvm_boot_order,
3325                             hvm_acpi=self.op.hvm_acpi,
3326                             hvm_pae=self.op.hvm_pae,
3327                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3328                             vnc_bind_address=self.op.vnc_bind_address,
3329                             )
3330
3331     feedback_fn("* creating instance disks...")
3332     if not _CreateDisks(self.cfg, iobj):
3333       _RemoveDisks(iobj, self.cfg)
3334       raise errors.OpExecError("Device creation failed, reverting...")
3335
3336     feedback_fn("adding instance %s to cluster config" % instance)
3337
3338     self.cfg.AddInstance(iobj)
3339
3340     if self.op.wait_for_sync:
3341       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3342     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3343       # make sure the disks are not degraded (still sync-ing is ok)
3344       time.sleep(15)
3345       feedback_fn("* checking mirrors status")
3346       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3347     else:
3348       disk_abort = False
3349
3350     if disk_abort:
3351       _RemoveDisks(iobj, self.cfg)
3352       self.cfg.RemoveInstance(iobj.name)
3353       raise errors.OpExecError("There are some degraded disks for"
3354                                " this instance")
3355
3356     feedback_fn("creating os for instance %s on node %s" %
3357                 (instance, pnode_name))
3358
3359     if iobj.disk_template != constants.DT_DISKLESS:
3360       if self.op.mode == constants.INSTANCE_CREATE:
3361         feedback_fn("* running the instance OS create scripts...")
3362         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3363           raise errors.OpExecError("could not add os for instance %s"
3364                                    " on node %s" %
3365                                    (instance, pnode_name))
3366
3367       elif self.op.mode == constants.INSTANCE_IMPORT:
3368         feedback_fn("* running the instance OS import scripts...")
3369         src_node = self.op.src_node
3370         src_image = self.src_image
3371         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3372                                                 src_node, src_image):
3373           raise errors.OpExecError("Could not import os for instance"
3374                                    " %s on node %s" %
3375                                    (instance, pnode_name))
3376       else:
3377         # also checked in the prereq part
3378         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3379                                      % self.op.mode)
3380
3381     if self.op.start:
3382       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3383       feedback_fn("* starting instance...")
3384       if not rpc.call_instance_start(pnode_name, iobj, None):
3385         raise errors.OpExecError("Could not start instance")
3386
3387
3388 class LUConnectConsole(NoHooksLU):
3389   """Connect to an instance's console.
3390
3391   This is somewhat special in that it returns the command line that
3392   you need to run on the master node in order to connect to the
3393   console.
3394
3395   """
3396   _OP_REQP = ["instance_name"]
3397
3398   def CheckPrereq(self):
3399     """Check prerequisites.
3400
3401     This checks that the instance is in the cluster.
3402
3403     """
3404     instance = self.cfg.GetInstanceInfo(
3405       self.cfg.ExpandInstanceName(self.op.instance_name))
3406     if instance is None:
3407       raise errors.OpPrereqError("Instance '%s' not known" %
3408                                  self.op.instance_name)
3409     self.instance = instance
3410
3411   def Exec(self, feedback_fn):
3412     """Connect to the console of an instance
3413
3414     """
3415     instance = self.instance
3416     node = instance.primary_node
3417
3418     node_insts = rpc.call_instance_list([node])[node]
3419     if node_insts is False:
3420       raise errors.OpExecError("Can't connect to node %s." % node)
3421
3422     if instance.name not in node_insts:
3423       raise errors.OpExecError("Instance %s is not running." % instance.name)
3424
3425     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3426
3427     hyper = hypervisor.GetHypervisor()
3428     console_cmd = hyper.GetShellCommandForConsole(instance)
3429
3430     # build ssh cmdline
3431     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3432
3433
3434 class LUReplaceDisks(LogicalUnit):
3435   """Replace the disks of an instance.
3436
3437   """
3438   HPATH = "mirrors-replace"
3439   HTYPE = constants.HTYPE_INSTANCE
3440   _OP_REQP = ["instance_name", "mode", "disks"]
3441
3442   def _RunAllocator(self):
3443     """Compute a new secondary node using an IAllocator.
3444
3445     """
3446     ial = IAllocator(self.cfg, self.sstore,
3447                      mode=constants.IALLOCATOR_MODE_RELOC,
3448                      name=self.op.instance_name,
3449                      relocate_from=[self.sec_node])
3450
3451     ial.Run(self.op.iallocator)
3452
3453     if not ial.success:
3454       raise errors.OpPrereqError("Can't compute nodes using"
3455                                  " iallocator '%s': %s" % (self.op.iallocator,
3456                                                            ial.info))
3457     if len(ial.nodes) != ial.required_nodes:
3458       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3459                                  " of nodes (%s), required %s" %
3460                                  (len(ial.nodes), ial.required_nodes))
3461     self.op.remote_node = ial.nodes[0]
3462     logger.ToStdout("Selected new secondary for the instance: %s" %
3463                     self.op.remote_node)
3464
3465   def BuildHooksEnv(self):
3466     """Build hooks env.
3467
3468     This runs on the master, the primary and all the secondaries.
3469
3470     """
3471     env = {
3472       "MODE": self.op.mode,
3473       "NEW_SECONDARY": self.op.remote_node,
3474       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3475       }
3476     env.update(_BuildInstanceHookEnvByObject(self.instance))
3477     nl = [
3478       self.sstore.GetMasterNode(),
3479       self.instance.primary_node,
3480       ]
3481     if self.op.remote_node is not None:
3482       nl.append(self.op.remote_node)
3483     return env, nl, nl
3484
3485   def CheckPrereq(self):
3486     """Check prerequisites.
3487
3488     This checks that the instance is in the cluster.
3489
3490     """
3491     if not hasattr(self.op, "remote_node"):
3492       self.op.remote_node = None
3493
3494     instance = self.cfg.GetInstanceInfo(
3495       self.cfg.ExpandInstanceName(self.op.instance_name))
3496     if instance is None:
3497       raise errors.OpPrereqError("Instance '%s' not known" %
3498                                  self.op.instance_name)
3499     self.instance = instance
3500     self.op.instance_name = instance.name
3501
3502     if instance.disk_template not in constants.DTS_NET_MIRROR:
3503       raise errors.OpPrereqError("Instance's disk layout is not"
3504                                  " network mirrored.")
3505
3506     if len(instance.secondary_nodes) != 1:
3507       raise errors.OpPrereqError("The instance has a strange layout,"
3508                                  " expected one secondary but found %d" %
3509                                  len(instance.secondary_nodes))
3510
3511     self.sec_node = instance.secondary_nodes[0]
3512
3513     ia_name = getattr(self.op, "iallocator", None)
3514     if ia_name is not None:
3515       if self.op.remote_node is not None:
3516         raise errors.OpPrereqError("Give either the iallocator or the new"
3517                                    " secondary, not both")
3518       self.op.remote_node = self._RunAllocator()
3519
3520     remote_node = self.op.remote_node
3521     if remote_node is not None:
3522       remote_node = self.cfg.ExpandNodeName(remote_node)
3523       if remote_node is None:
3524         raise errors.OpPrereqError("Node '%s' not known" %
3525                                    self.op.remote_node)
3526       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3527     else:
3528       self.remote_node_info = None
3529     if remote_node == instance.primary_node:
3530       raise errors.OpPrereqError("The specified node is the primary node of"
3531                                  " the instance.")
3532     elif remote_node == self.sec_node:
3533       if self.op.mode == constants.REPLACE_DISK_SEC:
3534         # this is for DRBD8, where we can't execute the same mode of
3535         # replacement as for drbd7 (no different port allocated)
3536         raise errors.OpPrereqError("Same secondary given, cannot execute"
3537                                    " replacement")
3538     if instance.disk_template == constants.DT_DRBD8:
3539       if (self.op.mode == constants.REPLACE_DISK_ALL and
3540           remote_node is not None):
3541         # switch to replace secondary mode
3542         self.op.mode = constants.REPLACE_DISK_SEC
3543
3544       if self.op.mode == constants.REPLACE_DISK_ALL:
3545         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3546                                    " secondary disk replacement, not"
3547                                    " both at once")
3548       elif self.op.mode == constants.REPLACE_DISK_PRI:
3549         if remote_node is not None:
3550           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3551                                      " the secondary while doing a primary"
3552                                      " node disk replacement")
3553         self.tgt_node = instance.primary_node
3554         self.oth_node = instance.secondary_nodes[0]
3555       elif self.op.mode == constants.REPLACE_DISK_SEC:
3556         self.new_node = remote_node # this can be None, in which case
3557                                     # we don't change the secondary
3558         self.tgt_node = instance.secondary_nodes[0]
3559         self.oth_node = instance.primary_node
3560       else:
3561         raise errors.ProgrammerError("Unhandled disk replace mode")
3562
3563     for name in self.op.disks:
3564       if instance.FindDisk(name) is None:
3565         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3566                                    (name, instance.name))
3567     self.op.remote_node = remote_node
3568
3569   def _ExecD8DiskOnly(self, feedback_fn):
3570     """Replace a disk on the primary or secondary for dbrd8.
3571
3572     The algorithm for replace is quite complicated:
3573       - for each disk to be replaced:
3574         - create new LVs on the target node with unique names
3575         - detach old LVs from the drbd device
3576         - rename old LVs to name_replaced.<time_t>
3577         - rename new LVs to old LVs
3578         - attach the new LVs (with the old names now) to the drbd device
3579       - wait for sync across all devices
3580       - for each modified disk:
3581         - remove old LVs (which have the name name_replaces.<time_t>)
3582
3583     Failures are not very well handled.
3584
3585     """
3586     steps_total = 6
3587     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3588     instance = self.instance
3589     iv_names = {}
3590     vgname = self.cfg.GetVGName()
3591     # start of work
3592     cfg = self.cfg
3593     tgt_node = self.tgt_node
3594     oth_node = self.oth_node
3595
3596     # Step: check device activation
3597     self.proc.LogStep(1, steps_total, "check device existence")
3598     info("checking volume groups")
3599     my_vg = cfg.GetVGName()
3600     results = rpc.call_vg_list([oth_node, tgt_node])
3601     if not results:
3602       raise errors.OpExecError("Can't list volume groups on the nodes")
3603     for node in oth_node, tgt_node:
3604       res = results.get(node, False)
3605       if not res or my_vg not in res:
3606         raise errors.OpExecError("Volume group '%s' not found on %s" %
3607                                  (my_vg, node))
3608     for dev in instance.disks:
3609       if not dev.iv_name in self.op.disks:
3610         continue
3611       for node in tgt_node, oth_node:
3612         info("checking %s on %s" % (dev.iv_name, node))
3613         cfg.SetDiskID(dev, node)
3614         if not rpc.call_blockdev_find(node, dev):
3615           raise errors.OpExecError("Can't find device %s on node %s" %
3616                                    (dev.iv_name, node))
3617
3618     # Step: check other node consistency
3619     self.proc.LogStep(2, steps_total, "check peer consistency")
3620     for dev in instance.disks:
3621       if not dev.iv_name in self.op.disks:
3622         continue
3623       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3624       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3625                                    oth_node==instance.primary_node):
3626         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3627                                  " to replace disks on this node (%s)" %
3628                                  (oth_node, tgt_node))
3629
3630     # Step: create new storage
3631     self.proc.LogStep(3, steps_total, "allocate new storage")
3632     for dev in instance.disks:
3633       if not dev.iv_name in self.op.disks:
3634         continue
3635       size = dev.size
3636       cfg.SetDiskID(dev, tgt_node)
3637       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3638       names = _GenerateUniqueNames(cfg, lv_names)
3639       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3640                              logical_id=(vgname, names[0]))
3641       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3642                              logical_id=(vgname, names[1]))
3643       new_lvs = [lv_data, lv_meta]
3644       old_lvs = dev.children
3645       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3646       info("creating new local storage on %s for %s" %
3647            (tgt_node, dev.iv_name))
3648       # since we *always* want to create this LV, we use the
3649       # _Create...OnPrimary (which forces the creation), even if we
3650       # are talking about the secondary node
3651       for new_lv in new_lvs:
3652         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3653                                         _GetInstanceInfoText(instance)):
3654           raise errors.OpExecError("Failed to create new LV named '%s' on"
3655                                    " node '%s'" %
3656                                    (new_lv.logical_id[1], tgt_node))
3657
3658     # Step: for each lv, detach+rename*2+attach
3659     self.proc.LogStep(4, steps_total, "change drbd configuration")
3660     for dev, old_lvs, new_lvs in iv_names.itervalues():
3661       info("detaching %s drbd from local storage" % dev.iv_name)
3662       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3663         raise errors.OpExecError("Can't detach drbd from local storage on node"
3664                                  " %s for device %s" % (tgt_node, dev.iv_name))
3665       #dev.children = []
3666       #cfg.Update(instance)
3667
3668       # ok, we created the new LVs, so now we know we have the needed
3669       # storage; as such, we proceed on the target node to rename
3670       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3671       # using the assumption that logical_id == physical_id (which in
3672       # turn is the unique_id on that node)
3673
3674       # FIXME(iustin): use a better name for the replaced LVs
3675       temp_suffix = int(time.time())
3676       ren_fn = lambda d, suff: (d.physical_id[0],
3677                                 d.physical_id[1] + "_replaced-%s" % suff)
3678       # build the rename list based on what LVs exist on the node
3679       rlist = []
3680       for to_ren in old_lvs:
3681         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3682         if find_res is not None: # device exists
3683           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3684
3685       info("renaming the old LVs on the target node")
3686       if not rpc.call_blockdev_rename(tgt_node, rlist):
3687         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3688       # now we rename the new LVs to the old LVs
3689       info("renaming the new LVs on the target node")
3690       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3691       if not rpc.call_blockdev_rename(tgt_node, rlist):
3692         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3693
3694       for old, new in zip(old_lvs, new_lvs):
3695         new.logical_id = old.logical_id
3696         cfg.SetDiskID(new, tgt_node)
3697
3698       for disk in old_lvs:
3699         disk.logical_id = ren_fn(disk, temp_suffix)
3700         cfg.SetDiskID(disk, tgt_node)
3701
3702       # now that the new lvs have the old name, we can add them to the device
3703       info("adding new mirror component on %s" % tgt_node)
3704       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3705         for new_lv in new_lvs:
3706           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3707             warning("Can't rollback device %s", hint="manually cleanup unused"
3708                     " logical volumes")
3709         raise errors.OpExecError("Can't add local storage to drbd")
3710
3711       dev.children = new_lvs
3712       cfg.Update(instance)
3713
3714     # Step: wait for sync
3715
3716     # this can fail as the old devices are degraded and _WaitForSync
3717     # does a combined result over all disks, so we don't check its
3718     # return value
3719     self.proc.LogStep(5, steps_total, "sync devices")
3720     _WaitForSync(cfg, instance, self.proc, unlock=True)
3721
3722     # so check manually all the devices
3723     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724       cfg.SetDiskID(dev, instance.primary_node)
3725       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3726       if is_degr:
3727         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3728
3729     # Step: remove old storage
3730     self.proc.LogStep(6, steps_total, "removing old storage")
3731     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3732       info("remove logical volumes for %s" % name)
3733       for lv in old_lvs:
3734         cfg.SetDiskID(lv, tgt_node)
3735         if not rpc.call_blockdev_remove(tgt_node, lv):
3736           warning("Can't remove old LV", hint="manually remove unused LVs")
3737           continue
3738
3739   def _ExecD8Secondary(self, feedback_fn):
3740     """Replace the secondary node for drbd8.
3741
3742     The algorithm for replace is quite complicated:
3743       - for all disks of the instance:
3744         - create new LVs on the new node with same names
3745         - shutdown the drbd device on the old secondary
3746         - disconnect the drbd network on the primary
3747         - create the drbd device on the new secondary
3748         - network attach the drbd on the primary, using an artifice:
3749           the drbd code for Attach() will connect to the network if it
3750           finds a device which is connected to the good local disks but
3751           not network enabled
3752       - wait for sync across all devices
3753       - remove all disks from the old secondary
3754
3755     Failures are not very well handled.
3756
3757     """
3758     steps_total = 6
3759     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3760     instance = self.instance
3761     iv_names = {}
3762     vgname = self.cfg.GetVGName()
3763     # start of work
3764     cfg = self.cfg
3765     old_node = self.tgt_node
3766     new_node = self.new_node
3767     pri_node = instance.primary_node
3768
3769     # Step: check device activation
3770     self.proc.LogStep(1, steps_total, "check device existence")
3771     info("checking volume groups")
3772     my_vg = cfg.GetVGName()
3773     results = rpc.call_vg_list([pri_node, new_node])
3774     if not results:
3775       raise errors.OpExecError("Can't list volume groups on the nodes")
3776     for node in pri_node, new_node:
3777       res = results.get(node, False)
3778       if not res or my_vg not in res:
3779         raise errors.OpExecError("Volume group '%s' not found on %s" %
3780                                  (my_vg, node))
3781     for dev in instance.disks:
3782       if not dev.iv_name in self.op.disks:
3783         continue
3784       info("checking %s on %s" % (dev.iv_name, pri_node))
3785       cfg.SetDiskID(dev, pri_node)
3786       if not rpc.call_blockdev_find(pri_node, dev):
3787         raise errors.OpExecError("Can't find device %s on node %s" %
3788                                  (dev.iv_name, pri_node))
3789
3790     # Step: check other node consistency
3791     self.proc.LogStep(2, steps_total, "check peer consistency")
3792     for dev in instance.disks:
3793       if not dev.iv_name in self.op.disks:
3794         continue
3795       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3796       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3797         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3798                                  " unsafe to replace the secondary" %
3799                                  pri_node)
3800
3801     # Step: create new storage
3802     self.proc.LogStep(3, steps_total, "allocate new storage")
3803     for dev in instance.disks:
3804       size = dev.size
3805       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3806       # since we *always* want to create this LV, we use the
3807       # _Create...OnPrimary (which forces the creation), even if we
3808       # are talking about the secondary node
3809       for new_lv in dev.children:
3810         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3811                                         _GetInstanceInfoText(instance)):
3812           raise errors.OpExecError("Failed to create new LV named '%s' on"
3813                                    " node '%s'" %
3814                                    (new_lv.logical_id[1], new_node))
3815
3816       iv_names[dev.iv_name] = (dev, dev.children)
3817
3818     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3819     for dev in instance.disks:
3820       size = dev.size
3821       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3822       # create new devices on new_node
3823       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3824                               logical_id=(pri_node, new_node,
3825                                           dev.logical_id[2]),
3826                               children=dev.children)
3827       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3828                                         new_drbd, False,
3829                                       _GetInstanceInfoText(instance)):
3830         raise errors.OpExecError("Failed to create new DRBD on"
3831                                  " node '%s'" % new_node)
3832
3833     for dev in instance.disks:
3834       # we have new devices, shutdown the drbd on the old secondary
3835       info("shutting down drbd for %s on old node" % dev.iv_name)
3836       cfg.SetDiskID(dev, old_node)
3837       if not rpc.call_blockdev_shutdown(old_node, dev):
3838         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3839                 hint="Please cleanup this device manually as soon as possible")
3840
3841     info("detaching primary drbds from the network (=> standalone)")
3842     done = 0
3843     for dev in instance.disks:
3844       cfg.SetDiskID(dev, pri_node)
3845       # set the physical (unique in bdev terms) id to None, meaning
3846       # detach from network
3847       dev.physical_id = (None,) * len(dev.physical_id)
3848       # and 'find' the device, which will 'fix' it to match the
3849       # standalone state
3850       if rpc.call_blockdev_find(pri_node, dev):
3851         done += 1
3852       else:
3853         warning("Failed to detach drbd %s from network, unusual case" %
3854                 dev.iv_name)
3855
3856     if not done:
3857       # no detaches succeeded (very unlikely)
3858       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3859
3860     # if we managed to detach at least one, we update all the disks of
3861     # the instance to point to the new secondary
3862     info("updating instance configuration")
3863     for dev in instance.disks:
3864       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3865       cfg.SetDiskID(dev, pri_node)
3866     cfg.Update(instance)
3867
3868     # and now perform the drbd attach
3869     info("attaching primary drbds to new secondary (standalone => connected)")
3870     failures = []
3871     for dev in instance.disks:
3872       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3873       # since the attach is smart, it's enough to 'find' the device,
3874       # it will automatically activate the network, if the physical_id
3875       # is correct
3876       cfg.SetDiskID(dev, pri_node)
3877       if not rpc.call_blockdev_find(pri_node, dev):
3878         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3879                 "please do a gnt-instance info to see the status of disks")
3880
3881     # this can fail as the old devices are degraded and _WaitForSync
3882     # does a combined result over all disks, so we don't check its
3883     # return value
3884     self.proc.LogStep(5, steps_total, "sync devices")
3885     _WaitForSync(cfg, instance, self.proc, unlock=True)
3886
3887     # so check manually all the devices
3888     for name, (dev, old_lvs) in iv_names.iteritems():
3889       cfg.SetDiskID(dev, pri_node)
3890       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3891       if is_degr:
3892         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3893
3894     self.proc.LogStep(6, steps_total, "removing old storage")
3895     for name, (dev, old_lvs) in iv_names.iteritems():
3896       info("remove logical volumes for %s" % name)
3897       for lv in old_lvs:
3898         cfg.SetDiskID(lv, old_node)
3899         if not rpc.call_blockdev_remove(old_node, lv):
3900           warning("Can't remove LV on old secondary",
3901                   hint="Cleanup stale volumes by hand")
3902
3903   def Exec(self, feedback_fn):
3904     """Execute disk replacement.
3905
3906     This dispatches the disk replacement to the appropriate handler.
3907
3908     """
3909     instance = self.instance
3910     if instance.disk_template == constants.DT_DRBD8:
3911       if self.op.remote_node is None:
3912         fn = self._ExecD8DiskOnly
3913       else:
3914         fn = self._ExecD8Secondary
3915     else:
3916       raise errors.ProgrammerError("Unhandled disk replacement case")
3917     return fn(feedback_fn)
3918
3919
3920 class LUQueryInstanceData(NoHooksLU):
3921   """Query runtime instance data.
3922
3923   """
3924   _OP_REQP = ["instances"]
3925
3926   def CheckPrereq(self):
3927     """Check prerequisites.
3928
3929     This only checks the optional instance list against the existing names.
3930
3931     """
3932     if not isinstance(self.op.instances, list):
3933       raise errors.OpPrereqError("Invalid argument type 'instances'")
3934     if self.op.instances:
3935       self.wanted_instances = []
3936       names = self.op.instances
3937       for name in names:
3938         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3939         if instance is None:
3940           raise errors.OpPrereqError("No such instance name '%s'" % name)
3941         self.wanted_instances.append(instance)
3942     else:
3943       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3944                                in self.cfg.GetInstanceList()]
3945     return
3946
3947
3948   def _ComputeDiskStatus(self, instance, snode, dev):
3949     """Compute block device status.
3950
3951     """
3952     self.cfg.SetDiskID(dev, instance.primary_node)
3953     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3954     if dev.dev_type in constants.LDS_DRBD:
3955       # we change the snode then (otherwise we use the one passed in)
3956       if dev.logical_id[0] == instance.primary_node:
3957         snode = dev.logical_id[1]
3958       else:
3959         snode = dev.logical_id[0]
3960
3961     if snode:
3962       self.cfg.SetDiskID(dev, snode)
3963       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3964     else:
3965       dev_sstatus = None
3966
3967     if dev.children:
3968       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3969                       for child in dev.children]
3970     else:
3971       dev_children = []
3972
3973     data = {
3974       "iv_name": dev.iv_name,
3975       "dev_type": dev.dev_type,
3976       "logical_id": dev.logical_id,
3977       "physical_id": dev.physical_id,
3978       "pstatus": dev_pstatus,
3979       "sstatus": dev_sstatus,
3980       "children": dev_children,
3981       }
3982
3983     return data
3984
3985   def Exec(self, feedback_fn):
3986     """Gather and return data"""
3987     result = {}
3988     for instance in self.wanted_instances:
3989       remote_info = rpc.call_instance_info(instance.primary_node,
3990                                                 instance.name)
3991       if remote_info and "state" in remote_info:
3992         remote_state = "up"
3993       else:
3994         remote_state = "down"
3995       if instance.status == "down":
3996         config_state = "down"
3997       else:
3998         config_state = "up"
3999
4000       disks = [self._ComputeDiskStatus(instance, None, device)
4001                for device in instance.disks]
4002
4003       idict = {
4004         "name": instance.name,
4005         "config_state": config_state,
4006         "run_state": remote_state,
4007         "pnode": instance.primary_node,
4008         "snodes": instance.secondary_nodes,
4009         "os": instance.os,
4010         "memory": instance.memory,
4011         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4012         "disks": disks,
4013         "vcpus": instance.vcpus,
4014         }
4015
4016       htkind = self.sstore.GetHypervisorType()
4017       if htkind == constants.HT_XEN_PVM30:
4018         idict["kernel_path"] = instance.kernel_path
4019         idict["initrd_path"] = instance.initrd_path
4020
4021       if htkind == constants.HT_XEN_HVM31:
4022         idict["hvm_boot_order"] = instance.hvm_boot_order
4023         idict["hvm_acpi"] = instance.hvm_acpi
4024         idict["hvm_pae"] = instance.hvm_pae
4025         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4026
4027       if htkind in constants.HTS_REQ_PORT:
4028         idict["vnc_bind_address"] = instance.vnc_bind_address
4029         idict["network_port"] = instance.network_port
4030
4031       result[instance.name] = idict
4032
4033     return result
4034
4035
4036 class LUSetInstanceParams(LogicalUnit):
4037   """Modifies an instances's parameters.
4038
4039   """
4040   HPATH = "instance-modify"
4041   HTYPE = constants.HTYPE_INSTANCE
4042   _OP_REQP = ["instance_name"]
4043
4044   def BuildHooksEnv(self):
4045     """Build hooks env.
4046
4047     This runs on the master, primary and secondaries.
4048
4049     """
4050     args = dict()
4051     if self.mem:
4052       args['memory'] = self.mem
4053     if self.vcpus:
4054       args['vcpus'] = self.vcpus
4055     if self.do_ip or self.do_bridge or self.mac:
4056       if self.do_ip:
4057         ip = self.ip
4058       else:
4059         ip = self.instance.nics[0].ip
4060       if self.bridge:
4061         bridge = self.bridge
4062       else:
4063         bridge = self.instance.nics[0].bridge
4064       if self.mac:
4065         mac = self.mac
4066       else:
4067         mac = self.instance.nics[0].mac
4068       args['nics'] = [(ip, bridge, mac)]
4069     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4070     nl = [self.sstore.GetMasterNode(),
4071           self.instance.primary_node] + list(self.instance.secondary_nodes)
4072     return env, nl, nl
4073
4074   def CheckPrereq(self):
4075     """Check prerequisites.
4076
4077     This only checks the instance list against the existing names.
4078
4079     """
4080     self.mem = getattr(self.op, "mem", None)
4081     self.vcpus = getattr(self.op, "vcpus", None)
4082     self.ip = getattr(self.op, "ip", None)
4083     self.mac = getattr(self.op, "mac", None)
4084     self.bridge = getattr(self.op, "bridge", None)
4085     self.kernel_path = getattr(self.op, "kernel_path", None)
4086     self.initrd_path = getattr(self.op, "initrd_path", None)
4087     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4088     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4089     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4090     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4091     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4092     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4093                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4094                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4095                  self.vnc_bind_address]
4096     if all_parms.count(None) == len(all_parms):
4097       raise errors.OpPrereqError("No changes submitted")
4098     if self.mem is not None:
4099       try:
4100         self.mem = int(self.mem)
4101       except ValueError, err:
4102         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4103     if self.vcpus is not None:
4104       try:
4105         self.vcpus = int(self.vcpus)
4106       except ValueError, err:
4107         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4108     if self.ip is not None:
4109       self.do_ip = True
4110       if self.ip.lower() == "none":
4111         self.ip = None
4112       else:
4113         if not utils.IsValidIP(self.ip):
4114           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4115     else:
4116       self.do_ip = False
4117     self.do_bridge = (self.bridge is not None)
4118     if self.mac is not None:
4119       if self.cfg.IsMacInUse(self.mac):
4120         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4121                                    self.mac)
4122       if not utils.IsValidMac(self.mac):
4123         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4124
4125     if self.kernel_path is not None:
4126       self.do_kernel_path = True
4127       if self.kernel_path == constants.VALUE_NONE:
4128         raise errors.OpPrereqError("Can't set instance to no kernel")
4129
4130       if self.kernel_path != constants.VALUE_DEFAULT:
4131         if not os.path.isabs(self.kernel_path):
4132           raise errors.OpPrereqError("The kernel path must be an absolute"
4133                                     " filename")
4134     else:
4135       self.do_kernel_path = False
4136
4137     if self.initrd_path is not None:
4138       self.do_initrd_path = True
4139       if self.initrd_path not in (constants.VALUE_NONE,
4140                                   constants.VALUE_DEFAULT):
4141         if not os.path.isabs(self.initrd_path):
4142           raise errors.OpPrereqError("The initrd path must be an absolute"
4143                                     " filename")
4144     else:
4145       self.do_initrd_path = False
4146
4147     # boot order verification
4148     if self.hvm_boot_order is not None:
4149       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4150         if len(self.hvm_boot_order.strip("acdn")) != 0:
4151           raise errors.OpPrereqError("invalid boot order specified,"
4152                                      " must be one or more of [acdn]"
4153                                      " or 'default'")
4154
4155     # hvm_cdrom_image_path verification
4156     if self.op.hvm_cdrom_image_path is not None:
4157       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4158         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4159                                    " be an absolute path or None, not %s" %
4160                                    self.op.hvm_cdrom_image_path)
4161       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4162         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4163                                    " regular file or a symlink pointing to"
4164                                    " an existing regular file, not %s" %
4165                                    self.op.hvm_cdrom_image_path)
4166
4167     # vnc_bind_address verification
4168     if self.op.vnc_bind_address is not None:
4169       if not utils.IsValidIP(self.op.vnc_bind_address):
4170         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4171                                    " like a valid IP address" %
4172                                    self.op.vnc_bind_address)
4173
4174     instance = self.cfg.GetInstanceInfo(
4175       self.cfg.ExpandInstanceName(self.op.instance_name))
4176     if instance is None:
4177       raise errors.OpPrereqError("No such instance name '%s'" %
4178                                  self.op.instance_name)
4179     self.op.instance_name = instance.name
4180     self.instance = instance
4181     return
4182
4183   def Exec(self, feedback_fn):
4184     """Modifies an instance.
4185
4186     All parameters take effect only at the next restart of the instance.
4187     """
4188     result = []
4189     instance = self.instance
4190     if self.mem:
4191       instance.memory = self.mem
4192       result.append(("mem", self.mem))
4193     if self.vcpus:
4194       instance.vcpus = self.vcpus
4195       result.append(("vcpus",  self.vcpus))
4196     if self.do_ip:
4197       instance.nics[0].ip = self.ip
4198       result.append(("ip", self.ip))
4199     if self.bridge:
4200       instance.nics[0].bridge = self.bridge
4201       result.append(("bridge", self.bridge))
4202     if self.mac:
4203       instance.nics[0].mac = self.mac
4204       result.append(("mac", self.mac))
4205     if self.do_kernel_path:
4206       instance.kernel_path = self.kernel_path
4207       result.append(("kernel_path", self.kernel_path))
4208     if self.do_initrd_path:
4209       instance.initrd_path = self.initrd_path
4210       result.append(("initrd_path", self.initrd_path))
4211     if self.hvm_boot_order:
4212       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4213         instance.hvm_boot_order = None
4214       else:
4215         instance.hvm_boot_order = self.hvm_boot_order
4216       result.append(("hvm_boot_order", self.hvm_boot_order))
4217     if self.hvm_acpi:
4218       instance.hvm_acpi = self.hvm_acpi
4219       result.append(("hvm_acpi", self.hvm_acpi))
4220     if self.hvm_pae:
4221       instance.hvm_pae = self.hvm_pae
4222       result.append(("hvm_pae", self.hvm_pae))
4223     if self.hvm_cdrom_image_path:
4224       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4225       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4226     if self.vnc_bind_address:
4227       instance.vnc_bind_address = self.vnc_bind_address
4228       result.append(("vnc_bind_address", self.vnc_bind_address))
4229
4230     self.cfg.AddInstance(instance)
4231
4232     return result
4233
4234
4235 class LUQueryExports(NoHooksLU):
4236   """Query the exports list
4237
4238   """
4239   _OP_REQP = []
4240
4241   def CheckPrereq(self):
4242     """Check that the nodelist contains only existing nodes.
4243
4244     """
4245     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4246
4247   def Exec(self, feedback_fn):
4248     """Compute the list of all the exported system images.
4249
4250     Returns:
4251       a dictionary with the structure node->(export-list)
4252       where export-list is a list of the instances exported on
4253       that node.
4254
4255     """
4256     return rpc.call_export_list(self.nodes)
4257
4258
4259 class LUExportInstance(LogicalUnit):
4260   """Export an instance to an image in the cluster.
4261
4262   """
4263   HPATH = "instance-export"
4264   HTYPE = constants.HTYPE_INSTANCE
4265   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4266
4267   def BuildHooksEnv(self):
4268     """Build hooks env.
4269
4270     This will run on the master, primary node and target node.
4271
4272     """
4273     env = {
4274       "EXPORT_NODE": self.op.target_node,
4275       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4276       }
4277     env.update(_BuildInstanceHookEnvByObject(self.instance))
4278     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4279           self.op.target_node]
4280     return env, nl, nl
4281
4282   def CheckPrereq(self):
4283     """Check prerequisites.
4284
4285     This checks that the instance and node names are valid.
4286
4287     """
4288     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4289     self.instance = self.cfg.GetInstanceInfo(instance_name)
4290     if self.instance is None:
4291       raise errors.OpPrereqError("Instance '%s' not found" %
4292                                  self.op.instance_name)
4293
4294     # node verification
4295     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4296     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4297
4298     if self.dst_node is None:
4299       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4300                                  self.op.target_node)
4301     self.op.target_node = self.dst_node.name
4302
4303     # instance disk type verification
4304     for disk in self.instance.disks:
4305       if disk.dev_type == constants.LD_FILE:
4306         raise errors.OpPrereqError("Export not supported for instances with"
4307                                    " file-based disks")
4308
4309   def Exec(self, feedback_fn):
4310     """Export an instance to an image in the cluster.
4311
4312     """
4313     instance = self.instance
4314     dst_node = self.dst_node
4315     src_node = instance.primary_node
4316     if self.op.shutdown:
4317       # shutdown the instance, but not the disks
4318       if not rpc.call_instance_shutdown(src_node, instance):
4319          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4320                                   (instance.name, src_node))
4321
4322     vgname = self.cfg.GetVGName()
4323
4324     snap_disks = []
4325
4326     try:
4327       for disk in instance.disks:
4328         if disk.iv_name == "sda":
4329           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4330           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4331
4332           if not new_dev_name:
4333             logger.Error("could not snapshot block device %s on node %s" %
4334                          (disk.logical_id[1], src_node))
4335           else:
4336             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4337                                       logical_id=(vgname, new_dev_name),
4338                                       physical_id=(vgname, new_dev_name),
4339                                       iv_name=disk.iv_name)
4340             snap_disks.append(new_dev)
4341
4342     finally:
4343       if self.op.shutdown and instance.status == "up":
4344         if not rpc.call_instance_start(src_node, instance, None):
4345           _ShutdownInstanceDisks(instance, self.cfg)
4346           raise errors.OpExecError("Could not start instance")
4347
4348     # TODO: check for size
4349
4350     for dev in snap_disks:
4351       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4352         logger.Error("could not export block device %s from node %s to node %s"
4353                      % (dev.logical_id[1], src_node, dst_node.name))
4354       if not rpc.call_blockdev_remove(src_node, dev):
4355         logger.Error("could not remove snapshot block device %s from node %s" %
4356                      (dev.logical_id[1], src_node))
4357
4358     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359       logger.Error("could not finalize export for instance %s on node %s" %
4360                    (instance.name, dst_node.name))
4361
4362     nodelist = self.cfg.GetNodeList()
4363     nodelist.remove(dst_node.name)
4364
4365     # on one-node clusters nodelist will be empty after the removal
4366     # if we proceed the backup would be removed because OpQueryExports
4367     # substitutes an empty list with the full cluster node list.
4368     if nodelist:
4369       op = opcodes.OpQueryExports(nodes=nodelist)
4370       exportlist = self.proc.ChainOpCode(op)
4371       for node in exportlist:
4372         if instance.name in exportlist[node]:
4373           if not rpc.call_export_remove(node, instance.name):
4374             logger.Error("could not remove older export for instance %s"
4375                          " on node %s" % (instance.name, node))
4376
4377
4378 class LURemoveExport(NoHooksLU):
4379   """Remove exports related to the named instance.
4380
4381   """
4382   _OP_REQP = ["instance_name"]
4383
4384   def CheckPrereq(self):
4385     """Check prerequisites.
4386     """
4387     pass
4388
4389   def Exec(self, feedback_fn):
4390     """Remove any export.
4391
4392     """
4393     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4394     # If the instance was not found we'll try with the name that was passed in.
4395     # This will only work if it was an FQDN, though.
4396     fqdn_warn = False
4397     if not instance_name:
4398       fqdn_warn = True
4399       instance_name = self.op.instance_name
4400
4401     op = opcodes.OpQueryExports(nodes=[])
4402     exportlist = self.proc.ChainOpCode(op)
4403     found = False
4404     for node in exportlist:
4405       if instance_name in exportlist[node]:
4406         found = True
4407         if not rpc.call_export_remove(node, instance_name):
4408           logger.Error("could not remove export for instance %s"
4409                        " on node %s" % (instance_name, node))
4410
4411     if fqdn_warn and not found:
4412       feedback_fn("Export not found. If trying to remove an export belonging"
4413                   " to a deleted instance please use its Fully Qualified"
4414                   " Domain Name.")
4415
4416
4417 class TagsLU(NoHooksLU):
4418   """Generic tags LU.
4419
4420   This is an abstract class which is the parent of all the other tags LUs.
4421
4422   """
4423   def CheckPrereq(self):
4424     """Check prerequisites.
4425
4426     """
4427     if self.op.kind == constants.TAG_CLUSTER:
4428       self.target = self.cfg.GetClusterInfo()
4429     elif self.op.kind == constants.TAG_NODE:
4430       name = self.cfg.ExpandNodeName(self.op.name)
4431       if name is None:
4432         raise errors.OpPrereqError("Invalid node name (%s)" %
4433                                    (self.op.name,))
4434       self.op.name = name
4435       self.target = self.cfg.GetNodeInfo(name)
4436     elif self.op.kind == constants.TAG_INSTANCE:
4437       name = self.cfg.ExpandInstanceName(self.op.name)
4438       if name is None:
4439         raise errors.OpPrereqError("Invalid instance name (%s)" %
4440                                    (self.op.name,))
4441       self.op.name = name
4442       self.target = self.cfg.GetInstanceInfo(name)
4443     else:
4444       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4445                                  str(self.op.kind))
4446
4447
4448 class LUGetTags(TagsLU):
4449   """Returns the tags of a given object.
4450
4451   """
4452   _OP_REQP = ["kind", "name"]
4453
4454   def Exec(self, feedback_fn):
4455     """Returns the tag list.
4456
4457     """
4458     return self.target.GetTags()
4459
4460
4461 class LUSearchTags(NoHooksLU):
4462   """Searches the tags for a given pattern.
4463
4464   """
4465   _OP_REQP = ["pattern"]
4466
4467   def CheckPrereq(self):
4468     """Check prerequisites.
4469
4470     This checks the pattern passed for validity by compiling it.
4471
4472     """
4473     try:
4474       self.re = re.compile(self.op.pattern)
4475     except re.error, err:
4476       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4477                                  (self.op.pattern, err))
4478
4479   def Exec(self, feedback_fn):
4480     """Returns the tag list.
4481
4482     """
4483     cfg = self.cfg
4484     tgts = [("/cluster", cfg.GetClusterInfo())]
4485     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4486     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4487     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4488     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4489     results = []
4490     for path, target in tgts:
4491       for tag in target.GetTags():
4492         if self.re.search(tag):
4493           results.append((path, tag))
4494     return results
4495
4496
4497 class LUAddTags(TagsLU):
4498   """Sets a tag on a given object.
4499
4500   """
4501   _OP_REQP = ["kind", "name", "tags"]
4502
4503   def CheckPrereq(self):
4504     """Check prerequisites.
4505
4506     This checks the type and length of the tag name and value.
4507
4508     """
4509     TagsLU.CheckPrereq(self)
4510     for tag in self.op.tags:
4511       objects.TaggableObject.ValidateTag(tag)
4512
4513   def Exec(self, feedback_fn):
4514     """Sets the tag.
4515
4516     """
4517     try:
4518       for tag in self.op.tags:
4519         self.target.AddTag(tag)
4520     except errors.TagError, err:
4521       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4522     try:
4523       self.cfg.Update(self.target)
4524     except errors.ConfigurationError:
4525       raise errors.OpRetryError("There has been a modification to the"
4526                                 " config file and the operation has been"
4527                                 " aborted. Please retry.")
4528
4529
4530 class LUDelTags(TagsLU):
4531   """Delete a list of tags from a given object.
4532
4533   """
4534   _OP_REQP = ["kind", "name", "tags"]
4535
4536   def CheckPrereq(self):
4537     """Check prerequisites.
4538
4539     This checks that we have the given tag.
4540
4541     """
4542     TagsLU.CheckPrereq(self)
4543     for tag in self.op.tags:
4544       objects.TaggableObject.ValidateTag(tag)
4545     del_tags = frozenset(self.op.tags)
4546     cur_tags = self.target.GetTags()
4547     if not del_tags <= cur_tags:
4548       diff_tags = del_tags - cur_tags
4549       diff_names = ["'%s'" % tag for tag in diff_tags]
4550       diff_names.sort()
4551       raise errors.OpPrereqError("Tag(s) %s not found" %
4552                                  (",".join(diff_names)))
4553
4554   def Exec(self, feedback_fn):
4555     """Remove the tag from the object.
4556
4557     """
4558     for tag in self.op.tags:
4559       self.target.RemoveTag(tag)
4560     try:
4561       self.cfg.Update(self.target)
4562     except errors.ConfigurationError:
4563       raise errors.OpRetryError("There has been a modification to the"
4564                                 " config file and the operation has been"
4565                                 " aborted. Please retry.")
4566
4567 class LUTestDelay(NoHooksLU):
4568   """Sleep for a specified amount of time.
4569
4570   This LU sleeps on the master and/or nodes for a specified amoutn of
4571   time.
4572
4573   """
4574   _OP_REQP = ["duration", "on_master", "on_nodes"]
4575
4576   def CheckPrereq(self):
4577     """Check prerequisites.
4578
4579     This checks that we have a good list of nodes and/or the duration
4580     is valid.
4581
4582     """
4583
4584     if self.op.on_nodes:
4585       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4586
4587   def Exec(self, feedback_fn):
4588     """Do the actual sleep.
4589
4590     """
4591     if self.op.on_master:
4592       if not utils.TestDelay(self.op.duration):
4593         raise errors.OpExecError("Error during master delay test")
4594     if self.op.on_nodes:
4595       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4596       if not result:
4597         raise errors.OpExecError("Complete failure from rpc call")
4598       for node, node_result in result.items():
4599         if not node_result:
4600           raise errors.OpExecError("Failure during rpc call to node %s,"
4601                                    " result: %s" % (node, node_result))
4602
4603
4604 class IAllocator(object):
4605   """IAllocator framework.
4606
4607   An IAllocator instance has three sets of attributes:
4608     - cfg/sstore that are needed to query the cluster
4609     - input data (all members of the _KEYS class attribute are required)
4610     - four buffer attributes (in|out_data|text), that represent the
4611       input (to the external script) in text and data structure format,
4612       and the output from it, again in two formats
4613     - the result variables from the script (success, info, nodes) for
4614       easy usage
4615
4616   """
4617   _ALLO_KEYS = [
4618     "mem_size", "disks", "disk_template",
4619     "os", "tags", "nics", "vcpus",
4620     ]
4621   _RELO_KEYS = [
4622     "relocate_from",
4623     ]
4624
4625   def __init__(self, cfg, sstore, mode, name, **kwargs):
4626     self.cfg = cfg
4627     self.sstore = sstore
4628     # init buffer variables
4629     self.in_text = self.out_text = self.in_data = self.out_data = None
4630     # init all input fields so that pylint is happy
4631     self.mode = mode
4632     self.name = name
4633     self.mem_size = self.disks = self.disk_template = None
4634     self.os = self.tags = self.nics = self.vcpus = None
4635     self.relocate_from = None
4636     # computed fields
4637     self.required_nodes = None
4638     # init result fields
4639     self.success = self.info = self.nodes = None
4640     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4641       keyset = self._ALLO_KEYS
4642     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4643       keyset = self._RELO_KEYS
4644     else:
4645       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4646                                    " IAllocator" % self.mode)
4647     for key in kwargs:
4648       if key not in keyset:
4649         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4650                                      " IAllocator" % key)
4651       setattr(self, key, kwargs[key])
4652     for key in keyset:
4653       if key not in kwargs:
4654         raise errors.ProgrammerError("Missing input parameter '%s' to"
4655                                      " IAllocator" % key)
4656     self._BuildInputData()
4657
4658   def _ComputeClusterData(self):
4659     """Compute the generic allocator input data.
4660
4661     This is the data that is independent of the actual operation.
4662
4663     """
4664     cfg = self.cfg
4665     # cluster data
4666     data = {
4667       "version": 1,
4668       "cluster_name": self.sstore.GetClusterName(),
4669       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4670       "hypervisor_type": self.sstore.GetHypervisorType(),
4671       # we don't have job IDs
4672       }
4673
4674     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4675
4676     # node data
4677     node_results = {}
4678     node_list = cfg.GetNodeList()
4679     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4680     for nname in node_list:
4681       ninfo = cfg.GetNodeInfo(nname)
4682       if nname not in node_data or not isinstance(node_data[nname], dict):
4683         raise errors.OpExecError("Can't get data for node %s" % nname)
4684       remote_info = node_data[nname]
4685       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4686                    'vg_size', 'vg_free', 'cpu_total']:
4687         if attr not in remote_info:
4688           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4689                                    (nname, attr))
4690         try:
4691           remote_info[attr] = int(remote_info[attr])
4692         except ValueError, err:
4693           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4694                                    " %s" % (nname, attr, str(err)))
4695       # compute memory used by primary instances
4696       i_p_mem = i_p_up_mem = 0
4697       for iinfo in i_list:
4698         if iinfo.primary_node == nname:
4699           i_p_mem += iinfo.memory
4700           if iinfo.status == "up":
4701             i_p_up_mem += iinfo.memory
4702
4703       # compute memory used by instances
4704       pnr = {
4705         "tags": list(ninfo.GetTags()),
4706         "total_memory": remote_info['memory_total'],
4707         "reserved_memory": remote_info['memory_dom0'],
4708         "free_memory": remote_info['memory_free'],
4709         "i_pri_memory": i_p_mem,
4710         "i_pri_up_memory": i_p_up_mem,
4711         "total_disk": remote_info['vg_size'],
4712         "free_disk": remote_info['vg_free'],
4713         "primary_ip": ninfo.primary_ip,
4714         "secondary_ip": ninfo.secondary_ip,
4715         "total_cpus": remote_info['cpu_total'],
4716         }
4717       node_results[nname] = pnr
4718     data["nodes"] = node_results
4719
4720     # instance data
4721     instance_data = {}
4722     for iinfo in i_list:
4723       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4724                   for n in iinfo.nics]
4725       pir = {
4726         "tags": list(iinfo.GetTags()),
4727         "should_run": iinfo.status == "up",
4728         "vcpus": iinfo.vcpus,
4729         "memory": iinfo.memory,
4730         "os": iinfo.os,
4731         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4732         "nics": nic_data,
4733         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4734         "disk_template": iinfo.disk_template,
4735         }
4736       instance_data[iinfo.name] = pir
4737
4738     data["instances"] = instance_data
4739
4740     self.in_data = data
4741
4742   def _AddNewInstance(self):
4743     """Add new instance data to allocator structure.
4744
4745     This in combination with _AllocatorGetClusterData will create the
4746     correct structure needed as input for the allocator.
4747
4748     The checks for the completeness of the opcode must have already been
4749     done.
4750
4751     """
4752     data = self.in_data
4753     if len(self.disks) != 2:
4754       raise errors.OpExecError("Only two-disk configurations supported")
4755
4756     disk_space = _ComputeDiskSize(self.disk_template,
4757                                   self.disks[0]["size"], self.disks[1]["size"])
4758
4759     if self.disk_template in constants.DTS_NET_MIRROR:
4760       self.required_nodes = 2
4761     else:
4762       self.required_nodes = 1
4763     request = {
4764       "type": "allocate",
4765       "name": self.name,
4766       "disk_template": self.disk_template,
4767       "tags": self.tags,
4768       "os": self.os,
4769       "vcpus": self.vcpus,
4770       "memory": self.mem_size,
4771       "disks": self.disks,
4772       "disk_space_total": disk_space,
4773       "nics": self.nics,
4774       "required_nodes": self.required_nodes,
4775       }
4776     data["request"] = request
4777
4778   def _AddRelocateInstance(self):
4779     """Add relocate instance data to allocator structure.
4780
4781     This in combination with _IAllocatorGetClusterData will create the
4782     correct structure needed as input for the allocator.
4783
4784     The checks for the completeness of the opcode must have already been
4785     done.
4786
4787     """
4788     instance = self.cfg.GetInstanceInfo(self.name)
4789     if instance is None:
4790       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4791                                    " IAllocator" % self.name)
4792
4793     if instance.disk_template not in constants.DTS_NET_MIRROR:
4794       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4795
4796     if len(instance.secondary_nodes) != 1:
4797       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4798
4799     self.required_nodes = 1
4800
4801     disk_space = _ComputeDiskSize(instance.disk_template,
4802                                   instance.disks[0].size,
4803                                   instance.disks[1].size)
4804
4805     request = {
4806       "type": "relocate",
4807       "name": self.name,
4808       "disk_space_total": disk_space,
4809       "required_nodes": self.required_nodes,
4810       "relocate_from": self.relocate_from,
4811       }
4812     self.in_data["request"] = request
4813
4814   def _BuildInputData(self):
4815     """Build input data structures.
4816
4817     """
4818     self._ComputeClusterData()
4819
4820     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4821       self._AddNewInstance()
4822     else:
4823       self._AddRelocateInstance()
4824
4825     self.in_text = serializer.Dump(self.in_data)
4826
4827   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4828     """Run an instance allocator and return the results.
4829
4830     """
4831     data = self.in_text
4832
4833     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4834
4835     if not isinstance(result, tuple) or len(result) != 4:
4836       raise errors.OpExecError("Invalid result from master iallocator runner")
4837
4838     rcode, stdout, stderr, fail = result
4839
4840     if rcode == constants.IARUN_NOTFOUND:
4841       raise errors.OpExecError("Can't find allocator '%s'" % name)
4842     elif rcode == constants.IARUN_FAILURE:
4843         raise errors.OpExecError("Instance allocator call failed: %s,"
4844                                  " output: %s" %
4845                                  (fail, stdout+stderr))
4846     self.out_text = stdout
4847     if validate:
4848       self._ValidateResult()
4849
4850   def _ValidateResult(self):
4851     """Process the allocator results.
4852
4853     This will process and if successful save the result in
4854     self.out_data and the other parameters.
4855
4856     """
4857     try:
4858       rdict = serializer.Load(self.out_text)
4859     except Exception, err:
4860       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4861
4862     if not isinstance(rdict, dict):
4863       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4864
4865     for key in "success", "info", "nodes":
4866       if key not in rdict:
4867         raise errors.OpExecError("Can't parse iallocator results:"
4868                                  " missing key '%s'" % key)
4869       setattr(self, key, rdict[key])
4870
4871     if not isinstance(rdict["nodes"], list):
4872       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4873                                " is not a list")
4874     self.out_data = rdict
4875
4876
4877 class LUTestAllocator(NoHooksLU):
4878   """Run allocator tests.
4879
4880   This LU runs the allocator tests
4881
4882   """
4883   _OP_REQP = ["direction", "mode", "name"]
4884
4885   def CheckPrereq(self):
4886     """Check prerequisites.
4887
4888     This checks the opcode parameters depending on the director and mode test.
4889
4890     """
4891     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4892       for attr in ["name", "mem_size", "disks", "disk_template",
4893                    "os", "tags", "nics", "vcpus"]:
4894         if not hasattr(self.op, attr):
4895           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4896                                      attr)
4897       iname = self.cfg.ExpandInstanceName(self.op.name)
4898       if iname is not None:
4899         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4900                                    iname)
4901       if not isinstance(self.op.nics, list):
4902         raise errors.OpPrereqError("Invalid parameter 'nics'")
4903       for row in self.op.nics:
4904         if (not isinstance(row, dict) or
4905             "mac" not in row or
4906             "ip" not in row or
4907             "bridge" not in row):
4908           raise errors.OpPrereqError("Invalid contents of the"
4909                                      " 'nics' parameter")
4910       if not isinstance(self.op.disks, list):
4911         raise errors.OpPrereqError("Invalid parameter 'disks'")
4912       if len(self.op.disks) != 2:
4913         raise errors.OpPrereqError("Only two-disk configurations supported")
4914       for row in self.op.disks:
4915         if (not isinstance(row, dict) or
4916             "size" not in row or
4917             not isinstance(row["size"], int) or
4918             "mode" not in row or
4919             row["mode"] not in ['r', 'w']):
4920           raise errors.OpPrereqError("Invalid contents of the"
4921                                      " 'disks' parameter")
4922     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4923       if not hasattr(self.op, "name"):
4924         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4925       fname = self.cfg.ExpandInstanceName(self.op.name)
4926       if fname is None:
4927         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4928                                    self.op.name)
4929       self.op.name = fname
4930       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4931     else:
4932       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4933                                  self.op.mode)
4934
4935     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4936       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4937         raise errors.OpPrereqError("Missing allocator name")
4938     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4939       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4940                                  self.op.direction)
4941
4942   def Exec(self, feedback_fn):
4943     """Run the allocator test.
4944
4945     """
4946     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4947       ial = IAllocator(self.cfg, self.sstore,
4948                        mode=self.op.mode,
4949                        name=self.op.name,
4950                        mem_size=self.op.mem_size,
4951                        disks=self.op.disks,
4952                        disk_template=self.op.disk_template,
4953                        os=self.op.os,
4954                        tags=self.op.tags,
4955                        nics=self.op.nics,
4956                        vcpus=self.op.vcpus,
4957                        )
4958     else:
4959       ial = IAllocator(self.cfg, self.sstore,
4960                        mode=self.op.mode,
4961                        name=self.op.name,
4962                        relocate_from=list(self.relocate_from),
4963                        )
4964
4965     if self.op.direction == constants.IALLOCATOR_DIR_IN:
4966       result = ial.in_text
4967     else:
4968       ial.Run(self.op.allocator, validate=False)
4969       result = ial.out_text
4970     return result