3fca09fbefbbc22def94cfc3cf2bb79c693ac1b1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_WSSTORE: the LU needs a writable SimpleStore
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_WSSTORE = False
69   REQ_BGL = True
70
71   def __init__(self, processor, op, context, sstore):
72     """Constructor for LogicalUnit.
73
74     This needs to be overriden in derived classes in order to check op
75     validity.
76
77     """
78     self.proc = processor
79     self.op = op
80     self.cfg = context.cfg
81     self.sstore = sstore
82     self.context = context
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95
96     if not self.cfg.IsCluster():
97       raise errors.OpPrereqError("Cluster not initialized yet,"
98                                  " use 'gnt-cluster init' first.")
99     if self.REQ_MASTER:
100       master = sstore.GetMasterNode()
101       if master != utils.HostInfo().name:
102         raise errors.OpPrereqError("Commands must be run on the master"
103                                    " node %s" % master)
104
105   def __GetSSH(self):
106     """Returns the SshRunner object
107
108     """
109     if not self.__ssh:
110       self.__ssh = ssh.SshRunner(self.sstore)
111     return self.__ssh
112
113   ssh = property(fget=__GetSSH)
114
115   def ExpandNames(self):
116     """Expand names for this LU.
117
118     This method is called before starting to execute the opcode, and it should
119     update all the parameters of the opcode to their canonical form (e.g. a
120     short node name must be fully expanded after this method has successfully
121     completed). This way locking, hooks, logging, ecc. can work correctly.
122
123     LUs which implement this method must also populate the self.needed_locks
124     member, as a dict with lock levels as keys, and a list of needed lock names
125     as values. Rules:
126       - Use an empty dict if you don't need any lock
127       - If you don't need any lock at a particular level omit that level
128       - Don't put anything for the BGL level
129       - If you want all locks at a level use None as a value
130         (this reflects what LockSet does, and will be replaced before
131         CheckPrereq with the full list of nodes that have been locked)
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: None,
141       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     """
286     assert locking.LEVEL_NODE in self.recalculate_locks, \
287       "_LockInstancesNodes helper function called with no nodes to recalculate"
288
289     # TODO: check if we're really been called with the instance locks held
290
291     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292     # future we might want to have different behaviors depending on the value
293     # of self.recalculate_locks[locking.LEVEL_NODE]
294     wanted_nodes = []
295     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296       instance = self.context.cfg.GetInstanceInfo(instance_name)
297       wanted_nodes.append(instance.primary_node)
298       wanted_nodes.extend(instance.secondary_nodes)
299     self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300
301     del self.recalculate_locks[locking.LEVEL_NODE]
302
303
304 class NoHooksLU(LogicalUnit):
305   """Simple LU which runs no hooks.
306
307   This LU is intended as a parent for other LogicalUnits which will
308   run no hooks, in order to reduce duplicate code.
309
310   """
311   HPATH = None
312   HTYPE = None
313
314
315 def _GetWantedNodes(lu, nodes):
316   """Returns list of checked and expanded node names.
317
318   Args:
319     nodes: List of nodes (strings) or None for all
320
321   """
322   if not isinstance(nodes, list):
323     raise errors.OpPrereqError("Invalid argument type 'nodes'")
324
325   if nodes:
326     wanted = []
327
328     for name in nodes:
329       node = lu.cfg.ExpandNodeName(name)
330       if node is None:
331         raise errors.OpPrereqError("No such node name '%s'" % name)
332       wanted.append(node)
333
334   else:
335     wanted = lu.cfg.GetNodeList()
336   return utils.NiceSort(wanted)
337
338
339 def _GetWantedInstances(lu, instances):
340   """Returns list of checked and expanded instance names.
341
342   Args:
343     instances: List of instances (strings) or None for all
344
345   """
346   if not isinstance(instances, list):
347     raise errors.OpPrereqError("Invalid argument type 'instances'")
348
349   if instances:
350     wanted = []
351
352     for name in instances:
353       instance = lu.cfg.ExpandInstanceName(name)
354       if instance is None:
355         raise errors.OpPrereqError("No such instance name '%s'" % name)
356       wanted.append(instance)
357
358   else:
359     wanted = lu.cfg.GetInstanceList()
360   return utils.NiceSort(wanted)
361
362
363 def _CheckOutputFields(static, dynamic, selected):
364   """Checks whether all selected fields are valid.
365
366   Args:
367     static: Static fields
368     dynamic: Dynamic fields
369
370   """
371   static_fields = frozenset(static)
372   dynamic_fields = frozenset(dynamic)
373
374   all_fields = static_fields | dynamic_fields
375
376   if not all_fields.issuperset(selected):
377     raise errors.OpPrereqError("Unknown output fields selected: %s"
378                                % ",".join(frozenset(selected).
379                                           difference(all_fields)))
380
381
382 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383                           memory, vcpus, nics):
384   """Builds instance related env variables for hooks from single variables.
385
386   Args:
387     secondary_nodes: List of secondary nodes as strings
388   """
389   env = {
390     "OP_TARGET": name,
391     "INSTANCE_NAME": name,
392     "INSTANCE_PRIMARY": primary_node,
393     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394     "INSTANCE_OS_TYPE": os_type,
395     "INSTANCE_STATUS": status,
396     "INSTANCE_MEMORY": memory,
397     "INSTANCE_VCPUS": vcpus,
398   }
399
400   if nics:
401     nic_count = len(nics)
402     for idx, (ip, bridge, mac) in enumerate(nics):
403       if ip is None:
404         ip = ""
405       env["INSTANCE_NIC%d_IP" % idx] = ip
406       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408   else:
409     nic_count = 0
410
411   env["INSTANCE_NIC_COUNT"] = nic_count
412
413   return env
414
415
416 def _BuildInstanceHookEnvByObject(instance, override=None):
417   """Builds instance related env variables for hooks from an object.
418
419   Args:
420     instance: objects.Instance object of instance
421     override: dict of values to override
422   """
423   args = {
424     'name': instance.name,
425     'primary_node': instance.primary_node,
426     'secondary_nodes': instance.secondary_nodes,
427     'os_type': instance.os,
428     'status': instance.os,
429     'memory': instance.memory,
430     'vcpus': instance.vcpus,
431     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432   }
433   if override:
434     args.update(override)
435   return _BuildInstanceHookEnv(**args)
436
437
438 def _CheckInstanceBridgesExist(instance):
439   """Check that the brigdes needed by an instance exist.
440
441   """
442   # check bridges existance
443   brlist = [nic.bridge for nic in instance.nics]
444   if not rpc.call_bridges_exist(instance.primary_node, brlist):
445     raise errors.OpPrereqError("one or more target bridges %s does not"
446                                " exist on destination node '%s'" %
447                                (brlist, instance.primary_node))
448
449
450 class LUDestroyCluster(NoHooksLU):
451   """Logical unit for destroying the cluster.
452
453   """
454   _OP_REQP = []
455
456   def CheckPrereq(self):
457     """Check prerequisites.
458
459     This checks whether the cluster is empty.
460
461     Any errors are signalled by raising errors.OpPrereqError.
462
463     """
464     master = self.sstore.GetMasterNode()
465
466     nodelist = self.cfg.GetNodeList()
467     if len(nodelist) != 1 or nodelist[0] != master:
468       raise errors.OpPrereqError("There are still %d node(s) in"
469                                  " this cluster." % (len(nodelist) - 1))
470     instancelist = self.cfg.GetInstanceList()
471     if instancelist:
472       raise errors.OpPrereqError("There are still %d instance(s) in"
473                                  " this cluster." % len(instancelist))
474
475   def Exec(self, feedback_fn):
476     """Destroys the cluster.
477
478     """
479     master = self.sstore.GetMasterNode()
480     if not rpc.call_node_stop_master(master, False):
481       raise errors.OpExecError("Could not disable the master role")
482     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483     utils.CreateBackup(priv_key)
484     utils.CreateBackup(pub_key)
485     return master
486
487
488 class LUVerifyCluster(LogicalUnit):
489   """Verifies the cluster status.
490
491   """
492   HPATH = "cluster-verify"
493   HTYPE = constants.HTYPE_CLUSTER
494   _OP_REQP = ["skip_checks"]
495
496   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497                   remote_version, feedback_fn):
498     """Run multiple tests against a node.
499
500     Test list:
501       - compares ganeti version
502       - checks vg existance and size > 20G
503       - checks config file checksum
504       - checks ssh to other nodes
505
506     Args:
507       node: name of the node to check
508       file_list: required list of files
509       local_cksum: dictionary of local files and their checksums
510
511     """
512     # compares ganeti version
513     local_version = constants.PROTOCOL_VERSION
514     if not remote_version:
515       feedback_fn("  - ERROR: connection to %s failed" % (node))
516       return True
517
518     if local_version != remote_version:
519       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520                       (local_version, node, remote_version))
521       return True
522
523     # checks vg existance and size > 20G
524
525     bad = False
526     if not vglist:
527       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528                       (node,))
529       bad = True
530     else:
531       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532                                             constants.MIN_VG_SIZE)
533       if vgstatus:
534         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535         bad = True
536
537     # checks config file checksum
538     # checks ssh to any
539
540     if 'filelist' not in node_result:
541       bad = True
542       feedback_fn("  - ERROR: node hasn't returned file checksum data")
543     else:
544       remote_cksum = node_result['filelist']
545       for file_name in file_list:
546         if file_name not in remote_cksum:
547           bad = True
548           feedback_fn("  - ERROR: file '%s' missing" % file_name)
549         elif remote_cksum[file_name] != local_cksum[file_name]:
550           bad = True
551           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552
553     if 'nodelist' not in node_result:
554       bad = True
555       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556     else:
557       if node_result['nodelist']:
558         bad = True
559         for node in node_result['nodelist']:
560           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561                           (node, node_result['nodelist'][node]))
562     if 'node-net-test' not in node_result:
563       bad = True
564       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565     else:
566       if node_result['node-net-test']:
567         bad = True
568         nlist = utils.NiceSort(node_result['node-net-test'].keys())
569         for node in nlist:
570           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571                           (node, node_result['node-net-test'][node]))
572
573     hyp_result = node_result.get('hypervisor', None)
574     if hyp_result is not None:
575       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576     return bad
577
578   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579                       node_instance, feedback_fn):
580     """Verify an instance.
581
582     This function checks to see if the required block devices are
583     available on the instance's node.
584
585     """
586     bad = False
587
588     node_current = instanceconfig.primary_node
589
590     node_vol_should = {}
591     instanceconfig.MapLVsByNode(node_vol_should)
592
593     for node in node_vol_should:
594       for volume in node_vol_should[node]:
595         if node not in node_vol_is or volume not in node_vol_is[node]:
596           feedback_fn("  - ERROR: volume %s missing on node %s" %
597                           (volume, node))
598           bad = True
599
600     if not instanceconfig.status == 'down':
601       if (node_current not in node_instance or
602           not instance in node_instance[node_current]):
603         feedback_fn("  - ERROR: instance %s not running on node %s" %
604                         (instance, node_current))
605         bad = True
606
607     for node in node_instance:
608       if (not node == node_current):
609         if instance in node_instance[node]:
610           feedback_fn("  - ERROR: instance %s should not run on node %s" %
611                           (instance, node))
612           bad = True
613
614     return bad
615
616   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617     """Verify if there are any unknown volumes in the cluster.
618
619     The .os, .swap and backup volumes are ignored. All other volumes are
620     reported as unknown.
621
622     """
623     bad = False
624
625     for node in node_vol_is:
626       for volume in node_vol_is[node]:
627         if node not in node_vol_should or volume not in node_vol_should[node]:
628           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629                       (volume, node))
630           bad = True
631     return bad
632
633   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634     """Verify the list of running instances.
635
636     This checks what instances are running but unknown to the cluster.
637
638     """
639     bad = False
640     for node in node_instance:
641       for runninginstance in node_instance[node]:
642         if runninginstance not in instancelist:
643           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644                           (runninginstance, node))
645           bad = True
646     return bad
647
648   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649     """Verify N+1 Memory Resilience.
650
651     Check that if one single node dies we can still start all the instances it
652     was primary for.
653
654     """
655     bad = False
656
657     for node, nodeinfo in node_info.iteritems():
658       # This code checks that every node which is now listed as secondary has
659       # enough memory to host all instances it is supposed to should a single
660       # other node in the cluster fail.
661       # FIXME: not ready for failover to an arbitrary node
662       # FIXME: does not support file-backed instances
663       # WARNING: we currently take into account down instances as well as up
664       # ones, considering that even if they're down someone might want to start
665       # them even in the event of a node failure.
666       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667         needed_mem = 0
668         for instance in instances:
669           needed_mem += instance_cfg[instance].memory
670         if nodeinfo['mfree'] < needed_mem:
671           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672                       " failovers should node %s fail" % (node, prinode))
673           bad = True
674     return bad
675
676   def CheckPrereq(self):
677     """Check prerequisites.
678
679     Transform the list of checks we're going to skip into a set and check that
680     all its members are valid.
681
682     """
683     self.skip_set = frozenset(self.op.skip_checks)
684     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685       raise errors.OpPrereqError("Invalid checks to be skipped specified")
686
687   def BuildHooksEnv(self):
688     """Build hooks env.
689
690     Cluster-Verify hooks just rone in the post phase and their failure makes
691     the output be logged in the verify output and the verification to fail.
692
693     """
694     all_nodes = self.cfg.GetNodeList()
695     # TODO: populate the environment with useful information for verify hooks
696     env = {}
697     return env, [], all_nodes
698
699   def Exec(self, feedback_fn):
700     """Verify integrity of cluster, performing various test on nodes.
701
702     """
703     bad = False
704     feedback_fn("* Verifying global settings")
705     for msg in self.cfg.VerifyConfig():
706       feedback_fn("  - ERROR: %s" % msg)
707
708     vg_name = self.cfg.GetVGName()
709     nodelist = utils.NiceSort(self.cfg.GetNodeList())
710     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712     i_non_redundant = [] # Non redundant instances
713     node_volume = {}
714     node_instance = {}
715     node_info = {}
716     instance_cfg = {}
717
718     # FIXME: verify OS list
719     # do local checksums
720     file_names = list(self.sstore.GetFileList())
721     file_names.append(constants.SSL_CERT_FILE)
722     file_names.append(constants.CLUSTER_CONF_FILE)
723     local_checksums = utils.FingerprintFiles(file_names)
724
725     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727     all_instanceinfo = rpc.call_instance_list(nodelist)
728     all_vglist = rpc.call_vg_list(nodelist)
729     node_verify_param = {
730       'filelist': file_names,
731       'nodelist': nodelist,
732       'hypervisor': None,
733       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734                         for node in nodeinfo]
735       }
736     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737     all_rversion = rpc.call_version(nodelist)
738     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739
740     for node in nodelist:
741       feedback_fn("* Verifying node %s" % node)
742       result = self._VerifyNode(node, file_names, local_checksums,
743                                 all_vglist[node], all_nvinfo[node],
744                                 all_rversion[node], feedback_fn)
745       bad = bad or result
746
747       # node_volume
748       volumeinfo = all_volumeinfo[node]
749
750       if isinstance(volumeinfo, basestring):
751         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752                     (node, volumeinfo[-400:].encode('string_escape')))
753         bad = True
754         node_volume[node] = {}
755       elif not isinstance(volumeinfo, dict):
756         feedback_fn("  - ERROR: connection to %s failed" % (node,))
757         bad = True
758         continue
759       else:
760         node_volume[node] = volumeinfo
761
762       # node_instance
763       nodeinstance = all_instanceinfo[node]
764       if type(nodeinstance) != list:
765         feedback_fn("  - ERROR: connection to %s failed" % (node,))
766         bad = True
767         continue
768
769       node_instance[node] = nodeinstance
770
771       # node_info
772       nodeinfo = all_ninfo[node]
773       if not isinstance(nodeinfo, dict):
774         feedback_fn("  - ERROR: connection to %s failed" % (node,))
775         bad = True
776         continue
777
778       try:
779         node_info[node] = {
780           "mfree": int(nodeinfo['memory_free']),
781           "dfree": int(nodeinfo['vg_free']),
782           "pinst": [],
783           "sinst": [],
784           # dictionary holding all instances this node is secondary for,
785           # grouped by their primary node. Each key is a cluster node, and each
786           # value is a list of instances which have the key as primary and the
787           # current node as secondary.  this is handy to calculate N+1 memory
788           # availability if you can only failover from a primary to its
789           # secondary.
790           "sinst-by-pnode": {},
791         }
792       except ValueError:
793         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794         bad = True
795         continue
796
797     node_vol_should = {}
798
799     for instance in instancelist:
800       feedback_fn("* Verifying instance %s" % instance)
801       inst_config = self.cfg.GetInstanceInfo(instance)
802       result =  self._VerifyInstance(instance, inst_config, node_volume,
803                                      node_instance, feedback_fn)
804       bad = bad or result
805
806       inst_config.MapLVsByNode(node_vol_should)
807
808       instance_cfg[instance] = inst_config
809
810       pnode = inst_config.primary_node
811       if pnode in node_info:
812         node_info[pnode]['pinst'].append(instance)
813       else:
814         feedback_fn("  - ERROR: instance %s, connection to primary node"
815                     " %s failed" % (instance, pnode))
816         bad = True
817
818       # If the instance is non-redundant we cannot survive losing its primary
819       # node, so we are not N+1 compliant. On the other hand we have no disk
820       # templates with more than one secondary so that situation is not well
821       # supported either.
822       # FIXME: does not support file-backed instances
823       if len(inst_config.secondary_nodes) == 0:
824         i_non_redundant.append(instance)
825       elif len(inst_config.secondary_nodes) > 1:
826         feedback_fn("  - WARNING: multiple secondaries for instance %s"
827                     % instance)
828
829       for snode in inst_config.secondary_nodes:
830         if snode in node_info:
831           node_info[snode]['sinst'].append(instance)
832           if pnode not in node_info[snode]['sinst-by-pnode']:
833             node_info[snode]['sinst-by-pnode'][pnode] = []
834           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835         else:
836           feedback_fn("  - ERROR: instance %s, connection to secondary node"
837                       " %s failed" % (instance, snode))
838
839     feedback_fn("* Verifying orphan volumes")
840     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841                                        feedback_fn)
842     bad = bad or result
843
844     feedback_fn("* Verifying remaining instances")
845     result = self._VerifyOrphanInstances(instancelist, node_instance,
846                                          feedback_fn)
847     bad = bad or result
848
849     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850       feedback_fn("* Verifying N+1 Memory redundancy")
851       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852       bad = bad or result
853
854     feedback_fn("* Other Notes")
855     if i_non_redundant:
856       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857                   % len(i_non_redundant))
858
859     return not bad
860
861   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862     """Analize the post-hooks' result, handle it, and send some
863     nicely-formatted feedback back to the user.
864
865     Args:
866       phase: the hooks phase that has just been run
867       hooks_results: the results of the multi-node hooks rpc call
868       feedback_fn: function to send feedback back to the caller
869       lu_result: previous Exec result
870
871     """
872     # We only really run POST phase hooks, and are only interested in
873     # their results
874     if phase == constants.HOOKS_PHASE_POST:
875       # Used to change hooks' output to proper indentation
876       indent_re = re.compile('^', re.M)
877       feedback_fn("* Hooks Results")
878       if not hooks_results:
879         feedback_fn("  - ERROR: general communication failure")
880         lu_result = 1
881       else:
882         for node_name in hooks_results:
883           show_node_header = True
884           res = hooks_results[node_name]
885           if res is False or not isinstance(res, list):
886             feedback_fn("    Communication failure")
887             lu_result = 1
888             continue
889           for script, hkr, output in res:
890             if hkr == constants.HKR_FAIL:
891               # The node header is only shown once, if there are
892               # failing hooks on that node
893               if show_node_header:
894                 feedback_fn("  Node %s:" % node_name)
895                 show_node_header = False
896               feedback_fn("    ERROR: Script %s failed, output:" % script)
897               output = indent_re.sub('      ', output)
898               feedback_fn("%s" % output)
899               lu_result = 1
900
901       return lu_result
902
903
904 class LUVerifyDisks(NoHooksLU):
905   """Verifies the cluster disks status.
906
907   """
908   _OP_REQP = []
909
910   def CheckPrereq(self):
911     """Check prerequisites.
912
913     This has no prerequisites.
914
915     """
916     pass
917
918   def Exec(self, feedback_fn):
919     """Verify integrity of cluster disks.
920
921     """
922     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923
924     vg_name = self.cfg.GetVGName()
925     nodes = utils.NiceSort(self.cfg.GetNodeList())
926     instances = [self.cfg.GetInstanceInfo(name)
927                  for name in self.cfg.GetInstanceList()]
928
929     nv_dict = {}
930     for inst in instances:
931       inst_lvs = {}
932       if (inst.status != "up" or
933           inst.disk_template not in constants.DTS_NET_MIRROR):
934         continue
935       inst.MapLVsByNode(inst_lvs)
936       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937       for node, vol_list in inst_lvs.iteritems():
938         for vol in vol_list:
939           nv_dict[(node, vol)] = inst
940
941     if not nv_dict:
942       return result
943
944     node_lvs = rpc.call_volume_list(nodes, vg_name)
945
946     to_act = set()
947     for node in nodes:
948       # node_volume
949       lvs = node_lvs[node]
950
951       if isinstance(lvs, basestring):
952         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953         res_nlvm[node] = lvs
954       elif not isinstance(lvs, dict):
955         logger.Info("connection to node %s failed or invalid data returned" %
956                     (node,))
957         res_nodes.append(node)
958         continue
959
960       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961         inst = nv_dict.pop((node, lv_name), None)
962         if (not lv_online and inst is not None
963             and inst.name not in res_instances):
964           res_instances.append(inst.name)
965
966     # any leftover items in nv_dict are missing LVs, let's arrange the
967     # data better
968     for key, inst in nv_dict.iteritems():
969       if inst.name not in res_missing:
970         res_missing[inst.name] = []
971       res_missing[inst.name].append(key)
972
973     return result
974
975
976 class LURenameCluster(LogicalUnit):
977   """Rename the cluster.
978
979   """
980   HPATH = "cluster-rename"
981   HTYPE = constants.HTYPE_CLUSTER
982   _OP_REQP = ["name"]
983   REQ_WSSTORE = True
984
985   def BuildHooksEnv(self):
986     """Build hooks env.
987
988     """
989     env = {
990       "OP_TARGET": self.sstore.GetClusterName(),
991       "NEW_NAME": self.op.name,
992       }
993     mn = self.sstore.GetMasterNode()
994     return env, [mn], [mn]
995
996   def CheckPrereq(self):
997     """Verify that the passed name is a valid one.
998
999     """
1000     hostname = utils.HostInfo(self.op.name)
1001
1002     new_name = hostname.name
1003     self.ip = new_ip = hostname.ip
1004     old_name = self.sstore.GetClusterName()
1005     old_ip = self.sstore.GetMasterIP()
1006     if new_name == old_name and new_ip == old_ip:
1007       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008                                  " cluster has changed")
1009     if new_ip != old_ip:
1010       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012                                    " reachable on the network. Aborting." %
1013                                    new_ip)
1014
1015     self.op.name = new_name
1016
1017   def Exec(self, feedback_fn):
1018     """Rename the cluster.
1019
1020     """
1021     clustername = self.op.name
1022     ip = self.ip
1023     ss = self.sstore
1024
1025     # shutdown the master IP
1026     master = ss.GetMasterNode()
1027     if not rpc.call_node_stop_master(master, False):
1028       raise errors.OpExecError("Could not disable the master role")
1029
1030     try:
1031       # modify the sstore
1032       ss.SetKey(ss.SS_MASTER_IP, ip)
1033       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034
1035       # Distribute updated ss config to all nodes
1036       myself = self.cfg.GetNodeInfo(master)
1037       dist_nodes = self.cfg.GetNodeList()
1038       if myself.name in dist_nodes:
1039         dist_nodes.remove(myself.name)
1040
1041       logger.Debug("Copying updated ssconf data to all nodes")
1042       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043         fname = ss.KeyToFilename(keyname)
1044         result = rpc.call_upload_file(dist_nodes, fname)
1045         for to_node in dist_nodes:
1046           if not result[to_node]:
1047             logger.Error("copy of file %s to node %s failed" %
1048                          (fname, to_node))
1049     finally:
1050       if not rpc.call_node_start_master(master, False):
1051         logger.Error("Could not re-enable the master role on the master,"
1052                      " please restart manually.")
1053
1054
1055 def _RecursiveCheckIfLVMBased(disk):
1056   """Check if the given disk or its children are lvm-based.
1057
1058   Args:
1059     disk: ganeti.objects.Disk object
1060
1061   Returns:
1062     boolean indicating whether a LD_LV dev_type was found or not
1063
1064   """
1065   if disk.children:
1066     for chdisk in disk.children:
1067       if _RecursiveCheckIfLVMBased(chdisk):
1068         return True
1069   return disk.dev_type == constants.LD_LV
1070
1071
1072 class LUSetClusterParams(LogicalUnit):
1073   """Change the parameters of the cluster.
1074
1075   """
1076   HPATH = "cluster-modify"
1077   HTYPE = constants.HTYPE_CLUSTER
1078   _OP_REQP = []
1079
1080   def BuildHooksEnv(self):
1081     """Build hooks env.
1082
1083     """
1084     env = {
1085       "OP_TARGET": self.sstore.GetClusterName(),
1086       "NEW_VG_NAME": self.op.vg_name,
1087       }
1088     mn = self.sstore.GetMasterNode()
1089     return env, [mn], [mn]
1090
1091   def CheckPrereq(self):
1092     """Check prerequisites.
1093
1094     This checks whether the given params don't conflict and
1095     if the given volume group is valid.
1096
1097     """
1098     if not self.op.vg_name:
1099       instances = [self.cfg.GetInstanceInfo(name)
1100                    for name in self.cfg.GetInstanceList()]
1101       for inst in instances:
1102         for disk in inst.disks:
1103           if _RecursiveCheckIfLVMBased(disk):
1104             raise errors.OpPrereqError("Cannot disable lvm storage while"
1105                                        " lvm-based instances exist")
1106
1107     # if vg_name not None, checks given volume group on all nodes
1108     if self.op.vg_name:
1109       node_list = self.cfg.GetNodeList()
1110       vglist = rpc.call_vg_list(node_list)
1111       for node in node_list:
1112         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113                                               constants.MIN_VG_SIZE)
1114         if vgstatus:
1115           raise errors.OpPrereqError("Error on node '%s': %s" %
1116                                      (node, vgstatus))
1117
1118   def Exec(self, feedback_fn):
1119     """Change the parameters of the cluster.
1120
1121     """
1122     if self.op.vg_name != self.cfg.GetVGName():
1123       self.cfg.SetVGName(self.op.vg_name)
1124     else:
1125       feedback_fn("Cluster LVM configuration already in desired"
1126                   " state, not changing")
1127
1128
1129 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130   """Sleep and poll for an instance's disk to sync.
1131
1132   """
1133   if not instance.disks:
1134     return True
1135
1136   if not oneshot:
1137     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138
1139   node = instance.primary_node
1140
1141   for dev in instance.disks:
1142     cfgw.SetDiskID(dev, node)
1143
1144   retries = 0
1145   while True:
1146     max_time = 0
1147     done = True
1148     cumul_degraded = False
1149     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150     if not rstats:
1151       proc.LogWarning("Can't get any data from node %s" % node)
1152       retries += 1
1153       if retries >= 10:
1154         raise errors.RemoteError("Can't contact node %s for mirror data,"
1155                                  " aborting." % node)
1156       time.sleep(6)
1157       continue
1158     retries = 0
1159     for i in range(len(rstats)):
1160       mstat = rstats[i]
1161       if mstat is None:
1162         proc.LogWarning("Can't compute data for node %s/%s" %
1163                         (node, instance.disks[i].iv_name))
1164         continue
1165       # we ignore the ldisk parameter
1166       perc_done, est_time, is_degraded, _ = mstat
1167       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168       if perc_done is not None:
1169         done = False
1170         if est_time is not None:
1171           rem_time = "%d estimated seconds remaining" % est_time
1172           max_time = est_time
1173         else:
1174           rem_time = "no time estimate"
1175         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176                      (instance.disks[i].iv_name, perc_done, rem_time))
1177     if done or oneshot:
1178       break
1179
1180     time.sleep(min(60, max_time))
1181
1182   if done:
1183     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184   return not cumul_degraded
1185
1186
1187 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188   """Check that mirrors are not degraded.
1189
1190   The ldisk parameter, if True, will change the test from the
1191   is_degraded attribute (which represents overall non-ok status for
1192   the device(s)) to the ldisk (representing the local storage status).
1193
1194   """
1195   cfgw.SetDiskID(dev, node)
1196   if ldisk:
1197     idx = 6
1198   else:
1199     idx = 5
1200
1201   result = True
1202   if on_primary or dev.AssembleOnSecondary():
1203     rstats = rpc.call_blockdev_find(node, dev)
1204     if not rstats:
1205       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206       result = False
1207     else:
1208       result = result and (not rstats[idx])
1209   if dev.children:
1210     for child in dev.children:
1211       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212
1213   return result
1214
1215
1216 class LUDiagnoseOS(NoHooksLU):
1217   """Logical unit for OS diagnose/query.
1218
1219   """
1220   _OP_REQP = ["output_fields", "names"]
1221
1222   def CheckPrereq(self):
1223     """Check prerequisites.
1224
1225     This always succeeds, since this is a pure query LU.
1226
1227     """
1228     if self.op.names:
1229       raise errors.OpPrereqError("Selective OS query not supported")
1230
1231     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232     _CheckOutputFields(static=[],
1233                        dynamic=self.dynamic_fields,
1234                        selected=self.op.output_fields)
1235
1236   @staticmethod
1237   def _DiagnoseByOS(node_list, rlist):
1238     """Remaps a per-node return list into an a per-os per-node dictionary
1239
1240       Args:
1241         node_list: a list with the names of all nodes
1242         rlist: a map with node names as keys and OS objects as values
1243
1244       Returns:
1245         map: a map with osnames as keys and as value another map, with
1246              nodes as
1247              keys and list of OS objects as values
1248              e.g. {"debian-etch": {"node1": [<object>,...],
1249                                    "node2": [<object>,]}
1250                   }
1251
1252     """
1253     all_os = {}
1254     for node_name, nr in rlist.iteritems():
1255       if not nr:
1256         continue
1257       for os_obj in nr:
1258         if os_obj.name not in all_os:
1259           # build a list of nodes for this os containing empty lists
1260           # for each node in node_list
1261           all_os[os_obj.name] = {}
1262           for nname in node_list:
1263             all_os[os_obj.name][nname] = []
1264         all_os[os_obj.name][node_name].append(os_obj)
1265     return all_os
1266
1267   def Exec(self, feedback_fn):
1268     """Compute the list of OSes.
1269
1270     """
1271     node_list = self.cfg.GetNodeList()
1272     node_data = rpc.call_os_diagnose(node_list)
1273     if node_data == False:
1274       raise errors.OpExecError("Can't gather the list of OSes")
1275     pol = self._DiagnoseByOS(node_list, node_data)
1276     output = []
1277     for os_name, os_data in pol.iteritems():
1278       row = []
1279       for field in self.op.output_fields:
1280         if field == "name":
1281           val = os_name
1282         elif field == "valid":
1283           val = utils.all([osl and osl[0] for osl in os_data.values()])
1284         elif field == "node_status":
1285           val = {}
1286           for node_name, nos_list in os_data.iteritems():
1287             val[node_name] = [(v.status, v.path) for v in nos_list]
1288         else:
1289           raise errors.ParameterError(field)
1290         row.append(val)
1291       output.append(row)
1292
1293     return output
1294
1295
1296 class LURemoveNode(LogicalUnit):
1297   """Logical unit for removing a node.
1298
1299   """
1300   HPATH = "node-remove"
1301   HTYPE = constants.HTYPE_NODE
1302   _OP_REQP = ["node_name"]
1303
1304   def BuildHooksEnv(self):
1305     """Build hooks env.
1306
1307     This doesn't run on the target node in the pre phase as a failed
1308     node would then be impossible to remove.
1309
1310     """
1311     env = {
1312       "OP_TARGET": self.op.node_name,
1313       "NODE_NAME": self.op.node_name,
1314       }
1315     all_nodes = self.cfg.GetNodeList()
1316     all_nodes.remove(self.op.node_name)
1317     return env, all_nodes, all_nodes
1318
1319   def CheckPrereq(self):
1320     """Check prerequisites.
1321
1322     This checks:
1323      - the node exists in the configuration
1324      - it does not have primary or secondary instances
1325      - it's not the master
1326
1327     Any errors are signalled by raising errors.OpPrereqError.
1328
1329     """
1330     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331     if node is None:
1332       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333
1334     instance_list = self.cfg.GetInstanceList()
1335
1336     masternode = self.sstore.GetMasterNode()
1337     if node.name == masternode:
1338       raise errors.OpPrereqError("Node is the master node,"
1339                                  " you need to failover first.")
1340
1341     for instance_name in instance_list:
1342       instance = self.cfg.GetInstanceInfo(instance_name)
1343       if node.name == instance.primary_node:
1344         raise errors.OpPrereqError("Instance %s still running on the node,"
1345                                    " please remove first." % instance_name)
1346       if node.name in instance.secondary_nodes:
1347         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348                                    " please remove first." % instance_name)
1349     self.op.node_name = node.name
1350     self.node = node
1351
1352   def Exec(self, feedback_fn):
1353     """Removes the node from the cluster.
1354
1355     """
1356     node = self.node
1357     logger.Info("stopping the node daemon and removing configs from node %s" %
1358                 node.name)
1359
1360     self.context.RemoveNode(node.name)
1361
1362     rpc.call_node_leave_cluster(node.name)
1363
1364
1365 class LUQueryNodes(NoHooksLU):
1366   """Logical unit for querying nodes.
1367
1368   """
1369   _OP_REQP = ["output_fields", "names"]
1370   REQ_BGL = False
1371
1372   def ExpandNames(self):
1373     self.dynamic_fields = frozenset([
1374       "dtotal", "dfree",
1375       "mtotal", "mnode", "mfree",
1376       "bootid",
1377       "ctotal",
1378       ])
1379
1380     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1381                                "pinst_list", "sinst_list",
1382                                "pip", "sip", "tags"],
1383                        dynamic=self.dynamic_fields,
1384                        selected=self.op.output_fields)
1385
1386     self.needed_locks = {}
1387     self.share_locks[locking.LEVEL_NODE] = 1
1388     # TODO: we could lock nodes only if the user asked for dynamic fields. For
1389     # that we need atomic ways to get info for a group of nodes from the
1390     # config, though.
1391     if not self.op.names:
1392       self.needed_locks[locking.LEVEL_NODE] = None
1393     else:
1394        self.needed_locks[locking.LEVEL_NODE] = \
1395          _GetWantedNodes(self, self.op.names)
1396
1397   def CheckPrereq(self):
1398     """Check prerequisites.
1399
1400     """
1401     # This of course is valid only if we locked the nodes
1402     self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1403
1404   def Exec(self, feedback_fn):
1405     """Computes the list of nodes and their attributes.
1406
1407     """
1408     nodenames = self.wanted
1409     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1410
1411     # begin data gathering
1412
1413     if self.dynamic_fields.intersection(self.op.output_fields):
1414       live_data = {}
1415       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1416       for name in nodenames:
1417         nodeinfo = node_data.get(name, None)
1418         if nodeinfo:
1419           live_data[name] = {
1420             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1421             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1422             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1423             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1424             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1425             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1426             "bootid": nodeinfo['bootid'],
1427             }
1428         else:
1429           live_data[name] = {}
1430     else:
1431       live_data = dict.fromkeys(nodenames, {})
1432
1433     node_to_primary = dict([(name, set()) for name in nodenames])
1434     node_to_secondary = dict([(name, set()) for name in nodenames])
1435
1436     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1437                              "sinst_cnt", "sinst_list"))
1438     if inst_fields & frozenset(self.op.output_fields):
1439       instancelist = self.cfg.GetInstanceList()
1440
1441       for instance_name in instancelist:
1442         inst = self.cfg.GetInstanceInfo(instance_name)
1443         if inst.primary_node in node_to_primary:
1444           node_to_primary[inst.primary_node].add(inst.name)
1445         for secnode in inst.secondary_nodes:
1446           if secnode in node_to_secondary:
1447             node_to_secondary[secnode].add(inst.name)
1448
1449     # end data gathering
1450
1451     output = []
1452     for node in nodelist:
1453       node_output = []
1454       for field in self.op.output_fields:
1455         if field == "name":
1456           val = node.name
1457         elif field == "pinst_list":
1458           val = list(node_to_primary[node.name])
1459         elif field == "sinst_list":
1460           val = list(node_to_secondary[node.name])
1461         elif field == "pinst_cnt":
1462           val = len(node_to_primary[node.name])
1463         elif field == "sinst_cnt":
1464           val = len(node_to_secondary[node.name])
1465         elif field == "pip":
1466           val = node.primary_ip
1467         elif field == "sip":
1468           val = node.secondary_ip
1469         elif field == "tags":
1470           val = list(node.GetTags())
1471         elif field in self.dynamic_fields:
1472           val = live_data[node.name].get(field, None)
1473         else:
1474           raise errors.ParameterError(field)
1475         node_output.append(val)
1476       output.append(node_output)
1477
1478     return output
1479
1480
1481 class LUQueryNodeVolumes(NoHooksLU):
1482   """Logical unit for getting volumes on node(s).
1483
1484   """
1485   _OP_REQP = ["nodes", "output_fields"]
1486
1487   def CheckPrereq(self):
1488     """Check prerequisites.
1489
1490     This checks that the fields required are valid output fields.
1491
1492     """
1493     self.nodes = _GetWantedNodes(self, self.op.nodes)
1494
1495     _CheckOutputFields(static=["node"],
1496                        dynamic=["phys", "vg", "name", "size", "instance"],
1497                        selected=self.op.output_fields)
1498
1499
1500   def Exec(self, feedback_fn):
1501     """Computes the list of nodes and their attributes.
1502
1503     """
1504     nodenames = self.nodes
1505     volumes = rpc.call_node_volumes(nodenames)
1506
1507     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1508              in self.cfg.GetInstanceList()]
1509
1510     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1511
1512     output = []
1513     for node in nodenames:
1514       if node not in volumes or not volumes[node]:
1515         continue
1516
1517       node_vols = volumes[node][:]
1518       node_vols.sort(key=lambda vol: vol['dev'])
1519
1520       for vol in node_vols:
1521         node_output = []
1522         for field in self.op.output_fields:
1523           if field == "node":
1524             val = node
1525           elif field == "phys":
1526             val = vol['dev']
1527           elif field == "vg":
1528             val = vol['vg']
1529           elif field == "name":
1530             val = vol['name']
1531           elif field == "size":
1532             val = int(float(vol['size']))
1533           elif field == "instance":
1534             for inst in ilist:
1535               if node not in lv_by_node[inst]:
1536                 continue
1537               if vol['name'] in lv_by_node[inst][node]:
1538                 val = inst.name
1539                 break
1540             else:
1541               val = '-'
1542           else:
1543             raise errors.ParameterError(field)
1544           node_output.append(str(val))
1545
1546         output.append(node_output)
1547
1548     return output
1549
1550
1551 class LUAddNode(LogicalUnit):
1552   """Logical unit for adding node to the cluster.
1553
1554   """
1555   HPATH = "node-add"
1556   HTYPE = constants.HTYPE_NODE
1557   _OP_REQP = ["node_name"]
1558
1559   def BuildHooksEnv(self):
1560     """Build hooks env.
1561
1562     This will run on all nodes before, and on all nodes + the new node after.
1563
1564     """
1565     env = {
1566       "OP_TARGET": self.op.node_name,
1567       "NODE_NAME": self.op.node_name,
1568       "NODE_PIP": self.op.primary_ip,
1569       "NODE_SIP": self.op.secondary_ip,
1570       }
1571     nodes_0 = self.cfg.GetNodeList()
1572     nodes_1 = nodes_0 + [self.op.node_name, ]
1573     return env, nodes_0, nodes_1
1574
1575   def CheckPrereq(self):
1576     """Check prerequisites.
1577
1578     This checks:
1579      - the new node is not already in the config
1580      - it is resolvable
1581      - its parameters (single/dual homed) matches the cluster
1582
1583     Any errors are signalled by raising errors.OpPrereqError.
1584
1585     """
1586     node_name = self.op.node_name
1587     cfg = self.cfg
1588
1589     dns_data = utils.HostInfo(node_name)
1590
1591     node = dns_data.name
1592     primary_ip = self.op.primary_ip = dns_data.ip
1593     secondary_ip = getattr(self.op, "secondary_ip", None)
1594     if secondary_ip is None:
1595       secondary_ip = primary_ip
1596     if not utils.IsValidIP(secondary_ip):
1597       raise errors.OpPrereqError("Invalid secondary IP given")
1598     self.op.secondary_ip = secondary_ip
1599
1600     node_list = cfg.GetNodeList()
1601     if not self.op.readd and node in node_list:
1602       raise errors.OpPrereqError("Node %s is already in the configuration" %
1603                                  node)
1604     elif self.op.readd and node not in node_list:
1605       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1606
1607     for existing_node_name in node_list:
1608       existing_node = cfg.GetNodeInfo(existing_node_name)
1609
1610       if self.op.readd and node == existing_node_name:
1611         if (existing_node.primary_ip != primary_ip or
1612             existing_node.secondary_ip != secondary_ip):
1613           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1614                                      " address configuration as before")
1615         continue
1616
1617       if (existing_node.primary_ip == primary_ip or
1618           existing_node.secondary_ip == primary_ip or
1619           existing_node.primary_ip == secondary_ip or
1620           existing_node.secondary_ip == secondary_ip):
1621         raise errors.OpPrereqError("New node ip address(es) conflict with"
1622                                    " existing node %s" % existing_node.name)
1623
1624     # check that the type of the node (single versus dual homed) is the
1625     # same as for the master
1626     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1627     master_singlehomed = myself.secondary_ip == myself.primary_ip
1628     newbie_singlehomed = secondary_ip == primary_ip
1629     if master_singlehomed != newbie_singlehomed:
1630       if master_singlehomed:
1631         raise errors.OpPrereqError("The master has no private ip but the"
1632                                    " new node has one")
1633       else:
1634         raise errors.OpPrereqError("The master has a private ip but the"
1635                                    " new node doesn't have one")
1636
1637     # checks reachablity
1638     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1639       raise errors.OpPrereqError("Node not reachable by ping")
1640
1641     if not newbie_singlehomed:
1642       # check reachability from my secondary ip to newbie's secondary ip
1643       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1644                            source=myself.secondary_ip):
1645         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1646                                    " based ping to noded port")
1647
1648     self.new_node = objects.Node(name=node,
1649                                  primary_ip=primary_ip,
1650                                  secondary_ip=secondary_ip)
1651
1652   def Exec(self, feedback_fn):
1653     """Adds the new node to the cluster.
1654
1655     """
1656     new_node = self.new_node
1657     node = new_node.name
1658
1659     # check connectivity
1660     result = rpc.call_version([node])[node]
1661     if result:
1662       if constants.PROTOCOL_VERSION == result:
1663         logger.Info("communication to node %s fine, sw version %s match" %
1664                     (node, result))
1665       else:
1666         raise errors.OpExecError("Version mismatch master version %s,"
1667                                  " node version %s" %
1668                                  (constants.PROTOCOL_VERSION, result))
1669     else:
1670       raise errors.OpExecError("Cannot get version from the new node")
1671
1672     # setup ssh on node
1673     logger.Info("copy ssh key to node %s" % node)
1674     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1675     keyarray = []
1676     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1677                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1678                 priv_key, pub_key]
1679
1680     for i in keyfiles:
1681       f = open(i, 'r')
1682       try:
1683         keyarray.append(f.read())
1684       finally:
1685         f.close()
1686
1687     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1688                                keyarray[3], keyarray[4], keyarray[5])
1689
1690     if not result:
1691       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1692
1693     # Add node to our /etc/hosts, and add key to known_hosts
1694     utils.AddHostToEtcHosts(new_node.name)
1695
1696     if new_node.secondary_ip != new_node.primary_ip:
1697       if not rpc.call_node_tcp_ping(new_node.name,
1698                                     constants.LOCALHOST_IP_ADDRESS,
1699                                     new_node.secondary_ip,
1700                                     constants.DEFAULT_NODED_PORT,
1701                                     10, False):
1702         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1703                                  " you gave (%s). Please fix and re-run this"
1704                                  " command." % new_node.secondary_ip)
1705
1706     node_verify_list = [self.sstore.GetMasterNode()]
1707     node_verify_param = {
1708       'nodelist': [node],
1709       # TODO: do a node-net-test as well?
1710     }
1711
1712     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1713     for verifier in node_verify_list:
1714       if not result[verifier]:
1715         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1716                                  " for remote verification" % verifier)
1717       if result[verifier]['nodelist']:
1718         for failed in result[verifier]['nodelist']:
1719           feedback_fn("ssh/hostname verification failed %s -> %s" %
1720                       (verifier, result[verifier]['nodelist'][failed]))
1721         raise errors.OpExecError("ssh/hostname verification failed.")
1722
1723     # Distribute updated /etc/hosts and known_hosts to all nodes,
1724     # including the node just added
1725     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1726     dist_nodes = self.cfg.GetNodeList()
1727     if not self.op.readd:
1728       dist_nodes.append(node)
1729     if myself.name in dist_nodes:
1730       dist_nodes.remove(myself.name)
1731
1732     logger.Debug("Copying hosts and known_hosts to all nodes")
1733     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1734       result = rpc.call_upload_file(dist_nodes, fname)
1735       for to_node in dist_nodes:
1736         if not result[to_node]:
1737           logger.Error("copy of file %s to node %s failed" %
1738                        (fname, to_node))
1739
1740     to_copy = self.sstore.GetFileList()
1741     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742       to_copy.append(constants.VNC_PASSWORD_FILE)
1743     for fname in to_copy:
1744       result = rpc.call_upload_file([node], fname)
1745       if not result[node]:
1746         logger.Error("could not copy file %s to node %s" % (fname, node))
1747
1748     if self.op.readd:
1749       self.context.ReaddNode(new_node)
1750     else:
1751       self.context.AddNode(new_node)
1752
1753
1754 class LUQueryClusterInfo(NoHooksLU):
1755   """Query cluster configuration.
1756
1757   """
1758   _OP_REQP = []
1759   REQ_MASTER = False
1760   REQ_BGL = False
1761
1762   def ExpandNames(self):
1763     self.needed_locks = {}
1764
1765   def CheckPrereq(self):
1766     """No prerequsites needed for this LU.
1767
1768     """
1769     pass
1770
1771   def Exec(self, feedback_fn):
1772     """Return cluster config.
1773
1774     """
1775     result = {
1776       "name": self.sstore.GetClusterName(),
1777       "software_version": constants.RELEASE_VERSION,
1778       "protocol_version": constants.PROTOCOL_VERSION,
1779       "config_version": constants.CONFIG_VERSION,
1780       "os_api_version": constants.OS_API_VERSION,
1781       "export_version": constants.EXPORT_VERSION,
1782       "master": self.sstore.GetMasterNode(),
1783       "architecture": (platform.architecture()[0], platform.machine()),
1784       "hypervisor_type": self.sstore.GetHypervisorType(),
1785       }
1786
1787     return result
1788
1789
1790 class LUDumpClusterConfig(NoHooksLU):
1791   """Return a text-representation of the cluster-config.
1792
1793   """
1794   _OP_REQP = []
1795   REQ_BGL = False
1796
1797   def ExpandNames(self):
1798     self.needed_locks = {}
1799
1800   def CheckPrereq(self):
1801     """No prerequisites.
1802
1803     """
1804     pass
1805
1806   def Exec(self, feedback_fn):
1807     """Dump a representation of the cluster config to the standard output.
1808
1809     """
1810     return self.cfg.DumpConfig()
1811
1812
1813 class LUActivateInstanceDisks(NoHooksLU):
1814   """Bring up an instance's disks.
1815
1816   """
1817   _OP_REQP = ["instance_name"]
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks that the instance is in the cluster.
1823
1824     """
1825     instance = self.cfg.GetInstanceInfo(
1826       self.cfg.ExpandInstanceName(self.op.instance_name))
1827     if instance is None:
1828       raise errors.OpPrereqError("Instance '%s' not known" %
1829                                  self.op.instance_name)
1830     self.instance = instance
1831
1832
1833   def Exec(self, feedback_fn):
1834     """Activate the disks.
1835
1836     """
1837     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1838     if not disks_ok:
1839       raise errors.OpExecError("Cannot activate block devices")
1840
1841     return disks_info
1842
1843
1844 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1845   """Prepare the block devices for an instance.
1846
1847   This sets up the block devices on all nodes.
1848
1849   Args:
1850     instance: a ganeti.objects.Instance object
1851     ignore_secondaries: if true, errors on secondary nodes won't result
1852                         in an error return from the function
1853
1854   Returns:
1855     false if the operation failed
1856     list of (host, instance_visible_name, node_visible_name) if the operation
1857          suceeded with the mapping from node devices to instance devices
1858   """
1859   device_info = []
1860   disks_ok = True
1861   iname = instance.name
1862   # With the two passes mechanism we try to reduce the window of
1863   # opportunity for the race condition of switching DRBD to primary
1864   # before handshaking occured, but we do not eliminate it
1865
1866   # The proper fix would be to wait (with some limits) until the
1867   # connection has been made and drbd transitions from WFConnection
1868   # into any other network-connected state (Connected, SyncTarget,
1869   # SyncSource, etc.)
1870
1871   # 1st pass, assemble on all nodes in secondary mode
1872   for inst_disk in instance.disks:
1873     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874       cfg.SetDiskID(node_disk, node)
1875       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1876       if not result:
1877         logger.Error("could not prepare block device %s on node %s"
1878                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1879         if not ignore_secondaries:
1880           disks_ok = False
1881
1882   # FIXME: race condition on drbd migration to primary
1883
1884   # 2nd pass, do only the primary node
1885   for inst_disk in instance.disks:
1886     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887       if node != instance.primary_node:
1888         continue
1889       cfg.SetDiskID(node_disk, node)
1890       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1891       if not result:
1892         logger.Error("could not prepare block device %s on node %s"
1893                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1894         disks_ok = False
1895     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1896
1897   # leave the disks configured for the primary node
1898   # this is a workaround that would be fixed better by
1899   # improving the logical/physical id handling
1900   for disk in instance.disks:
1901     cfg.SetDiskID(disk, instance.primary_node)
1902
1903   return disks_ok, device_info
1904
1905
1906 def _StartInstanceDisks(cfg, instance, force):
1907   """Start the disks of an instance.
1908
1909   """
1910   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1911                                            ignore_secondaries=force)
1912   if not disks_ok:
1913     _ShutdownInstanceDisks(instance, cfg)
1914     if force is not None and not force:
1915       logger.Error("If the message above refers to a secondary node,"
1916                    " you can retry the operation using '--force'.")
1917     raise errors.OpExecError("Disk consistency error")
1918
1919
1920 class LUDeactivateInstanceDisks(NoHooksLU):
1921   """Shutdown an instance's disks.
1922
1923   """
1924   _OP_REQP = ["instance_name"]
1925
1926   def CheckPrereq(self):
1927     """Check prerequisites.
1928
1929     This checks that the instance is in the cluster.
1930
1931     """
1932     instance = self.cfg.GetInstanceInfo(
1933       self.cfg.ExpandInstanceName(self.op.instance_name))
1934     if instance is None:
1935       raise errors.OpPrereqError("Instance '%s' not known" %
1936                                  self.op.instance_name)
1937     self.instance = instance
1938
1939   def Exec(self, feedback_fn):
1940     """Deactivate the disks
1941
1942     """
1943     instance = self.instance
1944     ins_l = rpc.call_instance_list([instance.primary_node])
1945     ins_l = ins_l[instance.primary_node]
1946     if not type(ins_l) is list:
1947       raise errors.OpExecError("Can't contact node '%s'" %
1948                                instance.primary_node)
1949
1950     if self.instance.name in ins_l:
1951       raise errors.OpExecError("Instance is running, can't shutdown"
1952                                " block devices.")
1953
1954     _ShutdownInstanceDisks(instance, self.cfg)
1955
1956
1957 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1958   """Shutdown block devices of an instance.
1959
1960   This does the shutdown on all nodes of the instance.
1961
1962   If the ignore_primary is false, errors on the primary node are
1963   ignored.
1964
1965   """
1966   result = True
1967   for disk in instance.disks:
1968     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1969       cfg.SetDiskID(top_disk, node)
1970       if not rpc.call_blockdev_shutdown(node, top_disk):
1971         logger.Error("could not shutdown block device %s on node %s" %
1972                      (disk.iv_name, node))
1973         if not ignore_primary or node != instance.primary_node:
1974           result = False
1975   return result
1976
1977
1978 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1979   """Checks if a node has enough free memory.
1980
1981   This function check if a given node has the needed amount of free
1982   memory. In case the node has less memory or we cannot get the
1983   information from the node, this function raise an OpPrereqError
1984   exception.
1985
1986   Args:
1987     - cfg: a ConfigWriter instance
1988     - node: the node name
1989     - reason: string to use in the error message
1990     - requested: the amount of memory in MiB
1991
1992   """
1993   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1994   if not nodeinfo or not isinstance(nodeinfo, dict):
1995     raise errors.OpPrereqError("Could not contact node %s for resource"
1996                              " information" % (node,))
1997
1998   free_mem = nodeinfo[node].get('memory_free')
1999   if not isinstance(free_mem, int):
2000     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2001                              " was '%s'" % (node, free_mem))
2002   if requested > free_mem:
2003     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2004                              " needed %s MiB, available %s MiB" %
2005                              (node, reason, requested, free_mem))
2006
2007
2008 class LUStartupInstance(LogicalUnit):
2009   """Starts an instance.
2010
2011   """
2012   HPATH = "instance-start"
2013   HTYPE = constants.HTYPE_INSTANCE
2014   _OP_REQP = ["instance_name", "force"]
2015   REQ_BGL = False
2016
2017   def ExpandNames(self):
2018     self._ExpandAndLockInstance()
2019     self.needed_locks[locking.LEVEL_NODE] = []
2020     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2021
2022   def DeclareLocks(self, level):
2023     if level == locking.LEVEL_NODE:
2024       self._LockInstancesNodes()
2025
2026   def BuildHooksEnv(self):
2027     """Build hooks env.
2028
2029     This runs on master, primary and secondary nodes of the instance.
2030
2031     """
2032     env = {
2033       "FORCE": self.op.force,
2034       }
2035     env.update(_BuildInstanceHookEnvByObject(self.instance))
2036     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037           list(self.instance.secondary_nodes))
2038     return env, nl, nl
2039
2040   def CheckPrereq(self):
2041     """Check prerequisites.
2042
2043     This checks that the instance is in the cluster.
2044
2045     """
2046     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2047     assert self.instance is not None, \
2048       "Cannot retrieve locked instance %s" % self.op.instance_name
2049
2050     # check bridges existance
2051     _CheckInstanceBridgesExist(instance)
2052
2053     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2054                          "starting instance %s" % instance.name,
2055                          instance.memory)
2056
2057   def Exec(self, feedback_fn):
2058     """Start the instance.
2059
2060     """
2061     instance = self.instance
2062     force = self.op.force
2063     extra_args = getattr(self.op, "extra_args", "")
2064
2065     self.cfg.MarkInstanceUp(instance.name)
2066
2067     node_current = instance.primary_node
2068
2069     _StartInstanceDisks(self.cfg, instance, force)
2070
2071     if not rpc.call_instance_start(node_current, instance, extra_args):
2072       _ShutdownInstanceDisks(instance, self.cfg)
2073       raise errors.OpExecError("Could not start instance")
2074
2075
2076 class LURebootInstance(LogicalUnit):
2077   """Reboot an instance.
2078
2079   """
2080   HPATH = "instance-reboot"
2081   HTYPE = constants.HTYPE_INSTANCE
2082   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083   REQ_BGL = False
2084
2085   def ExpandNames(self):
2086     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2087                                    constants.INSTANCE_REBOOT_HARD,
2088                                    constants.INSTANCE_REBOOT_FULL]:
2089       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2090                                   (constants.INSTANCE_REBOOT_SOFT,
2091                                    constants.INSTANCE_REBOOT_HARD,
2092                                    constants.INSTANCE_REBOOT_FULL))
2093     self._ExpandAndLockInstance()
2094     self.needed_locks[locking.LEVEL_NODE] = []
2095     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2096
2097   def DeclareLocks(self, level):
2098     if level == locking.LEVEL_NODE:
2099       # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2100       self._LockInstancesNodes()
2101
2102   def BuildHooksEnv(self):
2103     """Build hooks env.
2104
2105     This runs on master, primary and secondary nodes of the instance.
2106
2107     """
2108     env = {
2109       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2110       }
2111     env.update(_BuildInstanceHookEnvByObject(self.instance))
2112     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113           list(self.instance.secondary_nodes))
2114     return env, nl, nl
2115
2116   def CheckPrereq(self):
2117     """Check prerequisites.
2118
2119     This checks that the instance is in the cluster.
2120
2121     """
2122     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2123     assert self.instance is not None, \
2124       "Cannot retrieve locked instance %s" % self.op.instance_name
2125
2126     # check bridges existance
2127     _CheckInstanceBridgesExist(instance)
2128
2129   def Exec(self, feedback_fn):
2130     """Reboot the instance.
2131
2132     """
2133     instance = self.instance
2134     ignore_secondaries = self.op.ignore_secondaries
2135     reboot_type = self.op.reboot_type
2136     extra_args = getattr(self.op, "extra_args", "")
2137
2138     node_current = instance.primary_node
2139
2140     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2141                        constants.INSTANCE_REBOOT_HARD]:
2142       if not rpc.call_instance_reboot(node_current, instance,
2143                                       reboot_type, extra_args):
2144         raise errors.OpExecError("Could not reboot instance")
2145     else:
2146       if not rpc.call_instance_shutdown(node_current, instance):
2147         raise errors.OpExecError("could not shutdown instance for full reboot")
2148       _ShutdownInstanceDisks(instance, self.cfg)
2149       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2150       if not rpc.call_instance_start(node_current, instance, extra_args):
2151         _ShutdownInstanceDisks(instance, self.cfg)
2152         raise errors.OpExecError("Could not start instance for full reboot")
2153
2154     self.cfg.MarkInstanceUp(instance.name)
2155
2156
2157 class LUShutdownInstance(LogicalUnit):
2158   """Shutdown an instance.
2159
2160   """
2161   HPATH = "instance-stop"
2162   HTYPE = constants.HTYPE_INSTANCE
2163   _OP_REQP = ["instance_name"]
2164   REQ_BGL = False
2165
2166   def ExpandNames(self):
2167     self._ExpandAndLockInstance()
2168     self.needed_locks[locking.LEVEL_NODE] = []
2169     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2170
2171   def DeclareLocks(self, level):
2172     if level == locking.LEVEL_NODE:
2173       self._LockInstancesNodes()
2174
2175   def BuildHooksEnv(self):
2176     """Build hooks env.
2177
2178     This runs on master, primary and secondary nodes of the instance.
2179
2180     """
2181     env = _BuildInstanceHookEnvByObject(self.instance)
2182     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183           list(self.instance.secondary_nodes))
2184     return env, nl, nl
2185
2186   def CheckPrereq(self):
2187     """Check prerequisites.
2188
2189     This checks that the instance is in the cluster.
2190
2191     """
2192     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2193     assert self.instance is not None, \
2194       "Cannot retrieve locked instance %s" % self.op.instance_name
2195
2196   def Exec(self, feedback_fn):
2197     """Shutdown the instance.
2198
2199     """
2200     instance = self.instance
2201     node_current = instance.primary_node
2202     self.cfg.MarkInstanceDown(instance.name)
2203     if not rpc.call_instance_shutdown(node_current, instance):
2204       logger.Error("could not shutdown instance")
2205
2206     _ShutdownInstanceDisks(instance, self.cfg)
2207
2208
2209 class LUReinstallInstance(LogicalUnit):
2210   """Reinstall an instance.
2211
2212   """
2213   HPATH = "instance-reinstall"
2214   HTYPE = constants.HTYPE_INSTANCE
2215   _OP_REQP = ["instance_name"]
2216   REQ_BGL = False
2217
2218   def ExpandNames(self):
2219     self._ExpandAndLockInstance()
2220     self.needed_locks[locking.LEVEL_NODE] = []
2221     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2222
2223   def DeclareLocks(self, level):
2224     if level == locking.LEVEL_NODE:
2225       self._LockInstancesNodes()
2226
2227   def BuildHooksEnv(self):
2228     """Build hooks env.
2229
2230     This runs on master, primary and secondary nodes of the instance.
2231
2232     """
2233     env = _BuildInstanceHookEnvByObject(self.instance)
2234     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235           list(self.instance.secondary_nodes))
2236     return env, nl, nl
2237
2238   def CheckPrereq(self):
2239     """Check prerequisites.
2240
2241     This checks that the instance is in the cluster and is not running.
2242
2243     """
2244     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245     assert instance is not None, \
2246       "Cannot retrieve locked instance %s" % self.op.instance_name
2247
2248     if instance.disk_template == constants.DT_DISKLESS:
2249       raise errors.OpPrereqError("Instance '%s' has no disks" %
2250                                  self.op.instance_name)
2251     if instance.status != "down":
2252       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2253                                  self.op.instance_name)
2254     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2255     if remote_info:
2256       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2257                                  (self.op.instance_name,
2258                                   instance.primary_node))
2259
2260     self.op.os_type = getattr(self.op, "os_type", None)
2261     if self.op.os_type is not None:
2262       # OS verification
2263       pnode = self.cfg.GetNodeInfo(
2264         self.cfg.ExpandNodeName(instance.primary_node))
2265       if pnode is None:
2266         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2267                                    self.op.pnode)
2268       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2269       if not os_obj:
2270         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2271                                    " primary node"  % self.op.os_type)
2272
2273     self.instance = instance
2274
2275   def Exec(self, feedback_fn):
2276     """Reinstall the instance.
2277
2278     """
2279     inst = self.instance
2280
2281     if self.op.os_type is not None:
2282       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2283       inst.os = self.op.os_type
2284       self.cfg.AddInstance(inst)
2285
2286     _StartInstanceDisks(self.cfg, inst, None)
2287     try:
2288       feedback_fn("Running the instance OS create scripts...")
2289       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2290         raise errors.OpExecError("Could not install OS for instance %s"
2291                                  " on node %s" %
2292                                  (inst.name, inst.primary_node))
2293     finally:
2294       _ShutdownInstanceDisks(inst, self.cfg)
2295
2296
2297 class LURenameInstance(LogicalUnit):
2298   """Rename an instance.
2299
2300   """
2301   HPATH = "instance-rename"
2302   HTYPE = constants.HTYPE_INSTANCE
2303   _OP_REQP = ["instance_name", "new_name"]
2304
2305   def BuildHooksEnv(self):
2306     """Build hooks env.
2307
2308     This runs on master, primary and secondary nodes of the instance.
2309
2310     """
2311     env = _BuildInstanceHookEnvByObject(self.instance)
2312     env["INSTANCE_NEW_NAME"] = self.op.new_name
2313     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2314           list(self.instance.secondary_nodes))
2315     return env, nl, nl
2316
2317   def CheckPrereq(self):
2318     """Check prerequisites.
2319
2320     This checks that the instance is in the cluster and is not running.
2321
2322     """
2323     instance = self.cfg.GetInstanceInfo(
2324       self.cfg.ExpandInstanceName(self.op.instance_name))
2325     if instance is None:
2326       raise errors.OpPrereqError("Instance '%s' not known" %
2327                                  self.op.instance_name)
2328     if instance.status != "down":
2329       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2330                                  self.op.instance_name)
2331     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2332     if remote_info:
2333       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2334                                  (self.op.instance_name,
2335                                   instance.primary_node))
2336     self.instance = instance
2337
2338     # new name verification
2339     name_info = utils.HostInfo(self.op.new_name)
2340
2341     self.op.new_name = new_name = name_info.name
2342     instance_list = self.cfg.GetInstanceList()
2343     if new_name in instance_list:
2344       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2345                                  new_name)
2346
2347     if not getattr(self.op, "ignore_ip", False):
2348       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2349         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2350                                    (name_info.ip, new_name))
2351
2352
2353   def Exec(self, feedback_fn):
2354     """Reinstall the instance.
2355
2356     """
2357     inst = self.instance
2358     old_name = inst.name
2359
2360     if inst.disk_template == constants.DT_FILE:
2361       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2362
2363     self.cfg.RenameInstance(inst.name, self.op.new_name)
2364     # Change the instance lock. This is definitely safe while we hold the BGL
2365     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2366     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2367
2368     # re-read the instance from the configuration after rename
2369     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2370
2371     if inst.disk_template == constants.DT_FILE:
2372       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2374                                                 old_file_storage_dir,
2375                                                 new_file_storage_dir)
2376
2377       if not result:
2378         raise errors.OpExecError("Could not connect to node '%s' to rename"
2379                                  " directory '%s' to '%s' (but the instance"
2380                                  " has been renamed in Ganeti)" % (
2381                                  inst.primary_node, old_file_storage_dir,
2382                                  new_file_storage_dir))
2383
2384       if not result[0]:
2385         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2386                                  " (but the instance has been renamed in"
2387                                  " Ganeti)" % (old_file_storage_dir,
2388                                                new_file_storage_dir))
2389
2390     _StartInstanceDisks(self.cfg, inst, None)
2391     try:
2392       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2393                                           "sda", "sdb"):
2394         msg = ("Could run OS rename script for instance %s on node %s (but the"
2395                " instance has been renamed in Ganeti)" %
2396                (inst.name, inst.primary_node))
2397         logger.Error(msg)
2398     finally:
2399       _ShutdownInstanceDisks(inst, self.cfg)
2400
2401
2402 class LURemoveInstance(LogicalUnit):
2403   """Remove an instance.
2404
2405   """
2406   HPATH = "instance-remove"
2407   HTYPE = constants.HTYPE_INSTANCE
2408   _OP_REQP = ["instance_name", "ignore_failures"]
2409
2410   def BuildHooksEnv(self):
2411     """Build hooks env.
2412
2413     This runs on master, primary and secondary nodes of the instance.
2414
2415     """
2416     env = _BuildInstanceHookEnvByObject(self.instance)
2417     nl = [self.sstore.GetMasterNode()]
2418     return env, nl, nl
2419
2420   def CheckPrereq(self):
2421     """Check prerequisites.
2422
2423     This checks that the instance is in the cluster.
2424
2425     """
2426     instance = self.cfg.GetInstanceInfo(
2427       self.cfg.ExpandInstanceName(self.op.instance_name))
2428     if instance is None:
2429       raise errors.OpPrereqError("Instance '%s' not known" %
2430                                  self.op.instance_name)
2431     self.instance = instance
2432
2433   def Exec(self, feedback_fn):
2434     """Remove the instance.
2435
2436     """
2437     instance = self.instance
2438     logger.Info("shutting down instance %s on node %s" %
2439                 (instance.name, instance.primary_node))
2440
2441     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2442       if self.op.ignore_failures:
2443         feedback_fn("Warning: can't shutdown instance")
2444       else:
2445         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2446                                  (instance.name, instance.primary_node))
2447
2448     logger.Info("removing block devices for instance %s" % instance.name)
2449
2450     if not _RemoveDisks(instance, self.cfg):
2451       if self.op.ignore_failures:
2452         feedback_fn("Warning: can't remove instance's disks")
2453       else:
2454         raise errors.OpExecError("Can't remove instance's disks")
2455
2456     logger.Info("removing instance %s out of cluster config" % instance.name)
2457
2458     self.cfg.RemoveInstance(instance.name)
2459     # Remove the new instance from the Ganeti Lock Manager
2460     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2461
2462
2463 class LUQueryInstances(NoHooksLU):
2464   """Logical unit for querying instances.
2465
2466   """
2467   _OP_REQP = ["output_fields", "names"]
2468   REQ_BGL = False
2469
2470   def ExpandNames(self):
2471     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2472     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2473                                "admin_state", "admin_ram",
2474                                "disk_template", "ip", "mac", "bridge",
2475                                "sda_size", "sdb_size", "vcpus", "tags"],
2476                        dynamic=self.dynamic_fields,
2477                        selected=self.op.output_fields)
2478
2479     self.needed_locks = {}
2480     self.share_locks[locking.LEVEL_INSTANCE] = 1
2481     self.share_locks[locking.LEVEL_NODE] = 1
2482
2483     # TODO: we could lock instances (and nodes) only if the user asked for
2484     # dynamic fields. For that we need atomic ways to get info for a group of
2485     # instances from the config, though.
2486     if not self.op.names:
2487       self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2488     else:
2489       self.needed_locks[locking.LEVEL_INSTANCE] = \
2490         _GetWantedInstances(self, self.op.names)
2491
2492     self.needed_locks[locking.LEVEL_NODE] = []
2493     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2494
2495   def DeclareLocks(self, level):
2496     # TODO: locking of nodes could be avoided when not querying them
2497     if level == locking.LEVEL_NODE:
2498       self._LockInstancesNodes()
2499
2500   def CheckPrereq(self):
2501     """Check prerequisites.
2502
2503     """
2504     # This of course is valid only if we locked the instances
2505     self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2506
2507   def Exec(self, feedback_fn):
2508     """Computes the list of nodes and their attributes.
2509
2510     """
2511     instance_names = self.wanted
2512     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2513                      in instance_names]
2514
2515     # begin data gathering
2516
2517     nodes = frozenset([inst.primary_node for inst in instance_list])
2518
2519     bad_nodes = []
2520     if self.dynamic_fields.intersection(self.op.output_fields):
2521       live_data = {}
2522       node_data = rpc.call_all_instances_info(nodes)
2523       for name in nodes:
2524         result = node_data[name]
2525         if result:
2526           live_data.update(result)
2527         elif result == False:
2528           bad_nodes.append(name)
2529         # else no instance is alive
2530     else:
2531       live_data = dict([(name, {}) for name in instance_names])
2532
2533     # end data gathering
2534
2535     output = []
2536     for instance in instance_list:
2537       iout = []
2538       for field in self.op.output_fields:
2539         if field == "name":
2540           val = instance.name
2541         elif field == "os":
2542           val = instance.os
2543         elif field == "pnode":
2544           val = instance.primary_node
2545         elif field == "snodes":
2546           val = list(instance.secondary_nodes)
2547         elif field == "admin_state":
2548           val = (instance.status != "down")
2549         elif field == "oper_state":
2550           if instance.primary_node in bad_nodes:
2551             val = None
2552           else:
2553             val = bool(live_data.get(instance.name))
2554         elif field == "status":
2555           if instance.primary_node in bad_nodes:
2556             val = "ERROR_nodedown"
2557           else:
2558             running = bool(live_data.get(instance.name))
2559             if running:
2560               if instance.status != "down":
2561                 val = "running"
2562               else:
2563                 val = "ERROR_up"
2564             else:
2565               if instance.status != "down":
2566                 val = "ERROR_down"
2567               else:
2568                 val = "ADMIN_down"
2569         elif field == "admin_ram":
2570           val = instance.memory
2571         elif field == "oper_ram":
2572           if instance.primary_node in bad_nodes:
2573             val = None
2574           elif instance.name in live_data:
2575             val = live_data[instance.name].get("memory", "?")
2576           else:
2577             val = "-"
2578         elif field == "disk_template":
2579           val = instance.disk_template
2580         elif field == "ip":
2581           val = instance.nics[0].ip
2582         elif field == "bridge":
2583           val = instance.nics[0].bridge
2584         elif field == "mac":
2585           val = instance.nics[0].mac
2586         elif field == "sda_size" or field == "sdb_size":
2587           disk = instance.FindDisk(field[:3])
2588           if disk is None:
2589             val = None
2590           else:
2591             val = disk.size
2592         elif field == "vcpus":
2593           val = instance.vcpus
2594         elif field == "tags":
2595           val = list(instance.GetTags())
2596         else:
2597           raise errors.ParameterError(field)
2598         iout.append(val)
2599       output.append(iout)
2600
2601     return output
2602
2603
2604 class LUFailoverInstance(LogicalUnit):
2605   """Failover an instance.
2606
2607   """
2608   HPATH = "instance-failover"
2609   HTYPE = constants.HTYPE_INSTANCE
2610   _OP_REQP = ["instance_name", "ignore_consistency"]
2611   REQ_BGL = False
2612
2613   def ExpandNames(self):
2614     self._ExpandAndLockInstance()
2615     self.needed_locks[locking.LEVEL_NODE] = []
2616     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2617
2618   def DeclareLocks(self, level):
2619     if level == locking.LEVEL_NODE:
2620       self._LockInstancesNodes()
2621
2622   def BuildHooksEnv(self):
2623     """Build hooks env.
2624
2625     This runs on master, primary and secondary nodes of the instance.
2626
2627     """
2628     env = {
2629       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2630       }
2631     env.update(_BuildInstanceHookEnvByObject(self.instance))
2632     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2633     return env, nl, nl
2634
2635   def CheckPrereq(self):
2636     """Check prerequisites.
2637
2638     This checks that the instance is in the cluster.
2639
2640     """
2641     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2642     assert self.instance is not None, \
2643       "Cannot retrieve locked instance %s" % self.op.instance_name
2644
2645     if instance.disk_template not in constants.DTS_NET_MIRROR:
2646       raise errors.OpPrereqError("Instance's disk layout is not"
2647                                  " network mirrored, cannot failover.")
2648
2649     secondary_nodes = instance.secondary_nodes
2650     if not secondary_nodes:
2651       raise errors.ProgrammerError("no secondary node but using "
2652                                    "a mirrored disk template")
2653
2654     target_node = secondary_nodes[0]
2655     # check memory requirements on the secondary node
2656     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2657                          instance.name, instance.memory)
2658
2659     # check bridge existance
2660     brlist = [nic.bridge for nic in instance.nics]
2661     if not rpc.call_bridges_exist(target_node, brlist):
2662       raise errors.OpPrereqError("One or more target bridges %s does not"
2663                                  " exist on destination node '%s'" %
2664                                  (brlist, target_node))
2665
2666   def Exec(self, feedback_fn):
2667     """Failover an instance.
2668
2669     The failover is done by shutting it down on its present node and
2670     starting it on the secondary.
2671
2672     """
2673     instance = self.instance
2674
2675     source_node = instance.primary_node
2676     target_node = instance.secondary_nodes[0]
2677
2678     feedback_fn("* checking disk consistency between source and target")
2679     for dev in instance.disks:
2680       # for drbd, these are drbd over lvm
2681       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2682         if instance.status == "up" and not self.op.ignore_consistency:
2683           raise errors.OpExecError("Disk %s is degraded on target node,"
2684                                    " aborting failover." % dev.iv_name)
2685
2686     feedback_fn("* shutting down instance on source node")
2687     logger.Info("Shutting down instance %s on node %s" %
2688                 (instance.name, source_node))
2689
2690     if not rpc.call_instance_shutdown(source_node, instance):
2691       if self.op.ignore_consistency:
2692         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2693                      " anyway. Please make sure node %s is down"  %
2694                      (instance.name, source_node, source_node))
2695       else:
2696         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2697                                  (instance.name, source_node))
2698
2699     feedback_fn("* deactivating the instance's disks on source node")
2700     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2701       raise errors.OpExecError("Can't shut down the instance's disks.")
2702
2703     instance.primary_node = target_node
2704     # distribute new instance config to the other nodes
2705     self.cfg.Update(instance)
2706
2707     # Only start the instance if it's marked as up
2708     if instance.status == "up":
2709       feedback_fn("* activating the instance's disks on target node")
2710       logger.Info("Starting instance %s on node %s" %
2711                   (instance.name, target_node))
2712
2713       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2714                                                ignore_secondaries=True)
2715       if not disks_ok:
2716         _ShutdownInstanceDisks(instance, self.cfg)
2717         raise errors.OpExecError("Can't activate the instance's disks")
2718
2719       feedback_fn("* starting the instance on the target node")
2720       if not rpc.call_instance_start(target_node, instance, None):
2721         _ShutdownInstanceDisks(instance, self.cfg)
2722         raise errors.OpExecError("Could not start instance %s on node %s." %
2723                                  (instance.name, target_node))
2724
2725
2726 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2727   """Create a tree of block devices on the primary node.
2728
2729   This always creates all devices.
2730
2731   """
2732   if device.children:
2733     for child in device.children:
2734       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2735         return False
2736
2737   cfg.SetDiskID(device, node)
2738   new_id = rpc.call_blockdev_create(node, device, device.size,
2739                                     instance.name, True, info)
2740   if not new_id:
2741     return False
2742   if device.physical_id is None:
2743     device.physical_id = new_id
2744   return True
2745
2746
2747 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2748   """Create a tree of block devices on a secondary node.
2749
2750   If this device type has to be created on secondaries, create it and
2751   all its children.
2752
2753   If not, just recurse to children keeping the same 'force' value.
2754
2755   """
2756   if device.CreateOnSecondary():
2757     force = True
2758   if device.children:
2759     for child in device.children:
2760       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2761                                         child, force, info):
2762         return False
2763
2764   if not force:
2765     return True
2766   cfg.SetDiskID(device, node)
2767   new_id = rpc.call_blockdev_create(node, device, device.size,
2768                                     instance.name, False, info)
2769   if not new_id:
2770     return False
2771   if device.physical_id is None:
2772     device.physical_id = new_id
2773   return True
2774
2775
2776 def _GenerateUniqueNames(cfg, exts):
2777   """Generate a suitable LV name.
2778
2779   This will generate a logical volume name for the given instance.
2780
2781   """
2782   results = []
2783   for val in exts:
2784     new_id = cfg.GenerateUniqueID()
2785     results.append("%s%s" % (new_id, val))
2786   return results
2787
2788
2789 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2790   """Generate a drbd8 device complete with its children.
2791
2792   """
2793   port = cfg.AllocatePort()
2794   vgname = cfg.GetVGName()
2795   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2796                           logical_id=(vgname, names[0]))
2797   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2798                           logical_id=(vgname, names[1]))
2799   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2800                           logical_id = (primary, secondary, port),
2801                           children = [dev_data, dev_meta],
2802                           iv_name=iv_name)
2803   return drbd_dev
2804
2805
2806 def _GenerateDiskTemplate(cfg, template_name,
2807                           instance_name, primary_node,
2808                           secondary_nodes, disk_sz, swap_sz,
2809                           file_storage_dir, file_driver):
2810   """Generate the entire disk layout for a given template type.
2811
2812   """
2813   #TODO: compute space requirements
2814
2815   vgname = cfg.GetVGName()
2816   if template_name == constants.DT_DISKLESS:
2817     disks = []
2818   elif template_name == constants.DT_PLAIN:
2819     if len(secondary_nodes) != 0:
2820       raise errors.ProgrammerError("Wrong template configuration")
2821
2822     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2823     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2824                            logical_id=(vgname, names[0]),
2825                            iv_name = "sda")
2826     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2827                            logical_id=(vgname, names[1]),
2828                            iv_name = "sdb")
2829     disks = [sda_dev, sdb_dev]
2830   elif template_name == constants.DT_DRBD8:
2831     if len(secondary_nodes) != 1:
2832       raise errors.ProgrammerError("Wrong template configuration")
2833     remote_node = secondary_nodes[0]
2834     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2835                                        ".sdb_data", ".sdb_meta"])
2836     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837                                          disk_sz, names[0:2], "sda")
2838     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2839                                          swap_sz, names[2:4], "sdb")
2840     disks = [drbd_sda_dev, drbd_sdb_dev]
2841   elif template_name == constants.DT_FILE:
2842     if len(secondary_nodes) != 0:
2843       raise errors.ProgrammerError("Wrong template configuration")
2844
2845     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2846                                 iv_name="sda", logical_id=(file_driver,
2847                                 "%s/sda" % file_storage_dir))
2848     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2849                                 iv_name="sdb", logical_id=(file_driver,
2850                                 "%s/sdb" % file_storage_dir))
2851     disks = [file_sda_dev, file_sdb_dev]
2852   else:
2853     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2854   return disks
2855
2856
2857 def _GetInstanceInfoText(instance):
2858   """Compute that text that should be added to the disk's metadata.
2859
2860   """
2861   return "originstname+%s" % instance.name
2862
2863
2864 def _CreateDisks(cfg, instance):
2865   """Create all disks for an instance.
2866
2867   This abstracts away some work from AddInstance.
2868
2869   Args:
2870     instance: the instance object
2871
2872   Returns:
2873     True or False showing the success of the creation process
2874
2875   """
2876   info = _GetInstanceInfoText(instance)
2877
2878   if instance.disk_template == constants.DT_FILE:
2879     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2880     result = rpc.call_file_storage_dir_create(instance.primary_node,
2881                                               file_storage_dir)
2882
2883     if not result:
2884       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2885       return False
2886
2887     if not result[0]:
2888       logger.Error("failed to create directory '%s'" % file_storage_dir)
2889       return False
2890
2891   for device in instance.disks:
2892     logger.Info("creating volume %s for instance %s" %
2893                 (device.iv_name, instance.name))
2894     #HARDCODE
2895     for secondary_node in instance.secondary_nodes:
2896       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2897                                         device, False, info):
2898         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2899                      (device.iv_name, device, secondary_node))
2900         return False
2901     #HARDCODE
2902     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2903                                     instance, device, info):
2904       logger.Error("failed to create volume %s on primary!" %
2905                    device.iv_name)
2906       return False
2907
2908   return True
2909
2910
2911 def _RemoveDisks(instance, cfg):
2912   """Remove all disks for an instance.
2913
2914   This abstracts away some work from `AddInstance()` and
2915   `RemoveInstance()`. Note that in case some of the devices couldn't
2916   be removed, the removal will continue with the other ones (compare
2917   with `_CreateDisks()`).
2918
2919   Args:
2920     instance: the instance object
2921
2922   Returns:
2923     True or False showing the success of the removal proces
2924
2925   """
2926   logger.Info("removing block devices for instance %s" % instance.name)
2927
2928   result = True
2929   for device in instance.disks:
2930     for node, disk in device.ComputeNodeTree(instance.primary_node):
2931       cfg.SetDiskID(disk, node)
2932       if not rpc.call_blockdev_remove(node, disk):
2933         logger.Error("could not remove block device %s on node %s,"
2934                      " continuing anyway" %
2935                      (device.iv_name, node))
2936         result = False
2937
2938   if instance.disk_template == constants.DT_FILE:
2939     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2940     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2941                                             file_storage_dir):
2942       logger.Error("could not remove directory '%s'" % file_storage_dir)
2943       result = False
2944
2945   return result
2946
2947
2948 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2949   """Compute disk size requirements in the volume group
2950
2951   This is currently hard-coded for the two-drive layout.
2952
2953   """
2954   # Required free disk space as a function of disk and swap space
2955   req_size_dict = {
2956     constants.DT_DISKLESS: None,
2957     constants.DT_PLAIN: disk_size + swap_size,
2958     # 256 MB are added for drbd metadata, 128MB for each drbd device
2959     constants.DT_DRBD8: disk_size + swap_size + 256,
2960     constants.DT_FILE: None,
2961   }
2962
2963   if disk_template not in req_size_dict:
2964     raise errors.ProgrammerError("Disk template '%s' size requirement"
2965                                  " is unknown" %  disk_template)
2966
2967   return req_size_dict[disk_template]
2968
2969
2970 class LUCreateInstance(LogicalUnit):
2971   """Create an instance.
2972
2973   """
2974   HPATH = "instance-add"
2975   HTYPE = constants.HTYPE_INSTANCE
2976   _OP_REQP = ["instance_name", "mem_size", "disk_size",
2977               "disk_template", "swap_size", "mode", "start", "vcpus",
2978               "wait_for_sync", "ip_check", "mac"]
2979
2980   def _RunAllocator(self):
2981     """Run the allocator based on input opcode.
2982
2983     """
2984     disks = [{"size": self.op.disk_size, "mode": "w"},
2985              {"size": self.op.swap_size, "mode": "w"}]
2986     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2987              "bridge": self.op.bridge}]
2988     ial = IAllocator(self.cfg, self.sstore,
2989                      mode=constants.IALLOCATOR_MODE_ALLOC,
2990                      name=self.op.instance_name,
2991                      disk_template=self.op.disk_template,
2992                      tags=[],
2993                      os=self.op.os_type,
2994                      vcpus=self.op.vcpus,
2995                      mem_size=self.op.mem_size,
2996                      disks=disks,
2997                      nics=nics,
2998                      )
2999
3000     ial.Run(self.op.iallocator)
3001
3002     if not ial.success:
3003       raise errors.OpPrereqError("Can't compute nodes using"
3004                                  " iallocator '%s': %s" % (self.op.iallocator,
3005                                                            ial.info))
3006     if len(ial.nodes) != ial.required_nodes:
3007       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3008                                  " of nodes (%s), required %s" %
3009                                  (len(ial.nodes), ial.required_nodes))
3010     self.op.pnode = ial.nodes[0]
3011     logger.ToStdout("Selected nodes for the instance: %s" %
3012                     (", ".join(ial.nodes),))
3013     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3014                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3015     if ial.required_nodes == 2:
3016       self.op.snode = ial.nodes[1]
3017
3018   def BuildHooksEnv(self):
3019     """Build hooks env.
3020
3021     This runs on master, primary and secondary nodes of the instance.
3022
3023     """
3024     env = {
3025       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3026       "INSTANCE_DISK_SIZE": self.op.disk_size,
3027       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3028       "INSTANCE_ADD_MODE": self.op.mode,
3029       }
3030     if self.op.mode == constants.INSTANCE_IMPORT:
3031       env["INSTANCE_SRC_NODE"] = self.op.src_node
3032       env["INSTANCE_SRC_PATH"] = self.op.src_path
3033       env["INSTANCE_SRC_IMAGE"] = self.src_image
3034
3035     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3036       primary_node=self.op.pnode,
3037       secondary_nodes=self.secondaries,
3038       status=self.instance_status,
3039       os_type=self.op.os_type,
3040       memory=self.op.mem_size,
3041       vcpus=self.op.vcpus,
3042       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3043     ))
3044
3045     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3046           self.secondaries)
3047     return env, nl, nl
3048
3049
3050   def CheckPrereq(self):
3051     """Check prerequisites.
3052
3053     """
3054     # set optional parameters to none if they don't exist
3055     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3056                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3057                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3058       if not hasattr(self.op, attr):
3059         setattr(self.op, attr, None)
3060
3061     if self.op.mode not in (constants.INSTANCE_CREATE,
3062                             constants.INSTANCE_IMPORT):
3063       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3064                                  self.op.mode)
3065
3066     if (not self.cfg.GetVGName() and
3067         self.op.disk_template not in constants.DTS_NOT_LVM):
3068       raise errors.OpPrereqError("Cluster does not support lvm-based"
3069                                  " instances")
3070
3071     if self.op.mode == constants.INSTANCE_IMPORT:
3072       src_node = getattr(self.op, "src_node", None)
3073       src_path = getattr(self.op, "src_path", None)
3074       if src_node is None or src_path is None:
3075         raise errors.OpPrereqError("Importing an instance requires source"
3076                                    " node and path options")
3077       src_node_full = self.cfg.ExpandNodeName(src_node)
3078       if src_node_full is None:
3079         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3080       self.op.src_node = src_node = src_node_full
3081
3082       if not os.path.isabs(src_path):
3083         raise errors.OpPrereqError("The source path must be absolute")
3084
3085       export_info = rpc.call_export_info(src_node, src_path)
3086
3087       if not export_info:
3088         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3089
3090       if not export_info.has_section(constants.INISECT_EXP):
3091         raise errors.ProgrammerError("Corrupted export config")
3092
3093       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3094       if (int(ei_version) != constants.EXPORT_VERSION):
3095         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3096                                    (ei_version, constants.EXPORT_VERSION))
3097
3098       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3099         raise errors.OpPrereqError("Can't import instance with more than"
3100                                    " one data disk")
3101
3102       # FIXME: are the old os-es, disk sizes, etc. useful?
3103       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3104       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3105                                                          'disk0_dump'))
3106       self.src_image = diskimage
3107     else: # INSTANCE_CREATE
3108       if getattr(self.op, "os_type", None) is None:
3109         raise errors.OpPrereqError("No guest OS specified")
3110
3111     #### instance parameters check
3112
3113     # disk template and mirror node verification
3114     if self.op.disk_template not in constants.DISK_TEMPLATES:
3115       raise errors.OpPrereqError("Invalid disk template name")
3116
3117     # instance name verification
3118     hostname1 = utils.HostInfo(self.op.instance_name)
3119
3120     self.op.instance_name = instance_name = hostname1.name
3121     instance_list = self.cfg.GetInstanceList()
3122     if instance_name in instance_list:
3123       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3124                                  instance_name)
3125
3126     # ip validity checks
3127     ip = getattr(self.op, "ip", None)
3128     if ip is None or ip.lower() == "none":
3129       inst_ip = None
3130     elif ip.lower() == "auto":
3131       inst_ip = hostname1.ip
3132     else:
3133       if not utils.IsValidIP(ip):
3134         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3135                                    " like a valid IP" % ip)
3136       inst_ip = ip
3137     self.inst_ip = self.op.ip = inst_ip
3138
3139     if self.op.start and not self.op.ip_check:
3140       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3141                                  " adding an instance in start mode")
3142
3143     if self.op.ip_check:
3144       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3145         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3146                                    (hostname1.ip, instance_name))
3147
3148     # MAC address verification
3149     if self.op.mac != "auto":
3150       if not utils.IsValidMac(self.op.mac.lower()):
3151         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3152                                    self.op.mac)
3153
3154     # bridge verification
3155     bridge = getattr(self.op, "bridge", None)
3156     if bridge is None:
3157       self.op.bridge = self.cfg.GetDefBridge()
3158     else:
3159       self.op.bridge = bridge
3160
3161     # boot order verification
3162     if self.op.hvm_boot_order is not None:
3163       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3164         raise errors.OpPrereqError("invalid boot order specified,"
3165                                    " must be one or more of [acdn]")
3166     # file storage checks
3167     if (self.op.file_driver and
3168         not self.op.file_driver in constants.FILE_DRIVER):
3169       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3170                                  self.op.file_driver)
3171
3172     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3173       raise errors.OpPrereqError("File storage directory not a relative"
3174                                  " path")
3175     #### allocator run
3176
3177     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3178       raise errors.OpPrereqError("One and only one of iallocator and primary"
3179                                  " node must be given")
3180
3181     if self.op.iallocator is not None:
3182       self._RunAllocator()
3183
3184     #### node related checks
3185
3186     # check primary node
3187     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3188     if pnode is None:
3189       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3190                                  self.op.pnode)
3191     self.op.pnode = pnode.name
3192     self.pnode = pnode
3193     self.secondaries = []
3194
3195     # mirror node verification
3196     if self.op.disk_template in constants.DTS_NET_MIRROR:
3197       if getattr(self.op, "snode", None) is None:
3198         raise errors.OpPrereqError("The networked disk templates need"
3199                                    " a mirror node")
3200
3201       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3202       if snode_name is None:
3203         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3204                                    self.op.snode)
3205       elif snode_name == pnode.name:
3206         raise errors.OpPrereqError("The secondary node cannot be"
3207                                    " the primary node.")
3208       self.secondaries.append(snode_name)
3209
3210     req_size = _ComputeDiskSize(self.op.disk_template,
3211                                 self.op.disk_size, self.op.swap_size)
3212
3213     # Check lv size requirements
3214     if req_size is not None:
3215       nodenames = [pnode.name] + self.secondaries
3216       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3217       for node in nodenames:
3218         info = nodeinfo.get(node, None)
3219         if not info:
3220           raise errors.OpPrereqError("Cannot get current information"
3221                                      " from node '%s'" % node)
3222         vg_free = info.get('vg_free', None)
3223         if not isinstance(vg_free, int):
3224           raise errors.OpPrereqError("Can't compute free disk space on"
3225                                      " node %s" % node)
3226         if req_size > info['vg_free']:
3227           raise errors.OpPrereqError("Not enough disk space on target node %s."
3228                                      " %d MB available, %d MB required" %
3229                                      (node, info['vg_free'], req_size))
3230
3231     # os verification
3232     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3233     if not os_obj:
3234       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3235                                  " primary node"  % self.op.os_type)
3236
3237     if self.op.kernel_path == constants.VALUE_NONE:
3238       raise errors.OpPrereqError("Can't set instance kernel to none")
3239
3240
3241     # bridge check on primary node
3242     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3243       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3244                                  " destination node '%s'" %
3245                                  (self.op.bridge, pnode.name))
3246
3247     # memory check on primary node
3248     if self.op.start:
3249       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3250                            "creating instance %s" % self.op.instance_name,
3251                            self.op.mem_size)
3252
3253     # hvm_cdrom_image_path verification
3254     if self.op.hvm_cdrom_image_path is not None:
3255       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3256         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3257                                    " be an absolute path or None, not %s" %
3258                                    self.op.hvm_cdrom_image_path)
3259       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3260         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3261                                    " regular file or a symlink pointing to"
3262                                    " an existing regular file, not %s" %
3263                                    self.op.hvm_cdrom_image_path)
3264
3265     # vnc_bind_address verification
3266     if self.op.vnc_bind_address is not None:
3267       if not utils.IsValidIP(self.op.vnc_bind_address):
3268         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3269                                    " like a valid IP address" %
3270                                    self.op.vnc_bind_address)
3271
3272     # Xen HVM device type checks
3273     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3274       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3275         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3276                                    " hypervisor" % self.op.hvm_nic_type)
3277       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3278         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3279                                    " hypervisor" % self.op.hvm_disk_type)
3280
3281     if self.op.start:
3282       self.instance_status = 'up'
3283     else:
3284       self.instance_status = 'down'
3285
3286   def Exec(self, feedback_fn):
3287     """Create and add the instance to the cluster.
3288
3289     """
3290     instance = self.op.instance_name
3291     pnode_name = self.pnode.name
3292
3293     if self.op.mac == "auto":
3294       mac_address = self.cfg.GenerateMAC()
3295     else:
3296       mac_address = self.op.mac
3297
3298     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3299     if self.inst_ip is not None:
3300       nic.ip = self.inst_ip
3301
3302     ht_kind = self.sstore.GetHypervisorType()
3303     if ht_kind in constants.HTS_REQ_PORT:
3304       network_port = self.cfg.AllocatePort()
3305     else:
3306       network_port = None
3307
3308     if self.op.vnc_bind_address is None:
3309       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3310
3311     # this is needed because os.path.join does not accept None arguments
3312     if self.op.file_storage_dir is None:
3313       string_file_storage_dir = ""
3314     else:
3315       string_file_storage_dir = self.op.file_storage_dir
3316
3317     # build the full file storage dir path
3318     file_storage_dir = os.path.normpath(os.path.join(
3319                                         self.sstore.GetFileStorageDir(),
3320                                         string_file_storage_dir, instance))
3321
3322
3323     disks = _GenerateDiskTemplate(self.cfg,
3324                                   self.op.disk_template,
3325                                   instance, pnode_name,
3326                                   self.secondaries, self.op.disk_size,
3327                                   self.op.swap_size,
3328                                   file_storage_dir,
3329                                   self.op.file_driver)
3330
3331     iobj = objects.Instance(name=instance, os=self.op.os_type,
3332                             primary_node=pnode_name,
3333                             memory=self.op.mem_size,
3334                             vcpus=self.op.vcpus,
3335                             nics=[nic], disks=disks,
3336                             disk_template=self.op.disk_template,
3337                             status=self.instance_status,
3338                             network_port=network_port,
3339                             kernel_path=self.op.kernel_path,
3340                             initrd_path=self.op.initrd_path,
3341                             hvm_boot_order=self.op.hvm_boot_order,
3342                             hvm_acpi=self.op.hvm_acpi,
3343                             hvm_pae=self.op.hvm_pae,
3344                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3345                             vnc_bind_address=self.op.vnc_bind_address,
3346                             hvm_nic_type=self.op.hvm_nic_type,
3347                             hvm_disk_type=self.op.hvm_disk_type,
3348                             )
3349
3350     feedback_fn("* creating instance disks...")
3351     if not _CreateDisks(self.cfg, iobj):
3352       _RemoveDisks(iobj, self.cfg)
3353       raise errors.OpExecError("Device creation failed, reverting...")
3354
3355     feedback_fn("adding instance %s to cluster config" % instance)
3356
3357     self.cfg.AddInstance(iobj)
3358     # Add the new instance to the Ganeti Lock Manager
3359     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3360
3361     if self.op.wait_for_sync:
3362       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3363     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3364       # make sure the disks are not degraded (still sync-ing is ok)
3365       time.sleep(15)
3366       feedback_fn("* checking mirrors status")
3367       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3368     else:
3369       disk_abort = False
3370
3371     if disk_abort:
3372       _RemoveDisks(iobj, self.cfg)
3373       self.cfg.RemoveInstance(iobj.name)
3374       # Remove the new instance from the Ganeti Lock Manager
3375       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3376       raise errors.OpExecError("There are some degraded disks for"
3377                                " this instance")
3378
3379     feedback_fn("creating os for instance %s on node %s" %
3380                 (instance, pnode_name))
3381
3382     if iobj.disk_template != constants.DT_DISKLESS:
3383       if self.op.mode == constants.INSTANCE_CREATE:
3384         feedback_fn("* running the instance OS create scripts...")
3385         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3386           raise errors.OpExecError("could not add os for instance %s"
3387                                    " on node %s" %
3388                                    (instance, pnode_name))
3389
3390       elif self.op.mode == constants.INSTANCE_IMPORT:
3391         feedback_fn("* running the instance OS import scripts...")
3392         src_node = self.op.src_node
3393         src_image = self.src_image
3394         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3395                                                 src_node, src_image):
3396           raise errors.OpExecError("Could not import os for instance"
3397                                    " %s on node %s" %
3398                                    (instance, pnode_name))
3399       else:
3400         # also checked in the prereq part
3401         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3402                                      % self.op.mode)
3403
3404     if self.op.start:
3405       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3406       feedback_fn("* starting instance...")
3407       if not rpc.call_instance_start(pnode_name, iobj, None):
3408         raise errors.OpExecError("Could not start instance")
3409
3410
3411 class LUConnectConsole(NoHooksLU):
3412   """Connect to an instance's console.
3413
3414   This is somewhat special in that it returns the command line that
3415   you need to run on the master node in order to connect to the
3416   console.
3417
3418   """
3419   _OP_REQP = ["instance_name"]
3420   REQ_BGL = False
3421
3422   def ExpandNames(self):
3423     self._ExpandAndLockInstance()
3424
3425   def CheckPrereq(self):
3426     """Check prerequisites.
3427
3428     This checks that the instance is in the cluster.
3429
3430     """
3431     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3432     assert self.instance is not None, \
3433       "Cannot retrieve locked instance %s" % self.op.instance_name
3434
3435   def Exec(self, feedback_fn):
3436     """Connect to the console of an instance
3437
3438     """
3439     instance = self.instance
3440     node = instance.primary_node
3441
3442     node_insts = rpc.call_instance_list([node])[node]
3443     if node_insts is False:
3444       raise errors.OpExecError("Can't connect to node %s." % node)
3445
3446     if instance.name not in node_insts:
3447       raise errors.OpExecError("Instance %s is not running." % instance.name)
3448
3449     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3450
3451     hyper = hypervisor.GetHypervisor()
3452     console_cmd = hyper.GetShellCommandForConsole(instance)
3453
3454     # build ssh cmdline
3455     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3456
3457
3458 class LUReplaceDisks(LogicalUnit):
3459   """Replace the disks of an instance.
3460
3461   """
3462   HPATH = "mirrors-replace"
3463   HTYPE = constants.HTYPE_INSTANCE
3464   _OP_REQP = ["instance_name", "mode", "disks"]
3465
3466   def _RunAllocator(self):
3467     """Compute a new secondary node using an IAllocator.
3468
3469     """
3470     ial = IAllocator(self.cfg, self.sstore,
3471                      mode=constants.IALLOCATOR_MODE_RELOC,
3472                      name=self.op.instance_name,
3473                      relocate_from=[self.sec_node])
3474
3475     ial.Run(self.op.iallocator)
3476
3477     if not ial.success:
3478       raise errors.OpPrereqError("Can't compute nodes using"
3479                                  " iallocator '%s': %s" % (self.op.iallocator,
3480                                                            ial.info))
3481     if len(ial.nodes) != ial.required_nodes:
3482       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3483                                  " of nodes (%s), required %s" %
3484                                  (len(ial.nodes), ial.required_nodes))
3485     self.op.remote_node = ial.nodes[0]
3486     logger.ToStdout("Selected new secondary for the instance: %s" %
3487                     self.op.remote_node)
3488
3489   def BuildHooksEnv(self):
3490     """Build hooks env.
3491
3492     This runs on the master, the primary and all the secondaries.
3493
3494     """
3495     env = {
3496       "MODE": self.op.mode,
3497       "NEW_SECONDARY": self.op.remote_node,
3498       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3499       }
3500     env.update(_BuildInstanceHookEnvByObject(self.instance))
3501     nl = [
3502       self.sstore.GetMasterNode(),
3503       self.instance.primary_node,
3504       ]
3505     if self.op.remote_node is not None:
3506       nl.append(self.op.remote_node)
3507     return env, nl, nl
3508
3509   def CheckPrereq(self):
3510     """Check prerequisites.
3511
3512     This checks that the instance is in the cluster.
3513
3514     """
3515     if not hasattr(self.op, "remote_node"):
3516       self.op.remote_node = None
3517
3518     instance = self.cfg.GetInstanceInfo(
3519       self.cfg.ExpandInstanceName(self.op.instance_name))
3520     if instance is None:
3521       raise errors.OpPrereqError("Instance '%s' not known" %
3522                                  self.op.instance_name)
3523     self.instance = instance
3524     self.op.instance_name = instance.name
3525
3526     if instance.disk_template not in constants.DTS_NET_MIRROR:
3527       raise errors.OpPrereqError("Instance's disk layout is not"
3528                                  " network mirrored.")
3529
3530     if len(instance.secondary_nodes) != 1:
3531       raise errors.OpPrereqError("The instance has a strange layout,"
3532                                  " expected one secondary but found %d" %
3533                                  len(instance.secondary_nodes))
3534
3535     self.sec_node = instance.secondary_nodes[0]
3536
3537     ia_name = getattr(self.op, "iallocator", None)
3538     if ia_name is not None:
3539       if self.op.remote_node is not None:
3540         raise errors.OpPrereqError("Give either the iallocator or the new"
3541                                    " secondary, not both")
3542       self.op.remote_node = self._RunAllocator()
3543
3544     remote_node = self.op.remote_node
3545     if remote_node is not None:
3546       remote_node = self.cfg.ExpandNodeName(remote_node)
3547       if remote_node is None:
3548         raise errors.OpPrereqError("Node '%s' not known" %
3549                                    self.op.remote_node)
3550       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3551     else:
3552       self.remote_node_info = None
3553     if remote_node == instance.primary_node:
3554       raise errors.OpPrereqError("The specified node is the primary node of"
3555                                  " the instance.")
3556     elif remote_node == self.sec_node:
3557       if self.op.mode == constants.REPLACE_DISK_SEC:
3558         # this is for DRBD8, where we can't execute the same mode of
3559         # replacement as for drbd7 (no different port allocated)
3560         raise errors.OpPrereqError("Same secondary given, cannot execute"
3561                                    " replacement")
3562     if instance.disk_template == constants.DT_DRBD8:
3563       if (self.op.mode == constants.REPLACE_DISK_ALL and
3564           remote_node is not None):
3565         # switch to replace secondary mode
3566         self.op.mode = constants.REPLACE_DISK_SEC
3567
3568       if self.op.mode == constants.REPLACE_DISK_ALL:
3569         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3570                                    " secondary disk replacement, not"
3571                                    " both at once")
3572       elif self.op.mode == constants.REPLACE_DISK_PRI:
3573         if remote_node is not None:
3574           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3575                                      " the secondary while doing a primary"
3576                                      " node disk replacement")
3577         self.tgt_node = instance.primary_node
3578         self.oth_node = instance.secondary_nodes[0]
3579       elif self.op.mode == constants.REPLACE_DISK_SEC:
3580         self.new_node = remote_node # this can be None, in which case
3581                                     # we don't change the secondary
3582         self.tgt_node = instance.secondary_nodes[0]
3583         self.oth_node = instance.primary_node
3584       else:
3585         raise errors.ProgrammerError("Unhandled disk replace mode")
3586
3587     for name in self.op.disks:
3588       if instance.FindDisk(name) is None:
3589         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3590                                    (name, instance.name))
3591     self.op.remote_node = remote_node
3592
3593   def _ExecD8DiskOnly(self, feedback_fn):
3594     """Replace a disk on the primary or secondary for dbrd8.
3595
3596     The algorithm for replace is quite complicated:
3597       - for each disk to be replaced:
3598         - create new LVs on the target node with unique names
3599         - detach old LVs from the drbd device
3600         - rename old LVs to name_replaced.<time_t>
3601         - rename new LVs to old LVs
3602         - attach the new LVs (with the old names now) to the drbd device
3603       - wait for sync across all devices
3604       - for each modified disk:
3605         - remove old LVs (which have the name name_replaces.<time_t>)
3606
3607     Failures are not very well handled.
3608
3609     """
3610     steps_total = 6
3611     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3612     instance = self.instance
3613     iv_names = {}
3614     vgname = self.cfg.GetVGName()
3615     # start of work
3616     cfg = self.cfg
3617     tgt_node = self.tgt_node
3618     oth_node = self.oth_node
3619
3620     # Step: check device activation
3621     self.proc.LogStep(1, steps_total, "check device existence")
3622     info("checking volume groups")
3623     my_vg = cfg.GetVGName()
3624     results = rpc.call_vg_list([oth_node, tgt_node])
3625     if not results:
3626       raise errors.OpExecError("Can't list volume groups on the nodes")
3627     for node in oth_node, tgt_node:
3628       res = results.get(node, False)
3629       if not res or my_vg not in res:
3630         raise errors.OpExecError("Volume group '%s' not found on %s" %
3631                                  (my_vg, node))
3632     for dev in instance.disks:
3633       if not dev.iv_name in self.op.disks:
3634         continue
3635       for node in tgt_node, oth_node:
3636         info("checking %s on %s" % (dev.iv_name, node))
3637         cfg.SetDiskID(dev, node)
3638         if not rpc.call_blockdev_find(node, dev):
3639           raise errors.OpExecError("Can't find device %s on node %s" %
3640                                    (dev.iv_name, node))
3641
3642     # Step: check other node consistency
3643     self.proc.LogStep(2, steps_total, "check peer consistency")
3644     for dev in instance.disks:
3645       if not dev.iv_name in self.op.disks:
3646         continue
3647       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3648       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3649                                    oth_node==instance.primary_node):
3650         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3651                                  " to replace disks on this node (%s)" %
3652                                  (oth_node, tgt_node))
3653
3654     # Step: create new storage
3655     self.proc.LogStep(3, steps_total, "allocate new storage")
3656     for dev in instance.disks:
3657       if not dev.iv_name in self.op.disks:
3658         continue
3659       size = dev.size
3660       cfg.SetDiskID(dev, tgt_node)
3661       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3662       names = _GenerateUniqueNames(cfg, lv_names)
3663       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3664                              logical_id=(vgname, names[0]))
3665       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3666                              logical_id=(vgname, names[1]))
3667       new_lvs = [lv_data, lv_meta]
3668       old_lvs = dev.children
3669       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3670       info("creating new local storage on %s for %s" %
3671            (tgt_node, dev.iv_name))
3672       # since we *always* want to create this LV, we use the
3673       # _Create...OnPrimary (which forces the creation), even if we
3674       # are talking about the secondary node
3675       for new_lv in new_lvs:
3676         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3677                                         _GetInstanceInfoText(instance)):
3678           raise errors.OpExecError("Failed to create new LV named '%s' on"
3679                                    " node '%s'" %
3680                                    (new_lv.logical_id[1], tgt_node))
3681
3682     # Step: for each lv, detach+rename*2+attach
3683     self.proc.LogStep(4, steps_total, "change drbd configuration")
3684     for dev, old_lvs, new_lvs in iv_names.itervalues():
3685       info("detaching %s drbd from local storage" % dev.iv_name)
3686       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3687         raise errors.OpExecError("Can't detach drbd from local storage on node"
3688                                  " %s for device %s" % (tgt_node, dev.iv_name))
3689       #dev.children = []
3690       #cfg.Update(instance)
3691
3692       # ok, we created the new LVs, so now we know we have the needed
3693       # storage; as such, we proceed on the target node to rename
3694       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3695       # using the assumption that logical_id == physical_id (which in
3696       # turn is the unique_id on that node)
3697
3698       # FIXME(iustin): use a better name for the replaced LVs
3699       temp_suffix = int(time.time())
3700       ren_fn = lambda d, suff: (d.physical_id[0],
3701                                 d.physical_id[1] + "_replaced-%s" % suff)
3702       # build the rename list based on what LVs exist on the node
3703       rlist = []
3704       for to_ren in old_lvs:
3705         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3706         if find_res is not None: # device exists
3707           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3708
3709       info("renaming the old LVs on the target node")
3710       if not rpc.call_blockdev_rename(tgt_node, rlist):
3711         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3712       # now we rename the new LVs to the old LVs
3713       info("renaming the new LVs on the target node")
3714       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3715       if not rpc.call_blockdev_rename(tgt_node, rlist):
3716         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3717
3718       for old, new in zip(old_lvs, new_lvs):
3719         new.logical_id = old.logical_id
3720         cfg.SetDiskID(new, tgt_node)
3721
3722       for disk in old_lvs:
3723         disk.logical_id = ren_fn(disk, temp_suffix)
3724         cfg.SetDiskID(disk, tgt_node)
3725
3726       # now that the new lvs have the old name, we can add them to the device
3727       info("adding new mirror component on %s" % tgt_node)
3728       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3729         for new_lv in new_lvs:
3730           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3731             warning("Can't rollback device %s", hint="manually cleanup unused"
3732                     " logical volumes")
3733         raise errors.OpExecError("Can't add local storage to drbd")
3734
3735       dev.children = new_lvs
3736       cfg.Update(instance)
3737
3738     # Step: wait for sync
3739
3740     # this can fail as the old devices are degraded and _WaitForSync
3741     # does a combined result over all disks, so we don't check its
3742     # return value
3743     self.proc.LogStep(5, steps_total, "sync devices")
3744     _WaitForSync(cfg, instance, self.proc, unlock=True)
3745
3746     # so check manually all the devices
3747     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3748       cfg.SetDiskID(dev, instance.primary_node)
3749       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3750       if is_degr:
3751         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3752
3753     # Step: remove old storage
3754     self.proc.LogStep(6, steps_total, "removing old storage")
3755     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3756       info("remove logical volumes for %s" % name)
3757       for lv in old_lvs:
3758         cfg.SetDiskID(lv, tgt_node)
3759         if not rpc.call_blockdev_remove(tgt_node, lv):
3760           warning("Can't remove old LV", hint="manually remove unused LVs")
3761           continue
3762
3763   def _ExecD8Secondary(self, feedback_fn):
3764     """Replace the secondary node for drbd8.
3765
3766     The algorithm for replace is quite complicated:
3767       - for all disks of the instance:
3768         - create new LVs on the new node with same names
3769         - shutdown the drbd device on the old secondary
3770         - disconnect the drbd network on the primary
3771         - create the drbd device on the new secondary
3772         - network attach the drbd on the primary, using an artifice:
3773           the drbd code for Attach() will connect to the network if it
3774           finds a device which is connected to the good local disks but
3775           not network enabled
3776       - wait for sync across all devices
3777       - remove all disks from the old secondary
3778
3779     Failures are not very well handled.
3780
3781     """
3782     steps_total = 6
3783     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3784     instance = self.instance
3785     iv_names = {}
3786     vgname = self.cfg.GetVGName()
3787     # start of work
3788     cfg = self.cfg
3789     old_node = self.tgt_node
3790     new_node = self.new_node
3791     pri_node = instance.primary_node
3792
3793     # Step: check device activation
3794     self.proc.LogStep(1, steps_total, "check device existence")
3795     info("checking volume groups")
3796     my_vg = cfg.GetVGName()
3797     results = rpc.call_vg_list([pri_node, new_node])
3798     if not results:
3799       raise errors.OpExecError("Can't list volume groups on the nodes")
3800     for node in pri_node, new_node:
3801       res = results.get(node, False)
3802       if not res or my_vg not in res:
3803         raise errors.OpExecError("Volume group '%s' not found on %s" %
3804                                  (my_vg, node))
3805     for dev in instance.disks:
3806       if not dev.iv_name in self.op.disks:
3807         continue
3808       info("checking %s on %s" % (dev.iv_name, pri_node))
3809       cfg.SetDiskID(dev, pri_node)
3810       if not rpc.call_blockdev_find(pri_node, dev):
3811         raise errors.OpExecError("Can't find device %s on node %s" %
3812                                  (dev.iv_name, pri_node))
3813
3814     # Step: check other node consistency
3815     self.proc.LogStep(2, steps_total, "check peer consistency")
3816     for dev in instance.disks:
3817       if not dev.iv_name in self.op.disks:
3818         continue
3819       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3820       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3821         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3822                                  " unsafe to replace the secondary" %
3823                                  pri_node)
3824
3825     # Step: create new storage
3826     self.proc.LogStep(3, steps_total, "allocate new storage")
3827     for dev in instance.disks:
3828       size = dev.size
3829       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3830       # since we *always* want to create this LV, we use the
3831       # _Create...OnPrimary (which forces the creation), even if we
3832       # are talking about the secondary node
3833       for new_lv in dev.children:
3834         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3835                                         _GetInstanceInfoText(instance)):
3836           raise errors.OpExecError("Failed to create new LV named '%s' on"
3837                                    " node '%s'" %
3838                                    (new_lv.logical_id[1], new_node))
3839
3840       iv_names[dev.iv_name] = (dev, dev.children)
3841
3842     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3843     for dev in instance.disks:
3844       size = dev.size
3845       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3846       # create new devices on new_node
3847       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3848                               logical_id=(pri_node, new_node,
3849                                           dev.logical_id[2]),
3850                               children=dev.children)
3851       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3852                                         new_drbd, False,
3853                                       _GetInstanceInfoText(instance)):
3854         raise errors.OpExecError("Failed to create new DRBD on"
3855                                  " node '%s'" % new_node)
3856
3857     for dev in instance.disks:
3858       # we have new devices, shutdown the drbd on the old secondary
3859       info("shutting down drbd for %s on old node" % dev.iv_name)
3860       cfg.SetDiskID(dev, old_node)
3861       if not rpc.call_blockdev_shutdown(old_node, dev):
3862         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3863                 hint="Please cleanup this device manually as soon as possible")
3864
3865     info("detaching primary drbds from the network (=> standalone)")
3866     done = 0
3867     for dev in instance.disks:
3868       cfg.SetDiskID(dev, pri_node)
3869       # set the physical (unique in bdev terms) id to None, meaning
3870       # detach from network
3871       dev.physical_id = (None,) * len(dev.physical_id)
3872       # and 'find' the device, which will 'fix' it to match the
3873       # standalone state
3874       if rpc.call_blockdev_find(pri_node, dev):
3875         done += 1
3876       else:
3877         warning("Failed to detach drbd %s from network, unusual case" %
3878                 dev.iv_name)
3879
3880     if not done:
3881       # no detaches succeeded (very unlikely)
3882       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3883
3884     # if we managed to detach at least one, we update all the disks of
3885     # the instance to point to the new secondary
3886     info("updating instance configuration")
3887     for dev in instance.disks:
3888       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3889       cfg.SetDiskID(dev, pri_node)
3890     cfg.Update(instance)
3891
3892     # and now perform the drbd attach
3893     info("attaching primary drbds to new secondary (standalone => connected)")
3894     failures = []
3895     for dev in instance.disks:
3896       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3897       # since the attach is smart, it's enough to 'find' the device,
3898       # it will automatically activate the network, if the physical_id
3899       # is correct
3900       cfg.SetDiskID(dev, pri_node)
3901       if not rpc.call_blockdev_find(pri_node, dev):
3902         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3903                 "please do a gnt-instance info to see the status of disks")
3904
3905     # this can fail as the old devices are degraded and _WaitForSync
3906     # does a combined result over all disks, so we don't check its
3907     # return value
3908     self.proc.LogStep(5, steps_total, "sync devices")
3909     _WaitForSync(cfg, instance, self.proc, unlock=True)
3910
3911     # so check manually all the devices
3912     for name, (dev, old_lvs) in iv_names.iteritems():
3913       cfg.SetDiskID(dev, pri_node)
3914       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3915       if is_degr:
3916         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3917
3918     self.proc.LogStep(6, steps_total, "removing old storage")
3919     for name, (dev, old_lvs) in iv_names.iteritems():
3920       info("remove logical volumes for %s" % name)
3921       for lv in old_lvs:
3922         cfg.SetDiskID(lv, old_node)
3923         if not rpc.call_blockdev_remove(old_node, lv):
3924           warning("Can't remove LV on old secondary",
3925                   hint="Cleanup stale volumes by hand")
3926
3927   def Exec(self, feedback_fn):
3928     """Execute disk replacement.
3929
3930     This dispatches the disk replacement to the appropriate handler.
3931
3932     """
3933     instance = self.instance
3934
3935     # Activate the instance disks if we're replacing them on a down instance
3936     if instance.status == "down":
3937       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3938       self.proc.ChainOpCode(op)
3939
3940     if instance.disk_template == constants.DT_DRBD8:
3941       if self.op.remote_node is None:
3942         fn = self._ExecD8DiskOnly
3943       else:
3944         fn = self._ExecD8Secondary
3945     else:
3946       raise errors.ProgrammerError("Unhandled disk replacement case")
3947
3948     ret = fn(feedback_fn)
3949
3950     # Deactivate the instance disks if we're replacing them on a down instance
3951     if instance.status == "down":
3952       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3953       self.proc.ChainOpCode(op)
3954
3955     return ret
3956
3957
3958 class LUGrowDisk(LogicalUnit):
3959   """Grow a disk of an instance.
3960
3961   """
3962   HPATH = "disk-grow"
3963   HTYPE = constants.HTYPE_INSTANCE
3964   _OP_REQP = ["instance_name", "disk", "amount"]
3965
3966   def BuildHooksEnv(self):
3967     """Build hooks env.
3968
3969     This runs on the master, the primary and all the secondaries.
3970
3971     """
3972     env = {
3973       "DISK": self.op.disk,
3974       "AMOUNT": self.op.amount,
3975       }
3976     env.update(_BuildInstanceHookEnvByObject(self.instance))
3977     nl = [
3978       self.sstore.GetMasterNode(),
3979       self.instance.primary_node,
3980       ]
3981     return env, nl, nl
3982
3983   def CheckPrereq(self):
3984     """Check prerequisites.
3985
3986     This checks that the instance is in the cluster.
3987
3988     """
3989     instance = self.cfg.GetInstanceInfo(
3990       self.cfg.ExpandInstanceName(self.op.instance_name))
3991     if instance is None:
3992       raise errors.OpPrereqError("Instance '%s' not known" %
3993                                  self.op.instance_name)
3994     self.instance = instance
3995     self.op.instance_name = instance.name
3996
3997     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3998       raise errors.OpPrereqError("Instance's disk layout does not support"
3999                                  " growing.")
4000
4001     if instance.FindDisk(self.op.disk) is None:
4002       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4003                                  (self.op.disk, instance.name))
4004
4005     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4006     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4007     for node in nodenames:
4008       info = nodeinfo.get(node, None)
4009       if not info:
4010         raise errors.OpPrereqError("Cannot get current information"
4011                                    " from node '%s'" % node)
4012       vg_free = info.get('vg_free', None)
4013       if not isinstance(vg_free, int):
4014         raise errors.OpPrereqError("Can't compute free disk space on"
4015                                    " node %s" % node)
4016       if self.op.amount > info['vg_free']:
4017         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4018                                    " %d MiB available, %d MiB required" %
4019                                    (node, info['vg_free'], self.op.amount))
4020
4021   def Exec(self, feedback_fn):
4022     """Execute disk grow.
4023
4024     """
4025     instance = self.instance
4026     disk = instance.FindDisk(self.op.disk)
4027     for node in (instance.secondary_nodes + (instance.primary_node,)):
4028       self.cfg.SetDiskID(disk, node)
4029       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4030       if not result or not isinstance(result, tuple) or len(result) != 2:
4031         raise errors.OpExecError("grow request failed to node %s" % node)
4032       elif not result[0]:
4033         raise errors.OpExecError("grow request failed to node %s: %s" %
4034                                  (node, result[1]))
4035     disk.RecordGrow(self.op.amount)
4036     self.cfg.Update(instance)
4037     return
4038
4039
4040 class LUQueryInstanceData(NoHooksLU):
4041   """Query runtime instance data.
4042
4043   """
4044   _OP_REQP = ["instances"]
4045
4046   def CheckPrereq(self):
4047     """Check prerequisites.
4048
4049     This only checks the optional instance list against the existing names.
4050
4051     """
4052     if not isinstance(self.op.instances, list):
4053       raise errors.OpPrereqError("Invalid argument type 'instances'")
4054     if self.op.instances:
4055       self.wanted_instances = []
4056       names = self.op.instances
4057       for name in names:
4058         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4059         if instance is None:
4060           raise errors.OpPrereqError("No such instance name '%s'" % name)
4061         self.wanted_instances.append(instance)
4062     else:
4063       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4064                                in self.cfg.GetInstanceList()]
4065     return
4066
4067
4068   def _ComputeDiskStatus(self, instance, snode, dev):
4069     """Compute block device status.
4070
4071     """
4072     self.cfg.SetDiskID(dev, instance.primary_node)
4073     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4074     if dev.dev_type in constants.LDS_DRBD:
4075       # we change the snode then (otherwise we use the one passed in)
4076       if dev.logical_id[0] == instance.primary_node:
4077         snode = dev.logical_id[1]
4078       else:
4079         snode = dev.logical_id[0]
4080
4081     if snode:
4082       self.cfg.SetDiskID(dev, snode)
4083       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4084     else:
4085       dev_sstatus = None
4086
4087     if dev.children:
4088       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4089                       for child in dev.children]
4090     else:
4091       dev_children = []
4092
4093     data = {
4094       "iv_name": dev.iv_name,
4095       "dev_type": dev.dev_type,
4096       "logical_id": dev.logical_id,
4097       "physical_id": dev.physical_id,
4098       "pstatus": dev_pstatus,
4099       "sstatus": dev_sstatus,
4100       "children": dev_children,
4101       }
4102
4103     return data
4104
4105   def Exec(self, feedback_fn):
4106     """Gather and return data"""
4107     result = {}
4108     for instance in self.wanted_instances:
4109       remote_info = rpc.call_instance_info(instance.primary_node,
4110                                                 instance.name)
4111       if remote_info and "state" in remote_info:
4112         remote_state = "up"
4113       else:
4114         remote_state = "down"
4115       if instance.status == "down":
4116         config_state = "down"
4117       else:
4118         config_state = "up"
4119
4120       disks = [self._ComputeDiskStatus(instance, None, device)
4121                for device in instance.disks]
4122
4123       idict = {
4124         "name": instance.name,
4125         "config_state": config_state,
4126         "run_state": remote_state,
4127         "pnode": instance.primary_node,
4128         "snodes": instance.secondary_nodes,
4129         "os": instance.os,
4130         "memory": instance.memory,
4131         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4132         "disks": disks,
4133         "vcpus": instance.vcpus,
4134         }
4135
4136       htkind = self.sstore.GetHypervisorType()
4137       if htkind == constants.HT_XEN_PVM30:
4138         idict["kernel_path"] = instance.kernel_path
4139         idict["initrd_path"] = instance.initrd_path
4140
4141       if htkind == constants.HT_XEN_HVM31:
4142         idict["hvm_boot_order"] = instance.hvm_boot_order
4143         idict["hvm_acpi"] = instance.hvm_acpi
4144         idict["hvm_pae"] = instance.hvm_pae
4145         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4146         idict["hvm_nic_type"] = instance.hvm_nic_type
4147         idict["hvm_disk_type"] = instance.hvm_disk_type
4148
4149       if htkind in constants.HTS_REQ_PORT:
4150         idict["vnc_bind_address"] = instance.vnc_bind_address
4151         idict["network_port"] = instance.network_port
4152
4153       result[instance.name] = idict
4154
4155     return result
4156
4157
4158 class LUSetInstanceParams(LogicalUnit):
4159   """Modifies an instances's parameters.
4160
4161   """
4162   HPATH = "instance-modify"
4163   HTYPE = constants.HTYPE_INSTANCE
4164   _OP_REQP = ["instance_name"]
4165   REQ_BGL = False
4166
4167   def ExpandNames(self):
4168     self._ExpandAndLockInstance()
4169
4170   def BuildHooksEnv(self):
4171     """Build hooks env.
4172
4173     This runs on the master, primary and secondaries.
4174
4175     """
4176     args = dict()
4177     if self.mem:
4178       args['memory'] = self.mem
4179     if self.vcpus:
4180       args['vcpus'] = self.vcpus
4181     if self.do_ip or self.do_bridge or self.mac:
4182       if self.do_ip:
4183         ip = self.ip
4184       else:
4185         ip = self.instance.nics[0].ip
4186       if self.bridge:
4187         bridge = self.bridge
4188       else:
4189         bridge = self.instance.nics[0].bridge
4190       if self.mac:
4191         mac = self.mac
4192       else:
4193         mac = self.instance.nics[0].mac
4194       args['nics'] = [(ip, bridge, mac)]
4195     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4196     nl = [self.sstore.GetMasterNode(),
4197           self.instance.primary_node] + list(self.instance.secondary_nodes)
4198     return env, nl, nl
4199
4200   def CheckPrereq(self):
4201     """Check prerequisites.
4202
4203     This only checks the instance list against the existing names.
4204
4205     """
4206     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4207     # a separate CheckArguments function, if we implement one, so the operation
4208     # can be aborted without waiting for any lock, should it have an error...
4209     self.mem = getattr(self.op, "mem", None)
4210     self.vcpus = getattr(self.op, "vcpus", None)
4211     self.ip = getattr(self.op, "ip", None)
4212     self.mac = getattr(self.op, "mac", None)
4213     self.bridge = getattr(self.op, "bridge", None)
4214     self.kernel_path = getattr(self.op, "kernel_path", None)
4215     self.initrd_path = getattr(self.op, "initrd_path", None)
4216     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4217     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4218     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4219     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4220     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4221     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4222     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4223     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4224                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4225                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4226                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4227     if all_parms.count(None) == len(all_parms):
4228       raise errors.OpPrereqError("No changes submitted")
4229     if self.mem is not None:
4230       try:
4231         self.mem = int(self.mem)
4232       except ValueError, err:
4233         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4234     if self.vcpus is not None:
4235       try:
4236         self.vcpus = int(self.vcpus)
4237       except ValueError, err:
4238         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4239     if self.ip is not None:
4240       self.do_ip = True
4241       if self.ip.lower() == "none":
4242         self.ip = None
4243       else:
4244         if not utils.IsValidIP(self.ip):
4245           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4246     else:
4247       self.do_ip = False
4248     self.do_bridge = (self.bridge is not None)
4249     if self.mac is not None:
4250       if self.cfg.IsMacInUse(self.mac):
4251         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4252                                    self.mac)
4253       if not utils.IsValidMac(self.mac):
4254         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4255
4256     if self.kernel_path is not None:
4257       self.do_kernel_path = True
4258       if self.kernel_path == constants.VALUE_NONE:
4259         raise errors.OpPrereqError("Can't set instance to no kernel")
4260
4261       if self.kernel_path != constants.VALUE_DEFAULT:
4262         if not os.path.isabs(self.kernel_path):
4263           raise errors.OpPrereqError("The kernel path must be an absolute"
4264                                     " filename")
4265     else:
4266       self.do_kernel_path = False
4267
4268     if self.initrd_path is not None:
4269       self.do_initrd_path = True
4270       if self.initrd_path not in (constants.VALUE_NONE,
4271                                   constants.VALUE_DEFAULT):
4272         if not os.path.isabs(self.initrd_path):
4273           raise errors.OpPrereqError("The initrd path must be an absolute"
4274                                     " filename")
4275     else:
4276       self.do_initrd_path = False
4277
4278     # boot order verification
4279     if self.hvm_boot_order is not None:
4280       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4281         if len(self.hvm_boot_order.strip("acdn")) != 0:
4282           raise errors.OpPrereqError("invalid boot order specified,"
4283                                      " must be one or more of [acdn]"
4284                                      " or 'default'")
4285
4286     # hvm_cdrom_image_path verification
4287     if self.op.hvm_cdrom_image_path is not None:
4288       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4289               self.op.hvm_cdrom_image_path.lower() == "none"):
4290         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4291                                    " be an absolute path or None, not %s" %
4292                                    self.op.hvm_cdrom_image_path)
4293       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4294               self.op.hvm_cdrom_image_path.lower() == "none"):
4295         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4296                                    " regular file or a symlink pointing to"
4297                                    " an existing regular file, not %s" %
4298                                    self.op.hvm_cdrom_image_path)
4299
4300     # vnc_bind_address verification
4301     if self.op.vnc_bind_address is not None:
4302       if not utils.IsValidIP(self.op.vnc_bind_address):
4303         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4304                                    " like a valid IP address" %
4305                                    self.op.vnc_bind_address)
4306
4307     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4308     assert self.instance is not None, \
4309       "Cannot retrieve locked instance %s" % self.op.instance_name
4310     return
4311
4312   def Exec(self, feedback_fn):
4313     """Modifies an instance.
4314
4315     All parameters take effect only at the next restart of the instance.
4316     """
4317     result = []
4318     instance = self.instance
4319     if self.mem:
4320       instance.memory = self.mem
4321       result.append(("mem", self.mem))
4322     if self.vcpus:
4323       instance.vcpus = self.vcpus
4324       result.append(("vcpus",  self.vcpus))
4325     if self.do_ip:
4326       instance.nics[0].ip = self.ip
4327       result.append(("ip", self.ip))
4328     if self.bridge:
4329       instance.nics[0].bridge = self.bridge
4330       result.append(("bridge", self.bridge))
4331     if self.mac:
4332       instance.nics[0].mac = self.mac
4333       result.append(("mac", self.mac))
4334     if self.do_kernel_path:
4335       instance.kernel_path = self.kernel_path
4336       result.append(("kernel_path", self.kernel_path))
4337     if self.do_initrd_path:
4338       instance.initrd_path = self.initrd_path
4339       result.append(("initrd_path", self.initrd_path))
4340     if self.hvm_boot_order:
4341       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4342         instance.hvm_boot_order = None
4343       else:
4344         instance.hvm_boot_order = self.hvm_boot_order
4345       result.append(("hvm_boot_order", self.hvm_boot_order))
4346     if self.hvm_acpi is not None:
4347       instance.hvm_acpi = self.hvm_acpi
4348       result.append(("hvm_acpi", self.hvm_acpi))
4349     if self.hvm_pae is not None:
4350       instance.hvm_pae = self.hvm_pae
4351       result.append(("hvm_pae", self.hvm_pae))
4352     if self.hvm_nic_type is not None:
4353       instance.hvm_nic_type = self.hvm_nic_type
4354       result.append(("hvm_nic_type", self.hvm_nic_type))
4355     if self.hvm_disk_type is not None:
4356       instance.hvm_disk_type = self.hvm_disk_type
4357       result.append(("hvm_disk_type", self.hvm_disk_type))
4358     if self.hvm_cdrom_image_path:
4359       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4360         instance.hvm_cdrom_image_path = None
4361       else:
4362         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4363       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4364     if self.vnc_bind_address:
4365       instance.vnc_bind_address = self.vnc_bind_address
4366       result.append(("vnc_bind_address", self.vnc_bind_address))
4367
4368     self.cfg.Update(instance)
4369
4370     return result
4371
4372
4373 class LUQueryExports(NoHooksLU):
4374   """Query the exports list
4375
4376   """
4377   _OP_REQP = []
4378
4379   def CheckPrereq(self):
4380     """Check that the nodelist contains only existing nodes.
4381
4382     """
4383     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4384
4385   def Exec(self, feedback_fn):
4386     """Compute the list of all the exported system images.
4387
4388     Returns:
4389       a dictionary with the structure node->(export-list)
4390       where export-list is a list of the instances exported on
4391       that node.
4392
4393     """
4394     return rpc.call_export_list(self.nodes)
4395
4396
4397 class LUExportInstance(LogicalUnit):
4398   """Export an instance to an image in the cluster.
4399
4400   """
4401   HPATH = "instance-export"
4402   HTYPE = constants.HTYPE_INSTANCE
4403   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4404
4405   def BuildHooksEnv(self):
4406     """Build hooks env.
4407
4408     This will run on the master, primary node and target node.
4409
4410     """
4411     env = {
4412       "EXPORT_NODE": self.op.target_node,
4413       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4414       }
4415     env.update(_BuildInstanceHookEnvByObject(self.instance))
4416     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4417           self.op.target_node]
4418     return env, nl, nl
4419
4420   def CheckPrereq(self):
4421     """Check prerequisites.
4422
4423     This checks that the instance and node names are valid.
4424
4425     """
4426     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4427     self.instance = self.cfg.GetInstanceInfo(instance_name)
4428     if self.instance is None:
4429       raise errors.OpPrereqError("Instance '%s' not found" %
4430                                  self.op.instance_name)
4431
4432     # node verification
4433     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4434     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4435
4436     if self.dst_node is None:
4437       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4438                                  self.op.target_node)
4439     self.op.target_node = self.dst_node.name
4440
4441     # instance disk type verification
4442     for disk in self.instance.disks:
4443       if disk.dev_type == constants.LD_FILE:
4444         raise errors.OpPrereqError("Export not supported for instances with"
4445                                    " file-based disks")
4446
4447   def Exec(self, feedback_fn):
4448     """Export an instance to an image in the cluster.
4449
4450     """
4451     instance = self.instance
4452     dst_node = self.dst_node
4453     src_node = instance.primary_node
4454     if self.op.shutdown:
4455       # shutdown the instance, but not the disks
4456       if not rpc.call_instance_shutdown(src_node, instance):
4457         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4458                                  (instance.name, src_node))
4459
4460     vgname = self.cfg.GetVGName()
4461
4462     snap_disks = []
4463
4464     try:
4465       for disk in instance.disks:
4466         if disk.iv_name == "sda":
4467           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4468           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4469
4470           if not new_dev_name:
4471             logger.Error("could not snapshot block device %s on node %s" %
4472                          (disk.logical_id[1], src_node))
4473           else:
4474             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4475                                       logical_id=(vgname, new_dev_name),
4476                                       physical_id=(vgname, new_dev_name),
4477                                       iv_name=disk.iv_name)
4478             snap_disks.append(new_dev)
4479
4480     finally:
4481       if self.op.shutdown and instance.status == "up":
4482         if not rpc.call_instance_start(src_node, instance, None):
4483           _ShutdownInstanceDisks(instance, self.cfg)
4484           raise errors.OpExecError("Could not start instance")
4485
4486     # TODO: check for size
4487
4488     for dev in snap_disks:
4489       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4490         logger.Error("could not export block device %s from node %s to node %s"
4491                      % (dev.logical_id[1], src_node, dst_node.name))
4492       if not rpc.call_blockdev_remove(src_node, dev):
4493         logger.Error("could not remove snapshot block device %s from node %s" %
4494                      (dev.logical_id[1], src_node))
4495
4496     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4497       logger.Error("could not finalize export for instance %s on node %s" %
4498                    (instance.name, dst_node.name))
4499
4500     nodelist = self.cfg.GetNodeList()
4501     nodelist.remove(dst_node.name)
4502
4503     # on one-node clusters nodelist will be empty after the removal
4504     # if we proceed the backup would be removed because OpQueryExports
4505     # substitutes an empty list with the full cluster node list.
4506     if nodelist:
4507       op = opcodes.OpQueryExports(nodes=nodelist)
4508       exportlist = self.proc.ChainOpCode(op)
4509       for node in exportlist:
4510         if instance.name in exportlist[node]:
4511           if not rpc.call_export_remove(node, instance.name):
4512             logger.Error("could not remove older export for instance %s"
4513                          " on node %s" % (instance.name, node))
4514
4515
4516 class LURemoveExport(NoHooksLU):
4517   """Remove exports related to the named instance.
4518
4519   """
4520   _OP_REQP = ["instance_name"]
4521
4522   def CheckPrereq(self):
4523     """Check prerequisites.
4524     """
4525     pass
4526
4527   def Exec(self, feedback_fn):
4528     """Remove any export.
4529
4530     """
4531     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4532     # If the instance was not found we'll try with the name that was passed in.
4533     # This will only work if it was an FQDN, though.
4534     fqdn_warn = False
4535     if not instance_name:
4536       fqdn_warn = True
4537       instance_name = self.op.instance_name
4538
4539     op = opcodes.OpQueryExports(nodes=[])
4540     exportlist = self.proc.ChainOpCode(op)
4541     found = False
4542     for node in exportlist:
4543       if instance_name in exportlist[node]:
4544         found = True
4545         if not rpc.call_export_remove(node, instance_name):
4546           logger.Error("could not remove export for instance %s"
4547                        " on node %s" % (instance_name, node))
4548
4549     if fqdn_warn and not found:
4550       feedback_fn("Export not found. If trying to remove an export belonging"
4551                   " to a deleted instance please use its Fully Qualified"
4552                   " Domain Name.")
4553
4554
4555 class TagsLU(NoHooksLU):
4556   """Generic tags LU.
4557
4558   This is an abstract class which is the parent of all the other tags LUs.
4559
4560   """
4561   def CheckPrereq(self):
4562     """Check prerequisites.
4563
4564     """
4565     if self.op.kind == constants.TAG_CLUSTER:
4566       self.target = self.cfg.GetClusterInfo()
4567     elif self.op.kind == constants.TAG_NODE:
4568       name = self.cfg.ExpandNodeName(self.op.name)
4569       if name is None:
4570         raise errors.OpPrereqError("Invalid node name (%s)" %
4571                                    (self.op.name,))
4572       self.op.name = name
4573       self.target = self.cfg.GetNodeInfo(name)
4574     elif self.op.kind == constants.TAG_INSTANCE:
4575       name = self.cfg.ExpandInstanceName(self.op.name)
4576       if name is None:
4577         raise errors.OpPrereqError("Invalid instance name (%s)" %
4578                                    (self.op.name,))
4579       self.op.name = name
4580       self.target = self.cfg.GetInstanceInfo(name)
4581     else:
4582       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4583                                  str(self.op.kind))
4584
4585
4586 class LUGetTags(TagsLU):
4587   """Returns the tags of a given object.
4588
4589   """
4590   _OP_REQP = ["kind", "name"]
4591
4592   def Exec(self, feedback_fn):
4593     """Returns the tag list.
4594
4595     """
4596     return list(self.target.GetTags())
4597
4598
4599 class LUSearchTags(NoHooksLU):
4600   """Searches the tags for a given pattern.
4601
4602   """
4603   _OP_REQP = ["pattern"]
4604
4605   def CheckPrereq(self):
4606     """Check prerequisites.
4607
4608     This checks the pattern passed for validity by compiling it.
4609
4610     """
4611     try:
4612       self.re = re.compile(self.op.pattern)
4613     except re.error, err:
4614       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4615                                  (self.op.pattern, err))
4616
4617   def Exec(self, feedback_fn):
4618     """Returns the tag list.
4619
4620     """
4621     cfg = self.cfg
4622     tgts = [("/cluster", cfg.GetClusterInfo())]
4623     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4624     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4625     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4626     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4627     results = []
4628     for path, target in tgts:
4629       for tag in target.GetTags():
4630         if self.re.search(tag):
4631           results.append((path, tag))
4632     return results
4633
4634
4635 class LUAddTags(TagsLU):
4636   """Sets a tag on a given object.
4637
4638   """
4639   _OP_REQP = ["kind", "name", "tags"]
4640
4641   def CheckPrereq(self):
4642     """Check prerequisites.
4643
4644     This checks the type and length of the tag name and value.
4645
4646     """
4647     TagsLU.CheckPrereq(self)
4648     for tag in self.op.tags:
4649       objects.TaggableObject.ValidateTag(tag)
4650
4651   def Exec(self, feedback_fn):
4652     """Sets the tag.
4653
4654     """
4655     try:
4656       for tag in self.op.tags:
4657         self.target.AddTag(tag)
4658     except errors.TagError, err:
4659       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4660     try:
4661       self.cfg.Update(self.target)
4662     except errors.ConfigurationError:
4663       raise errors.OpRetryError("There has been a modification to the"
4664                                 " config file and the operation has been"
4665                                 " aborted. Please retry.")
4666
4667
4668 class LUDelTags(TagsLU):
4669   """Delete a list of tags from a given object.
4670
4671   """
4672   _OP_REQP = ["kind", "name", "tags"]
4673
4674   def CheckPrereq(self):
4675     """Check prerequisites.
4676
4677     This checks that we have the given tag.
4678
4679     """
4680     TagsLU.CheckPrereq(self)
4681     for tag in self.op.tags:
4682       objects.TaggableObject.ValidateTag(tag)
4683     del_tags = frozenset(self.op.tags)
4684     cur_tags = self.target.GetTags()
4685     if not del_tags <= cur_tags:
4686       diff_tags = del_tags - cur_tags
4687       diff_names = ["'%s'" % tag for tag in diff_tags]
4688       diff_names.sort()
4689       raise errors.OpPrereqError("Tag(s) %s not found" %
4690                                  (",".join(diff_names)))
4691
4692   def Exec(self, feedback_fn):
4693     """Remove the tag from the object.
4694
4695     """
4696     for tag in self.op.tags:
4697       self.target.RemoveTag(tag)
4698     try:
4699       self.cfg.Update(self.target)
4700     except errors.ConfigurationError:
4701       raise errors.OpRetryError("There has been a modification to the"
4702                                 " config file and the operation has been"
4703                                 " aborted. Please retry.")
4704
4705
4706 class LUTestDelay(NoHooksLU):
4707   """Sleep for a specified amount of time.
4708
4709   This LU sleeps on the master and/or nodes for a specified amount of
4710   time.
4711
4712   """
4713   _OP_REQP = ["duration", "on_master", "on_nodes"]
4714   REQ_BGL = False
4715
4716   def ExpandNames(self):
4717     """Expand names and set required locks.
4718
4719     This expands the node list, if any.
4720
4721     """
4722     self.needed_locks = {}
4723     if self.op.on_nodes:
4724       # _GetWantedNodes can be used here, but is not always appropriate to use
4725       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4726       # more information.
4727       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4728       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4729
4730   def CheckPrereq(self):
4731     """Check prerequisites.
4732
4733     """
4734
4735   def Exec(self, feedback_fn):
4736     """Do the actual sleep.
4737
4738     """
4739     if self.op.on_master:
4740       if not utils.TestDelay(self.op.duration):
4741         raise errors.OpExecError("Error during master delay test")
4742     if self.op.on_nodes:
4743       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4744       if not result:
4745         raise errors.OpExecError("Complete failure from rpc call")
4746       for node, node_result in result.items():
4747         if not node_result:
4748           raise errors.OpExecError("Failure during rpc call to node %s,"
4749                                    " result: %s" % (node, node_result))
4750
4751
4752 class IAllocator(object):
4753   """IAllocator framework.
4754
4755   An IAllocator instance has three sets of attributes:
4756     - cfg/sstore that are needed to query the cluster
4757     - input data (all members of the _KEYS class attribute are required)
4758     - four buffer attributes (in|out_data|text), that represent the
4759       input (to the external script) in text and data structure format,
4760       and the output from it, again in two formats
4761     - the result variables from the script (success, info, nodes) for
4762       easy usage
4763
4764   """
4765   _ALLO_KEYS = [
4766     "mem_size", "disks", "disk_template",
4767     "os", "tags", "nics", "vcpus",
4768     ]
4769   _RELO_KEYS = [
4770     "relocate_from",
4771     ]
4772
4773   def __init__(self, cfg, sstore, mode, name, **kwargs):
4774     self.cfg = cfg
4775     self.sstore = sstore
4776     # init buffer variables
4777     self.in_text = self.out_text = self.in_data = self.out_data = None
4778     # init all input fields so that pylint is happy
4779     self.mode = mode
4780     self.name = name
4781     self.mem_size = self.disks = self.disk_template = None
4782     self.os = self.tags = self.nics = self.vcpus = None
4783     self.relocate_from = None
4784     # computed fields
4785     self.required_nodes = None
4786     # init result fields
4787     self.success = self.info = self.nodes = None
4788     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4789       keyset = self._ALLO_KEYS
4790     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4791       keyset = self._RELO_KEYS
4792     else:
4793       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4794                                    " IAllocator" % self.mode)
4795     for key in kwargs:
4796       if key not in keyset:
4797         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4798                                      " IAllocator" % key)
4799       setattr(self, key, kwargs[key])
4800     for key in keyset:
4801       if key not in kwargs:
4802         raise errors.ProgrammerError("Missing input parameter '%s' to"
4803                                      " IAllocator" % key)
4804     self._BuildInputData()
4805
4806   def _ComputeClusterData(self):
4807     """Compute the generic allocator input data.
4808
4809     This is the data that is independent of the actual operation.
4810
4811     """
4812     cfg = self.cfg
4813     # cluster data
4814     data = {
4815       "version": 1,
4816       "cluster_name": self.sstore.GetClusterName(),
4817       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4818       "hypervisor_type": self.sstore.GetHypervisorType(),
4819       # we don't have job IDs
4820       }
4821
4822     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4823
4824     # node data
4825     node_results = {}
4826     node_list = cfg.GetNodeList()
4827     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4828     for nname in node_list:
4829       ninfo = cfg.GetNodeInfo(nname)
4830       if nname not in node_data or not isinstance(node_data[nname], dict):
4831         raise errors.OpExecError("Can't get data for node %s" % nname)
4832       remote_info = node_data[nname]
4833       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4834                    'vg_size', 'vg_free', 'cpu_total']:
4835         if attr not in remote_info:
4836           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4837                                    (nname, attr))
4838         try:
4839           remote_info[attr] = int(remote_info[attr])
4840         except ValueError, err:
4841           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4842                                    " %s" % (nname, attr, str(err)))
4843       # compute memory used by primary instances
4844       i_p_mem = i_p_up_mem = 0
4845       for iinfo in i_list:
4846         if iinfo.primary_node == nname:
4847           i_p_mem += iinfo.memory
4848           if iinfo.status == "up":
4849             i_p_up_mem += iinfo.memory
4850
4851       # compute memory used by instances
4852       pnr = {
4853         "tags": list(ninfo.GetTags()),
4854         "total_memory": remote_info['memory_total'],
4855         "reserved_memory": remote_info['memory_dom0'],
4856         "free_memory": remote_info['memory_free'],
4857         "i_pri_memory": i_p_mem,
4858         "i_pri_up_memory": i_p_up_mem,
4859         "total_disk": remote_info['vg_size'],
4860         "free_disk": remote_info['vg_free'],
4861         "primary_ip": ninfo.primary_ip,
4862         "secondary_ip": ninfo.secondary_ip,
4863         "total_cpus": remote_info['cpu_total'],
4864         }
4865       node_results[nname] = pnr
4866     data["nodes"] = node_results
4867
4868     # instance data
4869     instance_data = {}
4870     for iinfo in i_list:
4871       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4872                   for n in iinfo.nics]
4873       pir = {
4874         "tags": list(iinfo.GetTags()),
4875         "should_run": iinfo.status == "up",
4876         "vcpus": iinfo.vcpus,
4877         "memory": iinfo.memory,
4878         "os": iinfo.os,
4879         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4880         "nics": nic_data,
4881         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4882         "disk_template": iinfo.disk_template,
4883         }
4884       instance_data[iinfo.name] = pir
4885
4886     data["instances"] = instance_data
4887
4888     self.in_data = data
4889
4890   def _AddNewInstance(self):
4891     """Add new instance data to allocator structure.
4892
4893     This in combination with _AllocatorGetClusterData will create the
4894     correct structure needed as input for the allocator.
4895
4896     The checks for the completeness of the opcode must have already been
4897     done.
4898
4899     """
4900     data = self.in_data
4901     if len(self.disks) != 2:
4902       raise errors.OpExecError("Only two-disk configurations supported")
4903
4904     disk_space = _ComputeDiskSize(self.disk_template,
4905                                   self.disks[0]["size"], self.disks[1]["size"])
4906
4907     if self.disk_template in constants.DTS_NET_MIRROR:
4908       self.required_nodes = 2
4909     else:
4910       self.required_nodes = 1
4911     request = {
4912       "type": "allocate",
4913       "name": self.name,
4914       "disk_template": self.disk_template,
4915       "tags": self.tags,
4916       "os": self.os,
4917       "vcpus": self.vcpus,
4918       "memory": self.mem_size,
4919       "disks": self.disks,
4920       "disk_space_total": disk_space,
4921       "nics": self.nics,
4922       "required_nodes": self.required_nodes,
4923       }
4924     data["request"] = request
4925
4926   def _AddRelocateInstance(self):
4927     """Add relocate instance data to allocator structure.
4928
4929     This in combination with _IAllocatorGetClusterData will create the
4930     correct structure needed as input for the allocator.
4931
4932     The checks for the completeness of the opcode must have already been
4933     done.
4934
4935     """
4936     instance = self.cfg.GetInstanceInfo(self.name)
4937     if instance is None:
4938       raise errors.ProgrammerError("Unknown instance '%s' passed to"
4939                                    " IAllocator" % self.name)
4940
4941     if instance.disk_template not in constants.DTS_NET_MIRROR:
4942       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4943
4944     if len(instance.secondary_nodes) != 1:
4945       raise errors.OpPrereqError("Instance has not exactly one secondary node")
4946
4947     self.required_nodes = 1
4948
4949     disk_space = _ComputeDiskSize(instance.disk_template,
4950                                   instance.disks[0].size,
4951                                   instance.disks[1].size)
4952
4953     request = {
4954       "type": "relocate",
4955       "name": self.name,
4956       "disk_space_total": disk_space,
4957       "required_nodes": self.required_nodes,
4958       "relocate_from": self.relocate_from,
4959       }
4960     self.in_data["request"] = request
4961
4962   def _BuildInputData(self):
4963     """Build input data structures.
4964
4965     """
4966     self._ComputeClusterData()
4967
4968     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4969       self._AddNewInstance()
4970     else:
4971       self._AddRelocateInstance()
4972
4973     self.in_text = serializer.Dump(self.in_data)
4974
4975   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4976     """Run an instance allocator and return the results.
4977
4978     """
4979     data = self.in_text
4980
4981     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4982
4983     if not isinstance(result, tuple) or len(result) != 4:
4984       raise errors.OpExecError("Invalid result from master iallocator runner")
4985
4986     rcode, stdout, stderr, fail = result
4987
4988     if rcode == constants.IARUN_NOTFOUND:
4989       raise errors.OpExecError("Can't find allocator '%s'" % name)
4990     elif rcode == constants.IARUN_FAILURE:
4991       raise errors.OpExecError("Instance allocator call failed: %s,"
4992                                " output: %s" % (fail, stdout+stderr))
4993     self.out_text = stdout
4994     if validate:
4995       self._ValidateResult()
4996
4997   def _ValidateResult(self):
4998     """Process the allocator results.
4999
5000     This will process and if successful save the result in
5001     self.out_data and the other parameters.
5002
5003     """
5004     try:
5005       rdict = serializer.Load(self.out_text)
5006     except Exception, err:
5007       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5008
5009     if not isinstance(rdict, dict):
5010       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5011
5012     for key in "success", "info", "nodes":
5013       if key not in rdict:
5014         raise errors.OpExecError("Can't parse iallocator results:"
5015                                  " missing key '%s'" % key)
5016       setattr(self, key, rdict[key])
5017
5018     if not isinstance(rdict["nodes"], list):
5019       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5020                                " is not a list")
5021     self.out_data = rdict
5022
5023
5024 class LUTestAllocator(NoHooksLU):
5025   """Run allocator tests.
5026
5027   This LU runs the allocator tests
5028
5029   """
5030   _OP_REQP = ["direction", "mode", "name"]
5031
5032   def CheckPrereq(self):
5033     """Check prerequisites.
5034
5035     This checks the opcode parameters depending on the director and mode test.
5036
5037     """
5038     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5039       for attr in ["name", "mem_size", "disks", "disk_template",
5040                    "os", "tags", "nics", "vcpus"]:
5041         if not hasattr(self.op, attr):
5042           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5043                                      attr)
5044       iname = self.cfg.ExpandInstanceName(self.op.name)
5045       if iname is not None:
5046         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5047                                    iname)
5048       if not isinstance(self.op.nics, list):
5049         raise errors.OpPrereqError("Invalid parameter 'nics'")
5050       for row in self.op.nics:
5051         if (not isinstance(row, dict) or
5052             "mac" not in row or
5053             "ip" not in row or
5054             "bridge" not in row):
5055           raise errors.OpPrereqError("Invalid contents of the"
5056                                      " 'nics' parameter")
5057       if not isinstance(self.op.disks, list):
5058         raise errors.OpPrereqError("Invalid parameter 'disks'")
5059       if len(self.op.disks) != 2:
5060         raise errors.OpPrereqError("Only two-disk configurations supported")
5061       for row in self.op.disks:
5062         if (not isinstance(row, dict) or
5063             "size" not in row or
5064             not isinstance(row["size"], int) or
5065             "mode" not in row or
5066             row["mode"] not in ['r', 'w']):
5067           raise errors.OpPrereqError("Invalid contents of the"
5068                                      " 'disks' parameter")
5069     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5070       if not hasattr(self.op, "name"):
5071         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5072       fname = self.cfg.ExpandInstanceName(self.op.name)
5073       if fname is None:
5074         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5075                                    self.op.name)
5076       self.op.name = fname
5077       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5078     else:
5079       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5080                                  self.op.mode)
5081
5082     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5083       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5084         raise errors.OpPrereqError("Missing allocator name")
5085     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5086       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5087                                  self.op.direction)
5088
5089   def Exec(self, feedback_fn):
5090     """Run the allocator test.
5091
5092     """
5093     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5094       ial = IAllocator(self.cfg, self.sstore,
5095                        mode=self.op.mode,
5096                        name=self.op.name,
5097                        mem_size=self.op.mem_size,
5098                        disks=self.op.disks,
5099                        disk_template=self.op.disk_template,
5100                        os=self.op.os,
5101                        tags=self.op.tags,
5102                        nics=self.op.nics,
5103                        vcpus=self.op.vcpus,
5104                        )
5105     else:
5106       ial = IAllocator(self.cfg, self.sstore,
5107                        mode=self.op.mode,
5108                        name=self.op.name,
5109                        relocate_from=list(self.relocate_from),
5110                        )
5111
5112     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5113       result = ial.in_text
5114     else:
5115       ial.Run(self.op.allocator, validate=False)
5116       result = ial.out_text
5117     return result