Add method to update a disk object size
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_WSSTORE: the LU needs a writable SimpleStore
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_WSSTORE = False
69
70   def __init__(self, processor, op, cfg, sstore):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = cfg
80     self.sstore = sstore
81     self.__ssh = None
82
83     for attr_name in self._OP_REQP:
84       attr_val = getattr(op, attr_name, None)
85       if attr_val is None:
86         raise errors.OpPrereqError("Required parameter '%s' missing" %
87                                    attr_name)
88
89     if not cfg.IsCluster():
90       raise errors.OpPrereqError("Cluster not initialized yet,"
91                                  " use 'gnt-cluster init' first.")
92     if self.REQ_MASTER:
93       master = sstore.GetMasterNode()
94       if master != utils.HostInfo().name:
95         raise errors.OpPrereqError("Commands must be run on the master"
96                                    " node %s" % master)
97
98   def __GetSSH(self):
99     """Returns the SshRunner object
100
101     """
102     if not self.__ssh:
103       self.__ssh = ssh.SshRunner(self.sstore)
104     return self.__ssh
105
106   ssh = property(fget=__GetSSH)
107
108   def CheckPrereq(self):
109     """Check prerequisites for this LU.
110
111     This method should check that the prerequisites for the execution
112     of this LU are fulfilled. It can do internode communication, but
113     it should be idempotent - no cluster or system changes are
114     allowed.
115
116     The method should raise errors.OpPrereqError in case something is
117     not fulfilled. Its return value is ignored.
118
119     This method should also update all the parameters of the opcode to
120     their canonical form; e.g. a short node name must be fully
121     expanded after this method has successfully completed (so that
122     hooks, logging, etc. work correctly).
123
124     """
125     raise NotImplementedError
126
127   def Exec(self, feedback_fn):
128     """Execute the LU.
129
130     This method should implement the actual work. It should raise
131     errors.OpExecError for failures that are somewhat dealt with in
132     code, or expected.
133
134     """
135     raise NotImplementedError
136
137   def BuildHooksEnv(self):
138     """Build hooks environment for this LU.
139
140     This method should return a three-node tuple consisting of: a dict
141     containing the environment that will be used for running the
142     specific hook for this LU, a list of node names on which the hook
143     should run before the execution, and a list of node names on which
144     the hook should run after the execution.
145
146     The keys of the dict must not have 'GANETI_' prefixed as this will
147     be handled in the hooks runner. Also note additional keys will be
148     added by the hooks runner. If the LU doesn't define any
149     environment, an empty dict (and not None) should be returned.
150
151     No nodes should be returned as an empty list (and not None).
152
153     Note that if the HPATH for a LU class is None, this function will
154     not be called.
155
156     """
157     raise NotImplementedError
158
159   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160     """Notify the LU about the results of its hooks.
161
162     This method is called every time a hooks phase is executed, and notifies
163     the Logical Unit about the hooks' result. The LU can then use it to alter
164     its result based on the hooks.  By default the method does nothing and the
165     previous result is passed back unchanged but any LU can define it if it
166     wants to use the local cluster hook-scripts somehow.
167
168     Args:
169       phase: the hooks phase that has just been run
170       hooks_results: the results of the multi-node hooks rpc call
171       feedback_fn: function to send feedback back to the caller
172       lu_result: the previous result this LU had, or None in the PRE phase.
173
174     """
175     return lu_result
176
177
178 class NoHooksLU(LogicalUnit):
179   """Simple LU which runs no hooks.
180
181   This LU is intended as a parent for other LogicalUnits which will
182   run no hooks, in order to reduce duplicate code.
183
184   """
185   HPATH = None
186   HTYPE = None
187
188
189 def _GetWantedNodes(lu, nodes):
190   """Returns list of checked and expanded node names.
191
192   Args:
193     nodes: List of nodes (strings) or None for all
194
195   """
196   if not isinstance(nodes, list):
197     raise errors.OpPrereqError("Invalid argument type 'nodes'")
198
199   if nodes:
200     wanted = []
201
202     for name in nodes:
203       node = lu.cfg.ExpandNodeName(name)
204       if node is None:
205         raise errors.OpPrereqError("No such node name '%s'" % name)
206       wanted.append(node)
207
208   else:
209     wanted = lu.cfg.GetNodeList()
210   return utils.NiceSort(wanted)
211
212
213 def _GetWantedInstances(lu, instances):
214   """Returns list of checked and expanded instance names.
215
216   Args:
217     instances: List of instances (strings) or None for all
218
219   """
220   if not isinstance(instances, list):
221     raise errors.OpPrereqError("Invalid argument type 'instances'")
222
223   if instances:
224     wanted = []
225
226     for name in instances:
227       instance = lu.cfg.ExpandInstanceName(name)
228       if instance is None:
229         raise errors.OpPrereqError("No such instance name '%s'" % name)
230       wanted.append(instance)
231
232   else:
233     wanted = lu.cfg.GetInstanceList()
234   return utils.NiceSort(wanted)
235
236
237 def _CheckOutputFields(static, dynamic, selected):
238   """Checks whether all selected fields are valid.
239
240   Args:
241     static: Static fields
242     dynamic: Dynamic fields
243
244   """
245   static_fields = frozenset(static)
246   dynamic_fields = frozenset(dynamic)
247
248   all_fields = static_fields | dynamic_fields
249
250   if not all_fields.issuperset(selected):
251     raise errors.OpPrereqError("Unknown output fields selected: %s"
252                                % ",".join(frozenset(selected).
253                                           difference(all_fields)))
254
255
256 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257                           memory, vcpus, nics):
258   """Builds instance related env variables for hooks from single variables.
259
260   Args:
261     secondary_nodes: List of secondary nodes as strings
262   """
263   env = {
264     "OP_TARGET": name,
265     "INSTANCE_NAME": name,
266     "INSTANCE_PRIMARY": primary_node,
267     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268     "INSTANCE_OS_TYPE": os_type,
269     "INSTANCE_STATUS": status,
270     "INSTANCE_MEMORY": memory,
271     "INSTANCE_VCPUS": vcpus,
272   }
273
274   if nics:
275     nic_count = len(nics)
276     for idx, (ip, bridge, mac) in enumerate(nics):
277       if ip is None:
278         ip = ""
279       env["INSTANCE_NIC%d_IP" % idx] = ip
280       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
282   else:
283     nic_count = 0
284
285   env["INSTANCE_NIC_COUNT"] = nic_count
286
287   return env
288
289
290 def _BuildInstanceHookEnvByObject(instance, override=None):
291   """Builds instance related env variables for hooks from an object.
292
293   Args:
294     instance: objects.Instance object of instance
295     override: dict of values to override
296   """
297   args = {
298     'name': instance.name,
299     'primary_node': instance.primary_node,
300     'secondary_nodes': instance.secondary_nodes,
301     'os_type': instance.os,
302     'status': instance.os,
303     'memory': instance.memory,
304     'vcpus': instance.vcpus,
305     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
306   }
307   if override:
308     args.update(override)
309   return _BuildInstanceHookEnv(**args)
310
311
312 def _CheckInstanceBridgesExist(instance):
313   """Check that the brigdes needed by an instance exist.
314
315   """
316   # check bridges existance
317   brlist = [nic.bridge for nic in instance.nics]
318   if not rpc.call_bridges_exist(instance.primary_node, brlist):
319     raise errors.OpPrereqError("one or more target bridges %s does not"
320                                " exist on destination node '%s'" %
321                                (brlist, instance.primary_node))
322
323
324 class LUDestroyCluster(NoHooksLU):
325   """Logical unit for destroying the cluster.
326
327   """
328   _OP_REQP = []
329
330   def CheckPrereq(self):
331     """Check prerequisites.
332
333     This checks whether the cluster is empty.
334
335     Any errors are signalled by raising errors.OpPrereqError.
336
337     """
338     master = self.sstore.GetMasterNode()
339
340     nodelist = self.cfg.GetNodeList()
341     if len(nodelist) != 1 or nodelist[0] != master:
342       raise errors.OpPrereqError("There are still %d node(s) in"
343                                  " this cluster." % (len(nodelist) - 1))
344     instancelist = self.cfg.GetInstanceList()
345     if instancelist:
346       raise errors.OpPrereqError("There are still %d instance(s) in"
347                                  " this cluster." % len(instancelist))
348
349   def Exec(self, feedback_fn):
350     """Destroys the cluster.
351
352     """
353     master = self.sstore.GetMasterNode()
354     if not rpc.call_node_stop_master(master):
355       raise errors.OpExecError("Could not disable the master role")
356     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357     utils.CreateBackup(priv_key)
358     utils.CreateBackup(pub_key)
359     rpc.call_node_leave_cluster(master)
360
361
362 class LUVerifyCluster(LogicalUnit):
363   """Verifies the cluster status.
364
365   """
366   HPATH = "cluster-verify"
367   HTYPE = constants.HTYPE_CLUSTER
368   _OP_REQP = ["skip_checks"]
369
370   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371                   remote_version, feedback_fn):
372     """Run multiple tests against a node.
373
374     Test list:
375       - compares ganeti version
376       - checks vg existance and size > 20G
377       - checks config file checksum
378       - checks ssh to other nodes
379
380     Args:
381       node: name of the node to check
382       file_list: required list of files
383       local_cksum: dictionary of local files and their checksums
384
385     """
386     # compares ganeti version
387     local_version = constants.PROTOCOL_VERSION
388     if not remote_version:
389       feedback_fn("  - ERROR: connection to %s failed" % (node))
390       return True
391
392     if local_version != remote_version:
393       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
394                       (local_version, node, remote_version))
395       return True
396
397     # checks vg existance and size > 20G
398
399     bad = False
400     if not vglist:
401       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
402                       (node,))
403       bad = True
404     else:
405       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406                                             constants.MIN_VG_SIZE)
407       if vgstatus:
408         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
409         bad = True
410
411     # checks config file checksum
412     # checks ssh to any
413
414     if 'filelist' not in node_result:
415       bad = True
416       feedback_fn("  - ERROR: node hasn't returned file checksum data")
417     else:
418       remote_cksum = node_result['filelist']
419       for file_name in file_list:
420         if file_name not in remote_cksum:
421           bad = True
422           feedback_fn("  - ERROR: file '%s' missing" % file_name)
423         elif remote_cksum[file_name] != local_cksum[file_name]:
424           bad = True
425           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
426
427     if 'nodelist' not in node_result:
428       bad = True
429       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
430     else:
431       if node_result['nodelist']:
432         bad = True
433         for node in node_result['nodelist']:
434           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
435                           (node, node_result['nodelist'][node]))
436     if 'node-net-test' not in node_result:
437       bad = True
438       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
439     else:
440       if node_result['node-net-test']:
441         bad = True
442         nlist = utils.NiceSort(node_result['node-net-test'].keys())
443         for node in nlist:
444           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
445                           (node, node_result['node-net-test'][node]))
446
447     hyp_result = node_result.get('hypervisor', None)
448     if hyp_result is not None:
449       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
450     return bad
451
452   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453                       node_instance, feedback_fn):
454     """Verify an instance.
455
456     This function checks to see if the required block devices are
457     available on the instance's node.
458
459     """
460     bad = False
461
462     node_current = instanceconfig.primary_node
463
464     node_vol_should = {}
465     instanceconfig.MapLVsByNode(node_vol_should)
466
467     for node in node_vol_should:
468       for volume in node_vol_should[node]:
469         if node not in node_vol_is or volume not in node_vol_is[node]:
470           feedback_fn("  - ERROR: volume %s missing on node %s" %
471                           (volume, node))
472           bad = True
473
474     if not instanceconfig.status == 'down':
475       if (node_current not in node_instance or
476           not instance in node_instance[node_current]):
477         feedback_fn("  - ERROR: instance %s not running on node %s" %
478                         (instance, node_current))
479         bad = True
480
481     for node in node_instance:
482       if (not node == node_current):
483         if instance in node_instance[node]:
484           feedback_fn("  - ERROR: instance %s should not run on node %s" %
485                           (instance, node))
486           bad = True
487
488     return bad
489
490   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491     """Verify if there are any unknown volumes in the cluster.
492
493     The .os, .swap and backup volumes are ignored. All other volumes are
494     reported as unknown.
495
496     """
497     bad = False
498
499     for node in node_vol_is:
500       for volume in node_vol_is[node]:
501         if node not in node_vol_should or volume not in node_vol_should[node]:
502           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
503                       (volume, node))
504           bad = True
505     return bad
506
507   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508     """Verify the list of running instances.
509
510     This checks what instances are running but unknown to the cluster.
511
512     """
513     bad = False
514     for node in node_instance:
515       for runninginstance in node_instance[node]:
516         if runninginstance not in instancelist:
517           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
518                           (runninginstance, node))
519           bad = True
520     return bad
521
522   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523     """Verify N+1 Memory Resilience.
524
525     Check that if one single node dies we can still start all the instances it
526     was primary for.
527
528     """
529     bad = False
530
531     for node, nodeinfo in node_info.iteritems():
532       # This code checks that every node which is now listed as secondary has
533       # enough memory to host all instances it is supposed to should a single
534       # other node in the cluster fail.
535       # FIXME: not ready for failover to an arbitrary node
536       # FIXME: does not support file-backed instances
537       # WARNING: we currently take into account down instances as well as up
538       # ones, considering that even if they're down someone might want to start
539       # them even in the event of a node failure.
540       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
541         needed_mem = 0
542         for instance in instances:
543           needed_mem += instance_cfg[instance].memory
544         if nodeinfo['mfree'] < needed_mem:
545           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
546                       " failovers should node %s fail" % (node, prinode))
547           bad = True
548     return bad
549
550   def CheckPrereq(self):
551     """Check prerequisites.
552
553     Transform the list of checks we're going to skip into a set and check that
554     all its members are valid.
555
556     """
557     self.skip_set = frozenset(self.op.skip_checks)
558     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559       raise errors.OpPrereqError("Invalid checks to be skipped specified")
560
561   def BuildHooksEnv(self):
562     """Build hooks env.
563
564     Cluster-Verify hooks just rone in the post phase and their failure makes
565     the output be logged in the verify output and the verification to fail.
566
567     """
568     all_nodes = self.cfg.GetNodeList()
569     # TODO: populate the environment with useful information for verify hooks
570     env = {}
571     return env, [], all_nodes
572
573   def Exec(self, feedback_fn):
574     """Verify integrity of cluster, performing various test on nodes.
575
576     """
577     bad = False
578     feedback_fn("* Verifying global settings")
579     for msg in self.cfg.VerifyConfig():
580       feedback_fn("  - ERROR: %s" % msg)
581
582     vg_name = self.cfg.GetVGName()
583     nodelist = utils.NiceSort(self.cfg.GetNodeList())
584     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586     i_non_redundant = [] # Non redundant instances
587     node_volume = {}
588     node_instance = {}
589     node_info = {}
590     instance_cfg = {}
591
592     # FIXME: verify OS list
593     # do local checksums
594     file_names = list(self.sstore.GetFileList())
595     file_names.append(constants.SSL_CERT_FILE)
596     file_names.append(constants.CLUSTER_CONF_FILE)
597     local_checksums = utils.FingerprintFiles(file_names)
598
599     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601     all_instanceinfo = rpc.call_instance_list(nodelist)
602     all_vglist = rpc.call_vg_list(nodelist)
603     node_verify_param = {
604       'filelist': file_names,
605       'nodelist': nodelist,
606       'hypervisor': None,
607       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608                         for node in nodeinfo]
609       }
610     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611     all_rversion = rpc.call_version(nodelist)
612     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
613
614     for node in nodelist:
615       feedback_fn("* Verifying node %s" % node)
616       result = self._VerifyNode(node, file_names, local_checksums,
617                                 all_vglist[node], all_nvinfo[node],
618                                 all_rversion[node], feedback_fn)
619       bad = bad or result
620
621       # node_volume
622       volumeinfo = all_volumeinfo[node]
623
624       if isinstance(volumeinfo, basestring):
625         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
626                     (node, volumeinfo[-400:].encode('string_escape')))
627         bad = True
628         node_volume[node] = {}
629       elif not isinstance(volumeinfo, dict):
630         feedback_fn("  - ERROR: connection to %s failed" % (node,))
631         bad = True
632         continue
633       else:
634         node_volume[node] = volumeinfo
635
636       # node_instance
637       nodeinstance = all_instanceinfo[node]
638       if type(nodeinstance) != list:
639         feedback_fn("  - ERROR: connection to %s failed" % (node,))
640         bad = True
641         continue
642
643       node_instance[node] = nodeinstance
644
645       # node_info
646       nodeinfo = all_ninfo[node]
647       if not isinstance(nodeinfo, dict):
648         feedback_fn("  - ERROR: connection to %s failed" % (node,))
649         bad = True
650         continue
651
652       try:
653         node_info[node] = {
654           "mfree": int(nodeinfo['memory_free']),
655           "dfree": int(nodeinfo['vg_free']),
656           "pinst": [],
657           "sinst": [],
658           # dictionary holding all instances this node is secondary for,
659           # grouped by their primary node. Each key is a cluster node, and each
660           # value is a list of instances which have the key as primary and the
661           # current node as secondary.  this is handy to calculate N+1 memory
662           # availability if you can only failover from a primary to its
663           # secondary.
664           "sinst-by-pnode": {},
665         }
666       except ValueError:
667         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
668         bad = True
669         continue
670
671     node_vol_should = {}
672
673     for instance in instancelist:
674       feedback_fn("* Verifying instance %s" % instance)
675       inst_config = self.cfg.GetInstanceInfo(instance)
676       result =  self._VerifyInstance(instance, inst_config, node_volume,
677                                      node_instance, feedback_fn)
678       bad = bad or result
679
680       inst_config.MapLVsByNode(node_vol_should)
681
682       instance_cfg[instance] = inst_config
683
684       pnode = inst_config.primary_node
685       if pnode in node_info:
686         node_info[pnode]['pinst'].append(instance)
687       else:
688         feedback_fn("  - ERROR: instance %s, connection to primary node"
689                     " %s failed" % (instance, pnode))
690         bad = True
691
692       # If the instance is non-redundant we cannot survive losing its primary
693       # node, so we are not N+1 compliant. On the other hand we have no disk
694       # templates with more than one secondary so that situation is not well
695       # supported either.
696       # FIXME: does not support file-backed instances
697       if len(inst_config.secondary_nodes) == 0:
698         i_non_redundant.append(instance)
699       elif len(inst_config.secondary_nodes) > 1:
700         feedback_fn("  - WARNING: multiple secondaries for instance %s"
701                     % instance)
702
703       for snode in inst_config.secondary_nodes:
704         if snode in node_info:
705           node_info[snode]['sinst'].append(instance)
706           if pnode not in node_info[snode]['sinst-by-pnode']:
707             node_info[snode]['sinst-by-pnode'][pnode] = []
708           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
709         else:
710           feedback_fn("  - ERROR: instance %s, connection to secondary node"
711                       " %s failed" % (instance, snode))
712
713     feedback_fn("* Verifying orphan volumes")
714     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
715                                        feedback_fn)
716     bad = bad or result
717
718     feedback_fn("* Verifying remaining instances")
719     result = self._VerifyOrphanInstances(instancelist, node_instance,
720                                          feedback_fn)
721     bad = bad or result
722
723     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724       feedback_fn("* Verifying N+1 Memory redundancy")
725       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
726       bad = bad or result
727
728     feedback_fn("* Other Notes")
729     if i_non_redundant:
730       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
731                   % len(i_non_redundant))
732
733     return int(bad)
734
735   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736     """Analize the post-hooks' result, handle it, and send some
737     nicely-formatted feedback back to the user.
738
739     Args:
740       phase: the hooks phase that has just been run
741       hooks_results: the results of the multi-node hooks rpc call
742       feedback_fn: function to send feedback back to the caller
743       lu_result: previous Exec result
744
745     """
746     # We only really run POST phase hooks, and are only interested in their results
747     if phase == constants.HOOKS_PHASE_POST:
748       # Used to change hooks' output to proper indentation
749       indent_re = re.compile('^', re.M)
750       feedback_fn("* Hooks Results")
751       if not hooks_results:
752         feedback_fn("  - ERROR: general communication failure")
753         lu_result = 1
754       else:
755         for node_name in hooks_results:
756           show_node_header = True
757           res = hooks_results[node_name]
758           if res is False or not isinstance(res, list):
759             feedback_fn("    Communication failure")
760             lu_result = 1
761             continue
762           for script, hkr, output in res:
763             if hkr == constants.HKR_FAIL:
764               # The node header is only shown once, if there are
765               # failing hooks on that node
766               if show_node_header:
767                 feedback_fn("  Node %s:" % node_name)
768                 show_node_header = False
769               feedback_fn("    ERROR: Script %s failed, output:" % script)
770               output = indent_re.sub('      ', output)
771               feedback_fn("%s" % output)
772               lu_result = 1
773
774       return lu_result
775
776
777 class LUVerifyDisks(NoHooksLU):
778   """Verifies the cluster disks status.
779
780   """
781   _OP_REQP = []
782
783   def CheckPrereq(self):
784     """Check prerequisites.
785
786     This has no prerequisites.
787
788     """
789     pass
790
791   def Exec(self, feedback_fn):
792     """Verify integrity of cluster disks.
793
794     """
795     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
796
797     vg_name = self.cfg.GetVGName()
798     nodes = utils.NiceSort(self.cfg.GetNodeList())
799     instances = [self.cfg.GetInstanceInfo(name)
800                  for name in self.cfg.GetInstanceList()]
801
802     nv_dict = {}
803     for inst in instances:
804       inst_lvs = {}
805       if (inst.status != "up" or
806           inst.disk_template not in constants.DTS_NET_MIRROR):
807         continue
808       inst.MapLVsByNode(inst_lvs)
809       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810       for node, vol_list in inst_lvs.iteritems():
811         for vol in vol_list:
812           nv_dict[(node, vol)] = inst
813
814     if not nv_dict:
815       return result
816
817     node_lvs = rpc.call_volume_list(nodes, vg_name)
818
819     to_act = set()
820     for node in nodes:
821       # node_volume
822       lvs = node_lvs[node]
823
824       if isinstance(lvs, basestring):
825         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
826         res_nlvm[node] = lvs
827       elif not isinstance(lvs, dict):
828         logger.Info("connection to node %s failed or invalid data returned" %
829                     (node,))
830         res_nodes.append(node)
831         continue
832
833       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834         inst = nv_dict.pop((node, lv_name), None)
835         if (not lv_online and inst is not None
836             and inst.name not in res_instances):
837           res_instances.append(inst.name)
838
839     # any leftover items in nv_dict are missing LVs, let's arrange the
840     # data better
841     for key, inst in nv_dict.iteritems():
842       if inst.name not in res_missing:
843         res_missing[inst.name] = []
844       res_missing[inst.name].append(key)
845
846     return result
847
848
849 class LURenameCluster(LogicalUnit):
850   """Rename the cluster.
851
852   """
853   HPATH = "cluster-rename"
854   HTYPE = constants.HTYPE_CLUSTER
855   _OP_REQP = ["name"]
856   REQ_WSSTORE = True
857
858   def BuildHooksEnv(self):
859     """Build hooks env.
860
861     """
862     env = {
863       "OP_TARGET": self.sstore.GetClusterName(),
864       "NEW_NAME": self.op.name,
865       }
866     mn = self.sstore.GetMasterNode()
867     return env, [mn], [mn]
868
869   def CheckPrereq(self):
870     """Verify that the passed name is a valid one.
871
872     """
873     hostname = utils.HostInfo(self.op.name)
874
875     new_name = hostname.name
876     self.ip = new_ip = hostname.ip
877     old_name = self.sstore.GetClusterName()
878     old_ip = self.sstore.GetMasterIP()
879     if new_name == old_name and new_ip == old_ip:
880       raise errors.OpPrereqError("Neither the name nor the IP address of the"
881                                  " cluster has changed")
882     if new_ip != old_ip:
883       result = utils.RunCmd(["fping", "-q", new_ip])
884       if not result.failed:
885         raise errors.OpPrereqError("The given cluster IP address (%s) is"
886                                    " reachable on the network. Aborting." %
887                                    new_ip)
888
889     self.op.name = new_name
890
891   def Exec(self, feedback_fn):
892     """Rename the cluster.
893
894     """
895     clustername = self.op.name
896     ip = self.ip
897     ss = self.sstore
898
899     # shutdown the master IP
900     master = ss.GetMasterNode()
901     if not rpc.call_node_stop_master(master):
902       raise errors.OpExecError("Could not disable the master role")
903
904     try:
905       # modify the sstore
906       ss.SetKey(ss.SS_MASTER_IP, ip)
907       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
908
909       # Distribute updated ss config to all nodes
910       myself = self.cfg.GetNodeInfo(master)
911       dist_nodes = self.cfg.GetNodeList()
912       if myself.name in dist_nodes:
913         dist_nodes.remove(myself.name)
914
915       logger.Debug("Copying updated ssconf data to all nodes")
916       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
917         fname = ss.KeyToFilename(keyname)
918         result = rpc.call_upload_file(dist_nodes, fname)
919         for to_node in dist_nodes:
920           if not result[to_node]:
921             logger.Error("copy of file %s to node %s failed" %
922                          (fname, to_node))
923     finally:
924       if not rpc.call_node_start_master(master):
925         logger.Error("Could not re-enable the master role on the master,"
926                      " please restart manually.")
927
928
929 def _RecursiveCheckIfLVMBased(disk):
930   """Check if the given disk or its children are lvm-based.
931
932   Args:
933     disk: ganeti.objects.Disk object
934
935   Returns:
936     boolean indicating whether a LD_LV dev_type was found or not
937
938   """
939   if disk.children:
940     for chdisk in disk.children:
941       if _RecursiveCheckIfLVMBased(chdisk):
942         return True
943   return disk.dev_type == constants.LD_LV
944
945
946 class LUSetClusterParams(LogicalUnit):
947   """Change the parameters of the cluster.
948
949   """
950   HPATH = "cluster-modify"
951   HTYPE = constants.HTYPE_CLUSTER
952   _OP_REQP = []
953
954   def BuildHooksEnv(self):
955     """Build hooks env.
956
957     """
958     env = {
959       "OP_TARGET": self.sstore.GetClusterName(),
960       "NEW_VG_NAME": self.op.vg_name,
961       }
962     mn = self.sstore.GetMasterNode()
963     return env, [mn], [mn]
964
965   def CheckPrereq(self):
966     """Check prerequisites.
967
968     This checks whether the given params don't conflict and
969     if the given volume group is valid.
970
971     """
972     if not self.op.vg_name:
973       instances = [self.cfg.GetInstanceInfo(name)
974                    for name in self.cfg.GetInstanceList()]
975       for inst in instances:
976         for disk in inst.disks:
977           if _RecursiveCheckIfLVMBased(disk):
978             raise errors.OpPrereqError("Cannot disable lvm storage while"
979                                        " lvm-based instances exist")
980
981     # if vg_name not None, checks given volume group on all nodes
982     if self.op.vg_name:
983       node_list = self.cfg.GetNodeList()
984       vglist = rpc.call_vg_list(node_list)
985       for node in node_list:
986         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
987                                               constants.MIN_VG_SIZE)
988         if vgstatus:
989           raise errors.OpPrereqError("Error on node '%s': %s" %
990                                      (node, vgstatus))
991
992   def Exec(self, feedback_fn):
993     """Change the parameters of the cluster.
994
995     """
996     if self.op.vg_name != self.cfg.GetVGName():
997       self.cfg.SetVGName(self.op.vg_name)
998     else:
999       feedback_fn("Cluster LVM configuration already in desired"
1000                   " state, not changing")
1001
1002
1003 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1004   """Sleep and poll for an instance's disk to sync.
1005
1006   """
1007   if not instance.disks:
1008     return True
1009
1010   if not oneshot:
1011     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1012
1013   node = instance.primary_node
1014
1015   for dev in instance.disks:
1016     cfgw.SetDiskID(dev, node)
1017
1018   retries = 0
1019   while True:
1020     max_time = 0
1021     done = True
1022     cumul_degraded = False
1023     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1024     if not rstats:
1025       proc.LogWarning("Can't get any data from node %s" % node)
1026       retries += 1
1027       if retries >= 10:
1028         raise errors.RemoteError("Can't contact node %s for mirror data,"
1029                                  " aborting." % node)
1030       time.sleep(6)
1031       continue
1032     retries = 0
1033     for i in range(len(rstats)):
1034       mstat = rstats[i]
1035       if mstat is None:
1036         proc.LogWarning("Can't compute data for node %s/%s" %
1037                         (node, instance.disks[i].iv_name))
1038         continue
1039       # we ignore the ldisk parameter
1040       perc_done, est_time, is_degraded, _ = mstat
1041       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042       if perc_done is not None:
1043         done = False
1044         if est_time is not None:
1045           rem_time = "%d estimated seconds remaining" % est_time
1046           max_time = est_time
1047         else:
1048           rem_time = "no time estimate"
1049         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1050                      (instance.disks[i].iv_name, perc_done, rem_time))
1051     if done or oneshot:
1052       break
1053
1054     if unlock:
1055       #utils.Unlock('cmd')
1056       pass
1057     try:
1058       time.sleep(min(60, max_time))
1059     finally:
1060       if unlock:
1061         #utils.Lock('cmd')
1062         pass
1063
1064   if done:
1065     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1066   return not cumul_degraded
1067
1068
1069 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1070   """Check that mirrors are not degraded.
1071
1072   The ldisk parameter, if True, will change the test from the
1073   is_degraded attribute (which represents overall non-ok status for
1074   the device(s)) to the ldisk (representing the local storage status).
1075
1076   """
1077   cfgw.SetDiskID(dev, node)
1078   if ldisk:
1079     idx = 6
1080   else:
1081     idx = 5
1082
1083   result = True
1084   if on_primary or dev.AssembleOnSecondary():
1085     rstats = rpc.call_blockdev_find(node, dev)
1086     if not rstats:
1087       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1088       result = False
1089     else:
1090       result = result and (not rstats[idx])
1091   if dev.children:
1092     for child in dev.children:
1093       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1094
1095   return result
1096
1097
1098 class LUDiagnoseOS(NoHooksLU):
1099   """Logical unit for OS diagnose/query.
1100
1101   """
1102   _OP_REQP = ["output_fields", "names"]
1103
1104   def CheckPrereq(self):
1105     """Check prerequisites.
1106
1107     This always succeeds, since this is a pure query LU.
1108
1109     """
1110     if self.op.names:
1111       raise errors.OpPrereqError("Selective OS query not supported")
1112
1113     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1114     _CheckOutputFields(static=[],
1115                        dynamic=self.dynamic_fields,
1116                        selected=self.op.output_fields)
1117
1118   @staticmethod
1119   def _DiagnoseByOS(node_list, rlist):
1120     """Remaps a per-node return list into an a per-os per-node dictionary
1121
1122       Args:
1123         node_list: a list with the names of all nodes
1124         rlist: a map with node names as keys and OS objects as values
1125
1126       Returns:
1127         map: a map with osnames as keys and as value another map, with
1128              nodes as
1129              keys and list of OS objects as values
1130              e.g. {"debian-etch": {"node1": [<object>,...],
1131                                    "node2": [<object>,]}
1132                   }
1133
1134     """
1135     all_os = {}
1136     for node_name, nr in rlist.iteritems():
1137       if not nr:
1138         continue
1139       for os_obj in nr:
1140         if os_obj.name not in all_os:
1141           # build a list of nodes for this os containing empty lists
1142           # for each node in node_list
1143           all_os[os_obj.name] = {}
1144           for nname in node_list:
1145             all_os[os_obj.name][nname] = []
1146         all_os[os_obj.name][node_name].append(os_obj)
1147     return all_os
1148
1149   def Exec(self, feedback_fn):
1150     """Compute the list of OSes.
1151
1152     """
1153     node_list = self.cfg.GetNodeList()
1154     node_data = rpc.call_os_diagnose(node_list)
1155     if node_data == False:
1156       raise errors.OpExecError("Can't gather the list of OSes")
1157     pol = self._DiagnoseByOS(node_list, node_data)
1158     output = []
1159     for os_name, os_data in pol.iteritems():
1160       row = []
1161       for field in self.op.output_fields:
1162         if field == "name":
1163           val = os_name
1164         elif field == "valid":
1165           val = utils.all([osl and osl[0] for osl in os_data.values()])
1166         elif field == "node_status":
1167           val = {}
1168           for node_name, nos_list in os_data.iteritems():
1169             val[node_name] = [(v.status, v.path) for v in nos_list]
1170         else:
1171           raise errors.ParameterError(field)
1172         row.append(val)
1173       output.append(row)
1174
1175     return output
1176
1177
1178 class LURemoveNode(LogicalUnit):
1179   """Logical unit for removing a node.
1180
1181   """
1182   HPATH = "node-remove"
1183   HTYPE = constants.HTYPE_NODE
1184   _OP_REQP = ["node_name"]
1185
1186   def BuildHooksEnv(self):
1187     """Build hooks env.
1188
1189     This doesn't run on the target node in the pre phase as a failed
1190     node would not allows itself to run.
1191
1192     """
1193     env = {
1194       "OP_TARGET": self.op.node_name,
1195       "NODE_NAME": self.op.node_name,
1196       }
1197     all_nodes = self.cfg.GetNodeList()
1198     all_nodes.remove(self.op.node_name)
1199     return env, all_nodes, all_nodes
1200
1201   def CheckPrereq(self):
1202     """Check prerequisites.
1203
1204     This checks:
1205      - the node exists in the configuration
1206      - it does not have primary or secondary instances
1207      - it's not the master
1208
1209     Any errors are signalled by raising errors.OpPrereqError.
1210
1211     """
1212     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1213     if node is None:
1214       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1215
1216     instance_list = self.cfg.GetInstanceList()
1217
1218     masternode = self.sstore.GetMasterNode()
1219     if node.name == masternode:
1220       raise errors.OpPrereqError("Node is the master node,"
1221                                  " you need to failover first.")
1222
1223     for instance_name in instance_list:
1224       instance = self.cfg.GetInstanceInfo(instance_name)
1225       if node.name == instance.primary_node:
1226         raise errors.OpPrereqError("Instance %s still running on the node,"
1227                                    " please remove first." % instance_name)
1228       if node.name in instance.secondary_nodes:
1229         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230                                    " please remove first." % instance_name)
1231     self.op.node_name = node.name
1232     self.node = node
1233
1234   def Exec(self, feedback_fn):
1235     """Removes the node from the cluster.
1236
1237     """
1238     node = self.node
1239     logger.Info("stopping the node daemon and removing configs from node %s" %
1240                 node.name)
1241
1242     rpc.call_node_leave_cluster(node.name)
1243
1244     self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1245
1246     logger.Info("Removing node %s from config" % node.name)
1247
1248     self.cfg.RemoveNode(node.name)
1249
1250     utils.RemoveHostFromEtcHosts(node.name)
1251
1252
1253 class LUQueryNodes(NoHooksLU):
1254   """Logical unit for querying nodes.
1255
1256   """
1257   _OP_REQP = ["output_fields", "names"]
1258
1259   def CheckPrereq(self):
1260     """Check prerequisites.
1261
1262     This checks that the fields required are valid output fields.
1263
1264     """
1265     self.dynamic_fields = frozenset([
1266       "dtotal", "dfree",
1267       "mtotal", "mnode", "mfree",
1268       "bootid",
1269       "ctotal",
1270       ])
1271
1272     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273                                "pinst_list", "sinst_list",
1274                                "pip", "sip"],
1275                        dynamic=self.dynamic_fields,
1276                        selected=self.op.output_fields)
1277
1278     self.wanted = _GetWantedNodes(self, self.op.names)
1279
1280   def Exec(self, feedback_fn):
1281     """Computes the list of nodes and their attributes.
1282
1283     """
1284     nodenames = self.wanted
1285     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286
1287     # begin data gathering
1288
1289     if self.dynamic_fields.intersection(self.op.output_fields):
1290       live_data = {}
1291       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292       for name in nodenames:
1293         nodeinfo = node_data.get(name, None)
1294         if nodeinfo:
1295           live_data[name] = {
1296             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302             "bootid": nodeinfo['bootid'],
1303             }
1304         else:
1305           live_data[name] = {}
1306     else:
1307       live_data = dict.fromkeys(nodenames, {})
1308
1309     node_to_primary = dict([(name, set()) for name in nodenames])
1310     node_to_secondary = dict([(name, set()) for name in nodenames])
1311
1312     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313                              "sinst_cnt", "sinst_list"))
1314     if inst_fields & frozenset(self.op.output_fields):
1315       instancelist = self.cfg.GetInstanceList()
1316
1317       for instance_name in instancelist:
1318         inst = self.cfg.GetInstanceInfo(instance_name)
1319         if inst.primary_node in node_to_primary:
1320           node_to_primary[inst.primary_node].add(inst.name)
1321         for secnode in inst.secondary_nodes:
1322           if secnode in node_to_secondary:
1323             node_to_secondary[secnode].add(inst.name)
1324
1325     # end data gathering
1326
1327     output = []
1328     for node in nodelist:
1329       node_output = []
1330       for field in self.op.output_fields:
1331         if field == "name":
1332           val = node.name
1333         elif field == "pinst_list":
1334           val = list(node_to_primary[node.name])
1335         elif field == "sinst_list":
1336           val = list(node_to_secondary[node.name])
1337         elif field == "pinst_cnt":
1338           val = len(node_to_primary[node.name])
1339         elif field == "sinst_cnt":
1340           val = len(node_to_secondary[node.name])
1341         elif field == "pip":
1342           val = node.primary_ip
1343         elif field == "sip":
1344           val = node.secondary_ip
1345         elif field in self.dynamic_fields:
1346           val = live_data[node.name].get(field, None)
1347         else:
1348           raise errors.ParameterError(field)
1349         node_output.append(val)
1350       output.append(node_output)
1351
1352     return output
1353
1354
1355 class LUQueryNodeVolumes(NoHooksLU):
1356   """Logical unit for getting volumes on node(s).
1357
1358   """
1359   _OP_REQP = ["nodes", "output_fields"]
1360
1361   def CheckPrereq(self):
1362     """Check prerequisites.
1363
1364     This checks that the fields required are valid output fields.
1365
1366     """
1367     self.nodes = _GetWantedNodes(self, self.op.nodes)
1368
1369     _CheckOutputFields(static=["node"],
1370                        dynamic=["phys", "vg", "name", "size", "instance"],
1371                        selected=self.op.output_fields)
1372
1373
1374   def Exec(self, feedback_fn):
1375     """Computes the list of nodes and their attributes.
1376
1377     """
1378     nodenames = self.nodes
1379     volumes = rpc.call_node_volumes(nodenames)
1380
1381     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1382              in self.cfg.GetInstanceList()]
1383
1384     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1385
1386     output = []
1387     for node in nodenames:
1388       if node not in volumes or not volumes[node]:
1389         continue
1390
1391       node_vols = volumes[node][:]
1392       node_vols.sort(key=lambda vol: vol['dev'])
1393
1394       for vol in node_vols:
1395         node_output = []
1396         for field in self.op.output_fields:
1397           if field == "node":
1398             val = node
1399           elif field == "phys":
1400             val = vol['dev']
1401           elif field == "vg":
1402             val = vol['vg']
1403           elif field == "name":
1404             val = vol['name']
1405           elif field == "size":
1406             val = int(float(vol['size']))
1407           elif field == "instance":
1408             for inst in ilist:
1409               if node not in lv_by_node[inst]:
1410                 continue
1411               if vol['name'] in lv_by_node[inst][node]:
1412                 val = inst.name
1413                 break
1414             else:
1415               val = '-'
1416           else:
1417             raise errors.ParameterError(field)
1418           node_output.append(str(val))
1419
1420         output.append(node_output)
1421
1422     return output
1423
1424
1425 class LUAddNode(LogicalUnit):
1426   """Logical unit for adding node to the cluster.
1427
1428   """
1429   HPATH = "node-add"
1430   HTYPE = constants.HTYPE_NODE
1431   _OP_REQP = ["node_name"]
1432
1433   def BuildHooksEnv(self):
1434     """Build hooks env.
1435
1436     This will run on all nodes before, and on all nodes + the new node after.
1437
1438     """
1439     env = {
1440       "OP_TARGET": self.op.node_name,
1441       "NODE_NAME": self.op.node_name,
1442       "NODE_PIP": self.op.primary_ip,
1443       "NODE_SIP": self.op.secondary_ip,
1444       }
1445     nodes_0 = self.cfg.GetNodeList()
1446     nodes_1 = nodes_0 + [self.op.node_name, ]
1447     return env, nodes_0, nodes_1
1448
1449   def CheckPrereq(self):
1450     """Check prerequisites.
1451
1452     This checks:
1453      - the new node is not already in the config
1454      - it is resolvable
1455      - its parameters (single/dual homed) matches the cluster
1456
1457     Any errors are signalled by raising errors.OpPrereqError.
1458
1459     """
1460     node_name = self.op.node_name
1461     cfg = self.cfg
1462
1463     dns_data = utils.HostInfo(node_name)
1464
1465     node = dns_data.name
1466     primary_ip = self.op.primary_ip = dns_data.ip
1467     secondary_ip = getattr(self.op, "secondary_ip", None)
1468     if secondary_ip is None:
1469       secondary_ip = primary_ip
1470     if not utils.IsValidIP(secondary_ip):
1471       raise errors.OpPrereqError("Invalid secondary IP given")
1472     self.op.secondary_ip = secondary_ip
1473
1474     node_list = cfg.GetNodeList()
1475     if not self.op.readd and node in node_list:
1476       raise errors.OpPrereqError("Node %s is already in the configuration" %
1477                                  node)
1478     elif self.op.readd and node not in node_list:
1479       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1480
1481     for existing_node_name in node_list:
1482       existing_node = cfg.GetNodeInfo(existing_node_name)
1483
1484       if self.op.readd and node == existing_node_name:
1485         if (existing_node.primary_ip != primary_ip or
1486             existing_node.secondary_ip != secondary_ip):
1487           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1488                                      " address configuration as before")
1489         continue
1490
1491       if (existing_node.primary_ip == primary_ip or
1492           existing_node.secondary_ip == primary_ip or
1493           existing_node.primary_ip == secondary_ip or
1494           existing_node.secondary_ip == secondary_ip):
1495         raise errors.OpPrereqError("New node ip address(es) conflict with"
1496                                    " existing node %s" % existing_node.name)
1497
1498     # check that the type of the node (single versus dual homed) is the
1499     # same as for the master
1500     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501     master_singlehomed = myself.secondary_ip == myself.primary_ip
1502     newbie_singlehomed = secondary_ip == primary_ip
1503     if master_singlehomed != newbie_singlehomed:
1504       if master_singlehomed:
1505         raise errors.OpPrereqError("The master has no private ip but the"
1506                                    " new node has one")
1507       else:
1508         raise errors.OpPrereqError("The master has a private ip but the"
1509                                    " new node doesn't have one")
1510
1511     # checks reachablity
1512     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513       raise errors.OpPrereqError("Node not reachable by ping")
1514
1515     if not newbie_singlehomed:
1516       # check reachability from my secondary ip to newbie's secondary ip
1517       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518                            source=myself.secondary_ip):
1519         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520                                    " based ping to noded port")
1521
1522     self.new_node = objects.Node(name=node,
1523                                  primary_ip=primary_ip,
1524                                  secondary_ip=secondary_ip)
1525
1526     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529                                    constants.VNC_PASSWORD_FILE)
1530
1531   def Exec(self, feedback_fn):
1532     """Adds the new node to the cluster.
1533
1534     """
1535     new_node = self.new_node
1536     node = new_node.name
1537
1538     # set up inter-node password and certificate and restarts the node daemon
1539     gntpass = self.sstore.GetNodeDaemonPassword()
1540     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541       raise errors.OpExecError("ganeti password corruption detected")
1542     f = open(constants.SSL_CERT_FILE)
1543     try:
1544       gntpem = f.read(8192)
1545     finally:
1546       f.close()
1547     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548     # so we use this to detect an invalid certificate; as long as the
1549     # cert doesn't contain this, the here-document will be correctly
1550     # parsed by the shell sequence below
1551     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553     if not gntpem.endswith("\n"):
1554       raise errors.OpExecError("PEM must end with newline")
1555     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1556
1557     # and then connect with ssh to set password and start ganeti-noded
1558     # note that all the below variables are sanitized at this point,
1559     # either by being constants or by the checks above
1560     ss = self.sstore
1561     mycommand = ("umask 077 && "
1562                  "echo '%s' > '%s' && "
1563                  "cat > '%s' << '!EOF.' && \n"
1564                  "%s!EOF.\n%s restart" %
1565                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566                   constants.SSL_CERT_FILE, gntpem,
1567                   constants.NODE_INITD_SCRIPT))
1568
1569     result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1570     if result.failed:
1571       raise errors.OpExecError("Remote command on node %s, error: %s,"
1572                                " output: %s" %
1573                                (node, result.fail_reason, result.output))
1574
1575     # check connectivity
1576     time.sleep(4)
1577
1578     result = rpc.call_version([node])[node]
1579     if result:
1580       if constants.PROTOCOL_VERSION == result:
1581         logger.Info("communication to node %s fine, sw version %s match" %
1582                     (node, result))
1583       else:
1584         raise errors.OpExecError("Version mismatch master version %s,"
1585                                  " node version %s" %
1586                                  (constants.PROTOCOL_VERSION, result))
1587     else:
1588       raise errors.OpExecError("Cannot get version from the new node")
1589
1590     # setup ssh on node
1591     logger.Info("copy ssh key to node %s" % node)
1592     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1593     keyarray = []
1594     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1596                 priv_key, pub_key]
1597
1598     for i in keyfiles:
1599       f = open(i, 'r')
1600       try:
1601         keyarray.append(f.read())
1602       finally:
1603         f.close()
1604
1605     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606                                keyarray[3], keyarray[4], keyarray[5])
1607
1608     if not result:
1609       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1610
1611     # Add node to our /etc/hosts, and add key to known_hosts
1612     utils.AddHostToEtcHosts(new_node.name)
1613
1614     if new_node.secondary_ip != new_node.primary_ip:
1615       if not rpc.call_node_tcp_ping(new_node.name,
1616                                     constants.LOCALHOST_IP_ADDRESS,
1617                                     new_node.secondary_ip,
1618                                     constants.DEFAULT_NODED_PORT,
1619                                     10, False):
1620         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621                                  " you gave (%s). Please fix and re-run this"
1622                                  " command." % new_node.secondary_ip)
1623
1624     success, msg = self.ssh.VerifyNodeHostname(node)
1625     if not success:
1626       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627                                " than the one the resolver gives: %s."
1628                                " Please fix and re-run this command." %
1629                                (node, msg))
1630
1631     # Distribute updated /etc/hosts and known_hosts to all nodes,
1632     # including the node just added
1633     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634     dist_nodes = self.cfg.GetNodeList()
1635     if not self.op.readd:
1636       dist_nodes.append(node)
1637     if myself.name in dist_nodes:
1638       dist_nodes.remove(myself.name)
1639
1640     logger.Debug("Copying hosts and known_hosts to all nodes")
1641     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1642       result = rpc.call_upload_file(dist_nodes, fname)
1643       for to_node in dist_nodes:
1644         if not result[to_node]:
1645           logger.Error("copy of file %s to node %s failed" %
1646                        (fname, to_node))
1647
1648     to_copy = ss.GetFileList()
1649     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1650       to_copy.append(constants.VNC_PASSWORD_FILE)
1651     for fname in to_copy:
1652       if not self.ssh.CopyFileToNode(node, fname):
1653         logger.Error("could not copy file %s to node %s" % (fname, node))
1654
1655     if not self.op.readd:
1656       logger.Info("adding node %s to cluster.conf" % node)
1657       self.cfg.AddNode(new_node)
1658
1659
1660 class LUMasterFailover(LogicalUnit):
1661   """Failover the master node to the current node.
1662
1663   This is a special LU in that it must run on a non-master node.
1664
1665   """
1666   HPATH = "master-failover"
1667   HTYPE = constants.HTYPE_CLUSTER
1668   REQ_MASTER = False
1669   REQ_WSSTORE = True
1670   _OP_REQP = []
1671
1672   def BuildHooksEnv(self):
1673     """Build hooks env.
1674
1675     This will run on the new master only in the pre phase, and on all
1676     the nodes in the post phase.
1677
1678     """
1679     env = {
1680       "OP_TARGET": self.new_master,
1681       "NEW_MASTER": self.new_master,
1682       "OLD_MASTER": self.old_master,
1683       }
1684     return env, [self.new_master], self.cfg.GetNodeList()
1685
1686   def CheckPrereq(self):
1687     """Check prerequisites.
1688
1689     This checks that we are not already the master.
1690
1691     """
1692     self.new_master = utils.HostInfo().name
1693     self.old_master = self.sstore.GetMasterNode()
1694
1695     if self.old_master == self.new_master:
1696       raise errors.OpPrereqError("This commands must be run on the node"
1697                                  " where you want the new master to be."
1698                                  " %s is already the master" %
1699                                  self.old_master)
1700
1701   def Exec(self, feedback_fn):
1702     """Failover the master node.
1703
1704     This command, when run on a non-master node, will cause the current
1705     master to cease being master, and the non-master to become new
1706     master.
1707
1708     """
1709     #TODO: do not rely on gethostname returning the FQDN
1710     logger.Info("setting master to %s, old master: %s" %
1711                 (self.new_master, self.old_master))
1712
1713     if not rpc.call_node_stop_master(self.old_master):
1714       logger.Error("could disable the master role on the old master"
1715                    " %s, please disable manually" % self.old_master)
1716
1717     ss = self.sstore
1718     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1719     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1720                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1721       logger.Error("could not distribute the new simple store master file"
1722                    " to the other nodes, please check.")
1723
1724     if not rpc.call_node_start_master(self.new_master):
1725       logger.Error("could not start the master role on the new master"
1726                    " %s, please check" % self.new_master)
1727       feedback_fn("Error in activating the master IP on the new master,"
1728                   " please fix manually.")
1729
1730
1731
1732 class LUQueryClusterInfo(NoHooksLU):
1733   """Query cluster configuration.
1734
1735   """
1736   _OP_REQP = []
1737   REQ_MASTER = False
1738
1739   def CheckPrereq(self):
1740     """No prerequsites needed for this LU.
1741
1742     """
1743     pass
1744
1745   def Exec(self, feedback_fn):
1746     """Return cluster config.
1747
1748     """
1749     result = {
1750       "name": self.sstore.GetClusterName(),
1751       "software_version": constants.RELEASE_VERSION,
1752       "protocol_version": constants.PROTOCOL_VERSION,
1753       "config_version": constants.CONFIG_VERSION,
1754       "os_api_version": constants.OS_API_VERSION,
1755       "export_version": constants.EXPORT_VERSION,
1756       "master": self.sstore.GetMasterNode(),
1757       "architecture": (platform.architecture()[0], platform.machine()),
1758       "hypervisor_type": self.sstore.GetHypervisorType(),
1759       }
1760
1761     return result
1762
1763
1764 class LUClusterCopyFile(NoHooksLU):
1765   """Copy file to cluster.
1766
1767   """
1768   _OP_REQP = ["nodes", "filename"]
1769
1770   def CheckPrereq(self):
1771     """Check prerequisites.
1772
1773     It should check that the named file exists and that the given list
1774     of nodes is valid.
1775
1776     """
1777     if not os.path.exists(self.op.filename):
1778       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1779
1780     self.nodes = _GetWantedNodes(self, self.op.nodes)
1781
1782   def Exec(self, feedback_fn):
1783     """Copy a file from master to some nodes.
1784
1785     Args:
1786       opts - class with options as members
1787       args - list containing a single element, the file name
1788     Opts used:
1789       nodes - list containing the name of target nodes; if empty, all nodes
1790
1791     """
1792     filename = self.op.filename
1793
1794     myname = utils.HostInfo().name
1795
1796     for node in self.nodes:
1797       if node == myname:
1798         continue
1799       if not self.ssh.CopyFileToNode(node, filename):
1800         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1801
1802
1803 class LUDumpClusterConfig(NoHooksLU):
1804   """Return a text-representation of the cluster-config.
1805
1806   """
1807   _OP_REQP = []
1808
1809   def CheckPrereq(self):
1810     """No prerequisites.
1811
1812     """
1813     pass
1814
1815   def Exec(self, feedback_fn):
1816     """Dump a representation of the cluster config to the standard output.
1817
1818     """
1819     return self.cfg.DumpConfig()
1820
1821
1822 class LURunClusterCommand(NoHooksLU):
1823   """Run a command on some nodes.
1824
1825   """
1826   _OP_REQP = ["command", "nodes"]
1827
1828   def CheckPrereq(self):
1829     """Check prerequisites.
1830
1831     It checks that the given list of nodes is valid.
1832
1833     """
1834     self.nodes = _GetWantedNodes(self, self.op.nodes)
1835
1836   def Exec(self, feedback_fn):
1837     """Run a command on some nodes.
1838
1839     """
1840     # put the master at the end of the nodes list
1841     master_node = self.sstore.GetMasterNode()
1842     if master_node in self.nodes:
1843       self.nodes.remove(master_node)
1844       self.nodes.append(master_node)
1845
1846     data = []
1847     for node in self.nodes:
1848       result = self.ssh.Run(node, "root", self.op.command)
1849       data.append((node, result.output, result.exit_code))
1850
1851     return data
1852
1853
1854 class LUActivateInstanceDisks(NoHooksLU):
1855   """Bring up an instance's disks.
1856
1857   """
1858   _OP_REQP = ["instance_name"]
1859
1860   def CheckPrereq(self):
1861     """Check prerequisites.
1862
1863     This checks that the instance is in the cluster.
1864
1865     """
1866     instance = self.cfg.GetInstanceInfo(
1867       self.cfg.ExpandInstanceName(self.op.instance_name))
1868     if instance is None:
1869       raise errors.OpPrereqError("Instance '%s' not known" %
1870                                  self.op.instance_name)
1871     self.instance = instance
1872
1873
1874   def Exec(self, feedback_fn):
1875     """Activate the disks.
1876
1877     """
1878     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1879     if not disks_ok:
1880       raise errors.OpExecError("Cannot activate block devices")
1881
1882     return disks_info
1883
1884
1885 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1886   """Prepare the block devices for an instance.
1887
1888   This sets up the block devices on all nodes.
1889
1890   Args:
1891     instance: a ganeti.objects.Instance object
1892     ignore_secondaries: if true, errors on secondary nodes won't result
1893                         in an error return from the function
1894
1895   Returns:
1896     false if the operation failed
1897     list of (host, instance_visible_name, node_visible_name) if the operation
1898          suceeded with the mapping from node devices to instance devices
1899   """
1900   device_info = []
1901   disks_ok = True
1902   iname = instance.name
1903   # With the two passes mechanism we try to reduce the window of
1904   # opportunity for the race condition of switching DRBD to primary
1905   # before handshaking occured, but we do not eliminate it
1906
1907   # The proper fix would be to wait (with some limits) until the
1908   # connection has been made and drbd transitions from WFConnection
1909   # into any other network-connected state (Connected, SyncTarget,
1910   # SyncSource, etc.)
1911
1912   # 1st pass, assemble on all nodes in secondary mode
1913   for inst_disk in instance.disks:
1914     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1915       cfg.SetDiskID(node_disk, node)
1916       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1917       if not result:
1918         logger.Error("could not prepare block device %s on node %s"
1919                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1920         if not ignore_secondaries:
1921           disks_ok = False
1922
1923   # FIXME: race condition on drbd migration to primary
1924
1925   # 2nd pass, do only the primary node
1926   for inst_disk in instance.disks:
1927     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1928       if node != instance.primary_node:
1929         continue
1930       cfg.SetDiskID(node_disk, node)
1931       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1932       if not result:
1933         logger.Error("could not prepare block device %s on node %s"
1934                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1935         disks_ok = False
1936     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1937
1938   # leave the disks configured for the primary node
1939   # this is a workaround that would be fixed better by
1940   # improving the logical/physical id handling
1941   for disk in instance.disks:
1942     cfg.SetDiskID(disk, instance.primary_node)
1943
1944   return disks_ok, device_info
1945
1946
1947 def _StartInstanceDisks(cfg, instance, force):
1948   """Start the disks of an instance.
1949
1950   """
1951   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1952                                            ignore_secondaries=force)
1953   if not disks_ok:
1954     _ShutdownInstanceDisks(instance, cfg)
1955     if force is not None and not force:
1956       logger.Error("If the message above refers to a secondary node,"
1957                    " you can retry the operation using '--force'.")
1958     raise errors.OpExecError("Disk consistency error")
1959
1960
1961 class LUDeactivateInstanceDisks(NoHooksLU):
1962   """Shutdown an instance's disks.
1963
1964   """
1965   _OP_REQP = ["instance_name"]
1966
1967   def CheckPrereq(self):
1968     """Check prerequisites.
1969
1970     This checks that the instance is in the cluster.
1971
1972     """
1973     instance = self.cfg.GetInstanceInfo(
1974       self.cfg.ExpandInstanceName(self.op.instance_name))
1975     if instance is None:
1976       raise errors.OpPrereqError("Instance '%s' not known" %
1977                                  self.op.instance_name)
1978     self.instance = instance
1979
1980   def Exec(self, feedback_fn):
1981     """Deactivate the disks
1982
1983     """
1984     instance = self.instance
1985     ins_l = rpc.call_instance_list([instance.primary_node])
1986     ins_l = ins_l[instance.primary_node]
1987     if not type(ins_l) is list:
1988       raise errors.OpExecError("Can't contact node '%s'" %
1989                                instance.primary_node)
1990
1991     if self.instance.name in ins_l:
1992       raise errors.OpExecError("Instance is running, can't shutdown"
1993                                " block devices.")
1994
1995     _ShutdownInstanceDisks(instance, self.cfg)
1996
1997
1998 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1999   """Shutdown block devices of an instance.
2000
2001   This does the shutdown on all nodes of the instance.
2002
2003   If the ignore_primary is false, errors on the primary node are
2004   ignored.
2005
2006   """
2007   result = True
2008   for disk in instance.disks:
2009     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2010       cfg.SetDiskID(top_disk, node)
2011       if not rpc.call_blockdev_shutdown(node, top_disk):
2012         logger.Error("could not shutdown block device %s on node %s" %
2013                      (disk.iv_name, node))
2014         if not ignore_primary or node != instance.primary_node:
2015           result = False
2016   return result
2017
2018
2019 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2020   """Checks if a node has enough free memory.
2021
2022   This function check if a given node has the needed amount of free
2023   memory. In case the node has less memory or we cannot get the
2024   information from the node, this function raise an OpPrereqError
2025   exception.
2026
2027   Args:
2028     - cfg: a ConfigWriter instance
2029     - node: the node name
2030     - reason: string to use in the error message
2031     - requested: the amount of memory in MiB
2032
2033   """
2034   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2035   if not nodeinfo or not isinstance(nodeinfo, dict):
2036     raise errors.OpPrereqError("Could not contact node %s for resource"
2037                              " information" % (node,))
2038
2039   free_mem = nodeinfo[node].get('memory_free')
2040   if not isinstance(free_mem, int):
2041     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2042                              " was '%s'" % (node, free_mem))
2043   if requested > free_mem:
2044     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2045                              " needed %s MiB, available %s MiB" %
2046                              (node, reason, requested, free_mem))
2047
2048
2049 class LUStartupInstance(LogicalUnit):
2050   """Starts an instance.
2051
2052   """
2053   HPATH = "instance-start"
2054   HTYPE = constants.HTYPE_INSTANCE
2055   _OP_REQP = ["instance_name", "force"]
2056
2057   def BuildHooksEnv(self):
2058     """Build hooks env.
2059
2060     This runs on master, primary and secondary nodes of the instance.
2061
2062     """
2063     env = {
2064       "FORCE": self.op.force,
2065       }
2066     env.update(_BuildInstanceHookEnvByObject(self.instance))
2067     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2068           list(self.instance.secondary_nodes))
2069     return env, nl, nl
2070
2071   def CheckPrereq(self):
2072     """Check prerequisites.
2073
2074     This checks that the instance is in the cluster.
2075
2076     """
2077     instance = self.cfg.GetInstanceInfo(
2078       self.cfg.ExpandInstanceName(self.op.instance_name))
2079     if instance is None:
2080       raise errors.OpPrereqError("Instance '%s' not known" %
2081                                  self.op.instance_name)
2082
2083     # check bridges existance
2084     _CheckInstanceBridgesExist(instance)
2085
2086     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2087                          "starting instance %s" % instance.name,
2088                          instance.memory)
2089
2090     self.instance = instance
2091     self.op.instance_name = instance.name
2092
2093   def Exec(self, feedback_fn):
2094     """Start the instance.
2095
2096     """
2097     instance = self.instance
2098     force = self.op.force
2099     extra_args = getattr(self.op, "extra_args", "")
2100
2101     self.cfg.MarkInstanceUp(instance.name)
2102
2103     node_current = instance.primary_node
2104
2105     _StartInstanceDisks(self.cfg, instance, force)
2106
2107     if not rpc.call_instance_start(node_current, instance, extra_args):
2108       _ShutdownInstanceDisks(instance, self.cfg)
2109       raise errors.OpExecError("Could not start instance")
2110
2111
2112 class LURebootInstance(LogicalUnit):
2113   """Reboot an instance.
2114
2115   """
2116   HPATH = "instance-reboot"
2117   HTYPE = constants.HTYPE_INSTANCE
2118   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2119
2120   def BuildHooksEnv(self):
2121     """Build hooks env.
2122
2123     This runs on master, primary and secondary nodes of the instance.
2124
2125     """
2126     env = {
2127       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2128       }
2129     env.update(_BuildInstanceHookEnvByObject(self.instance))
2130     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2131           list(self.instance.secondary_nodes))
2132     return env, nl, nl
2133
2134   def CheckPrereq(self):
2135     """Check prerequisites.
2136
2137     This checks that the instance is in the cluster.
2138
2139     """
2140     instance = self.cfg.GetInstanceInfo(
2141       self.cfg.ExpandInstanceName(self.op.instance_name))
2142     if instance is None:
2143       raise errors.OpPrereqError("Instance '%s' not known" %
2144                                  self.op.instance_name)
2145
2146     # check bridges existance
2147     _CheckInstanceBridgesExist(instance)
2148
2149     self.instance = instance
2150     self.op.instance_name = instance.name
2151
2152   def Exec(self, feedback_fn):
2153     """Reboot the instance.
2154
2155     """
2156     instance = self.instance
2157     ignore_secondaries = self.op.ignore_secondaries
2158     reboot_type = self.op.reboot_type
2159     extra_args = getattr(self.op, "extra_args", "")
2160
2161     node_current = instance.primary_node
2162
2163     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2164                            constants.INSTANCE_REBOOT_HARD,
2165                            constants.INSTANCE_REBOOT_FULL]:
2166       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2167                                   (constants.INSTANCE_REBOOT_SOFT,
2168                                    constants.INSTANCE_REBOOT_HARD,
2169                                    constants.INSTANCE_REBOOT_FULL))
2170
2171     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2172                        constants.INSTANCE_REBOOT_HARD]:
2173       if not rpc.call_instance_reboot(node_current, instance,
2174                                       reboot_type, extra_args):
2175         raise errors.OpExecError("Could not reboot instance")
2176     else:
2177       if not rpc.call_instance_shutdown(node_current, instance):
2178         raise errors.OpExecError("could not shutdown instance for full reboot")
2179       _ShutdownInstanceDisks(instance, self.cfg)
2180       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2181       if not rpc.call_instance_start(node_current, instance, extra_args):
2182         _ShutdownInstanceDisks(instance, self.cfg)
2183         raise errors.OpExecError("Could not start instance for full reboot")
2184
2185     self.cfg.MarkInstanceUp(instance.name)
2186
2187
2188 class LUShutdownInstance(LogicalUnit):
2189   """Shutdown an instance.
2190
2191   """
2192   HPATH = "instance-stop"
2193   HTYPE = constants.HTYPE_INSTANCE
2194   _OP_REQP = ["instance_name"]
2195
2196   def BuildHooksEnv(self):
2197     """Build hooks env.
2198
2199     This runs on master, primary and secondary nodes of the instance.
2200
2201     """
2202     env = _BuildInstanceHookEnvByObject(self.instance)
2203     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2204           list(self.instance.secondary_nodes))
2205     return env, nl, nl
2206
2207   def CheckPrereq(self):
2208     """Check prerequisites.
2209
2210     This checks that the instance is in the cluster.
2211
2212     """
2213     instance = self.cfg.GetInstanceInfo(
2214       self.cfg.ExpandInstanceName(self.op.instance_name))
2215     if instance is None:
2216       raise errors.OpPrereqError("Instance '%s' not known" %
2217                                  self.op.instance_name)
2218     self.instance = instance
2219
2220   def Exec(self, feedback_fn):
2221     """Shutdown the instance.
2222
2223     """
2224     instance = self.instance
2225     node_current = instance.primary_node
2226     self.cfg.MarkInstanceDown(instance.name)
2227     if not rpc.call_instance_shutdown(node_current, instance):
2228       logger.Error("could not shutdown instance")
2229
2230     _ShutdownInstanceDisks(instance, self.cfg)
2231
2232
2233 class LUReinstallInstance(LogicalUnit):
2234   """Reinstall an instance.
2235
2236   """
2237   HPATH = "instance-reinstall"
2238   HTYPE = constants.HTYPE_INSTANCE
2239   _OP_REQP = ["instance_name"]
2240
2241   def BuildHooksEnv(self):
2242     """Build hooks env.
2243
2244     This runs on master, primary and secondary nodes of the instance.
2245
2246     """
2247     env = _BuildInstanceHookEnvByObject(self.instance)
2248     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2249           list(self.instance.secondary_nodes))
2250     return env, nl, nl
2251
2252   def CheckPrereq(self):
2253     """Check prerequisites.
2254
2255     This checks that the instance is in the cluster and is not running.
2256
2257     """
2258     instance = self.cfg.GetInstanceInfo(
2259       self.cfg.ExpandInstanceName(self.op.instance_name))
2260     if instance is None:
2261       raise errors.OpPrereqError("Instance '%s' not known" %
2262                                  self.op.instance_name)
2263     if instance.disk_template == constants.DT_DISKLESS:
2264       raise errors.OpPrereqError("Instance '%s' has no disks" %
2265                                  self.op.instance_name)
2266     if instance.status != "down":
2267       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2268                                  self.op.instance_name)
2269     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270     if remote_info:
2271       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2272                                  (self.op.instance_name,
2273                                   instance.primary_node))
2274
2275     self.op.os_type = getattr(self.op, "os_type", None)
2276     if self.op.os_type is not None:
2277       # OS verification
2278       pnode = self.cfg.GetNodeInfo(
2279         self.cfg.ExpandNodeName(instance.primary_node))
2280       if pnode is None:
2281         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2282                                    self.op.pnode)
2283       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2284       if not os_obj:
2285         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2286                                    " primary node"  % self.op.os_type)
2287
2288     self.instance = instance
2289
2290   def Exec(self, feedback_fn):
2291     """Reinstall the instance.
2292
2293     """
2294     inst = self.instance
2295
2296     if self.op.os_type is not None:
2297       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2298       inst.os = self.op.os_type
2299       self.cfg.AddInstance(inst)
2300
2301     _StartInstanceDisks(self.cfg, inst, None)
2302     try:
2303       feedback_fn("Running the instance OS create scripts...")
2304       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2305         raise errors.OpExecError("Could not install OS for instance %s"
2306                                  " on node %s" %
2307                                  (inst.name, inst.primary_node))
2308     finally:
2309       _ShutdownInstanceDisks(inst, self.cfg)
2310
2311
2312 class LURenameInstance(LogicalUnit):
2313   """Rename an instance.
2314
2315   """
2316   HPATH = "instance-rename"
2317   HTYPE = constants.HTYPE_INSTANCE
2318   _OP_REQP = ["instance_name", "new_name"]
2319
2320   def BuildHooksEnv(self):
2321     """Build hooks env.
2322
2323     This runs on master, primary and secondary nodes of the instance.
2324
2325     """
2326     env = _BuildInstanceHookEnvByObject(self.instance)
2327     env["INSTANCE_NEW_NAME"] = self.op.new_name
2328     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2329           list(self.instance.secondary_nodes))
2330     return env, nl, nl
2331
2332   def CheckPrereq(self):
2333     """Check prerequisites.
2334
2335     This checks that the instance is in the cluster and is not running.
2336
2337     """
2338     instance = self.cfg.GetInstanceInfo(
2339       self.cfg.ExpandInstanceName(self.op.instance_name))
2340     if instance is None:
2341       raise errors.OpPrereqError("Instance '%s' not known" %
2342                                  self.op.instance_name)
2343     if instance.status != "down":
2344       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345                                  self.op.instance_name)
2346     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2347     if remote_info:
2348       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349                                  (self.op.instance_name,
2350                                   instance.primary_node))
2351     self.instance = instance
2352
2353     # new name verification
2354     name_info = utils.HostInfo(self.op.new_name)
2355
2356     self.op.new_name = new_name = name_info.name
2357     instance_list = self.cfg.GetInstanceList()
2358     if new_name in instance_list:
2359       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2360                                  new_name)
2361
2362     if not getattr(self.op, "ignore_ip", False):
2363       command = ["fping", "-q", name_info.ip]
2364       result = utils.RunCmd(command)
2365       if not result.failed:
2366         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2367                                    (name_info.ip, new_name))
2368
2369
2370   def Exec(self, feedback_fn):
2371     """Reinstall the instance.
2372
2373     """
2374     inst = self.instance
2375     old_name = inst.name
2376
2377     if inst.disk_template == constants.DT_FILE:
2378       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2379
2380     self.cfg.RenameInstance(inst.name, self.op.new_name)
2381
2382     # re-read the instance from the configuration after rename
2383     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2384
2385     if inst.disk_template == constants.DT_FILE:
2386       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2387       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2388                                                 old_file_storage_dir,
2389                                                 new_file_storage_dir)
2390
2391       if not result:
2392         raise errors.OpExecError("Could not connect to node '%s' to rename"
2393                                  " directory '%s' to '%s' (but the instance"
2394                                  " has been renamed in Ganeti)" % (
2395                                  inst.primary_node, old_file_storage_dir,
2396                                  new_file_storage_dir))
2397
2398       if not result[0]:
2399         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2400                                  " (but the instance has been renamed in"
2401                                  " Ganeti)" % (old_file_storage_dir,
2402                                                new_file_storage_dir))
2403
2404     _StartInstanceDisks(self.cfg, inst, None)
2405     try:
2406       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2407                                           "sda", "sdb"):
2408         msg = ("Could run OS rename script for instance %s on node %s (but the"
2409                " instance has been renamed in Ganeti)" %
2410                (inst.name, inst.primary_node))
2411         logger.Error(msg)
2412     finally:
2413       _ShutdownInstanceDisks(inst, self.cfg)
2414
2415
2416 class LURemoveInstance(LogicalUnit):
2417   """Remove an instance.
2418
2419   """
2420   HPATH = "instance-remove"
2421   HTYPE = constants.HTYPE_INSTANCE
2422   _OP_REQP = ["instance_name", "ignore_failures"]
2423
2424   def BuildHooksEnv(self):
2425     """Build hooks env.
2426
2427     This runs on master, primary and secondary nodes of the instance.
2428
2429     """
2430     env = _BuildInstanceHookEnvByObject(self.instance)
2431     nl = [self.sstore.GetMasterNode()]
2432     return env, nl, nl
2433
2434   def CheckPrereq(self):
2435     """Check prerequisites.
2436
2437     This checks that the instance is in the cluster.
2438
2439     """
2440     instance = self.cfg.GetInstanceInfo(
2441       self.cfg.ExpandInstanceName(self.op.instance_name))
2442     if instance is None:
2443       raise errors.OpPrereqError("Instance '%s' not known" %
2444                                  self.op.instance_name)
2445     self.instance = instance
2446
2447   def Exec(self, feedback_fn):
2448     """Remove the instance.
2449
2450     """
2451     instance = self.instance
2452     logger.Info("shutting down instance %s on node %s" %
2453                 (instance.name, instance.primary_node))
2454
2455     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2456       if self.op.ignore_failures:
2457         feedback_fn("Warning: can't shutdown instance")
2458       else:
2459         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2460                                  (instance.name, instance.primary_node))
2461
2462     logger.Info("removing block devices for instance %s" % instance.name)
2463
2464     if not _RemoveDisks(instance, self.cfg):
2465       if self.op.ignore_failures:
2466         feedback_fn("Warning: can't remove instance's disks")
2467       else:
2468         raise errors.OpExecError("Can't remove instance's disks")
2469
2470     logger.Info("removing instance %s out of cluster config" % instance.name)
2471
2472     self.cfg.RemoveInstance(instance.name)
2473
2474
2475 class LUQueryInstances(NoHooksLU):
2476   """Logical unit for querying instances.
2477
2478   """
2479   _OP_REQP = ["output_fields", "names"]
2480
2481   def CheckPrereq(self):
2482     """Check prerequisites.
2483
2484     This checks that the fields required are valid output fields.
2485
2486     """
2487     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2488     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2489                                "admin_state", "admin_ram",
2490                                "disk_template", "ip", "mac", "bridge",
2491                                "sda_size", "sdb_size", "vcpus"],
2492                        dynamic=self.dynamic_fields,
2493                        selected=self.op.output_fields)
2494
2495     self.wanted = _GetWantedInstances(self, self.op.names)
2496
2497   def Exec(self, feedback_fn):
2498     """Computes the list of nodes and their attributes.
2499
2500     """
2501     instance_names = self.wanted
2502     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2503                      in instance_names]
2504
2505     # begin data gathering
2506
2507     nodes = frozenset([inst.primary_node for inst in instance_list])
2508
2509     bad_nodes = []
2510     if self.dynamic_fields.intersection(self.op.output_fields):
2511       live_data = {}
2512       node_data = rpc.call_all_instances_info(nodes)
2513       for name in nodes:
2514         result = node_data[name]
2515         if result:
2516           live_data.update(result)
2517         elif result == False:
2518           bad_nodes.append(name)
2519         # else no instance is alive
2520     else:
2521       live_data = dict([(name, {}) for name in instance_names])
2522
2523     # end data gathering
2524
2525     output = []
2526     for instance in instance_list:
2527       iout = []
2528       for field in self.op.output_fields:
2529         if field == "name":
2530           val = instance.name
2531         elif field == "os":
2532           val = instance.os
2533         elif field == "pnode":
2534           val = instance.primary_node
2535         elif field == "snodes":
2536           val = list(instance.secondary_nodes)
2537         elif field == "admin_state":
2538           val = (instance.status != "down")
2539         elif field == "oper_state":
2540           if instance.primary_node in bad_nodes:
2541             val = None
2542           else:
2543             val = bool(live_data.get(instance.name))
2544         elif field == "status":
2545           if instance.primary_node in bad_nodes:
2546             val = "ERROR_nodedown"
2547           else:
2548             running = bool(live_data.get(instance.name))
2549             if running:
2550               if instance.status != "down":
2551                 val = "running"
2552               else:
2553                 val = "ERROR_up"
2554             else:
2555               if instance.status != "down":
2556                 val = "ERROR_down"
2557               else:
2558                 val = "ADMIN_down"
2559         elif field == "admin_ram":
2560           val = instance.memory
2561         elif field == "oper_ram":
2562           if instance.primary_node in bad_nodes:
2563             val = None
2564           elif instance.name in live_data:
2565             val = live_data[instance.name].get("memory", "?")
2566           else:
2567             val = "-"
2568         elif field == "disk_template":
2569           val = instance.disk_template
2570         elif field == "ip":
2571           val = instance.nics[0].ip
2572         elif field == "bridge":
2573           val = instance.nics[0].bridge
2574         elif field == "mac":
2575           val = instance.nics[0].mac
2576         elif field == "sda_size" or field == "sdb_size":
2577           disk = instance.FindDisk(field[:3])
2578           if disk is None:
2579             val = None
2580           else:
2581             val = disk.size
2582         elif field == "vcpus":
2583           val = instance.vcpus
2584         else:
2585           raise errors.ParameterError(field)
2586         iout.append(val)
2587       output.append(iout)
2588
2589     return output
2590
2591
2592 class LUFailoverInstance(LogicalUnit):
2593   """Failover an instance.
2594
2595   """
2596   HPATH = "instance-failover"
2597   HTYPE = constants.HTYPE_INSTANCE
2598   _OP_REQP = ["instance_name", "ignore_consistency"]
2599
2600   def BuildHooksEnv(self):
2601     """Build hooks env.
2602
2603     This runs on master, primary and secondary nodes of the instance.
2604
2605     """
2606     env = {
2607       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2608       }
2609     env.update(_BuildInstanceHookEnvByObject(self.instance))
2610     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2611     return env, nl, nl
2612
2613   def CheckPrereq(self):
2614     """Check prerequisites.
2615
2616     This checks that the instance is in the cluster.
2617
2618     """
2619     instance = self.cfg.GetInstanceInfo(
2620       self.cfg.ExpandInstanceName(self.op.instance_name))
2621     if instance is None:
2622       raise errors.OpPrereqError("Instance '%s' not known" %
2623                                  self.op.instance_name)
2624
2625     if instance.disk_template not in constants.DTS_NET_MIRROR:
2626       raise errors.OpPrereqError("Instance's disk layout is not"
2627                                  " network mirrored, cannot failover.")
2628
2629     secondary_nodes = instance.secondary_nodes
2630     if not secondary_nodes:
2631       raise errors.ProgrammerError("no secondary node but using "
2632                                    "a mirrored disk template")
2633
2634     target_node = secondary_nodes[0]
2635     # check memory requirements on the secondary node
2636     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2637                          instance.name, instance.memory)
2638
2639     # check bridge existance
2640     brlist = [nic.bridge for nic in instance.nics]
2641     if not rpc.call_bridges_exist(target_node, brlist):
2642       raise errors.OpPrereqError("One or more target bridges %s does not"
2643                                  " exist on destination node '%s'" %
2644                                  (brlist, target_node))
2645
2646     self.instance = instance
2647
2648   def Exec(self, feedback_fn):
2649     """Failover an instance.
2650
2651     The failover is done by shutting it down on its present node and
2652     starting it on the secondary.
2653
2654     """
2655     instance = self.instance
2656
2657     source_node = instance.primary_node
2658     target_node = instance.secondary_nodes[0]
2659
2660     feedback_fn("* checking disk consistency between source and target")
2661     for dev in instance.disks:
2662       # for drbd, these are drbd over lvm
2663       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2664         if instance.status == "up" and not self.op.ignore_consistency:
2665           raise errors.OpExecError("Disk %s is degraded on target node,"
2666                                    " aborting failover." % dev.iv_name)
2667
2668     feedback_fn("* shutting down instance on source node")
2669     logger.Info("Shutting down instance %s on node %s" %
2670                 (instance.name, source_node))
2671
2672     if not rpc.call_instance_shutdown(source_node, instance):
2673       if self.op.ignore_consistency:
2674         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2675                      " anyway. Please make sure node %s is down"  %
2676                      (instance.name, source_node, source_node))
2677       else:
2678         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2679                                  (instance.name, source_node))
2680
2681     feedback_fn("* deactivating the instance's disks on source node")
2682     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2683       raise errors.OpExecError("Can't shut down the instance's disks.")
2684
2685     instance.primary_node = target_node
2686     # distribute new instance config to the other nodes
2687     self.cfg.Update(instance)
2688
2689     # Only start the instance if it's marked as up
2690     if instance.status == "up":
2691       feedback_fn("* activating the instance's disks on target node")
2692       logger.Info("Starting instance %s on node %s" %
2693                   (instance.name, target_node))
2694
2695       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2696                                                ignore_secondaries=True)
2697       if not disks_ok:
2698         _ShutdownInstanceDisks(instance, self.cfg)
2699         raise errors.OpExecError("Can't activate the instance's disks")
2700
2701       feedback_fn("* starting the instance on the target node")
2702       if not rpc.call_instance_start(target_node, instance, None):
2703         _ShutdownInstanceDisks(instance, self.cfg)
2704         raise errors.OpExecError("Could not start instance %s on node %s." %
2705                                  (instance.name, target_node))
2706
2707
2708 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2709   """Create a tree of block devices on the primary node.
2710
2711   This always creates all devices.
2712
2713   """
2714   if device.children:
2715     for child in device.children:
2716       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2717         return False
2718
2719   cfg.SetDiskID(device, node)
2720   new_id = rpc.call_blockdev_create(node, device, device.size,
2721                                     instance.name, True, info)
2722   if not new_id:
2723     return False
2724   if device.physical_id is None:
2725     device.physical_id = new_id
2726   return True
2727
2728
2729 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2730   """Create a tree of block devices on a secondary node.
2731
2732   If this device type has to be created on secondaries, create it and
2733   all its children.
2734
2735   If not, just recurse to children keeping the same 'force' value.
2736
2737   """
2738   if device.CreateOnSecondary():
2739     force = True
2740   if device.children:
2741     for child in device.children:
2742       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2743                                         child, force, info):
2744         return False
2745
2746   if not force:
2747     return True
2748   cfg.SetDiskID(device, node)
2749   new_id = rpc.call_blockdev_create(node, device, device.size,
2750                                     instance.name, False, info)
2751   if not new_id:
2752     return False
2753   if device.physical_id is None:
2754     device.physical_id = new_id
2755   return True
2756
2757
2758 def _GenerateUniqueNames(cfg, exts):
2759   """Generate a suitable LV name.
2760
2761   This will generate a logical volume name for the given instance.
2762
2763   """
2764   results = []
2765   for val in exts:
2766     new_id = cfg.GenerateUniqueID()
2767     results.append("%s%s" % (new_id, val))
2768   return results
2769
2770
2771 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2772   """Generate a drbd device complete with its children.
2773
2774   """
2775   port = cfg.AllocatePort()
2776   vgname = cfg.GetVGName()
2777   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2778                           logical_id=(vgname, names[0]))
2779   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2780                           logical_id=(vgname, names[1]))
2781   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2782                           logical_id = (primary, secondary, port),
2783                           children = [dev_data, dev_meta])
2784   return drbd_dev
2785
2786
2787 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2788   """Generate a drbd8 device complete with its children.
2789
2790   """
2791   port = cfg.AllocatePort()
2792   vgname = cfg.GetVGName()
2793   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2794                           logical_id=(vgname, names[0]))
2795   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2796                           logical_id=(vgname, names[1]))
2797   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2798                           logical_id = (primary, secondary, port),
2799                           children = [dev_data, dev_meta],
2800                           iv_name=iv_name)
2801   return drbd_dev
2802
2803
2804 def _GenerateDiskTemplate(cfg, template_name,
2805                           instance_name, primary_node,
2806                           secondary_nodes, disk_sz, swap_sz,
2807                           file_storage_dir, file_driver):
2808   """Generate the entire disk layout for a given template type.
2809
2810   """
2811   #TODO: compute space requirements
2812
2813   vgname = cfg.GetVGName()
2814   if template_name == constants.DT_DISKLESS:
2815     disks = []
2816   elif template_name == constants.DT_PLAIN:
2817     if len(secondary_nodes) != 0:
2818       raise errors.ProgrammerError("Wrong template configuration")
2819
2820     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2821     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2822                            logical_id=(vgname, names[0]),
2823                            iv_name = "sda")
2824     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2825                            logical_id=(vgname, names[1]),
2826                            iv_name = "sdb")
2827     disks = [sda_dev, sdb_dev]
2828   elif template_name == constants.DT_DRBD8:
2829     if len(secondary_nodes) != 1:
2830       raise errors.ProgrammerError("Wrong template configuration")
2831     remote_node = secondary_nodes[0]
2832     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2833                                        ".sdb_data", ".sdb_meta"])
2834     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2835                                          disk_sz, names[0:2], "sda")
2836     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837                                          swap_sz, names[2:4], "sdb")
2838     disks = [drbd_sda_dev, drbd_sdb_dev]
2839   elif template_name == constants.DT_FILE:
2840     if len(secondary_nodes) != 0:
2841       raise errors.ProgrammerError("Wrong template configuration")
2842
2843     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2844                                 iv_name="sda", logical_id=(file_driver,
2845                                 "%s/sda" % file_storage_dir))
2846     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2847                                 iv_name="sdb", logical_id=(file_driver,
2848                                 "%s/sdb" % file_storage_dir))
2849     disks = [file_sda_dev, file_sdb_dev]
2850   else:
2851     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2852   return disks
2853
2854
2855 def _GetInstanceInfoText(instance):
2856   """Compute that text that should be added to the disk's metadata.
2857
2858   """
2859   return "originstname+%s" % instance.name
2860
2861
2862 def _CreateDisks(cfg, instance):
2863   """Create all disks for an instance.
2864
2865   This abstracts away some work from AddInstance.
2866
2867   Args:
2868     instance: the instance object
2869
2870   Returns:
2871     True or False showing the success of the creation process
2872
2873   """
2874   info = _GetInstanceInfoText(instance)
2875
2876   if instance.disk_template == constants.DT_FILE:
2877     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2878     result = rpc.call_file_storage_dir_create(instance.primary_node,
2879                                               file_storage_dir)
2880
2881     if not result:
2882       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2883       return False
2884
2885     if not result[0]:
2886       logger.Error("failed to create directory '%s'" % file_storage_dir)
2887       return False
2888
2889   for device in instance.disks:
2890     logger.Info("creating volume %s for instance %s" %
2891                 (device.iv_name, instance.name))
2892     #HARDCODE
2893     for secondary_node in instance.secondary_nodes:
2894       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2895                                         device, False, info):
2896         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2897                      (device.iv_name, device, secondary_node))
2898         return False
2899     #HARDCODE
2900     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2901                                     instance, device, info):
2902       logger.Error("failed to create volume %s on primary!" %
2903                    device.iv_name)
2904       return False
2905
2906   return True
2907
2908
2909 def _RemoveDisks(instance, cfg):
2910   """Remove all disks for an instance.
2911
2912   This abstracts away some work from `AddInstance()` and
2913   `RemoveInstance()`. Note that in case some of the devices couldn't
2914   be removed, the removal will continue with the other ones (compare
2915   with `_CreateDisks()`).
2916
2917   Args:
2918     instance: the instance object
2919
2920   Returns:
2921     True or False showing the success of the removal proces
2922
2923   """
2924   logger.Info("removing block devices for instance %s" % instance.name)
2925
2926   result = True
2927   for device in instance.disks:
2928     for node, disk in device.ComputeNodeTree(instance.primary_node):
2929       cfg.SetDiskID(disk, node)
2930       if not rpc.call_blockdev_remove(node, disk):
2931         logger.Error("could not remove block device %s on node %s,"
2932                      " continuing anyway" %
2933                      (device.iv_name, node))
2934         result = False
2935
2936   if instance.disk_template == constants.DT_FILE:
2937     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2938     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2939                                             file_storage_dir):
2940       logger.Error("could not remove directory '%s'" % file_storage_dir)
2941       result = False
2942
2943   return result
2944
2945
2946 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2947   """Compute disk size requirements in the volume group
2948
2949   This is currently hard-coded for the two-drive layout.
2950
2951   """
2952   # Required free disk space as a function of disk and swap space
2953   req_size_dict = {
2954     constants.DT_DISKLESS: None,
2955     constants.DT_PLAIN: disk_size + swap_size,
2956     # 256 MB are added for drbd metadata, 128MB for each drbd device
2957     constants.DT_DRBD8: disk_size + swap_size + 256,
2958     constants.DT_FILE: None,
2959   }
2960
2961   if disk_template not in req_size_dict:
2962     raise errors.ProgrammerError("Disk template '%s' size requirement"
2963                                  " is unknown" %  disk_template)
2964
2965   return req_size_dict[disk_template]
2966
2967
2968 class LUCreateInstance(LogicalUnit):
2969   """Create an instance.
2970
2971   """
2972   HPATH = "instance-add"
2973   HTYPE = constants.HTYPE_INSTANCE
2974   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2975               "disk_template", "swap_size", "mode", "start", "vcpus",
2976               "wait_for_sync", "ip_check", "mac"]
2977
2978   def _RunAllocator(self):
2979     """Run the allocator based on input opcode.
2980
2981     """
2982     disks = [{"size": self.op.disk_size, "mode": "w"},
2983              {"size": self.op.swap_size, "mode": "w"}]
2984     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2985              "bridge": self.op.bridge}]
2986     ial = IAllocator(self.cfg, self.sstore,
2987                      mode=constants.IALLOCATOR_MODE_ALLOC,
2988                      name=self.op.instance_name,
2989                      disk_template=self.op.disk_template,
2990                      tags=[],
2991                      os=self.op.os_type,
2992                      vcpus=self.op.vcpus,
2993                      mem_size=self.op.mem_size,
2994                      disks=disks,
2995                      nics=nics,
2996                      )
2997
2998     ial.Run(self.op.iallocator)
2999
3000     if not ial.success:
3001       raise errors.OpPrereqError("Can't compute nodes using"
3002                                  " iallocator '%s': %s" % (self.op.iallocator,
3003                                                            ial.info))
3004     if len(ial.nodes) != ial.required_nodes:
3005       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3006                                  " of nodes (%s), required %s" %
3007                                  (len(ial.nodes), ial.required_nodes))
3008     self.op.pnode = ial.nodes[0]
3009     logger.ToStdout("Selected nodes for the instance: %s" %
3010                     (", ".join(ial.nodes),))
3011     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3012                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3013     if ial.required_nodes == 2:
3014       self.op.snode = ial.nodes[1]
3015
3016   def BuildHooksEnv(self):
3017     """Build hooks env.
3018
3019     This runs on master, primary and secondary nodes of the instance.
3020
3021     """
3022     env = {
3023       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3024       "INSTANCE_DISK_SIZE": self.op.disk_size,
3025       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3026       "INSTANCE_ADD_MODE": self.op.mode,
3027       }
3028     if self.op.mode == constants.INSTANCE_IMPORT:
3029       env["INSTANCE_SRC_NODE"] = self.op.src_node
3030       env["INSTANCE_SRC_PATH"] = self.op.src_path
3031       env["INSTANCE_SRC_IMAGE"] = self.src_image
3032
3033     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3034       primary_node=self.op.pnode,
3035       secondary_nodes=self.secondaries,
3036       status=self.instance_status,
3037       os_type=self.op.os_type,
3038       memory=self.op.mem_size,
3039       vcpus=self.op.vcpus,
3040       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3041     ))
3042
3043     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3044           self.secondaries)
3045     return env, nl, nl
3046
3047
3048   def CheckPrereq(self):
3049     """Check prerequisites.
3050
3051     """
3052     # set optional parameters to none if they don't exist
3053     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3054                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3055                  "vnc_bind_address"]:
3056       if not hasattr(self.op, attr):
3057         setattr(self.op, attr, None)
3058
3059     if self.op.mode not in (constants.INSTANCE_CREATE,
3060                             constants.INSTANCE_IMPORT):
3061       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3062                                  self.op.mode)
3063
3064     if (not self.cfg.GetVGName() and
3065         self.op.disk_template not in constants.DTS_NOT_LVM):
3066       raise errors.OpPrereqError("Cluster does not support lvm-based"
3067                                  " instances")
3068
3069     if self.op.mode == constants.INSTANCE_IMPORT:
3070       src_node = getattr(self.op, "src_node", None)
3071       src_path = getattr(self.op, "src_path", None)
3072       if src_node is None or src_path is None:
3073         raise errors.OpPrereqError("Importing an instance requires source"
3074                                    " node and path options")
3075       src_node_full = self.cfg.ExpandNodeName(src_node)
3076       if src_node_full is None:
3077         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3078       self.op.src_node = src_node = src_node_full
3079
3080       if not os.path.isabs(src_path):
3081         raise errors.OpPrereqError("The source path must be absolute")
3082
3083       export_info = rpc.call_export_info(src_node, src_path)
3084
3085       if not export_info:
3086         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3087
3088       if not export_info.has_section(constants.INISECT_EXP):
3089         raise errors.ProgrammerError("Corrupted export config")
3090
3091       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3092       if (int(ei_version) != constants.EXPORT_VERSION):
3093         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3094                                    (ei_version, constants.EXPORT_VERSION))
3095
3096       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3097         raise errors.OpPrereqError("Can't import instance with more than"
3098                                    " one data disk")
3099
3100       # FIXME: are the old os-es, disk sizes, etc. useful?
3101       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3102       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3103                                                          'disk0_dump'))
3104       self.src_image = diskimage
3105     else: # INSTANCE_CREATE
3106       if getattr(self.op, "os_type", None) is None:
3107         raise errors.OpPrereqError("No guest OS specified")
3108
3109     #### instance parameters check
3110
3111     # disk template and mirror node verification
3112     if self.op.disk_template not in constants.DISK_TEMPLATES:
3113       raise errors.OpPrereqError("Invalid disk template name")
3114
3115     # instance name verification
3116     hostname1 = utils.HostInfo(self.op.instance_name)
3117
3118     self.op.instance_name = instance_name = hostname1.name
3119     instance_list = self.cfg.GetInstanceList()
3120     if instance_name in instance_list:
3121       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3122                                  instance_name)
3123
3124     # ip validity checks
3125     ip = getattr(self.op, "ip", None)
3126     if ip is None or ip.lower() == "none":
3127       inst_ip = None
3128     elif ip.lower() == "auto":
3129       inst_ip = hostname1.ip
3130     else:
3131       if not utils.IsValidIP(ip):
3132         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3133                                    " like a valid IP" % ip)
3134       inst_ip = ip
3135     self.inst_ip = self.op.ip = inst_ip
3136
3137     if self.op.start and not self.op.ip_check:
3138       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3139                                  " adding an instance in start mode")
3140
3141     if self.op.ip_check:
3142       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3143         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3144                                    (hostname1.ip, instance_name))
3145
3146     # MAC address verification
3147     if self.op.mac != "auto":
3148       if not utils.IsValidMac(self.op.mac.lower()):
3149         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3150                                    self.op.mac)
3151
3152     # bridge verification
3153     bridge = getattr(self.op, "bridge", None)
3154     if bridge is None:
3155       self.op.bridge = self.cfg.GetDefBridge()
3156     else:
3157       self.op.bridge = bridge
3158
3159     # boot order verification
3160     if self.op.hvm_boot_order is not None:
3161       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3162         raise errors.OpPrereqError("invalid boot order specified,"
3163                                    " must be one or more of [acdn]")
3164     # file storage checks
3165     if (self.op.file_driver and
3166         not self.op.file_driver in constants.FILE_DRIVER):
3167       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3168                                  self.op.file_driver)
3169
3170     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3171       raise errors.OpPrereqError("File storage directory not a relative"
3172                                  " path")
3173     #### allocator run
3174
3175     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3176       raise errors.OpPrereqError("One and only one of iallocator and primary"
3177                                  " node must be given")
3178
3179     if self.op.iallocator is not None:
3180       self._RunAllocator()
3181
3182     #### node related checks
3183
3184     # check primary node
3185     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3186     if pnode is None:
3187       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3188                                  self.op.pnode)
3189     self.op.pnode = pnode.name
3190     self.pnode = pnode
3191     self.secondaries = []
3192
3193     # mirror node verification
3194     if self.op.disk_template in constants.DTS_NET_MIRROR:
3195       if getattr(self.op, "snode", None) is None:
3196         raise errors.OpPrereqError("The networked disk templates need"
3197                                    " a mirror node")
3198
3199       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3200       if snode_name is None:
3201         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3202                                    self.op.snode)
3203       elif snode_name == pnode.name:
3204         raise errors.OpPrereqError("The secondary node cannot be"
3205                                    " the primary node.")
3206       self.secondaries.append(snode_name)
3207
3208     req_size = _ComputeDiskSize(self.op.disk_template,
3209                                 self.op.disk_size, self.op.swap_size)
3210
3211     # Check lv size requirements
3212     if req_size is not None:
3213       nodenames = [pnode.name] + self.secondaries
3214       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3215       for node in nodenames:
3216         info = nodeinfo.get(node, None)
3217         if not info:
3218           raise errors.OpPrereqError("Cannot get current information"
3219                                      " from node '%s'" % node)
3220         vg_free = info.get('vg_free', None)
3221         if not isinstance(vg_free, int):
3222           raise errors.OpPrereqError("Can't compute free disk space on"
3223                                      " node %s" % node)
3224         if req_size > info['vg_free']:
3225           raise errors.OpPrereqError("Not enough disk space on target node %s."
3226                                      " %d MB available, %d MB required" %
3227                                      (node, info['vg_free'], req_size))
3228
3229     # os verification
3230     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3231     if not os_obj:
3232       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3233                                  " primary node"  % self.op.os_type)
3234
3235     if self.op.kernel_path == constants.VALUE_NONE:
3236       raise errors.OpPrereqError("Can't set instance kernel to none")
3237
3238
3239     # bridge check on primary node
3240     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3241       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3242                                  " destination node '%s'" %
3243                                  (self.op.bridge, pnode.name))
3244
3245     # memory check on primary node
3246     if self.op.start:
3247       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3248                            "creating instance %s" % self.op.instance_name,
3249                            self.op.mem_size)
3250
3251     # hvm_cdrom_image_path verification
3252     if self.op.hvm_cdrom_image_path is not None:
3253       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3254         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3255                                    " be an absolute path or None, not %s" %
3256                                    self.op.hvm_cdrom_image_path)
3257       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3258         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3259                                    " regular file or a symlink pointing to"
3260                                    " an existing regular file, not %s" %
3261                                    self.op.hvm_cdrom_image_path)
3262
3263     # vnc_bind_address verification
3264     if self.op.vnc_bind_address is not None:
3265       if not utils.IsValidIP(self.op.vnc_bind_address):
3266         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3267                                    " like a valid IP address" %
3268                                    self.op.vnc_bind_address)
3269
3270     if self.op.start:
3271       self.instance_status = 'up'
3272     else:
3273       self.instance_status = 'down'
3274
3275   def Exec(self, feedback_fn):
3276     """Create and add the instance to the cluster.
3277
3278     """
3279     instance = self.op.instance_name
3280     pnode_name = self.pnode.name
3281
3282     if self.op.mac == "auto":
3283       mac_address = self.cfg.GenerateMAC()
3284     else:
3285       mac_address = self.op.mac
3286
3287     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3288     if self.inst_ip is not None:
3289       nic.ip = self.inst_ip
3290
3291     ht_kind = self.sstore.GetHypervisorType()
3292     if ht_kind in constants.HTS_REQ_PORT:
3293       network_port = self.cfg.AllocatePort()
3294     else:
3295       network_port = None
3296
3297     if self.op.vnc_bind_address is None:
3298       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3299
3300     # this is needed because os.path.join does not accept None arguments
3301     if self.op.file_storage_dir is None:
3302       string_file_storage_dir = ""
3303     else:
3304       string_file_storage_dir = self.op.file_storage_dir
3305
3306     # build the full file storage dir path
3307     file_storage_dir = os.path.normpath(os.path.join(
3308                                         self.sstore.GetFileStorageDir(),
3309                                         string_file_storage_dir, instance))
3310
3311
3312     disks = _GenerateDiskTemplate(self.cfg,
3313                                   self.op.disk_template,
3314                                   instance, pnode_name,
3315                                   self.secondaries, self.op.disk_size,
3316                                   self.op.swap_size,
3317                                   file_storage_dir,
3318                                   self.op.file_driver)
3319
3320     iobj = objects.Instance(name=instance, os=self.op.os_type,
3321                             primary_node=pnode_name,
3322                             memory=self.op.mem_size,
3323                             vcpus=self.op.vcpus,
3324                             nics=[nic], disks=disks,
3325                             disk_template=self.op.disk_template,
3326                             status=self.instance_status,
3327                             network_port=network_port,
3328                             kernel_path=self.op.kernel_path,
3329                             initrd_path=self.op.initrd_path,
3330                             hvm_boot_order=self.op.hvm_boot_order,
3331                             hvm_acpi=self.op.hvm_acpi,
3332                             hvm_pae=self.op.hvm_pae,
3333                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3334                             vnc_bind_address=self.op.vnc_bind_address,
3335                             )
3336
3337     feedback_fn("* creating instance disks...")
3338     if not _CreateDisks(self.cfg, iobj):
3339       _RemoveDisks(iobj, self.cfg)
3340       raise errors.OpExecError("Device creation failed, reverting...")
3341
3342     feedback_fn("adding instance %s to cluster config" % instance)
3343
3344     self.cfg.AddInstance(iobj)
3345
3346     if self.op.wait_for_sync:
3347       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3348     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3349       # make sure the disks are not degraded (still sync-ing is ok)
3350       time.sleep(15)
3351       feedback_fn("* checking mirrors status")
3352       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3353     else:
3354       disk_abort = False
3355
3356     if disk_abort:
3357       _RemoveDisks(iobj, self.cfg)
3358       self.cfg.RemoveInstance(iobj.name)
3359       raise errors.OpExecError("There are some degraded disks for"
3360                                " this instance")
3361
3362     feedback_fn("creating os for instance %s on node %s" %
3363                 (instance, pnode_name))
3364
3365     if iobj.disk_template != constants.DT_DISKLESS:
3366       if self.op.mode == constants.INSTANCE_CREATE:
3367         feedback_fn("* running the instance OS create scripts...")
3368         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3369           raise errors.OpExecError("could not add os for instance %s"
3370                                    " on node %s" %
3371                                    (instance, pnode_name))
3372
3373       elif self.op.mode == constants.INSTANCE_IMPORT:
3374         feedback_fn("* running the instance OS import scripts...")
3375         src_node = self.op.src_node
3376         src_image = self.src_image
3377         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3378                                                 src_node, src_image):
3379           raise errors.OpExecError("Could not import os for instance"
3380                                    " %s on node %s" %
3381                                    (instance, pnode_name))
3382       else:
3383         # also checked in the prereq part
3384         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3385                                      % self.op.mode)
3386
3387     if self.op.start:
3388       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3389       feedback_fn("* starting instance...")
3390       if not rpc.call_instance_start(pnode_name, iobj, None):
3391         raise errors.OpExecError("Could not start instance")
3392
3393
3394 class LUConnectConsole(NoHooksLU):
3395   """Connect to an instance's console.
3396
3397   This is somewhat special in that it returns the command line that
3398   you need to run on the master node in order to connect to the
3399   console.
3400
3401   """
3402   _OP_REQP = ["instance_name"]
3403
3404   def CheckPrereq(self):
3405     """Check prerequisites.
3406
3407     This checks that the instance is in the cluster.
3408
3409     """
3410     instance = self.cfg.GetInstanceInfo(
3411       self.cfg.ExpandInstanceName(self.op.instance_name))
3412     if instance is None:
3413       raise errors.OpPrereqError("Instance '%s' not known" %
3414                                  self.op.instance_name)
3415     self.instance = instance
3416
3417   def Exec(self, feedback_fn):
3418     """Connect to the console of an instance
3419
3420     """
3421     instance = self.instance
3422     node = instance.primary_node
3423
3424     node_insts = rpc.call_instance_list([node])[node]
3425     if node_insts is False:
3426       raise errors.OpExecError("Can't connect to node %s." % node)
3427
3428     if instance.name not in node_insts:
3429       raise errors.OpExecError("Instance %s is not running." % instance.name)
3430
3431     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3432
3433     hyper = hypervisor.GetHypervisor()
3434     console_cmd = hyper.GetShellCommandForConsole(instance)
3435
3436     # build ssh cmdline
3437     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3438
3439
3440 class LUReplaceDisks(LogicalUnit):
3441   """Replace the disks of an instance.
3442
3443   """
3444   HPATH = "mirrors-replace"
3445   HTYPE = constants.HTYPE_INSTANCE
3446   _OP_REQP = ["instance_name", "mode", "disks"]
3447
3448   def _RunAllocator(self):
3449     """Compute a new secondary node using an IAllocator.
3450
3451     """
3452     ial = IAllocator(self.cfg, self.sstore,
3453                      mode=constants.IALLOCATOR_MODE_RELOC,
3454                      name=self.op.instance_name,
3455                      relocate_from=[self.sec_node])
3456
3457     ial.Run(self.op.iallocator)
3458
3459     if not ial.success:
3460       raise errors.OpPrereqError("Can't compute nodes using"
3461                                  " iallocator '%s': %s" % (self.op.iallocator,
3462                                                            ial.info))
3463     if len(ial.nodes) != ial.required_nodes:
3464       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3465                                  " of nodes (%s), required %s" %
3466                                  (len(ial.nodes), ial.required_nodes))
3467     self.op.remote_node = ial.nodes[0]
3468     logger.ToStdout("Selected new secondary for the instance: %s" %
3469                     self.op.remote_node)
3470
3471   def BuildHooksEnv(self):
3472     """Build hooks env.
3473
3474     This runs on the master, the primary and all the secondaries.
3475
3476     """
3477     env = {
3478       "MODE": self.op.mode,
3479       "NEW_SECONDARY": self.op.remote_node,
3480       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3481       }
3482     env.update(_BuildInstanceHookEnvByObject(self.instance))
3483     nl = [
3484       self.sstore.GetMasterNode(),
3485       self.instance.primary_node,
3486       ]
3487     if self.op.remote_node is not None:
3488       nl.append(self.op.remote_node)
3489     return env, nl, nl
3490
3491   def CheckPrereq(self):
3492     """Check prerequisites.
3493
3494     This checks that the instance is in the cluster.
3495
3496     """
3497     if not hasattr(self.op, "remote_node"):
3498       self.op.remote_node = None
3499
3500     instance = self.cfg.GetInstanceInfo(
3501       self.cfg.ExpandInstanceName(self.op.instance_name))
3502     if instance is None:
3503       raise errors.OpPrereqError("Instance '%s' not known" %
3504                                  self.op.instance_name)
3505     self.instance = instance
3506     self.op.instance_name = instance.name
3507
3508     if instance.disk_template not in constants.DTS_NET_MIRROR:
3509       raise errors.OpPrereqError("Instance's disk layout is not"
3510                                  " network mirrored.")
3511
3512     if len(instance.secondary_nodes) != 1:
3513       raise errors.OpPrereqError("The instance has a strange layout,"
3514                                  " expected one secondary but found %d" %
3515                                  len(instance.secondary_nodes))
3516
3517     self.sec_node = instance.secondary_nodes[0]
3518
3519     ia_name = getattr(self.op, "iallocator", None)
3520     if ia_name is not None:
3521       if self.op.remote_node is not None:
3522         raise errors.OpPrereqError("Give either the iallocator or the new"
3523                                    " secondary, not both")
3524       self.op.remote_node = self._RunAllocator()
3525
3526     remote_node = self.op.remote_node
3527     if remote_node is not None:
3528       remote_node = self.cfg.ExpandNodeName(remote_node)
3529       if remote_node is None:
3530         raise errors.OpPrereqError("Node '%s' not known" %
3531                                    self.op.remote_node)
3532       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3533     else:
3534       self.remote_node_info = None
3535     if remote_node == instance.primary_node:
3536       raise errors.OpPrereqError("The specified node is the primary node of"
3537                                  " the instance.")
3538     elif remote_node == self.sec_node:
3539       if self.op.mode == constants.REPLACE_DISK_SEC:
3540         # this is for DRBD8, where we can't execute the same mode of
3541         # replacement as for drbd7 (no different port allocated)
3542         raise errors.OpPrereqError("Same secondary given, cannot execute"
3543                                    " replacement")
3544     if instance.disk_template == constants.DT_DRBD8:
3545       if (self.op.mode == constants.REPLACE_DISK_ALL and
3546           remote_node is not None):
3547         # switch to replace secondary mode
3548         self.op.mode = constants.REPLACE_DISK_SEC
3549
3550       if self.op.mode == constants.REPLACE_DISK_ALL:
3551         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3552                                    " secondary disk replacement, not"
3553                                    " both at once")
3554       elif self.op.mode == constants.REPLACE_DISK_PRI:
3555         if remote_node is not None:
3556           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3557                                      " the secondary while doing a primary"
3558                                      " node disk replacement")
3559         self.tgt_node = instance.primary_node
3560         self.oth_node = instance.secondary_nodes[0]
3561       elif self.op.mode == constants.REPLACE_DISK_SEC:
3562         self.new_node = remote_node # this can be None, in which case
3563                                     # we don't change the secondary
3564         self.tgt_node = instance.secondary_nodes[0]
3565         self.oth_node = instance.primary_node
3566       else:
3567         raise errors.ProgrammerError("Unhandled disk replace mode")
3568
3569     for name in self.op.disks:
3570       if instance.FindDisk(name) is None:
3571         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3572                                    (name, instance.name))
3573     self.op.remote_node = remote_node
3574
3575   def _ExecD8DiskOnly(self, feedback_fn):
3576     """Replace a disk on the primary or secondary for dbrd8.
3577
3578     The algorithm for replace is quite complicated:
3579       - for each disk to be replaced:
3580         - create new LVs on the target node with unique names
3581         - detach old LVs from the drbd device
3582         - rename old LVs to name_replaced.<time_t>
3583         - rename new LVs to old LVs
3584         - attach the new LVs (with the old names now) to the drbd device
3585       - wait for sync across all devices
3586       - for each modified disk:
3587         - remove old LVs (which have the name name_replaces.<time_t>)
3588
3589     Failures are not very well handled.
3590
3591     """
3592     steps_total = 6
3593     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3594     instance = self.instance
3595     iv_names = {}
3596     vgname = self.cfg.GetVGName()
3597     # start of work
3598     cfg = self.cfg
3599     tgt_node = self.tgt_node
3600     oth_node = self.oth_node
3601
3602     # Step: check device activation
3603     self.proc.LogStep(1, steps_total, "check device existence")
3604     info("checking volume groups")
3605     my_vg = cfg.GetVGName()
3606     results = rpc.call_vg_list([oth_node, tgt_node])
3607     if not results:
3608       raise errors.OpExecError("Can't list volume groups on the nodes")
3609     for node in oth_node, tgt_node:
3610       res = results.get(node, False)
3611       if not res or my_vg not in res:
3612         raise errors.OpExecError("Volume group '%s' not found on %s" %
3613                                  (my_vg, node))
3614     for dev in instance.disks:
3615       if not dev.iv_name in self.op.disks:
3616         continue
3617       for node in tgt_node, oth_node:
3618         info("checking %s on %s" % (dev.iv_name, node))
3619         cfg.SetDiskID(dev, node)
3620         if not rpc.call_blockdev_find(node, dev):
3621           raise errors.OpExecError("Can't find device %s on node %s" %
3622                                    (dev.iv_name, node))
3623
3624     # Step: check other node consistency
3625     self.proc.LogStep(2, steps_total, "check peer consistency")
3626     for dev in instance.disks:
3627       if not dev.iv_name in self.op.disks:
3628         continue
3629       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3630       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3631                                    oth_node==instance.primary_node):
3632         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3633                                  " to replace disks on this node (%s)" %
3634                                  (oth_node, tgt_node))
3635
3636     # Step: create new storage
3637     self.proc.LogStep(3, steps_total, "allocate new storage")
3638     for dev in instance.disks:
3639       if not dev.iv_name in self.op.disks:
3640         continue
3641       size = dev.size
3642       cfg.SetDiskID(dev, tgt_node)
3643       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3644       names = _GenerateUniqueNames(cfg, lv_names)
3645       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3646                              logical_id=(vgname, names[0]))
3647       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3648                              logical_id=(vgname, names[1]))
3649       new_lvs = [lv_data, lv_meta]
3650       old_lvs = dev.children
3651       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3652       info("creating new local storage on %s for %s" %
3653            (tgt_node, dev.iv_name))
3654       # since we *always* want to create this LV, we use the
3655       # _Create...OnPrimary (which forces the creation), even if we
3656       # are talking about the secondary node
3657       for new_lv in new_lvs:
3658         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3659                                         _GetInstanceInfoText(instance)):
3660           raise errors.OpExecError("Failed to create new LV named '%s' on"
3661                                    " node '%s'" %
3662                                    (new_lv.logical_id[1], tgt_node))
3663
3664     # Step: for each lv, detach+rename*2+attach
3665     self.proc.LogStep(4, steps_total, "change drbd configuration")
3666     for dev, old_lvs, new_lvs in iv_names.itervalues():
3667       info("detaching %s drbd from local storage" % dev.iv_name)
3668       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3669         raise errors.OpExecError("Can't detach drbd from local storage on node"
3670                                  " %s for device %s" % (tgt_node, dev.iv_name))
3671       #dev.children = []
3672       #cfg.Update(instance)
3673
3674       # ok, we created the new LVs, so now we know we have the needed
3675       # storage; as such, we proceed on the target node to rename
3676       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3677       # using the assumption that logical_id == physical_id (which in
3678       # turn is the unique_id on that node)
3679
3680       # FIXME(iustin): use a better name for the replaced LVs
3681       temp_suffix = int(time.time())
3682       ren_fn = lambda d, suff: (d.physical_id[0],
3683                                 d.physical_id[1] + "_replaced-%s" % suff)
3684       # build the rename list based on what LVs exist on the node
3685       rlist = []
3686       for to_ren in old_lvs:
3687         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3688         if find_res is not None: # device exists
3689           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3690
3691       info("renaming the old LVs on the target node")
3692       if not rpc.call_blockdev_rename(tgt_node, rlist):
3693         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3694       # now we rename the new LVs to the old LVs
3695       info("renaming the new LVs on the target node")
3696       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3697       if not rpc.call_blockdev_rename(tgt_node, rlist):
3698         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3699
3700       for old, new in zip(old_lvs, new_lvs):
3701         new.logical_id = old.logical_id
3702         cfg.SetDiskID(new, tgt_node)
3703
3704       for disk in old_lvs:
3705         disk.logical_id = ren_fn(disk, temp_suffix)
3706         cfg.SetDiskID(disk, tgt_node)
3707
3708       # now that the new lvs have the old name, we can add them to the device
3709       info("adding new mirror component on %s" % tgt_node)
3710       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3711         for new_lv in new_lvs:
3712           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3713             warning("Can't rollback device %s", hint="manually cleanup unused"
3714                     " logical volumes")
3715         raise errors.OpExecError("Can't add local storage to drbd")
3716
3717       dev.children = new_lvs
3718       cfg.Update(instance)
3719
3720     # Step: wait for sync
3721
3722     # this can fail as the old devices are degraded and _WaitForSync
3723     # does a combined result over all disks, so we don't check its
3724     # return value
3725     self.proc.LogStep(5, steps_total, "sync devices")
3726     _WaitForSync(cfg, instance, self.proc, unlock=True)
3727
3728     # so check manually all the devices
3729     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3730       cfg.SetDiskID(dev, instance.primary_node)
3731       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3732       if is_degr:
3733         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3734
3735     # Step: remove old storage
3736     self.proc.LogStep(6, steps_total, "removing old storage")
3737     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3738       info("remove logical volumes for %s" % name)
3739       for lv in old_lvs:
3740         cfg.SetDiskID(lv, tgt_node)
3741         if not rpc.call_blockdev_remove(tgt_node, lv):
3742           warning("Can't remove old LV", hint="manually remove unused LVs")
3743           continue
3744
3745   def _ExecD8Secondary(self, feedback_fn):
3746     """Replace the secondary node for drbd8.
3747
3748     The algorithm for replace is quite complicated:
3749       - for all disks of the instance:
3750         - create new LVs on the new node with same names
3751         - shutdown the drbd device on the old secondary
3752         - disconnect the drbd network on the primary
3753         - create the drbd device on the new secondary
3754         - network attach the drbd on the primary, using an artifice:
3755           the drbd code for Attach() will connect to the network if it
3756           finds a device which is connected to the good local disks but
3757           not network enabled
3758       - wait for sync across all devices
3759       - remove all disks from the old secondary
3760
3761     Failures are not very well handled.
3762
3763     """
3764     steps_total = 6
3765     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3766     instance = self.instance
3767     iv_names = {}
3768     vgname = self.cfg.GetVGName()
3769     # start of work
3770     cfg = self.cfg
3771     old_node = self.tgt_node
3772     new_node = self.new_node
3773     pri_node = instance.primary_node
3774
3775     # Step: check device activation
3776     self.proc.LogStep(1, steps_total, "check device existence")
3777     info("checking volume groups")
3778     my_vg = cfg.GetVGName()
3779     results = rpc.call_vg_list([pri_node, new_node])
3780     if not results:
3781       raise errors.OpExecError("Can't list volume groups on the nodes")
3782     for node in pri_node, new_node:
3783       res = results.get(node, False)
3784       if not res or my_vg not in res:
3785         raise errors.OpExecError("Volume group '%s' not found on %s" %
3786                                  (my_vg, node))
3787     for dev in instance.disks:
3788       if not dev.iv_name in self.op.disks:
3789         continue
3790       info("checking %s on %s" % (dev.iv_name, pri_node))
3791       cfg.SetDiskID(dev, pri_node)
3792       if not rpc.call_blockdev_find(pri_node, dev):
3793         raise errors.OpExecError("Can't find device %s on node %s" %
3794                                  (dev.iv_name, pri_node))
3795
3796     # Step: check other node consistency
3797     self.proc.LogStep(2, steps_total, "check peer consistency")
3798     for dev in instance.disks:
3799       if not dev.iv_name in self.op.disks:
3800         continue
3801       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3802       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3803         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3804                                  " unsafe to replace the secondary" %
3805                                  pri_node)
3806
3807     # Step: create new storage
3808     self.proc.LogStep(3, steps_total, "allocate new storage")
3809     for dev in instance.disks:
3810       size = dev.size
3811       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3812       # since we *always* want to create this LV, we use the
3813       # _Create...OnPrimary (which forces the creation), even if we
3814       # are talking about the secondary node
3815       for new_lv in dev.children:
3816         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3817                                         _GetInstanceInfoText(instance)):
3818           raise errors.OpExecError("Failed to create new LV named '%s' on"
3819                                    " node '%s'" %
3820                                    (new_lv.logical_id[1], new_node))
3821
3822       iv_names[dev.iv_name] = (dev, dev.children)
3823
3824     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3825     for dev in instance.disks:
3826       size = dev.size
3827       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3828       # create new devices on new_node
3829       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3830                               logical_id=(pri_node, new_node,
3831                                           dev.logical_id[2]),
3832                               children=dev.children)
3833       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3834                                         new_drbd, False,
3835                                       _GetInstanceInfoText(instance)):
3836         raise errors.OpExecError("Failed to create new DRBD on"
3837                                  " node '%s'" % new_node)
3838
3839     for dev in instance.disks:
3840       # we have new devices, shutdown the drbd on the old secondary
3841       info("shutting down drbd for %s on old node" % dev.iv_name)
3842       cfg.SetDiskID(dev, old_node)
3843       if not rpc.call_blockdev_shutdown(old_node, dev):
3844         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3845                 hint="Please cleanup this device manually as soon as possible")
3846
3847     info("detaching primary drbds from the network (=> standalone)")
3848     done = 0
3849     for dev in instance.disks:
3850       cfg.SetDiskID(dev, pri_node)
3851       # set the physical (unique in bdev terms) id to None, meaning
3852       # detach from network
3853       dev.physical_id = (None,) * len(dev.physical_id)
3854       # and 'find' the device, which will 'fix' it to match the
3855       # standalone state
3856       if rpc.call_blockdev_find(pri_node, dev):
3857         done += 1
3858       else:
3859         warning("Failed to detach drbd %s from network, unusual case" %
3860                 dev.iv_name)
3861
3862     if not done:
3863       # no detaches succeeded (very unlikely)
3864       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3865
3866     # if we managed to detach at least one, we update all the disks of
3867     # the instance to point to the new secondary
3868     info("updating instance configuration")
3869     for dev in instance.disks:
3870       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3871       cfg.SetDiskID(dev, pri_node)
3872     cfg.Update(instance)
3873
3874     # and now perform the drbd attach
3875     info("attaching primary drbds to new secondary (standalone => connected)")
3876     failures = []
3877     for dev in instance.disks:
3878       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3879       # since the attach is smart, it's enough to 'find' the device,
3880       # it will automatically activate the network, if the physical_id
3881       # is correct
3882       cfg.SetDiskID(dev, pri_node)
3883       if not rpc.call_blockdev_find(pri_node, dev):
3884         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3885                 "please do a gnt-instance info to see the status of disks")
3886
3887     # this can fail as the old devices are degraded and _WaitForSync
3888     # does a combined result over all disks, so we don't check its
3889     # return value
3890     self.proc.LogStep(5, steps_total, "sync devices")
3891     _WaitForSync(cfg, instance, self.proc, unlock=True)
3892
3893     # so check manually all the devices
3894     for name, (dev, old_lvs) in iv_names.iteritems():
3895       cfg.SetDiskID(dev, pri_node)
3896       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3897       if is_degr:
3898         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3899
3900     self.proc.LogStep(6, steps_total, "removing old storage")
3901     for name, (dev, old_lvs) in iv_names.iteritems():
3902       info("remove logical volumes for %s" % name)
3903       for lv in old_lvs:
3904         cfg.SetDiskID(lv, old_node)
3905         if not rpc.call_blockdev_remove(old_node, lv):
3906           warning("Can't remove LV on old secondary",
3907                   hint="Cleanup stale volumes by hand")
3908
3909   def Exec(self, feedback_fn):
3910     """Execute disk replacement.
3911
3912     This dispatches the disk replacement to the appropriate handler.
3913
3914     """
3915     instance = self.instance
3916
3917     # Activate the instance disks if we're replacing them on a down instance
3918     if instance.status == "down":
3919       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3920       self.proc.ChainOpCode(op)
3921
3922     if instance.disk_template == constants.DT_DRBD8:
3923       if self.op.remote_node is None:
3924         fn = self._ExecD8DiskOnly
3925       else:
3926         fn = self._ExecD8Secondary
3927     else:
3928       raise errors.ProgrammerError("Unhandled disk replacement case")
3929
3930     ret = fn(feedback_fn)
3931
3932     # Deactivate the instance disks if we're replacing them on a down instance
3933     if instance.status == "down":
3934       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3935       self.proc.ChainOpCode(op)
3936
3937     return ret
3938
3939
3940 class LUQueryInstanceData(NoHooksLU):
3941   """Query runtime instance data.
3942
3943   """
3944   _OP_REQP = ["instances"]
3945
3946   def CheckPrereq(self):
3947     """Check prerequisites.
3948
3949     This only checks the optional instance list against the existing names.
3950
3951     """
3952     if not isinstance(self.op.instances, list):
3953       raise errors.OpPrereqError("Invalid argument type 'instances'")
3954     if self.op.instances:
3955       self.wanted_instances = []
3956       names = self.op.instances
3957       for name in names:
3958         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3959         if instance is None:
3960           raise errors.OpPrereqError("No such instance name '%s'" % name)
3961         self.wanted_instances.append(instance)
3962     else:
3963       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3964                                in self.cfg.GetInstanceList()]
3965     return
3966
3967
3968   def _ComputeDiskStatus(self, instance, snode, dev):
3969     """Compute block device status.
3970
3971     """
3972     self.cfg.SetDiskID(dev, instance.primary_node)
3973     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3974     if dev.dev_type in constants.LDS_DRBD:
3975       # we change the snode then (otherwise we use the one passed in)
3976       if dev.logical_id[0] == instance.primary_node:
3977         snode = dev.logical_id[1]
3978       else:
3979         snode = dev.logical_id[0]
3980
3981     if snode:
3982       self.cfg.SetDiskID(dev, snode)
3983       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3984     else:
3985       dev_sstatus = None
3986
3987     if dev.children:
3988       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3989                       for child in dev.children]
3990     else:
3991       dev_children = []
3992
3993     data = {
3994       "iv_name": dev.iv_name,
3995       "dev_type": dev.dev_type,
3996       "logical_id": dev.logical_id,
3997       "physical_id": dev.physical_id,
3998       "pstatus": dev_pstatus,
3999       "sstatus": dev_sstatus,
4000       "children": dev_children,
4001       }
4002
4003     return data
4004
4005   def Exec(self, feedback_fn):
4006     """Gather and return data"""
4007     result = {}
4008     for instance in self.wanted_instances:
4009       remote_info = rpc.call_instance_info(instance.primary_node,
4010                                                 instance.name)
4011       if remote_info and "state" in remote_info:
4012         remote_state = "up"
4013       else:
4014         remote_state = "down"
4015       if instance.status == "down":
4016         config_state = "down"
4017       else:
4018         config_state = "up"
4019
4020       disks = [self._ComputeDiskStatus(instance, None, device)
4021                for device in instance.disks]
4022
4023       idict = {
4024         "name": instance.name,
4025         "config_state": config_state,
4026         "run_state": remote_state,
4027         "pnode": instance.primary_node,
4028         "snodes": instance.secondary_nodes,
4029         "os": instance.os,
4030         "memory": instance.memory,
4031         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4032         "disks": disks,
4033         "vcpus": instance.vcpus,
4034         }
4035
4036       htkind = self.sstore.GetHypervisorType()
4037       if htkind == constants.HT_XEN_PVM30:
4038         idict["kernel_path"] = instance.kernel_path
4039         idict["initrd_path"] = instance.initrd_path
4040
4041       if htkind == constants.HT_XEN_HVM31:
4042         idict["hvm_boot_order"] = instance.hvm_boot_order
4043         idict["hvm_acpi"] = instance.hvm_acpi
4044         idict["hvm_pae"] = instance.hvm_pae
4045         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4046
4047       if htkind in constants.HTS_REQ_PORT:
4048         idict["vnc_bind_address"] = instance.vnc_bind_address
4049         idict["network_port"] = instance.network_port
4050
4051       result[instance.name] = idict
4052
4053     return result
4054
4055
4056 class LUSetInstanceParams(LogicalUnit):
4057   """Modifies an instances's parameters.
4058
4059   """
4060   HPATH = "instance-modify"
4061   HTYPE = constants.HTYPE_INSTANCE
4062   _OP_REQP = ["instance_name"]
4063
4064   def BuildHooksEnv(self):
4065     """Build hooks env.
4066
4067     This runs on the master, primary and secondaries.
4068
4069     """
4070     args = dict()
4071     if self.mem:
4072       args['memory'] = self.mem
4073     if self.vcpus:
4074       args['vcpus'] = self.vcpus
4075     if self.do_ip or self.do_bridge or self.mac:
4076       if self.do_ip:
4077         ip = self.ip
4078       else:
4079         ip = self.instance.nics[0].ip
4080       if self.bridge:
4081         bridge = self.bridge
4082       else:
4083         bridge = self.instance.nics[0].bridge
4084       if self.mac:
4085         mac = self.mac
4086       else:
4087         mac = self.instance.nics[0].mac
4088       args['nics'] = [(ip, bridge, mac)]
4089     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4090     nl = [self.sstore.GetMasterNode(),
4091           self.instance.primary_node] + list(self.instance.secondary_nodes)
4092     return env, nl, nl
4093
4094   def CheckPrereq(self):
4095     """Check prerequisites.
4096
4097     This only checks the instance list against the existing names.
4098
4099     """
4100     self.mem = getattr(self.op, "mem", None)
4101     self.vcpus = getattr(self.op, "vcpus", None)
4102     self.ip = getattr(self.op, "ip", None)
4103     self.mac = getattr(self.op, "mac", None)
4104     self.bridge = getattr(self.op, "bridge", None)
4105     self.kernel_path = getattr(self.op, "kernel_path", None)
4106     self.initrd_path = getattr(self.op, "initrd_path", None)
4107     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4108     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4109     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4110     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4111     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4112     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4113                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4114                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4115                  self.vnc_bind_address]
4116     if all_parms.count(None) == len(all_parms):
4117       raise errors.OpPrereqError("No changes submitted")
4118     if self.mem is not None:
4119       try:
4120         self.mem = int(self.mem)
4121       except ValueError, err:
4122         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4123     if self.vcpus is not None:
4124       try:
4125         self.vcpus = int(self.vcpus)
4126       except ValueError, err:
4127         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4128     if self.ip is not None:
4129       self.do_ip = True
4130       if self.ip.lower() == "none":
4131         self.ip = None
4132       else:
4133         if not utils.IsValidIP(self.ip):
4134           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4135     else:
4136       self.do_ip = False
4137     self.do_bridge = (self.bridge is not None)
4138     if self.mac is not None:
4139       if self.cfg.IsMacInUse(self.mac):
4140         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4141                                    self.mac)
4142       if not utils.IsValidMac(self.mac):
4143         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4144
4145     if self.kernel_path is not None:
4146       self.do_kernel_path = True
4147       if self.kernel_path == constants.VALUE_NONE:
4148         raise errors.OpPrereqError("Can't set instance to no kernel")
4149
4150       if self.kernel_path != constants.VALUE_DEFAULT:
4151         if not os.path.isabs(self.kernel_path):
4152           raise errors.OpPrereqError("The kernel path must be an absolute"
4153                                     " filename")
4154     else:
4155       self.do_kernel_path = False
4156
4157     if self.initrd_path is not None:
4158       self.do_initrd_path = True
4159       if self.initrd_path not in (constants.VALUE_NONE,
4160                                   constants.VALUE_DEFAULT):
4161         if not os.path.isabs(self.initrd_path):
4162           raise errors.OpPrereqError("The initrd path must be an absolute"
4163                                     " filename")
4164     else:
4165       self.do_initrd_path = False
4166
4167     # boot order verification
4168     if self.hvm_boot_order is not None:
4169       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4170         if len(self.hvm_boot_order.strip("acdn")) != 0:
4171           raise errors.OpPrereqError("invalid boot order specified,"
4172                                      " must be one or more of [acdn]"
4173                                      " or 'default'")
4174
4175     # hvm_cdrom_image_path verification
4176     if self.op.hvm_cdrom_image_path is not None:
4177       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4178         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4179                                    " be an absolute path or None, not %s" %
4180                                    self.op.hvm_cdrom_image_path)
4181       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4182         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4183                                    " regular file or a symlink pointing to"
4184                                    " an existing regular file, not %s" %
4185                                    self.op.hvm_cdrom_image_path)
4186
4187     # vnc_bind_address verification
4188     if self.op.vnc_bind_address is not None:
4189       if not utils.IsValidIP(self.op.vnc_bind_address):
4190         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4191                                    " like a valid IP address" %
4192                                    self.op.vnc_bind_address)
4193
4194     instance = self.cfg.GetInstanceInfo(
4195       self.cfg.ExpandInstanceName(self.op.instance_name))
4196     if instance is None:
4197       raise errors.OpPrereqError("No such instance name '%s'" %
4198                                  self.op.instance_name)
4199     self.op.instance_name = instance.name
4200     self.instance = instance
4201     return
4202
4203   def Exec(self, feedback_fn):
4204     """Modifies an instance.
4205
4206     All parameters take effect only at the next restart of the instance.
4207     """
4208     result = []
4209     instance = self.instance
4210     if self.mem:
4211       instance.memory = self.mem
4212       result.append(("mem", self.mem))
4213     if self.vcpus:
4214       instance.vcpus = self.vcpus
4215       result.append(("vcpus",  self.vcpus))
4216     if self.do_ip:
4217       instance.nics[0].ip = self.ip
4218       result.append(("ip", self.ip))
4219     if self.bridge:
4220       instance.nics[0].bridge = self.bridge
4221       result.append(("bridge", self.bridge))
4222     if self.mac:
4223       instance.nics[0].mac = self.mac
4224       result.append(("mac", self.mac))
4225     if self.do_kernel_path:
4226       instance.kernel_path = self.kernel_path
4227       result.append(("kernel_path", self.kernel_path))
4228     if self.do_initrd_path:
4229       instance.initrd_path = self.initrd_path
4230       result.append(("initrd_path", self.initrd_path))
4231     if self.hvm_boot_order:
4232       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4233         instance.hvm_boot_order = None
4234       else:
4235         instance.hvm_boot_order = self.hvm_boot_order
4236       result.append(("hvm_boot_order", self.hvm_boot_order))
4237     if self.hvm_acpi:
4238       instance.hvm_acpi = self.hvm_acpi
4239       result.append(("hvm_acpi", self.hvm_acpi))
4240     if self.hvm_pae:
4241       instance.hvm_pae = self.hvm_pae
4242       result.append(("hvm_pae", self.hvm_pae))
4243     if self.hvm_cdrom_image_path:
4244       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4245       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4246     if self.vnc_bind_address:
4247       instance.vnc_bind_address = self.vnc_bind_address
4248       result.append(("vnc_bind_address", self.vnc_bind_address))
4249
4250     self.cfg.AddInstance(instance)
4251
4252     return result
4253
4254
4255 class LUQueryExports(NoHooksLU):
4256   """Query the exports list
4257
4258   """
4259   _OP_REQP = []
4260
4261   def CheckPrereq(self):
4262     """Check that the nodelist contains only existing nodes.
4263
4264     """
4265     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4266
4267   def Exec(self, feedback_fn):
4268     """Compute the list of all the exported system images.
4269
4270     Returns:
4271       a dictionary with the structure node->(export-list)
4272       where export-list is a list of the instances exported on
4273       that node.
4274
4275     """
4276     return rpc.call_export_list(self.nodes)
4277
4278
4279 class LUExportInstance(LogicalUnit):
4280   """Export an instance to an image in the cluster.
4281
4282   """
4283   HPATH = "instance-export"
4284   HTYPE = constants.HTYPE_INSTANCE
4285   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4286
4287   def BuildHooksEnv(self):
4288     """Build hooks env.
4289
4290     This will run on the master, primary node and target node.
4291
4292     """
4293     env = {
4294       "EXPORT_NODE": self.op.target_node,
4295       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4296       }
4297     env.update(_BuildInstanceHookEnvByObject(self.instance))
4298     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4299           self.op.target_node]
4300     return env, nl, nl
4301
4302   def CheckPrereq(self):
4303     """Check prerequisites.
4304
4305     This checks that the instance and node names are valid.
4306
4307     """
4308     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4309     self.instance = self.cfg.GetInstanceInfo(instance_name)
4310     if self.instance is None:
4311       raise errors.OpPrereqError("Instance '%s' not found" %
4312                                  self.op.instance_name)
4313
4314     # node verification
4315     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4316     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4317
4318     if self.dst_node is None:
4319       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4320                                  self.op.target_node)
4321     self.op.target_node = self.dst_node.name
4322
4323     # instance disk type verification
4324     for disk in self.instance.disks:
4325       if disk.dev_type == constants.LD_FILE:
4326         raise errors.OpPrereqError("Export not supported for instances with"
4327                                    " file-based disks")
4328
4329   def Exec(self, feedback_fn):
4330     """Export an instance to an image in the cluster.
4331
4332     """
4333     instance = self.instance
4334     dst_node = self.dst_node
4335     src_node = instance.primary_node
4336     if self.op.shutdown:
4337       # shutdown the instance, but not the disks
4338       if not rpc.call_instance_shutdown(src_node, instance):
4339          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4340                                   (instance.name, src_node))
4341
4342     vgname = self.cfg.GetVGName()
4343
4344     snap_disks = []
4345
4346     try:
4347       for disk in instance.disks:
4348         if disk.iv_name == "sda":
4349           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4350           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4351
4352           if not new_dev_name:
4353             logger.Error("could not snapshot block device %s on node %s" %
4354                          (disk.logical_id[1], src_node))
4355           else:
4356             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4357                                       logical_id=(vgname, new_dev_name),
4358                                       physical_id=(vgname, new_dev_name),
4359                                       iv_name=disk.iv_name)
4360             snap_disks.append(new_dev)
4361
4362     finally:
4363       if self.op.shutdown and instance.status == "up":
4364         if not rpc.call_instance_start(src_node, instance, None):
4365           _ShutdownInstanceDisks(instance, self.cfg)
4366           raise errors.OpExecError("Could not start instance")
4367
4368     # TODO: check for size
4369
4370     for dev in snap_disks:
4371       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4372         logger.Error("could not export block device %s from node %s to node %s"
4373                      % (dev.logical_id[1], src_node, dst_node.name))
4374       if not rpc.call_blockdev_remove(src_node, dev):
4375         logger.Error("could not remove snapshot block device %s from node %s" %
4376                      (dev.logical_id[1], src_node))
4377
4378     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4379       logger.Error("could not finalize export for instance %s on node %s" %
4380                    (instance.name, dst_node.name))
4381
4382     nodelist = self.cfg.GetNodeList()
4383     nodelist.remove(dst_node.name)
4384
4385     # on one-node clusters nodelist will be empty after the removal
4386     # if we proceed the backup would be removed because OpQueryExports
4387     # substitutes an empty list with the full cluster node list.
4388     if nodelist:
4389       op = opcodes.OpQueryExports(nodes=nodelist)
4390       exportlist = self.proc.ChainOpCode(op)
4391       for node in exportlist:
4392         if instance.name in exportlist[node]:
4393           if not rpc.call_export_remove(node, instance.name):
4394             logger.Error("could not remove older export for instance %s"
4395                          " on node %s" % (instance.name, node))
4396
4397
4398 class LURemoveExport(NoHooksLU):
4399   """Remove exports related to the named instance.
4400
4401   """
4402   _OP_REQP = ["instance_name"]
4403
4404   def CheckPrereq(self):
4405     """Check prerequisites.
4406     """
4407     pass
4408
4409   def Exec(self, feedback_fn):
4410     """Remove any export.
4411
4412     """
4413     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4414     # If the instance was not found we'll try with the name that was passed in.
4415     # This will only work if it was an FQDN, though.
4416     fqdn_warn = False
4417     if not instance_name:
4418       fqdn_warn = True
4419       instance_name = self.op.instance_name
4420
4421     op = opcodes.OpQueryExports(nodes=[])
4422     exportlist = self.proc.ChainOpCode(op)
4423     found = False
4424     for node in exportlist:
4425       if instance_name in exportlist[node]:
4426         found = True
4427         if not rpc.call_export_remove(node, instance_name):
4428           logger.Error("could not remove export for instance %s"
4429                        " on node %s" % (instance_name, node))
4430
4431     if fqdn_warn and not found:
4432       feedback_fn("Export not found. If trying to remove an export belonging"
4433                   " to a deleted instance please use its Fully Qualified"
4434                   " Domain Name.")
4435
4436
4437 class TagsLU(NoHooksLU):
4438   """Generic tags LU.
4439
4440   This is an abstract class which is the parent of all the other tags LUs.
4441
4442   """
4443   def CheckPrereq(self):
4444     """Check prerequisites.
4445
4446     """
4447     if self.op.kind == constants.TAG_CLUSTER:
4448       self.target = self.cfg.GetClusterInfo()
4449     elif self.op.kind == constants.TAG_NODE:
4450       name = self.cfg.ExpandNodeName(self.op.name)
4451       if name is None:
4452         raise errors.OpPrereqError("Invalid node name (%s)" %
4453                                    (self.op.name,))
4454       self.op.name = name
4455       self.target = self.cfg.GetNodeInfo(name)
4456     elif self.op.kind == constants.TAG_INSTANCE:
4457       name = self.cfg.ExpandInstanceName(self.op.name)
4458       if name is None:
4459         raise errors.OpPrereqError("Invalid instance name (%s)" %
4460                                    (self.op.name,))
4461       self.op.name = name
4462       self.target = self.cfg.GetInstanceInfo(name)
4463     else:
4464       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4465                                  str(self.op.kind))
4466
4467
4468 class LUGetTags(TagsLU):
4469   """Returns the tags of a given object.
4470
4471   """
4472   _OP_REQP = ["kind", "name"]
4473
4474   def Exec(self, feedback_fn):
4475     """Returns the tag list.
4476
4477     """
4478     return self.target.GetTags()
4479
4480
4481 class LUSearchTags(NoHooksLU):
4482   """Searches the tags for a given pattern.
4483
4484   """
4485   _OP_REQP = ["pattern"]
4486
4487   def CheckPrereq(self):
4488     """Check prerequisites.
4489
4490     This checks the pattern passed for validity by compiling it.
4491
4492     """
4493     try:
4494       self.re = re.compile(self.op.pattern)
4495     except re.error, err:
4496       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4497                                  (self.op.pattern, err))
4498
4499   def Exec(self, feedback_fn):
4500     """Returns the tag list.
4501
4502     """
4503     cfg = self.cfg
4504     tgts = [("/cluster", cfg.GetClusterInfo())]
4505     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4506     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4507     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4508     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4509     results = []
4510     for path, target in tgts:
4511       for tag in target.GetTags():
4512         if self.re.search(tag):
4513           results.append((path, tag))
4514     return results
4515
4516
4517 class LUAddTags(TagsLU):
4518   """Sets a tag on a given object.
4519
4520   """
4521   _OP_REQP = ["kind", "name", "tags"]
4522
4523   def CheckPrereq(self):
4524     """Check prerequisites.
4525
4526     This checks the type and length of the tag name and value.
4527
4528     """
4529     TagsLU.CheckPrereq(self)
4530     for tag in self.op.tags:
4531       objects.TaggableObject.ValidateTag(tag)
4532
4533   def Exec(self, feedback_fn):
4534     """Sets the tag.
4535
4536     """
4537     try:
4538       for tag in self.op.tags:
4539         self.target.AddTag(tag)
4540     except errors.TagError, err:
4541       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4542     try:
4543       self.cfg.Update(self.target)
4544     except errors.ConfigurationError:
4545       raise errors.OpRetryError("There has been a modification to the"
4546                                 " config file and the operation has been"
4547                                 " aborted. Please retry.")
4548
4549
4550 class LUDelTags(TagsLU):
4551   """Delete a list of tags from a given object.
4552
4553   """
4554   _OP_REQP = ["kind", "name", "tags"]
4555
4556   def CheckPrereq(self):
4557     """Check prerequisites.
4558
4559     This checks that we have the given tag.
4560
4561     """
4562     TagsLU.CheckPrereq(self)
4563     for tag in self.op.tags:
4564       objects.TaggableObject.ValidateTag(tag)
4565     del_tags = frozenset(self.op.tags)
4566     cur_tags = self.target.GetTags()
4567     if not del_tags <= cur_tags:
4568       diff_tags = del_tags - cur_tags
4569       diff_names = ["'%s'" % tag for tag in diff_tags]
4570       diff_names.sort()
4571       raise errors.OpPrereqError("Tag(s) %s not found" %
4572                                  (",".join(diff_names)))
4573
4574   def Exec(self, feedback_fn):
4575     """Remove the tag from the object.
4576
4577     """
4578     for tag in self.op.tags:
4579       self.target.RemoveTag(tag)
4580     try:
4581       self.cfg.Update(self.target)
4582     except errors.ConfigurationError:
4583       raise errors.OpRetryError("There has been a modification to the"
4584                                 " config file and the operation has been"
4585                                 " aborted. Please retry.")
4586
4587 class LUTestDelay(NoHooksLU):
4588   """Sleep for a specified amount of time.
4589
4590   This LU sleeps on the master and/or nodes for a specified amoutn of
4591   time.
4592
4593   """
4594   _OP_REQP = ["duration", "on_master", "on_nodes"]
4595
4596   def CheckPrereq(self):
4597     """Check prerequisites.
4598
4599     This checks that we have a good list of nodes and/or the duration
4600     is valid.
4601
4602     """
4603
4604     if self.op.on_nodes:
4605       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4606
4607   def Exec(self, feedback_fn):
4608     """Do the actual sleep.
4609
4610     """
4611     if self.op.on_master:
4612       if not utils.TestDelay(self.op.duration):
4613         raise errors.OpExecError("Error during master delay test")
4614     if self.op.on_nodes:
4615       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4616       if not result:
4617         raise errors.OpExecError("Complete failure from rpc call")
4618       for node, node_result in result.items():
4619         if not node_result:
4620           raise errors.OpExecError("Failure during rpc call to node %s,"
4621                                    " result: %s" % (node, node_result))
4622
4623
4624 class IAllocator(object):
4625   """IAllocator framework.
4626
4627   An IAllocator instance has three sets of attributes:
4628     - cfg/sstore that are needed to query the cluster
4629     - input data (all members of the _KEYS class attribute are required)
4630     - four buffer attributes (in|out_data|text), that represent the
4631       input (to the external script) in text and data structure format,
4632       and the output from it, again in two formats
4633     - the result variables from the script (success, info, nodes) for
4634       easy usage
4635
4636   """
4637   _ALLO_KEYS = [
4638     "mem_size", "disks", "disk_template",
4639     "os", "tags", "nics", "vcpus",
4640     ]
4641   _RELO_KEYS = [
4642     "relocate_from",
4643     ]
4644
4645   def __init__(self, cfg, sstore, mode, name, **kwargs):
4646     self.cfg = cfg
4647     self.sstore = sstore
4648     # init buffer variables
4649     self.in_text = self.out_text = self.in_data = self.out_data = None
4650     # init all input fields so that pylint is happy
4651     self.mode = mode
4652     self.name = name
4653     self.mem_size = self.disks = self.disk_template = None
4654     self.os = self.tags = self.nics = self.vcpus = None
4655     self.relocate_from = None
4656     # computed fields
4657     self.required_nodes = None
4658     # init result fields
4659     self.success = self.info = self.nodes = None
4660     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4661       keyset = self._ALLO_KEYS
4662     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4663       keyset = self._RELO_KEYS
4664     else:
4665       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4666                                    " IAllocator" % self.mode)
4667     for key in kwargs:
4668       if key not in keyset:
4669         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4670                                      " IAllocator" % key)
4671       setattr(self, key, kwargs[key])
4672     for key in keyset:
4673       if key not in kwargs:
4674         raise errors.ProgrammerError("Missing input parameter '%s' to"
4675                                      " IAllocator" % key)
4676     self._BuildInputData()
4677
4678   def _ComputeClusterData(self):
4679     """Compute the generic allocator input data.
4680
4681     This is the data that is independent of the actual operation.
4682
4683     """
4684     cfg = self.cfg
4685     # cluster data
4686     data = {
4687       "version": 1,
4688       "cluster_name": self.sstore.GetClusterName(),
4689       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4690       "hypervisor_type": self.sstore.GetHypervisorType(),
4691       # we don't have job IDs
4692       }
4693
4694     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4695
4696     # node data
4697     node_results = {}
4698     node_list = cfg.GetNodeList()
4699     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4700     for nname in node_list:
4701       ninfo = cfg.GetNodeInfo(nname)
4702       if nname not in node_data or not isinstance(node_data[nname], dict):
4703         raise errors.OpExecError("Can't get data for node %s" % nname)
4704       remote_info = node_data[nname]
4705       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4706                    'vg_size', 'vg_free', 'cpu_total']:
4707         if attr not in remote_info:
4708           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4709                                    (nname, attr))
4710         try:
4711           remote_info[attr] = int(remote_info[attr])
4712         except ValueError, err:
4713           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4714                                    " %s" % (nname, attr, str(err)))
4715       # compute memory used by primary instances
4716       i_p_mem = i_p_up_mem = 0
4717       for iinfo in i_list:
4718         if iinfo.primary_node == nname:
4719           i_p_mem += iinfo.memory
4720           if iinfo.status == "up":
4721             i_p_up_mem += iinfo.memory
4722
4723       # compute memory used by instances
4724       pnr = {
4725         "tags": list(ninfo.GetTags()),
4726         "total_memory": remote_info['memory_total'],
4727         "reserved_memory": remote_info['memory_dom0'],
4728         "free_memory": remote_info['memory_free'],
4729         "i_pri_memory": i_p_mem,
4730         "i_pri_up_memory": i_p_up_mem,
4731         "total_disk": remote_info['vg_size'],
4732         "free_disk": remote_info['vg_free'],
4733         "primary_ip": ninfo.primary_ip,
4734         "secondary_ip": ninfo.secondary_ip,
4735         "total_cpus": remote_info['cpu_total'],
4736         }
4737       node_results[nname] = pnr
4738     data["nodes"] = node_results
4739
4740     # instance data
4741     instance_data = {}
4742     for iinfo in i_list:
4743       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4744                   for n in iinfo.nics]
4745       pir = {
4746         "tags": list(iinfo.GetTags()),
4747         "should_run": iinfo.status == "up",
4748         "vcpus": iinfo.vcpus,
4749         "memory": iinfo.memory,
4750         "os": iinfo.os,
4751         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4752         "nics": nic_data,
4753         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4754         "disk_template": iinfo.disk_template,
4755         }
4756       instance_data[iinfo.name] = pir
4757
4758     data["instances"] = instance_data
4759
4760     self.in_data = data
4761
4762   def _AddNewInstance(self):
4763     """Add new instance data to allocator structure.
4764
4765     This in combination with _AllocatorGetClusterData will create the
4766     correct structure needed as input for the allocator.
4767
4768     The checks for the completeness of the opcode must have already been
4769     done.
4770
4771     """
4772     data = self.in_data
4773     if len(self.disks) != 2:
4774       raise errors.OpExecError("Only two-disk configurations supported")
4775
4776     disk_space = _ComputeDiskSize(self.disk_template,
4777                                   self.disks[0]["size"], self.disks[1]["size"])
4778
4779     if self.disk_template in constants.DTS_NET_MIRROR:
4780       self.required_nodes = 2
4781     else:
4782       self.required_nodes = 1
4783     request = {
4784       "type": "allocate",
4785       "name": self.name,
4786       "disk_template": self.disk_template,
4787       "tags": self.tags,
4788       "os": self.os,
4789       "vcpus": self.vcpus,
4790       "memory": self.mem_size,
4791       "disks": self.disks,
4792       "disk_space_total": disk_space,
4793       "nics": self.nics,
4794       "required_nodes": self.required_nodes,
4795       }
4796     data["request"] = request
4797
4798   def _AddRelocateInstance(self):
4799     """Add relocate instance data to allocator structure.
4800
4801     This in combination with _IAllocatorGetClusterData will create the
4802     correct structure needed as input for the allocator.
4803
4804     The checks for the completeness of the opcode must have already been
4805     done.
4806
4807     """
4808     instance = self.cfg.GetInstanceInfo(self.name)
4809     if instance is None:
4810       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4811                                    " IAllocator" % self.name)
4812
4813     if instance.disk_template not in constants.DTS_NET_MIRROR:
4814       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4815
4816     if len(instance.secondary_nodes) != 1:
4817       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4818
4819     self.required_nodes = 1
4820
4821     disk_space = _ComputeDiskSize(instance.disk_template,
4822                                   instance.disks[0].size,
4823                                   instance.disks[1].size)
4824
4825     request = {
4826       "type": "relocate",
4827       "name": self.name,
4828       "disk_space_total": disk_space,
4829       "required_nodes": self.required_nodes,
4830       "relocate_from": self.relocate_from,
4831       }
4832     self.in_data["request"] = request
4833
4834   def _BuildInputData(self):
4835     """Build input data structures.
4836
4837     """
4838     self._ComputeClusterData()
4839
4840     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4841       self._AddNewInstance()
4842     else:
4843       self._AddRelocateInstance()
4844
4845     self.in_text = serializer.Dump(self.in_data)
4846
4847   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4848     """Run an instance allocator and return the results.
4849
4850     """
4851     data = self.in_text
4852
4853     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4854
4855     if not isinstance(result, tuple) or len(result) != 4:
4856       raise errors.OpExecError("Invalid result from master iallocator runner")
4857
4858     rcode, stdout, stderr, fail = result
4859
4860     if rcode == constants.IARUN_NOTFOUND:
4861       raise errors.OpExecError("Can't find allocator '%s'" % name)
4862     elif rcode == constants.IARUN_FAILURE:
4863         raise errors.OpExecError("Instance allocator call failed: %s,"
4864                                  " output: %s" %
4865                                  (fail, stdout+stderr))
4866     self.out_text = stdout
4867     if validate:
4868       self._ValidateResult()
4869
4870   def _ValidateResult(self):
4871     """Process the allocator results.
4872
4873     This will process and if successful save the result in
4874     self.out_data and the other parameters.
4875
4876     """
4877     try:
4878       rdict = serializer.Load(self.out_text)
4879     except Exception, err:
4880       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4881
4882     if not isinstance(rdict, dict):
4883       raise errors.OpExecError("Can't parse iallocator results: not a dict")
4884
4885     for key in "success", "info", "nodes":
4886       if key not in rdict:
4887         raise errors.OpExecError("Can't parse iallocator results:"
4888                                  " missing key '%s'" % key)
4889       setattr(self, key, rdict[key])
4890
4891     if not isinstance(rdict["nodes"], list):
4892       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4893                                " is not a list")
4894     self.out_data = rdict
4895
4896
4897 class LUTestAllocator(NoHooksLU):
4898   """Run allocator tests.
4899
4900   This LU runs the allocator tests
4901
4902   """
4903   _OP_REQP = ["direction", "mode", "name"]
4904
4905   def CheckPrereq(self):
4906     """Check prerequisites.
4907
4908     This checks the opcode parameters depending on the director and mode test.
4909
4910     """
4911     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4912       for attr in ["name", "mem_size", "disks", "disk_template",
4913                    "os", "tags", "nics", "vcpus"]:
4914         if not hasattr(self.op, attr):
4915           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4916                                      attr)
4917       iname = self.cfg.ExpandInstanceName(self.op.name)
4918       if iname is not None:
4919         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4920                                    iname)
4921       if not isinstance(self.op.nics, list):
4922         raise errors.OpPrereqError("Invalid parameter 'nics'")
4923       for row in self.op.nics:
4924         if (not isinstance(row, dict) or
4925             "mac" not in row or
4926             "ip" not in row or
4927             "bridge" not in row):
4928           raise errors.OpPrereqError("Invalid contents of the"
4929                                      " 'nics' parameter")
4930       if not isinstance(self.op.disks, list):
4931         raise errors.OpPrereqError("Invalid parameter 'disks'")
4932       if len(self.op.disks) != 2:
4933         raise errors.OpPrereqError("Only two-disk configurations supported")
4934       for row in self.op.disks:
4935         if (not isinstance(row, dict) or
4936             "size" not in row or
4937             not isinstance(row["size"], int) or
4938             "mode" not in row or
4939             row["mode"] not in ['r', 'w']):
4940           raise errors.OpPrereqError("Invalid contents of the"
4941                                      " 'disks' parameter")
4942     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4943       if not hasattr(self.op, "name"):
4944         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4945       fname = self.cfg.ExpandInstanceName(self.op.name)
4946       if fname is None:
4947         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4948                                    self.op.name)
4949       self.op.name = fname
4950       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4951     else:
4952       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4953                                  self.op.mode)
4954
4955     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4956       if not hasattr(self.op, "allocator") or self.op.allocator is None:
4957         raise errors.OpPrereqError("Missing allocator name")
4958     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4959       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4960                                  self.op.direction)
4961
4962   def Exec(self, feedback_fn):
4963     """Run the allocator test.
4964
4965     """
4966     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4967       ial = IAllocator(self.cfg, self.sstore,
4968                        mode=self.op.mode,
4969                        name=self.op.name,
4970                        mem_size=self.op.mem_size,
4971                        disks=self.op.disks,
4972                        disk_template=self.op.disk_template,
4973                        os=self.op.os,
4974                        tags=self.op.tags,
4975                        nics=self.op.nics,
4976                        vcpus=self.op.vcpus,
4977                        )
4978     else:
4979       ial = IAllocator(self.cfg, self.sstore,
4980                        mode=self.op.mode,
4981                        name=self.op.name,
4982                        relocate_from=list(self.relocate_from),
4983                        )
4984
4985     if self.op.direction == constants.IALLOCATOR_DIR_IN:
4986       result = ial.in_text
4987     else:
4988       ial.Run(self.op.allocator, validate=False)
4989       result = ial.out_text
4990     return result