Change LUQueryNodes to return raw values and support selective listing
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   while True:
389     rawline = f.readline()
390     logger.Debug('read %s' % (repr(rawline),))
391
392     if not rawline:
393       # End of file
394       break
395
396     line = rawline.split('\n')[0]
397
398     parts = line.split(' ')
399     fields = parts[0].split(',')
400     key = parts[2]
401
402     haveall = True
403     havesome = False
404     for spec in [ ip, fullnode ]:
405       if spec not in fields:
406         haveall = False
407       if spec in fields:
408         havesome = True
409
410     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411     if haveall and key == pubkey:
412       inthere = True
413       save_lines.append(rawline)
414       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415       continue
416
417     if havesome and (not haveall or key != pubkey):
418       removed = True
419       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420       continue
421
422     save_lines.append(rawline)
423
424   if not inthere:
425     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427
428   if removed:
429     save_lines = save_lines + add_lines
430
431     # Write a new file and replace old.
432     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433                                    constants.DATA_DIR)
434     newfile = os.fdopen(fd, 'w')
435     try:
436       newfile.write(''.join(save_lines))
437     finally:
438       newfile.close()
439     logger.Debug("Wrote new known_hosts.")
440     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441
442   elif add_lines:
443     # Simply appending a new line will do the trick.
444     f.seek(0, 2)
445     for add in add_lines:
446       f.write(add)
447
448   f.close()
449
450
451 def _HasValidVG(vglist, vgname):
452   """Checks if the volume group list is valid.
453
454   A non-None return value means there's an error, and the return value
455   is the error message.
456
457   """
458   vgsize = vglist.get(vgname, None)
459   if vgsize is None:
460     return "volume group '%s' missing" % vgname
461   elif vgsize < 20480:
462     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463             (vgname, vgsize))
464   return None
465
466
467 def _InitSSHSetup(node):
468   """Setup the SSH configuration for the cluster.
469
470
471   This generates a dsa keypair for root, adds the pub key to the
472   permitted hosts and adds the hostkey to its own known hosts.
473
474   Args:
475     node: the name of this host as a fqdn
476
477   """
478   if os.path.exists('/root/.ssh/id_dsa'):
479     utils.CreateBackup('/root/.ssh/id_dsa')
480   if os.path.exists('/root/.ssh/id_dsa.pub'):
481     utils.CreateBackup('/root/.ssh/id_dsa.pub')
482
483   utils.RemoveFile('/root/.ssh/id_dsa')
484   utils.RemoveFile('/root/.ssh/id_dsa.pub')
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", "/root/.ssh/id_dsa",
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open('/root/.ssh/id_dsa.pub', 'r')
494   try:
495     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 class LUInitCluster(LogicalUnit):
532   """Initialise the cluster.
533
534   """
535   HPATH = "cluster-init"
536   HTYPE = constants.HTYPE_CLUSTER
537   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538               "def_bridge", "master_netdev"]
539   REQ_CLUSTER = False
540
541   def BuildHooksEnv(self):
542     """Build hooks env.
543
544     Notes: Since we don't require a cluster, we must manually add
545     ourselves in the post-run node list.
546
547     """
548     env = {
549       "CLUSTER": self.op.cluster_name,
550       "MASTER": self.hostname['hostname_full'],
551       }
552     return env, [], [self.hostname['hostname_full']]
553
554   def CheckPrereq(self):
555     """Verify that the passed name is a valid one.
556
557     """
558     if config.ConfigWriter.IsCluster():
559       raise errors.OpPrereqError("Cluster is already initialised")
560
561     hostname_local = socket.gethostname()
562     self.hostname = hostname = utils.LookupHostname(hostname_local)
563     if not hostname:
564       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565                                  hostname_local)
566
567     if hostname["hostname_full"] != hostname_local:
568       raise errors.OpPrereqError("My own hostname (%s) does not match the"
569                                  " resolver (%s): probably not using FQDN"
570                                  " for hostname." %
571                                  (hostname_local, hostname["hostname_full"]))
572
573     if hostname["ip"].startswith("127."):
574       raise errors.OpPrereqError("This host's IP resolves to the private"
575                                  " range (%s). Please fix DNS or /etc/hosts." %
576                                  (hostname["ip"],))
577
578     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579     if not clustername:
580       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581                                  % self.op.cluster_name)
582
583     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584     if result.failed:
585       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586                                  " to %s,\nbut this ip address does not"
587                                  " belong to this host."
588                                  " Aborting." % hostname['ip'])
589
590     secondary_ip = getattr(self.op, "secondary_ip", None)
591     if secondary_ip and not utils.IsValidIP(secondary_ip):
592       raise errors.OpPrereqError("Invalid secondary ip given")
593     if secondary_ip and secondary_ip != hostname['ip']:
594       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595       if result.failed:
596         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597                                    "but it does not belong to this host." %
598                                    secondary_ip)
599     self.secondary_ip = secondary_ip
600
601     # checks presence of the volume group given
602     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603
604     if vgstatus:
605       raise errors.OpPrereqError("Error: %s" % vgstatus)
606
607     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608                     self.op.mac_prefix):
609       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610                                  self.op.mac_prefix)
611
612     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614                                  self.op.hypervisor_type)
615
616     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617     if result.failed:
618       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619                                  (self.op.master_netdev,
620                                   result.output.strip()))
621
622   def Exec(self, feedback_fn):
623     """Initialize the cluster.
624
625     """
626     clustername = self.clustername
627     hostname = self.hostname
628
629     # set up the simple store
630     ss = ssconf.SimpleStore()
631     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636
637     # set up the inter-node password and certificate
638     _InitGanetiServerSetup(ss)
639
640     # start the master ip
641     rpc.call_node_start_master(hostname['hostname_full'])
642
643     # set up ssh config and /etc/hosts
644     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645     try:
646       sshline = f.read()
647     finally:
648       f.close()
649     sshkey = sshline.split(" ")[1]
650
651     _UpdateEtcHosts(hostname['hostname_full'],
652                     hostname['ip'],
653                     )
654
655     _UpdateKnownHosts(hostname['hostname_full'],
656                       hostname['ip'],
657                       sshkey,
658                       )
659
660     _InitSSHSetup(hostname['hostname'])
661
662     # init of cluster config file
663     cfgw = config.ConfigWriter()
664     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665                     sshkey, self.op.mac_prefix,
666                     self.op.vg_name, self.op.def_bridge)
667
668
669 class LUDestroyCluster(NoHooksLU):
670   """Logical unit for destroying the cluster.
671
672   """
673   _OP_REQP = []
674
675   def CheckPrereq(self):
676     """Check prerequisites.
677
678     This checks whether the cluster is empty.
679
680     Any errors are signalled by raising errors.OpPrereqError.
681
682     """
683     master = self.sstore.GetMasterNode()
684
685     nodelist = self.cfg.GetNodeList()
686     if len(nodelist) != 1 or nodelist[0] != master:
687       raise errors.OpPrereqError("There are still %d node(s) in"
688                                  " this cluster." % (len(nodelist) - 1))
689     instancelist = self.cfg.GetInstanceList()
690     if instancelist:
691       raise errors.OpPrereqError("There are still %d instance(s) in"
692                                  " this cluster." % len(instancelist))
693
694   def Exec(self, feedback_fn):
695     """Destroys the cluster.
696
697     """
698     utils.CreateBackup('/root/.ssh/id_dsa')
699     utils.CreateBackup('/root/.ssh/id_dsa.pub')
700     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701
702
703 class LUVerifyCluster(NoHooksLU):
704   """Verifies the cluster status.
705
706   """
707   _OP_REQP = []
708
709   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710                   remote_version, feedback_fn):
711     """Run multiple tests against a node.
712
713     Test list:
714       - compares ganeti version
715       - checks vg existance and size > 20G
716       - checks config file checksum
717       - checks ssh to other nodes
718
719     Args:
720       node: name of the node to check
721       file_list: required list of files
722       local_cksum: dictionary of local files and their checksums
723
724     """
725     # compares ganeti version
726     local_version = constants.PROTOCOL_VERSION
727     if not remote_version:
728       feedback_fn(" - ERROR: connection to %s failed" % (node))
729       return True
730
731     if local_version != remote_version:
732       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733                       (local_version, node, remote_version))
734       return True
735
736     # checks vg existance and size > 20G
737
738     bad = False
739     if not vglist:
740       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741                       (node,))
742       bad = True
743     else:
744       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745       if vgstatus:
746         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747         bad = True
748
749     # checks config file checksum
750     # checks ssh to any
751
752     if 'filelist' not in node_result:
753       bad = True
754       feedback_fn("  - ERROR: node hasn't returned file checksum data")
755     else:
756       remote_cksum = node_result['filelist']
757       for file_name in file_list:
758         if file_name not in remote_cksum:
759           bad = True
760           feedback_fn("  - ERROR: file '%s' missing" % file_name)
761         elif remote_cksum[file_name] != local_cksum[file_name]:
762           bad = True
763           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764
765     if 'nodelist' not in node_result:
766       bad = True
767       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768     else:
769       if node_result['nodelist']:
770         bad = True
771         for node in node_result['nodelist']:
772           feedback_fn("  - ERROR: communication with node '%s': %s" %
773                           (node, node_result['nodelist'][node]))
774     hyp_result = node_result.get('hypervisor', None)
775     if hyp_result is not None:
776       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777     return bad
778
779   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780     """Verify an instance.
781
782     This function checks to see if the required block devices are
783     available on the instance's node.
784
785     """
786     bad = False
787
788     instancelist = self.cfg.GetInstanceList()
789     if not instance in instancelist:
790       feedback_fn("  - ERROR: instance %s not in instance list %s" %
791                       (instance, instancelist))
792       bad = True
793
794     instanceconfig = self.cfg.GetInstanceInfo(instance)
795     node_current = instanceconfig.primary_node
796
797     node_vol_should = {}
798     instanceconfig.MapLVsByNode(node_vol_should)
799
800     for node in node_vol_should:
801       for volume in node_vol_should[node]:
802         if node not in node_vol_is or volume not in node_vol_is[node]:
803           feedback_fn("  - ERROR: volume %s missing on node %s" %
804                           (volume, node))
805           bad = True
806
807     if not instanceconfig.status == 'down':
808       if not instance in node_instance[node_current]:
809         feedback_fn("  - ERROR: instance %s not running on node %s" %
810                         (instance, node_current))
811         bad = True
812
813     for node in node_instance:
814       if (not node == node_current):
815         if instance in node_instance[node]:
816           feedback_fn("  - ERROR: instance %s should not run on node %s" %
817                           (instance, node))
818           bad = True
819
820     return not bad
821
822   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823     """Verify if there are any unknown volumes in the cluster.
824
825     The .os, .swap and backup volumes are ignored. All other volumes are
826     reported as unknown.
827
828     """
829     bad = False
830
831     for node in node_vol_is:
832       for volume in node_vol_is[node]:
833         if node not in node_vol_should or volume not in node_vol_should[node]:
834           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835                       (volume, node))
836           bad = True
837     return bad
838
839   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840     """Verify the list of running instances.
841
842     This checks what instances are running but unknown to the cluster.
843
844     """
845     bad = False
846     for node in node_instance:
847       for runninginstance in node_instance[node]:
848         if runninginstance not in instancelist:
849           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850                           (runninginstance, node))
851           bad = True
852     return bad
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     This has no prerequisites.
858
859     """
860     pass
861
862   def Exec(self, feedback_fn):
863     """Verify integrity of cluster, performing various test on nodes.
864
865     """
866     bad = False
867     feedback_fn("* Verifying global settings")
868     self.cfg.VerifyConfig()
869
870     master = self.sstore.GetMasterNode()
871     vg_name = self.cfg.GetVGName()
872     nodelist = utils.NiceSort(self.cfg.GetNodeList())
873     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874     node_volume = {}
875     node_instance = {}
876
877     # FIXME: verify OS list
878     # do local checksums
879     file_names = list(self.sstore.GetFileList())
880     file_names.append(constants.SSL_CERT_FILE)
881     file_names.append(constants.CLUSTER_CONF_FILE)
882     local_checksums = utils.FingerprintFiles(file_names)
883
884     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886     all_instanceinfo = rpc.call_instance_list(nodelist)
887     all_vglist = rpc.call_vg_list(nodelist)
888     node_verify_param = {
889       'filelist': file_names,
890       'nodelist': nodelist,
891       'hypervisor': None,
892       }
893     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894     all_rversion = rpc.call_version(nodelist)
895
896     for node in nodelist:
897       feedback_fn("* Verifying node %s" % node)
898       result = self._VerifyNode(node, file_names, local_checksums,
899                                 all_vglist[node], all_nvinfo[node],
900                                 all_rversion[node], feedback_fn)
901       bad = bad or result
902
903       # node_volume
904       volumeinfo = all_volumeinfo[node]
905
906       if type(volumeinfo) != dict:
907         feedback_fn("  - ERROR: connection to %s failed" % (node,))
908         bad = True
909         continue
910
911       node_volume[node] = volumeinfo
912
913       # node_instance
914       nodeinstance = all_instanceinfo[node]
915       if type(nodeinstance) != list:
916         feedback_fn("  - ERROR: connection to %s failed" % (node,))
917         bad = True
918         continue
919
920       node_instance[node] = nodeinstance
921
922     node_vol_should = {}
923
924     for instance in instancelist:
925       feedback_fn("* Verifying instance %s" % instance)
926       result =  self._VerifyInstance(instance, node_volume, node_instance,
927                                      feedback_fn)
928       bad = bad or result
929
930       inst_config = self.cfg.GetInstanceInfo(instance)
931
932       inst_config.MapLVsByNode(node_vol_should)
933
934     feedback_fn("* Verifying orphan volumes")
935     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936                                        feedback_fn)
937     bad = bad or result
938
939     feedback_fn("* Verifying remaining instances")
940     result = self._VerifyOrphanInstances(instancelist, node_instance,
941                                          feedback_fn)
942     bad = bad or result
943
944     return int(bad)
945
946
947 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948   """Sleep and poll for an instance's disk to sync.
949
950   """
951   if not instance.disks:
952     return True
953
954   if not oneshot:
955     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956
957   node = instance.primary_node
958
959   for dev in instance.disks:
960     cfgw.SetDiskID(dev, node)
961
962   retries = 0
963   while True:
964     max_time = 0
965     done = True
966     cumul_degraded = False
967     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968     if not rstats:
969       logger.ToStderr("Can't get any data from node %s" % node)
970       retries += 1
971       if retries >= 10:
972         raise errors.RemoteError("Can't contact node %s for mirror data,"
973                                  " aborting." % node)
974       time.sleep(6)
975       continue
976     retries = 0
977     for i in range(len(rstats)):
978       mstat = rstats[i]
979       if mstat is None:
980         logger.ToStderr("Can't compute data for node %s/%s" %
981                         (node, instance.disks[i].iv_name))
982         continue
983       perc_done, est_time, is_degraded = mstat
984       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985       if perc_done is not None:
986         done = False
987         if est_time is not None:
988           rem_time = "%d estimated seconds remaining" % est_time
989           max_time = est_time
990         else:
991           rem_time = "no time estimate"
992         logger.ToStdout("- device %s: %5.2f%% done, %s" %
993                         (instance.disks[i].iv_name, perc_done, rem_time))
994     if done or oneshot:
995       break
996
997     if unlock:
998       utils.Unlock('cmd')
999     try:
1000       time.sleep(min(60, max_time))
1001     finally:
1002       if unlock:
1003         utils.Lock('cmd')
1004
1005   if done:
1006     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007   return not cumul_degraded
1008
1009
1010 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011   """Check that mirrors are not degraded.
1012
1013   """
1014   cfgw.SetDiskID(dev, node)
1015
1016   result = True
1017   if on_primary or dev.AssembleOnSecondary():
1018     rstats = rpc.call_blockdev_find(node, dev)
1019     if not rstats:
1020       logger.ToStderr("Can't get any data from node %s" % node)
1021       result = False
1022     else:
1023       result = result and (not rstats[5])
1024   if dev.children:
1025     for child in dev.children:
1026       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027
1028   return result
1029
1030
1031 class LUDiagnoseOS(NoHooksLU):
1032   """Logical unit for OS diagnose/query.
1033
1034   """
1035   _OP_REQP = []
1036
1037   def CheckPrereq(self):
1038     """Check prerequisites.
1039
1040     This always succeeds, since this is a pure query LU.
1041
1042     """
1043     return
1044
1045   def Exec(self, feedback_fn):
1046     """Compute the list of OSes.
1047
1048     """
1049     node_list = self.cfg.GetNodeList()
1050     node_data = rpc.call_os_diagnose(node_list)
1051     if node_data == False:
1052       raise errors.OpExecError("Can't gather the list of OSes")
1053     return node_data
1054
1055
1056 class LURemoveNode(LogicalUnit):
1057   """Logical unit for removing a node.
1058
1059   """
1060   HPATH = "node-remove"
1061   HTYPE = constants.HTYPE_NODE
1062   _OP_REQP = ["node_name"]
1063
1064   def BuildHooksEnv(self):
1065     """Build hooks env.
1066
1067     This doesn't run on the target node in the pre phase as a failed
1068     node would not allows itself to run.
1069
1070     """
1071     env = {
1072       "NODE_NAME": self.op.node_name,
1073       }
1074     all_nodes = self.cfg.GetNodeList()
1075     all_nodes.remove(self.op.node_name)
1076     return env, all_nodes, all_nodes
1077
1078   def CheckPrereq(self):
1079     """Check prerequisites.
1080
1081     This checks:
1082      - the node exists in the configuration
1083      - it does not have primary or secondary instances
1084      - it's not the master
1085
1086     Any errors are signalled by raising errors.OpPrereqError.
1087
1088     """
1089     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090     if node is None:
1091       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092
1093     instance_list = self.cfg.GetInstanceList()
1094
1095     masternode = self.sstore.GetMasterNode()
1096     if node.name == masternode:
1097       raise errors.OpPrereqError("Node is the master node,"
1098                                  " you need to failover first.")
1099
1100     for instance_name in instance_list:
1101       instance = self.cfg.GetInstanceInfo(instance_name)
1102       if node.name == instance.primary_node:
1103         raise errors.OpPrereqError("Instance %s still running on the node,"
1104                                    " please remove first." % instance_name)
1105       if node.name in instance.secondary_nodes:
1106         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107                                    " please remove first." % instance_name)
1108     self.op.node_name = node.name
1109     self.node = node
1110
1111   def Exec(self, feedback_fn):
1112     """Removes the node from the cluster.
1113
1114     """
1115     node = self.node
1116     logger.Info("stopping the node daemon and removing configs from node %s" %
1117                 node.name)
1118
1119     rpc.call_node_leave_cluster(node.name)
1120
1121     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122
1123     logger.Info("Removing node %s from config" % node.name)
1124
1125     self.cfg.RemoveNode(node.name)
1126
1127
1128 class LUQueryNodes(NoHooksLU):
1129   """Logical unit for querying nodes.
1130
1131   """
1132   _OP_REQP = ["output_fields", "nodes"]
1133
1134   def CheckPrereq(self):
1135     """Check prerequisites.
1136
1137     This checks that the fields required are valid output fields.
1138
1139     """
1140     self.dynamic_fields = frozenset(["dtotal", "dfree",
1141                                      "mtotal", "mnode", "mfree"])
1142
1143     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144                                "pinst_list", "sinst_list",
1145                                "pip", "sip"],
1146                        dynamic=self.dynamic_fields,
1147                        selected=self.op.output_fields)
1148
1149     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
1150
1151   def Exec(self, feedback_fn):
1152     """Computes the list of nodes and their attributes.
1153
1154     """
1155     nodenames = self.wanted_nodes
1156     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157
1158     # begin data gathering
1159
1160     if self.dynamic_fields.intersection(self.op.output_fields):
1161       live_data = {}
1162       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163       for name in nodenames:
1164         nodeinfo = node_data.get(name, None)
1165         if nodeinfo:
1166           live_data[name] = {
1167             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172             }
1173         else:
1174           live_data[name] = {}
1175     else:
1176       live_data = dict.fromkeys(nodenames, {})
1177
1178     node_to_primary = dict([(name, set()) for name in nodenames])
1179     node_to_secondary = dict([(name, set()) for name in nodenames])
1180
1181     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182                              "sinst_cnt", "sinst_list"))
1183     if inst_fields & frozenset(self.op.output_fields):
1184       instancelist = self.cfg.GetInstanceList()
1185
1186       for instance_name in instancelist:
1187         inst = self.cfg.GetInstanceInfo(instance_name)
1188         if inst.primary_node in node_to_primary:
1189           node_to_primary[inst.primary_node].add(inst.name)
1190         for secnode in inst.secondary_nodes:
1191           if secnode in node_to_secondary:
1192             node_to_secondary[secnode].add(inst.name)
1193
1194     # end data gathering
1195
1196     output = []
1197     for node in nodelist:
1198       node_output = []
1199       for field in self.op.output_fields:
1200         if field == "name":
1201           val = node.name
1202         elif field == "pinst_list":
1203           val = list(node_to_primary[node.name])
1204         elif field == "sinst_list":
1205           val = list(node_to_secondary[node.name])
1206         elif field == "pinst_cnt":
1207           val = len(node_to_primary[node.name])
1208         elif field == "sinst_cnt":
1209           val = len(node_to_secondary[node.name])
1210         elif field == "pip":
1211           val = node.primary_ip
1212         elif field == "sip":
1213           val = node.secondary_ip
1214         elif field in self.dynamic_fields:
1215           val = live_data[node.name].get(field, None)
1216         else:
1217           raise errors.ParameterError(field)
1218         node_output.append(val)
1219       output.append(node_output)
1220
1221     return output
1222
1223
1224 class LUQueryNodeVolumes(NoHooksLU):
1225   """Logical unit for getting volumes on node(s).
1226
1227   """
1228   _OP_REQP = ["nodes", "output_fields"]
1229
1230   def CheckPrereq(self):
1231     """Check prerequisites.
1232
1233     This checks that the fields required are valid output fields.
1234
1235     """
1236     self.nodes = _GetWantedNodes(self, self.op.nodes)
1237
1238     _CheckOutputFields(static=["node"],
1239                        dynamic=["phys", "vg", "name", "size", "instance"],
1240                        selected=self.op.output_fields)
1241
1242
1243   def Exec(self, feedback_fn):
1244     """Computes the list of nodes and their attributes.
1245
1246     """
1247     nodenames = self.nodes
1248     volumes = rpc.call_node_volumes(nodenames)
1249
1250     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251              in self.cfg.GetInstanceList()]
1252
1253     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254
1255     output = []
1256     for node in nodenames:
1257       if node not in volumes or not volumes[node]:
1258         continue
1259
1260       node_vols = volumes[node][:]
1261       node_vols.sort(key=lambda vol: vol['dev'])
1262
1263       for vol in node_vols:
1264         node_output = []
1265         for field in self.op.output_fields:
1266           if field == "node":
1267             val = node
1268           elif field == "phys":
1269             val = vol['dev']
1270           elif field == "vg":
1271             val = vol['vg']
1272           elif field == "name":
1273             val = vol['name']
1274           elif field == "size":
1275             val = int(float(vol['size']))
1276           elif field == "instance":
1277             for inst in ilist:
1278               if node not in lv_by_node[inst]:
1279                 continue
1280               if vol['name'] in lv_by_node[inst][node]:
1281                 val = inst.name
1282                 break
1283             else:
1284               val = '-'
1285           else:
1286             raise errors.ParameterError(field)
1287           node_output.append(str(val))
1288
1289         output.append(node_output)
1290
1291     return output
1292
1293
1294 class LUAddNode(LogicalUnit):
1295   """Logical unit for adding node to the cluster.
1296
1297   """
1298   HPATH = "node-add"
1299   HTYPE = constants.HTYPE_NODE
1300   _OP_REQP = ["node_name"]
1301
1302   def BuildHooksEnv(self):
1303     """Build hooks env.
1304
1305     This will run on all nodes before, and on all nodes + the new node after.
1306
1307     """
1308     env = {
1309       "NODE_NAME": self.op.node_name,
1310       "NODE_PIP": self.op.primary_ip,
1311       "NODE_SIP": self.op.secondary_ip,
1312       }
1313     nodes_0 = self.cfg.GetNodeList()
1314     nodes_1 = nodes_0 + [self.op.node_name, ]
1315     return env, nodes_0, nodes_1
1316
1317   def CheckPrereq(self):
1318     """Check prerequisites.
1319
1320     This checks:
1321      - the new node is not already in the config
1322      - it is resolvable
1323      - its parameters (single/dual homed) matches the cluster
1324
1325     Any errors are signalled by raising errors.OpPrereqError.
1326
1327     """
1328     node_name = self.op.node_name
1329     cfg = self.cfg
1330
1331     dns_data = utils.LookupHostname(node_name)
1332     if not dns_data:
1333       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334
1335     node = dns_data['hostname']
1336     primary_ip = self.op.primary_ip = dns_data['ip']
1337     secondary_ip = getattr(self.op, "secondary_ip", None)
1338     if secondary_ip is None:
1339       secondary_ip = primary_ip
1340     if not utils.IsValidIP(secondary_ip):
1341       raise errors.OpPrereqError("Invalid secondary IP given")
1342     self.op.secondary_ip = secondary_ip
1343     node_list = cfg.GetNodeList()
1344     if node in node_list:
1345       raise errors.OpPrereqError("Node %s is already in the configuration"
1346                                  % node)
1347
1348     for existing_node_name in node_list:
1349       existing_node = cfg.GetNodeInfo(existing_node_name)
1350       if (existing_node.primary_ip == primary_ip or
1351           existing_node.secondary_ip == primary_ip or
1352           existing_node.primary_ip == secondary_ip or
1353           existing_node.secondary_ip == secondary_ip):
1354         raise errors.OpPrereqError("New node ip address(es) conflict with"
1355                                    " existing node %s" % existing_node.name)
1356
1357     # check that the type of the node (single versus dual homed) is the
1358     # same as for the master
1359     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360     master_singlehomed = myself.secondary_ip == myself.primary_ip
1361     newbie_singlehomed = secondary_ip == primary_ip
1362     if master_singlehomed != newbie_singlehomed:
1363       if master_singlehomed:
1364         raise errors.OpPrereqError("The master has no private ip but the"
1365                                    " new node has one")
1366       else:
1367         raise errors.OpPrereqError("The master has a private ip but the"
1368                                    " new node doesn't have one")
1369
1370     # checks reachablity
1371     command = ["fping", "-q", primary_ip]
1372     result = utils.RunCmd(command)
1373     if result.failed:
1374       raise errors.OpPrereqError("Node not reachable by ping")
1375
1376     if not newbie_singlehomed:
1377       # check reachability from my secondary ip to newbie's secondary ip
1378       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379       result = utils.RunCmd(command)
1380       if result.failed:
1381         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382
1383     self.new_node = objects.Node(name=node,
1384                                  primary_ip=primary_ip,
1385                                  secondary_ip=secondary_ip)
1386
1387   def Exec(self, feedback_fn):
1388     """Adds the new node to the cluster.
1389
1390     """
1391     new_node = self.new_node
1392     node = new_node.name
1393
1394     # set up inter-node password and certificate and restarts the node daemon
1395     gntpass = self.sstore.GetNodeDaemonPassword()
1396     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397       raise errors.OpExecError("ganeti password corruption detected")
1398     f = open(constants.SSL_CERT_FILE)
1399     try:
1400       gntpem = f.read(8192)
1401     finally:
1402       f.close()
1403     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404     # so we use this to detect an invalid certificate; as long as the
1405     # cert doesn't contain this, the here-document will be correctly
1406     # parsed by the shell sequence below
1407     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409     if not gntpem.endswith("\n"):
1410       raise errors.OpExecError("PEM must end with newline")
1411     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412
1413     # and then connect with ssh to set password and start ganeti-noded
1414     # note that all the below variables are sanitized at this point,
1415     # either by being constants or by the checks above
1416     ss = self.sstore
1417     mycommand = ("umask 077 && "
1418                  "echo '%s' > '%s' && "
1419                  "cat > '%s' << '!EOF.' && \n"
1420                  "%s!EOF.\n%s restart" %
1421                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422                   constants.SSL_CERT_FILE, gntpem,
1423                   constants.NODE_INITD_SCRIPT))
1424
1425     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426     if result.failed:
1427       raise errors.OpExecError("Remote command on node %s, error: %s,"
1428                                " output: %s" %
1429                                (node, result.fail_reason, result.output))
1430
1431     # check connectivity
1432     time.sleep(4)
1433
1434     result = rpc.call_version([node])[node]
1435     if result:
1436       if constants.PROTOCOL_VERSION == result:
1437         logger.Info("communication to node %s fine, sw version %s match" %
1438                     (node, result))
1439       else:
1440         raise errors.OpExecError("Version mismatch master version %s,"
1441                                  " node version %s" %
1442                                  (constants.PROTOCOL_VERSION, result))
1443     else:
1444       raise errors.OpExecError("Cannot get version from the new node")
1445
1446     # setup ssh on node
1447     logger.Info("copy ssh key to node %s" % node)
1448     keyarray = []
1449     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452
1453     for i in keyfiles:
1454       f = open(i, 'r')
1455       try:
1456         keyarray.append(f.read())
1457       finally:
1458         f.close()
1459
1460     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461                                keyarray[3], keyarray[4], keyarray[5])
1462
1463     if not result:
1464       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465
1466     # Add node to our /etc/hosts, and add key to known_hosts
1467     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469                       self.cfg.GetHostKey())
1470
1471     if new_node.secondary_ip != new_node.primary_ip:
1472       result = ssh.SSHCall(node, "root",
1473                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474       if result.failed:
1475         raise errors.OpExecError("Node claims it doesn't have the"
1476                                  " secondary ip you gave (%s).\n"
1477                                  "Please fix and re-run this command." %
1478                                  new_node.secondary_ip)
1479
1480     success, msg = ssh.VerifyNodeHostname(node)
1481     if not success:
1482       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483                                " than the one the resolver gives: %s.\n"
1484                                "Please fix and re-run this command." %
1485                                (node, msg))
1486
1487     # Distribute updated /etc/hosts and known_hosts to all nodes,
1488     # including the node just added
1489     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490     dist_nodes = self.cfg.GetNodeList() + [node]
1491     if myself.name in dist_nodes:
1492       dist_nodes.remove(myself.name)
1493
1494     logger.Debug("Copying hosts and known_hosts to all nodes")
1495     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496       result = rpc.call_upload_file(dist_nodes, fname)
1497       for to_node in dist_nodes:
1498         if not result[to_node]:
1499           logger.Error("copy of file %s to node %s failed" %
1500                        (fname, to_node))
1501
1502     to_copy = ss.GetFileList()
1503     for fname in to_copy:
1504       if not ssh.CopyFileToNode(node, fname):
1505         logger.Error("could not copy file %s to node %s" % (fname, node))
1506
1507     logger.Info("adding node %s to cluster.conf" % node)
1508     self.cfg.AddNode(new_node)
1509
1510
1511 class LUMasterFailover(LogicalUnit):
1512   """Failover the master node to the current node.
1513
1514   This is a special LU in that it must run on a non-master node.
1515
1516   """
1517   HPATH = "master-failover"
1518   HTYPE = constants.HTYPE_CLUSTER
1519   REQ_MASTER = False
1520   _OP_REQP = []
1521
1522   def BuildHooksEnv(self):
1523     """Build hooks env.
1524
1525     This will run on the new master only in the pre phase, and on all
1526     the nodes in the post phase.
1527
1528     """
1529     env = {
1530       "NEW_MASTER": self.new_master,
1531       "OLD_MASTER": self.old_master,
1532       }
1533     return env, [self.new_master], self.cfg.GetNodeList()
1534
1535   def CheckPrereq(self):
1536     """Check prerequisites.
1537
1538     This checks that we are not already the master.
1539
1540     """
1541     self.new_master = socket.gethostname()
1542
1543     self.old_master = self.sstore.GetMasterNode()
1544
1545     if self.old_master == self.new_master:
1546       raise errors.OpPrereqError("This commands must be run on the node"
1547                                  " where you want the new master to be.\n"
1548                                  "%s is already the master" %
1549                                  self.old_master)
1550
1551   def Exec(self, feedback_fn):
1552     """Failover the master node.
1553
1554     This command, when run on a non-master node, will cause the current
1555     master to cease being master, and the non-master to become new
1556     master.
1557
1558     """
1559     #TODO: do not rely on gethostname returning the FQDN
1560     logger.Info("setting master to %s, old master: %s" %
1561                 (self.new_master, self.old_master))
1562
1563     if not rpc.call_node_stop_master(self.old_master):
1564       logger.Error("could disable the master role on the old master"
1565                    " %s, please disable manually" % self.old_master)
1566
1567     ss = self.sstore
1568     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571       logger.Error("could not distribute the new simple store master file"
1572                    " to the other nodes, please check.")
1573
1574     if not rpc.call_node_start_master(self.new_master):
1575       logger.Error("could not start the master role on the new master"
1576                    " %s, please check" % self.new_master)
1577       feedback_fn("Error in activating the master IP on the new master,\n"
1578                   "please fix manually.")
1579
1580
1581
1582 class LUQueryClusterInfo(NoHooksLU):
1583   """Query cluster configuration.
1584
1585   """
1586   _OP_REQP = []
1587   REQ_MASTER = False
1588
1589   def CheckPrereq(self):
1590     """No prerequsites needed for this LU.
1591
1592     """
1593     pass
1594
1595   def Exec(self, feedback_fn):
1596     """Return cluster config.
1597
1598     """
1599     result = {
1600       "name": self.sstore.GetClusterName(),
1601       "software_version": constants.RELEASE_VERSION,
1602       "protocol_version": constants.PROTOCOL_VERSION,
1603       "config_version": constants.CONFIG_VERSION,
1604       "os_api_version": constants.OS_API_VERSION,
1605       "export_version": constants.EXPORT_VERSION,
1606       "master": self.sstore.GetMasterNode(),
1607       "architecture": (platform.architecture()[0], platform.machine()),
1608       }
1609
1610     return result
1611
1612
1613 class LUClusterCopyFile(NoHooksLU):
1614   """Copy file to cluster.
1615
1616   """
1617   _OP_REQP = ["nodes", "filename"]
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     It should check that the named file exists and that the given list
1623     of nodes is valid.
1624
1625     """
1626     if not os.path.exists(self.op.filename):
1627       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628
1629     self.nodes = _GetWantedNodes(self, self.op.nodes)
1630
1631   def Exec(self, feedback_fn):
1632     """Copy a file from master to some nodes.
1633
1634     Args:
1635       opts - class with options as members
1636       args - list containing a single element, the file name
1637     Opts used:
1638       nodes - list containing the name of target nodes; if empty, all nodes
1639
1640     """
1641     filename = self.op.filename
1642
1643     myname = socket.gethostname()
1644
1645     for node in self.nodes:
1646       if node == myname:
1647         continue
1648       if not ssh.CopyFileToNode(node, filename):
1649         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650
1651
1652 class LUDumpClusterConfig(NoHooksLU):
1653   """Return a text-representation of the cluster-config.
1654
1655   """
1656   _OP_REQP = []
1657
1658   def CheckPrereq(self):
1659     """No prerequisites.
1660
1661     """
1662     pass
1663
1664   def Exec(self, feedback_fn):
1665     """Dump a representation of the cluster config to the standard output.
1666
1667     """
1668     return self.cfg.DumpConfig()
1669
1670
1671 class LURunClusterCommand(NoHooksLU):
1672   """Run a command on some nodes.
1673
1674   """
1675   _OP_REQP = ["command", "nodes"]
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     It checks that the given list of nodes is valid.
1681
1682     """
1683     self.nodes = _GetWantedNodes(self, self.op.nodes)
1684
1685   def Exec(self, feedback_fn):
1686     """Run a command on some nodes.
1687
1688     """
1689     data = []
1690     for node in self.nodes:
1691       result = ssh.SSHCall(node, "root", self.op.command)
1692       data.append((node, result.output, result.exit_code))
1693
1694     return data
1695
1696
1697 class LUActivateInstanceDisks(NoHooksLU):
1698   """Bring up an instance's disks.
1699
1700   """
1701   _OP_REQP = ["instance_name"]
1702
1703   def CheckPrereq(self):
1704     """Check prerequisites.
1705
1706     This checks that the instance is in the cluster.
1707
1708     """
1709     instance = self.cfg.GetInstanceInfo(
1710       self.cfg.ExpandInstanceName(self.op.instance_name))
1711     if instance is None:
1712       raise errors.OpPrereqError("Instance '%s' not known" %
1713                                  self.op.instance_name)
1714     self.instance = instance
1715
1716
1717   def Exec(self, feedback_fn):
1718     """Activate the disks.
1719
1720     """
1721     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722     if not disks_ok:
1723       raise errors.OpExecError("Cannot activate block devices")
1724
1725     return disks_info
1726
1727
1728 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729   """Prepare the block devices for an instance.
1730
1731   This sets up the block devices on all nodes.
1732
1733   Args:
1734     instance: a ganeti.objects.Instance object
1735     ignore_secondaries: if true, errors on secondary nodes won't result
1736                         in an error return from the function
1737
1738   Returns:
1739     false if the operation failed
1740     list of (host, instance_visible_name, node_visible_name) if the operation
1741          suceeded with the mapping from node devices to instance devices
1742   """
1743   device_info = []
1744   disks_ok = True
1745   for inst_disk in instance.disks:
1746     master_result = None
1747     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748       cfg.SetDiskID(node_disk, node)
1749       is_primary = node == instance.primary_node
1750       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751       if not result:
1752         logger.Error("could not prepare block device %s on node %s (is_pri"
1753                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754         if is_primary or not ignore_secondaries:
1755           disks_ok = False
1756       if is_primary:
1757         master_result = result
1758     device_info.append((instance.primary_node, inst_disk.iv_name,
1759                         master_result))
1760
1761   return disks_ok, device_info
1762
1763
1764 def _StartInstanceDisks(cfg, instance, force):
1765   """Start the disks of an instance.
1766
1767   """
1768   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769                                            ignore_secondaries=force)
1770   if not disks_ok:
1771     _ShutdownInstanceDisks(instance, cfg)
1772     if force is not None and not force:
1773       logger.Error("If the message above refers to a secondary node,"
1774                    " you can retry the operation using '--force'.")
1775     raise errors.OpExecError("Disk consistency error")
1776
1777
1778 class LUDeactivateInstanceDisks(NoHooksLU):
1779   """Shutdown an instance's disks.
1780
1781   """
1782   _OP_REQP = ["instance_name"]
1783
1784   def CheckPrereq(self):
1785     """Check prerequisites.
1786
1787     This checks that the instance is in the cluster.
1788
1789     """
1790     instance = self.cfg.GetInstanceInfo(
1791       self.cfg.ExpandInstanceName(self.op.instance_name))
1792     if instance is None:
1793       raise errors.OpPrereqError("Instance '%s' not known" %
1794                                  self.op.instance_name)
1795     self.instance = instance
1796
1797   def Exec(self, feedback_fn):
1798     """Deactivate the disks
1799
1800     """
1801     instance = self.instance
1802     ins_l = rpc.call_instance_list([instance.primary_node])
1803     ins_l = ins_l[instance.primary_node]
1804     if not type(ins_l) is list:
1805       raise errors.OpExecError("Can't contact node '%s'" %
1806                                instance.primary_node)
1807
1808     if self.instance.name in ins_l:
1809       raise errors.OpExecError("Instance is running, can't shutdown"
1810                                " block devices.")
1811
1812     _ShutdownInstanceDisks(instance, self.cfg)
1813
1814
1815 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816   """Shutdown block devices of an instance.
1817
1818   This does the shutdown on all nodes of the instance.
1819
1820   If the ignore_primary is false, errors on the primary node are
1821   ignored.
1822
1823   """
1824   result = True
1825   for disk in instance.disks:
1826     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827       cfg.SetDiskID(top_disk, node)
1828       if not rpc.call_blockdev_shutdown(node, top_disk):
1829         logger.Error("could not shutdown block device %s on node %s" %
1830                      (disk.iv_name, node))
1831         if not ignore_primary or node != instance.primary_node:
1832           result = False
1833   return result
1834
1835
1836 class LUStartupInstance(LogicalUnit):
1837   """Starts an instance.
1838
1839   """
1840   HPATH = "instance-start"
1841   HTYPE = constants.HTYPE_INSTANCE
1842   _OP_REQP = ["instance_name", "force"]
1843
1844   def BuildHooksEnv(self):
1845     """Build hooks env.
1846
1847     This runs on master, primary and secondary nodes of the instance.
1848
1849     """
1850     env = {
1851       "FORCE": self.op.force,
1852       }
1853     env.update(_BuildInstanceHookEnvByObject(self.instance))
1854     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855           list(self.instance.secondary_nodes))
1856     return env, nl, nl
1857
1858   def CheckPrereq(self):
1859     """Check prerequisites.
1860
1861     This checks that the instance is in the cluster.
1862
1863     """
1864     instance = self.cfg.GetInstanceInfo(
1865       self.cfg.ExpandInstanceName(self.op.instance_name))
1866     if instance is None:
1867       raise errors.OpPrereqError("Instance '%s' not known" %
1868                                  self.op.instance_name)
1869
1870     # check bridges existance
1871     brlist = [nic.bridge for nic in instance.nics]
1872     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873       raise errors.OpPrereqError("one or more target bridges %s does not"
1874                                  " exist on destination node '%s'" %
1875                                  (brlist, instance.primary_node))
1876
1877     self.instance = instance
1878     self.op.instance_name = instance.name
1879
1880   def Exec(self, feedback_fn):
1881     """Start the instance.
1882
1883     """
1884     instance = self.instance
1885     force = self.op.force
1886     extra_args = getattr(self.op, "extra_args", "")
1887
1888     node_current = instance.primary_node
1889
1890     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891     if not nodeinfo:
1892       raise errors.OpExecError("Could not contact node %s for infos" %
1893                                (node_current))
1894
1895     freememory = nodeinfo[node_current]['memory_free']
1896     memory = instance.memory
1897     if memory > freememory:
1898       raise errors.OpExecError("Not enough memory to start instance"
1899                                " %s on node %s"
1900                                " needed %s MiB, available %s MiB" %
1901                                (instance.name, node_current, memory,
1902                                 freememory))
1903
1904     _StartInstanceDisks(self.cfg, instance, force)
1905
1906     if not rpc.call_instance_start(node_current, instance, extra_args):
1907       _ShutdownInstanceDisks(instance, self.cfg)
1908       raise errors.OpExecError("Could not start instance")
1909
1910     self.cfg.MarkInstanceUp(instance.name)
1911
1912
1913 class LUShutdownInstance(LogicalUnit):
1914   """Shutdown an instance.
1915
1916   """
1917   HPATH = "instance-stop"
1918   HTYPE = constants.HTYPE_INSTANCE
1919   _OP_REQP = ["instance_name"]
1920
1921   def BuildHooksEnv(self):
1922     """Build hooks env.
1923
1924     This runs on master, primary and secondary nodes of the instance.
1925
1926     """
1927     env = _BuildInstanceHookEnvByObject(self.instance)
1928     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929           list(self.instance.secondary_nodes))
1930     return env, nl, nl
1931
1932   def CheckPrereq(self):
1933     """Check prerequisites.
1934
1935     This checks that the instance is in the cluster.
1936
1937     """
1938     instance = self.cfg.GetInstanceInfo(
1939       self.cfg.ExpandInstanceName(self.op.instance_name))
1940     if instance is None:
1941       raise errors.OpPrereqError("Instance '%s' not known" %
1942                                  self.op.instance_name)
1943     self.instance = instance
1944
1945   def Exec(self, feedback_fn):
1946     """Shutdown the instance.
1947
1948     """
1949     instance = self.instance
1950     node_current = instance.primary_node
1951     if not rpc.call_instance_shutdown(node_current, instance):
1952       logger.Error("could not shutdown instance")
1953
1954     self.cfg.MarkInstanceDown(instance.name)
1955     _ShutdownInstanceDisks(instance, self.cfg)
1956
1957
1958 class LUReinstallInstance(LogicalUnit):
1959   """Reinstall an instance.
1960
1961   """
1962   HPATH = "instance-reinstall"
1963   HTYPE = constants.HTYPE_INSTANCE
1964   _OP_REQP = ["instance_name"]
1965
1966   def BuildHooksEnv(self):
1967     """Build hooks env.
1968
1969     This runs on master, primary and secondary nodes of the instance.
1970
1971     """
1972     env = _BuildInstanceHookEnvByObject(self.instance)
1973     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974           list(self.instance.secondary_nodes))
1975     return env, nl, nl
1976
1977   def CheckPrereq(self):
1978     """Check prerequisites.
1979
1980     This checks that the instance is in the cluster and is not running.
1981
1982     """
1983     instance = self.cfg.GetInstanceInfo(
1984       self.cfg.ExpandInstanceName(self.op.instance_name))
1985     if instance is None:
1986       raise errors.OpPrereqError("Instance '%s' not known" %
1987                                  self.op.instance_name)
1988     if instance.disk_template == constants.DT_DISKLESS:
1989       raise errors.OpPrereqError("Instance '%s' has no disks" %
1990                                  self.op.instance_name)
1991     if instance.status != "down":
1992       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993                                  self.op.instance_name)
1994     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995     if remote_info:
1996       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997                                  (self.op.instance_name,
1998                                   instance.primary_node))
1999
2000     self.op.os_type = getattr(self.op, "os_type", None)
2001     if self.op.os_type is not None:
2002       # OS verification
2003       pnode = self.cfg.GetNodeInfo(
2004         self.cfg.ExpandNodeName(instance.primary_node))
2005       if pnode is None:
2006         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007                                    self.op.pnode)
2008       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009       if not isinstance(os_obj, objects.OS):
2010         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011                                    " primary node"  % self.op.os_type)
2012
2013     self.instance = instance
2014
2015   def Exec(self, feedback_fn):
2016     """Reinstall the instance.
2017
2018     """
2019     inst = self.instance
2020
2021     if self.op.os_type is not None:
2022       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023       inst.os = self.op.os_type
2024       self.cfg.AddInstance(inst)
2025
2026     _StartInstanceDisks(self.cfg, inst, None)
2027     try:
2028       feedback_fn("Running the instance OS create scripts...")
2029       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030         raise errors.OpExecError("Could not install OS for instance %s "
2031                                  "on node %s" %
2032                                  (inst.name, inst.primary_node))
2033     finally:
2034       _ShutdownInstanceDisks(inst, self.cfg)
2035
2036
2037 class LURemoveInstance(LogicalUnit):
2038   """Remove an instance.
2039
2040   """
2041   HPATH = "instance-remove"
2042   HTYPE = constants.HTYPE_INSTANCE
2043   _OP_REQP = ["instance_name"]
2044
2045   def BuildHooksEnv(self):
2046     """Build hooks env.
2047
2048     This runs on master, primary and secondary nodes of the instance.
2049
2050     """
2051     env = _BuildInstanceHookEnvByObject(self.instance)
2052     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053           list(self.instance.secondary_nodes))
2054     return env, nl, nl
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     This checks that the instance is in the cluster.
2060
2061     """
2062     instance = self.cfg.GetInstanceInfo(
2063       self.cfg.ExpandInstanceName(self.op.instance_name))
2064     if instance is None:
2065       raise errors.OpPrereqError("Instance '%s' not known" %
2066                                  self.op.instance_name)
2067     self.instance = instance
2068
2069   def Exec(self, feedback_fn):
2070     """Remove the instance.
2071
2072     """
2073     instance = self.instance
2074     logger.Info("shutting down instance %s on node %s" %
2075                 (instance.name, instance.primary_node))
2076
2077     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2078       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2079                                (instance.name, instance.primary_node))
2080
2081     logger.Info("removing block devices for instance %s" % instance.name)
2082
2083     _RemoveDisks(instance, self.cfg)
2084
2085     logger.Info("removing instance %s out of cluster config" % instance.name)
2086
2087     self.cfg.RemoveInstance(instance.name)
2088
2089
2090 class LUQueryInstances(NoHooksLU):
2091   """Logical unit for querying instances.
2092
2093   """
2094   _OP_REQP = ["output_fields"]
2095
2096   def CheckPrereq(self):
2097     """Check prerequisites.
2098
2099     This checks that the fields required are valid output fields.
2100
2101     """
2102     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2103     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2104                                "admin_state", "admin_ram",
2105                                "disk_template", "ip", "mac", "bridge",
2106                                "sda_size", "sdb_size"],
2107                        dynamic=self.dynamic_fields,
2108                        selected=self.op.output_fields)
2109
2110   def Exec(self, feedback_fn):
2111     """Computes the list of nodes and their attributes.
2112
2113     """
2114     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2115     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2116                      in instance_names]
2117
2118     # begin data gathering
2119
2120     nodes = frozenset([inst.primary_node for inst in instance_list])
2121
2122     bad_nodes = []
2123     if self.dynamic_fields.intersection(self.op.output_fields):
2124       live_data = {}
2125       node_data = rpc.call_all_instances_info(nodes)
2126       for name in nodes:
2127         result = node_data[name]
2128         if result:
2129           live_data.update(result)
2130         elif result == False:
2131           bad_nodes.append(name)
2132         # else no instance is alive
2133     else:
2134       live_data = dict([(name, {}) for name in instance_names])
2135
2136     # end data gathering
2137
2138     output = []
2139     for instance in instance_list:
2140       iout = []
2141       for field in self.op.output_fields:
2142         if field == "name":
2143           val = instance.name
2144         elif field == "os":
2145           val = instance.os
2146         elif field == "pnode":
2147           val = instance.primary_node
2148         elif field == "snodes":
2149           val = list(instance.secondary_nodes)
2150         elif field == "admin_state":
2151           val = (instance.status != "down")
2152         elif field == "oper_state":
2153           if instance.primary_node in bad_nodes:
2154             val = None
2155           else:
2156             val = bool(live_data.get(instance.name))
2157         elif field == "admin_ram":
2158           val = instance.memory
2159         elif field == "oper_ram":
2160           if instance.primary_node in bad_nodes:
2161             val = None
2162           elif instance.name in live_data:
2163             val = live_data[instance.name].get("memory", "?")
2164           else:
2165             val = "-"
2166         elif field == "disk_template":
2167           val = instance.disk_template
2168         elif field == "ip":
2169           val = instance.nics[0].ip
2170         elif field == "bridge":
2171           val = instance.nics[0].bridge
2172         elif field == "mac":
2173           val = instance.nics[0].mac
2174         elif field == "sda_size" or field == "sdb_size":
2175           disk = instance.FindDisk(field[:3])
2176           if disk is None:
2177             val = None
2178           else:
2179             val = disk.size
2180         else:
2181           raise errors.ParameterError(field)
2182         iout.append(val)
2183       output.append(iout)
2184
2185     return output
2186
2187
2188 class LUFailoverInstance(LogicalUnit):
2189   """Failover an instance.
2190
2191   """
2192   HPATH = "instance-failover"
2193   HTYPE = constants.HTYPE_INSTANCE
2194   _OP_REQP = ["instance_name", "ignore_consistency"]
2195
2196   def BuildHooksEnv(self):
2197     """Build hooks env.
2198
2199     This runs on master, primary and secondary nodes of the instance.
2200
2201     """
2202     env = {
2203       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2204       }
2205     env.update(_BuildInstanceHookEnvByObject(self.instance))
2206     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2207     return env, nl, nl
2208
2209   def CheckPrereq(self):
2210     """Check prerequisites.
2211
2212     This checks that the instance is in the cluster.
2213
2214     """
2215     instance = self.cfg.GetInstanceInfo(
2216       self.cfg.ExpandInstanceName(self.op.instance_name))
2217     if instance is None:
2218       raise errors.OpPrereqError("Instance '%s' not known" %
2219                                  self.op.instance_name)
2220
2221     if instance.disk_template != constants.DT_REMOTE_RAID1:
2222       raise errors.OpPrereqError("Instance's disk layout is not"
2223                                  " remote_raid1.")
2224
2225     secondary_nodes = instance.secondary_nodes
2226     if not secondary_nodes:
2227       raise errors.ProgrammerError("no secondary node but using "
2228                                    "DT_REMOTE_RAID1 template")
2229
2230     # check memory requirements on the secondary node
2231     target_node = secondary_nodes[0]
2232     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2233     info = nodeinfo.get(target_node, None)
2234     if not info:
2235       raise errors.OpPrereqError("Cannot get current information"
2236                                  " from node '%s'" % nodeinfo)
2237     if instance.memory > info['memory_free']:
2238       raise errors.OpPrereqError("Not enough memory on target node %s."
2239                                  " %d MB available, %d MB required" %
2240                                  (target_node, info['memory_free'],
2241                                   instance.memory))
2242
2243     # check bridge existance
2244     brlist = [nic.bridge for nic in instance.nics]
2245     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2246       raise errors.OpPrereqError("One or more target bridges %s does not"
2247                                  " exist on destination node '%s'" %
2248                                  (brlist, instance.primary_node))
2249
2250     self.instance = instance
2251
2252   def Exec(self, feedback_fn):
2253     """Failover an instance.
2254
2255     The failover is done by shutting it down on its present node and
2256     starting it on the secondary.
2257
2258     """
2259     instance = self.instance
2260
2261     source_node = instance.primary_node
2262     target_node = instance.secondary_nodes[0]
2263
2264     feedback_fn("* checking disk consistency between source and target")
2265     for dev in instance.disks:
2266       # for remote_raid1, these are md over drbd
2267       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2268         if not self.op.ignore_consistency:
2269           raise errors.OpExecError("Disk %s is degraded on target node,"
2270                                    " aborting failover." % dev.iv_name)
2271
2272     feedback_fn("* checking target node resource availability")
2273     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2274
2275     if not nodeinfo:
2276       raise errors.OpExecError("Could not contact target node %s." %
2277                                target_node)
2278
2279     free_memory = int(nodeinfo[target_node]['memory_free'])
2280     memory = instance.memory
2281     if memory > free_memory:
2282       raise errors.OpExecError("Not enough memory to create instance %s on"
2283                                " node %s. needed %s MiB, available %s MiB" %
2284                                (instance.name, target_node, memory,
2285                                 free_memory))
2286
2287     feedback_fn("* shutting down instance on source node")
2288     logger.Info("Shutting down instance %s on node %s" %
2289                 (instance.name, source_node))
2290
2291     if not rpc.call_instance_shutdown(source_node, instance):
2292       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2293                    " anyway. Please make sure node %s is down"  %
2294                    (instance.name, source_node, source_node))
2295
2296     feedback_fn("* deactivating the instance's disks on source node")
2297     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2298       raise errors.OpExecError("Can't shut down the instance's disks.")
2299
2300     instance.primary_node = target_node
2301     # distribute new instance config to the other nodes
2302     self.cfg.AddInstance(instance)
2303
2304     feedback_fn("* activating the instance's disks on target node")
2305     logger.Info("Starting instance %s on node %s" %
2306                 (instance.name, target_node))
2307
2308     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2309                                              ignore_secondaries=True)
2310     if not disks_ok:
2311       _ShutdownInstanceDisks(instance, self.cfg)
2312       raise errors.OpExecError("Can't activate the instance's disks")
2313
2314     feedback_fn("* starting the instance on the target node")
2315     if not rpc.call_instance_start(target_node, instance, None):
2316       _ShutdownInstanceDisks(instance, self.cfg)
2317       raise errors.OpExecError("Could not start instance %s on node %s." %
2318                                (instance.name, target_node))
2319
2320
2321 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2322   """Create a tree of block devices on the primary node.
2323
2324   This always creates all devices.
2325
2326   """
2327   if device.children:
2328     for child in device.children:
2329       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2330         return False
2331
2332   cfg.SetDiskID(device, node)
2333   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2334   if not new_id:
2335     return False
2336   if device.physical_id is None:
2337     device.physical_id = new_id
2338   return True
2339
2340
2341 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2342   """Create a tree of block devices on a secondary node.
2343
2344   If this device type has to be created on secondaries, create it and
2345   all its children.
2346
2347   If not, just recurse to children keeping the same 'force' value.
2348
2349   """
2350   if device.CreateOnSecondary():
2351     force = True
2352   if device.children:
2353     for child in device.children:
2354       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2355         return False
2356
2357   if not force:
2358     return True
2359   cfg.SetDiskID(device, node)
2360   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2361   if not new_id:
2362     return False
2363   if device.physical_id is None:
2364     device.physical_id = new_id
2365   return True
2366
2367
2368 def _GenerateUniqueNames(cfg, exts):
2369   """Generate a suitable LV name.
2370
2371   This will generate a logical volume name for the given instance.
2372
2373   """
2374   results = []
2375   for val in exts:
2376     new_id = cfg.GenerateUniqueID()
2377     results.append("%s%s" % (new_id, val))
2378   return results
2379
2380
2381 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2382   """Generate a drbd device complete with its children.
2383
2384   """
2385   port = cfg.AllocatePort()
2386   vgname = cfg.GetVGName()
2387   dev_data = objects.Disk(dev_type="lvm", size=size,
2388                           logical_id=(vgname, names[0]))
2389   dev_meta = objects.Disk(dev_type="lvm", size=128,
2390                           logical_id=(vgname, names[1]))
2391   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2392                           logical_id = (primary, secondary, port),
2393                           children = [dev_data, dev_meta])
2394   return drbd_dev
2395
2396
2397 def _GenerateDiskTemplate(cfg, template_name,
2398                           instance_name, primary_node,
2399                           secondary_nodes, disk_sz, swap_sz):
2400   """Generate the entire disk layout for a given template type.
2401
2402   """
2403   #TODO: compute space requirements
2404
2405   vgname = cfg.GetVGName()
2406   if template_name == "diskless":
2407     disks = []
2408   elif template_name == "plain":
2409     if len(secondary_nodes) != 0:
2410       raise errors.ProgrammerError("Wrong template configuration")
2411
2412     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2413     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2414                            logical_id=(vgname, names[0]),
2415                            iv_name = "sda")
2416     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2417                            logical_id=(vgname, names[1]),
2418                            iv_name = "sdb")
2419     disks = [sda_dev, sdb_dev]
2420   elif template_name == "local_raid1":
2421     if len(secondary_nodes) != 0:
2422       raise errors.ProgrammerError("Wrong template configuration")
2423
2424
2425     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2426                                        ".sdb_m1", ".sdb_m2"])
2427     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2428                               logical_id=(vgname, names[0]))
2429     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2430                               logical_id=(vgname, names[1]))
2431     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2432                               size=disk_sz,
2433                               children = [sda_dev_m1, sda_dev_m2])
2434     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2435                               logical_id=(vgname, names[2]))
2436     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2437                               logical_id=(vgname, names[3]))
2438     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2439                               size=swap_sz,
2440                               children = [sdb_dev_m1, sdb_dev_m2])
2441     disks = [md_sda_dev, md_sdb_dev]
2442   elif template_name == constants.DT_REMOTE_RAID1:
2443     if len(secondary_nodes) != 1:
2444       raise errors.ProgrammerError("Wrong template configuration")
2445     remote_node = secondary_nodes[0]
2446     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2447                                        ".sdb_data", ".sdb_meta"])
2448     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2449                                          disk_sz, names[0:2])
2450     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2451                               children = [drbd_sda_dev], size=disk_sz)
2452     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2453                                          swap_sz, names[2:4])
2454     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2455                               children = [drbd_sdb_dev], size=swap_sz)
2456     disks = [md_sda_dev, md_sdb_dev]
2457   else:
2458     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2459   return disks
2460
2461
2462 def _GetInstanceInfoText(instance):
2463   """Compute that text that should be added to the disk's metadata.
2464
2465   """
2466   return "originstname+%s" % instance.name
2467
2468
2469 def _CreateDisks(cfg, instance):
2470   """Create all disks for an instance.
2471
2472   This abstracts away some work from AddInstance.
2473
2474   Args:
2475     instance: the instance object
2476
2477   Returns:
2478     True or False showing the success of the creation process
2479
2480   """
2481   info = _GetInstanceInfoText(instance)
2482
2483   for device in instance.disks:
2484     logger.Info("creating volume %s for instance %s" %
2485               (device.iv_name, instance.name))
2486     #HARDCODE
2487     for secondary_node in instance.secondary_nodes:
2488       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2489                                         info):
2490         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2491                      (device.iv_name, device, secondary_node))
2492         return False
2493     #HARDCODE
2494     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2495       logger.Error("failed to create volume %s on primary!" %
2496                    device.iv_name)
2497       return False
2498   return True
2499
2500
2501 def _RemoveDisks(instance, cfg):
2502   """Remove all disks for an instance.
2503
2504   This abstracts away some work from `AddInstance()` and
2505   `RemoveInstance()`. Note that in case some of the devices couldn't
2506   be remove, the removal will continue with the other ones (compare
2507   with `_CreateDisks()`).
2508
2509   Args:
2510     instance: the instance object
2511
2512   Returns:
2513     True or False showing the success of the removal proces
2514
2515   """
2516   logger.Info("removing block devices for instance %s" % instance.name)
2517
2518   result = True
2519   for device in instance.disks:
2520     for node, disk in device.ComputeNodeTree(instance.primary_node):
2521       cfg.SetDiskID(disk, node)
2522       if not rpc.call_blockdev_remove(node, disk):
2523         logger.Error("could not remove block device %s on node %s,"
2524                      " continuing anyway" %
2525                      (device.iv_name, node))
2526         result = False
2527   return result
2528
2529
2530 class LUCreateInstance(LogicalUnit):
2531   """Create an instance.
2532
2533   """
2534   HPATH = "instance-add"
2535   HTYPE = constants.HTYPE_INSTANCE
2536   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2537               "disk_template", "swap_size", "mode", "start", "vcpus",
2538               "wait_for_sync"]
2539
2540   def BuildHooksEnv(self):
2541     """Build hooks env.
2542
2543     This runs on master, primary and secondary nodes of the instance.
2544
2545     """
2546     env = {
2547       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2548       "INSTANCE_DISK_SIZE": self.op.disk_size,
2549       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2550       "INSTANCE_ADD_MODE": self.op.mode,
2551       }
2552     if self.op.mode == constants.INSTANCE_IMPORT:
2553       env["INSTANCE_SRC_NODE"] = self.op.src_node
2554       env["INSTANCE_SRC_PATH"] = self.op.src_path
2555       env["INSTANCE_SRC_IMAGE"] = self.src_image
2556
2557     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2558       primary_node=self.op.pnode,
2559       secondary_nodes=self.secondaries,
2560       status=self.instance_status,
2561       os_type=self.op.os_type,
2562       memory=self.op.mem_size,
2563       vcpus=self.op.vcpus,
2564       nics=[(self.inst_ip, self.op.bridge)],
2565     ))
2566
2567     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2568           self.secondaries)
2569     return env, nl, nl
2570
2571
2572   def CheckPrereq(self):
2573     """Check prerequisites.
2574
2575     """
2576     if self.op.mode not in (constants.INSTANCE_CREATE,
2577                             constants.INSTANCE_IMPORT):
2578       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2579                                  self.op.mode)
2580
2581     if self.op.mode == constants.INSTANCE_IMPORT:
2582       src_node = getattr(self.op, "src_node", None)
2583       src_path = getattr(self.op, "src_path", None)
2584       if src_node is None or src_path is None:
2585         raise errors.OpPrereqError("Importing an instance requires source"
2586                                    " node and path options")
2587       src_node_full = self.cfg.ExpandNodeName(src_node)
2588       if src_node_full is None:
2589         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2590       self.op.src_node = src_node = src_node_full
2591
2592       if not os.path.isabs(src_path):
2593         raise errors.OpPrereqError("The source path must be absolute")
2594
2595       export_info = rpc.call_export_info(src_node, src_path)
2596
2597       if not export_info:
2598         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2599
2600       if not export_info.has_section(constants.INISECT_EXP):
2601         raise errors.ProgrammerError("Corrupted export config")
2602
2603       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2604       if (int(ei_version) != constants.EXPORT_VERSION):
2605         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2606                                    (ei_version, constants.EXPORT_VERSION))
2607
2608       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2609         raise errors.OpPrereqError("Can't import instance with more than"
2610                                    " one data disk")
2611
2612       # FIXME: are the old os-es, disk sizes, etc. useful?
2613       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2614       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2615                                                          'disk0_dump'))
2616       self.src_image = diskimage
2617     else: # INSTANCE_CREATE
2618       if getattr(self.op, "os_type", None) is None:
2619         raise errors.OpPrereqError("No guest OS specified")
2620
2621     # check primary node
2622     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2623     if pnode is None:
2624       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2625                                  self.op.pnode)
2626     self.op.pnode = pnode.name
2627     self.pnode = pnode
2628     self.secondaries = []
2629     # disk template and mirror node verification
2630     if self.op.disk_template not in constants.DISK_TEMPLATES:
2631       raise errors.OpPrereqError("Invalid disk template name")
2632
2633     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2634       if getattr(self.op, "snode", None) is None:
2635         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2636                                    " a mirror node")
2637
2638       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2639       if snode_name is None:
2640         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2641                                    self.op.snode)
2642       elif snode_name == pnode.name:
2643         raise errors.OpPrereqError("The secondary node cannot be"
2644                                    " the primary node.")
2645       self.secondaries.append(snode_name)
2646
2647     # Check lv size requirements
2648     nodenames = [pnode.name] + self.secondaries
2649     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2650
2651     # Required free disk space as a function of disk and swap space
2652     req_size_dict = {
2653       constants.DT_DISKLESS: 0,
2654       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2655       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2656       # 256 MB are added for drbd metadata, 128MB for each drbd device
2657       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2658     }
2659
2660     if self.op.disk_template not in req_size_dict:
2661       raise errors.ProgrammerError("Disk template '%s' size requirement"
2662                                    " is unknown" %  self.op.disk_template)
2663
2664     req_size = req_size_dict[self.op.disk_template]
2665
2666     for node in nodenames:
2667       info = nodeinfo.get(node, None)
2668       if not info:
2669         raise errors.OpPrereqError("Cannot get current information"
2670                                    " from node '%s'" % nodeinfo)
2671       if req_size > info['vg_free']:
2672         raise errors.OpPrereqError("Not enough disk space on target node %s."
2673                                    " %d MB available, %d MB required" %
2674                                    (node, info['vg_free'], req_size))
2675
2676     # os verification
2677     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2678     if not isinstance(os_obj, objects.OS):
2679       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2680                                  " primary node"  % self.op.os_type)
2681
2682     # instance verification
2683     hostname1 = utils.LookupHostname(self.op.instance_name)
2684     if not hostname1:
2685       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2686                                  self.op.instance_name)
2687
2688     self.op.instance_name = instance_name = hostname1['hostname']
2689     instance_list = self.cfg.GetInstanceList()
2690     if instance_name in instance_list:
2691       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2692                                  instance_name)
2693
2694     ip = getattr(self.op, "ip", None)
2695     if ip is None or ip.lower() == "none":
2696       inst_ip = None
2697     elif ip.lower() == "auto":
2698       inst_ip = hostname1['ip']
2699     else:
2700       if not utils.IsValidIP(ip):
2701         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2702                                    " like a valid IP" % ip)
2703       inst_ip = ip
2704     self.inst_ip = inst_ip
2705
2706     command = ["fping", "-q", hostname1['ip']]
2707     result = utils.RunCmd(command)
2708     if not result.failed:
2709       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2710                                  (hostname1['ip'], instance_name))
2711
2712     # bridge verification
2713     bridge = getattr(self.op, "bridge", None)
2714     if bridge is None:
2715       self.op.bridge = self.cfg.GetDefBridge()
2716     else:
2717       self.op.bridge = bridge
2718
2719     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2720       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2721                                  " destination node '%s'" %
2722                                  (self.op.bridge, pnode.name))
2723
2724     if self.op.start:
2725       self.instance_status = 'up'
2726     else:
2727       self.instance_status = 'down'
2728
2729   def Exec(self, feedback_fn):
2730     """Create and add the instance to the cluster.
2731
2732     """
2733     instance = self.op.instance_name
2734     pnode_name = self.pnode.name
2735
2736     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2737     if self.inst_ip is not None:
2738       nic.ip = self.inst_ip
2739
2740     disks = _GenerateDiskTemplate(self.cfg,
2741                                   self.op.disk_template,
2742                                   instance, pnode_name,
2743                                   self.secondaries, self.op.disk_size,
2744                                   self.op.swap_size)
2745
2746     iobj = objects.Instance(name=instance, os=self.op.os_type,
2747                             primary_node=pnode_name,
2748                             memory=self.op.mem_size,
2749                             vcpus=self.op.vcpus,
2750                             nics=[nic], disks=disks,
2751                             disk_template=self.op.disk_template,
2752                             status=self.instance_status,
2753                             )
2754
2755     feedback_fn("* creating instance disks...")
2756     if not _CreateDisks(self.cfg, iobj):
2757       _RemoveDisks(iobj, self.cfg)
2758       raise errors.OpExecError("Device creation failed, reverting...")
2759
2760     feedback_fn("adding instance %s to cluster config" % instance)
2761
2762     self.cfg.AddInstance(iobj)
2763
2764     if self.op.wait_for_sync:
2765       disk_abort = not _WaitForSync(self.cfg, iobj)
2766     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2767       # make sure the disks are not degraded (still sync-ing is ok)
2768       time.sleep(15)
2769       feedback_fn("* checking mirrors status")
2770       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2771     else:
2772       disk_abort = False
2773
2774     if disk_abort:
2775       _RemoveDisks(iobj, self.cfg)
2776       self.cfg.RemoveInstance(iobj.name)
2777       raise errors.OpExecError("There are some degraded disks for"
2778                                " this instance")
2779
2780     feedback_fn("creating os for instance %s on node %s" %
2781                 (instance, pnode_name))
2782
2783     if iobj.disk_template != constants.DT_DISKLESS:
2784       if self.op.mode == constants.INSTANCE_CREATE:
2785         feedback_fn("* running the instance OS create scripts...")
2786         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2787           raise errors.OpExecError("could not add os for instance %s"
2788                                    " on node %s" %
2789                                    (instance, pnode_name))
2790
2791       elif self.op.mode == constants.INSTANCE_IMPORT:
2792         feedback_fn("* running the instance OS import scripts...")
2793         src_node = self.op.src_node
2794         src_image = self.src_image
2795         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2796                                                 src_node, src_image):
2797           raise errors.OpExecError("Could not import os for instance"
2798                                    " %s on node %s" %
2799                                    (instance, pnode_name))
2800       else:
2801         # also checked in the prereq part
2802         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2803                                      % self.op.mode)
2804
2805     if self.op.start:
2806       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2807       feedback_fn("* starting instance...")
2808       if not rpc.call_instance_start(pnode_name, iobj, None):
2809         raise errors.OpExecError("Could not start instance")
2810
2811
2812 class LUConnectConsole(NoHooksLU):
2813   """Connect to an instance's console.
2814
2815   This is somewhat special in that it returns the command line that
2816   you need to run on the master node in order to connect to the
2817   console.
2818
2819   """
2820   _OP_REQP = ["instance_name"]
2821
2822   def CheckPrereq(self):
2823     """Check prerequisites.
2824
2825     This checks that the instance is in the cluster.
2826
2827     """
2828     instance = self.cfg.GetInstanceInfo(
2829       self.cfg.ExpandInstanceName(self.op.instance_name))
2830     if instance is None:
2831       raise errors.OpPrereqError("Instance '%s' not known" %
2832                                  self.op.instance_name)
2833     self.instance = instance
2834
2835   def Exec(self, feedback_fn):
2836     """Connect to the console of an instance
2837
2838     """
2839     instance = self.instance
2840     node = instance.primary_node
2841
2842     node_insts = rpc.call_instance_list([node])[node]
2843     if node_insts is False:
2844       raise errors.OpExecError("Can't connect to node %s." % node)
2845
2846     if instance.name not in node_insts:
2847       raise errors.OpExecError("Instance %s is not running." % instance.name)
2848
2849     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2850
2851     hyper = hypervisor.GetHypervisor()
2852     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2853     # build ssh cmdline
2854     argv = ["ssh", "-q", "-t"]
2855     argv.extend(ssh.KNOWN_HOSTS_OPTS)
2856     argv.extend(ssh.BATCH_MODE_OPTS)
2857     argv.append(node)
2858     argv.append(console_cmd)
2859     return "ssh", argv
2860
2861
2862 class LUAddMDDRBDComponent(LogicalUnit):
2863   """Adda new mirror member to an instance's disk.
2864
2865   """
2866   HPATH = "mirror-add"
2867   HTYPE = constants.HTYPE_INSTANCE
2868   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2869
2870   def BuildHooksEnv(self):
2871     """Build hooks env.
2872
2873     This runs on the master, the primary and all the secondaries.
2874
2875     """
2876     env = {
2877       "NEW_SECONDARY": self.op.remote_node,
2878       "DISK_NAME": self.op.disk_name,
2879       }
2880     env.update(_BuildInstanceHookEnvByObject(self.instance))
2881     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2882           self.op.remote_node,] + list(self.instance.secondary_nodes)
2883     return env, nl, nl
2884
2885   def CheckPrereq(self):
2886     """Check prerequisites.
2887
2888     This checks that the instance is in the cluster.
2889
2890     """
2891     instance = self.cfg.GetInstanceInfo(
2892       self.cfg.ExpandInstanceName(self.op.instance_name))
2893     if instance is None:
2894       raise errors.OpPrereqError("Instance '%s' not known" %
2895                                  self.op.instance_name)
2896     self.instance = instance
2897
2898     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2899     if remote_node is None:
2900       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2901     self.remote_node = remote_node
2902
2903     if remote_node == instance.primary_node:
2904       raise errors.OpPrereqError("The specified node is the primary node of"
2905                                  " the instance.")
2906
2907     if instance.disk_template != constants.DT_REMOTE_RAID1:
2908       raise errors.OpPrereqError("Instance's disk layout is not"
2909                                  " remote_raid1.")
2910     for disk in instance.disks:
2911       if disk.iv_name == self.op.disk_name:
2912         break
2913     else:
2914       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2915                                  " instance." % self.op.disk_name)
2916     if len(disk.children) > 1:
2917       raise errors.OpPrereqError("The device already has two slave"
2918                                  " devices.\n"
2919                                  "This would create a 3-disk raid1"
2920                                  " which we don't allow.")
2921     self.disk = disk
2922
2923   def Exec(self, feedback_fn):
2924     """Add the mirror component
2925
2926     """
2927     disk = self.disk
2928     instance = self.instance
2929
2930     remote_node = self.remote_node
2931     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2932     names = _GenerateUniqueNames(self.cfg, lv_names)
2933     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2934                                      remote_node, disk.size, names)
2935
2936     logger.Info("adding new mirror component on secondary")
2937     #HARDCODE
2938     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2939                                       _GetInstanceInfoText(instance)):
2940       raise errors.OpExecError("Failed to create new component on secondary"
2941                                " node %s" % remote_node)
2942
2943     logger.Info("adding new mirror component on primary")
2944     #HARDCODE
2945     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2946                                     _GetInstanceInfoText(instance)):
2947       # remove secondary dev
2948       self.cfg.SetDiskID(new_drbd, remote_node)
2949       rpc.call_blockdev_remove(remote_node, new_drbd)
2950       raise errors.OpExecError("Failed to create volume on primary")
2951
2952     # the device exists now
2953     # call the primary node to add the mirror to md
2954     logger.Info("adding new mirror component to md")
2955     if not rpc.call_blockdev_addchild(instance.primary_node,
2956                                            disk, new_drbd):
2957       logger.Error("Can't add mirror compoment to md!")
2958       self.cfg.SetDiskID(new_drbd, remote_node)
2959       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2960         logger.Error("Can't rollback on secondary")
2961       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2962       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2963         logger.Error("Can't rollback on primary")
2964       raise errors.OpExecError("Can't add mirror component to md array")
2965
2966     disk.children.append(new_drbd)
2967
2968     self.cfg.AddInstance(instance)
2969
2970     _WaitForSync(self.cfg, instance)
2971
2972     return 0
2973
2974
2975 class LURemoveMDDRBDComponent(LogicalUnit):
2976   """Remove a component from a remote_raid1 disk.
2977
2978   """
2979   HPATH = "mirror-remove"
2980   HTYPE = constants.HTYPE_INSTANCE
2981   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2982
2983   def BuildHooksEnv(self):
2984     """Build hooks env.
2985
2986     This runs on the master, the primary and all the secondaries.
2987
2988     """
2989     env = {
2990       "DISK_NAME": self.op.disk_name,
2991       "DISK_ID": self.op.disk_id,
2992       "OLD_SECONDARY": self.old_secondary,
2993       }
2994     env.update(_BuildInstanceHookEnvByObject(self.instance))
2995     nl = [self.sstore.GetMasterNode(),
2996           self.instance.primary_node] + list(self.instance.secondary_nodes)
2997     return env, nl, nl
2998
2999   def CheckPrereq(self):
3000     """Check prerequisites.
3001
3002     This checks that the instance is in the cluster.
3003
3004     """
3005     instance = self.cfg.GetInstanceInfo(
3006       self.cfg.ExpandInstanceName(self.op.instance_name))
3007     if instance is None:
3008       raise errors.OpPrereqError("Instance '%s' not known" %
3009                                  self.op.instance_name)
3010     self.instance = instance
3011
3012     if instance.disk_template != constants.DT_REMOTE_RAID1:
3013       raise errors.OpPrereqError("Instance's disk layout is not"
3014                                  " remote_raid1.")
3015     for disk in instance.disks:
3016       if disk.iv_name == self.op.disk_name:
3017         break
3018     else:
3019       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3020                                  " instance." % self.op.disk_name)
3021     for child in disk.children:
3022       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3023         break
3024     else:
3025       raise errors.OpPrereqError("Can't find the device with this port.")
3026
3027     if len(disk.children) < 2:
3028       raise errors.OpPrereqError("Cannot remove the last component from"
3029                                  " a mirror.")
3030     self.disk = disk
3031     self.child = child
3032     if self.child.logical_id[0] == instance.primary_node:
3033       oid = 1
3034     else:
3035       oid = 0
3036     self.old_secondary = self.child.logical_id[oid]
3037
3038   def Exec(self, feedback_fn):
3039     """Remove the mirror component
3040
3041     """
3042     instance = self.instance
3043     disk = self.disk
3044     child = self.child
3045     logger.Info("remove mirror component")
3046     self.cfg.SetDiskID(disk, instance.primary_node)
3047     if not rpc.call_blockdev_removechild(instance.primary_node,
3048                                               disk, child):
3049       raise errors.OpExecError("Can't remove child from mirror.")
3050
3051     for node in child.logical_id[:2]:
3052       self.cfg.SetDiskID(child, node)
3053       if not rpc.call_blockdev_remove(node, child):
3054         logger.Error("Warning: failed to remove device from node %s,"
3055                      " continuing operation." % node)
3056
3057     disk.children.remove(child)
3058     self.cfg.AddInstance(instance)
3059
3060
3061 class LUReplaceDisks(LogicalUnit):
3062   """Replace the disks of an instance.
3063
3064   """
3065   HPATH = "mirrors-replace"
3066   HTYPE = constants.HTYPE_INSTANCE
3067   _OP_REQP = ["instance_name"]
3068
3069   def BuildHooksEnv(self):
3070     """Build hooks env.
3071
3072     This runs on the master, the primary and all the secondaries.
3073
3074     """
3075     env = {
3076       "NEW_SECONDARY": self.op.remote_node,
3077       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3078       }
3079     env.update(_BuildInstanceHookEnvByObject(self.instance))
3080     nl = [self.sstore.GetMasterNode(),
3081           self.instance.primary_node] + list(self.instance.secondary_nodes)
3082     return env, nl, nl
3083
3084   def CheckPrereq(self):
3085     """Check prerequisites.
3086
3087     This checks that the instance is in the cluster.
3088
3089     """
3090     instance = self.cfg.GetInstanceInfo(
3091       self.cfg.ExpandInstanceName(self.op.instance_name))
3092     if instance is None:
3093       raise errors.OpPrereqError("Instance '%s' not known" %
3094                                  self.op.instance_name)
3095     self.instance = instance
3096
3097     if instance.disk_template != constants.DT_REMOTE_RAID1:
3098       raise errors.OpPrereqError("Instance's disk layout is not"
3099                                  " remote_raid1.")
3100
3101     if len(instance.secondary_nodes) != 1:
3102       raise errors.OpPrereqError("The instance has a strange layout,"
3103                                  " expected one secondary but found %d" %
3104                                  len(instance.secondary_nodes))
3105
3106     remote_node = getattr(self.op, "remote_node", None)
3107     if remote_node is None:
3108       remote_node = instance.secondary_nodes[0]
3109     else:
3110       remote_node = self.cfg.ExpandNodeName(remote_node)
3111       if remote_node is None:
3112         raise errors.OpPrereqError("Node '%s' not known" %
3113                                    self.op.remote_node)
3114     if remote_node == instance.primary_node:
3115       raise errors.OpPrereqError("The specified node is the primary node of"
3116                                  " the instance.")
3117     self.op.remote_node = remote_node
3118
3119   def Exec(self, feedback_fn):
3120     """Replace the disks of an instance.
3121
3122     """
3123     instance = self.instance
3124     iv_names = {}
3125     # start of work
3126     remote_node = self.op.remote_node
3127     cfg = self.cfg
3128     for dev in instance.disks:
3129       size = dev.size
3130       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3131       names = _GenerateUniqueNames(cfg, lv_names)
3132       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3133                                        remote_node, size, names)
3134       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3135       logger.Info("adding new mirror component on secondary for %s" %
3136                   dev.iv_name)
3137       #HARDCODE
3138       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3139                                         _GetInstanceInfoText(instance)):
3140         raise errors.OpExecError("Failed to create new component on"
3141                                  " secondary node %s\n"
3142                                  "Full abort, cleanup manually!" %
3143                                  remote_node)
3144
3145       logger.Info("adding new mirror component on primary")
3146       #HARDCODE
3147       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3148                                       _GetInstanceInfoText(instance)):
3149         # remove secondary dev
3150         cfg.SetDiskID(new_drbd, remote_node)
3151         rpc.call_blockdev_remove(remote_node, new_drbd)
3152         raise errors.OpExecError("Failed to create volume on primary!\n"
3153                                  "Full abort, cleanup manually!!")
3154
3155       # the device exists now
3156       # call the primary node to add the mirror to md
3157       logger.Info("adding new mirror component to md")
3158       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3159                                         new_drbd):
3160         logger.Error("Can't add mirror compoment to md!")
3161         cfg.SetDiskID(new_drbd, remote_node)
3162         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3163           logger.Error("Can't rollback on secondary")
3164         cfg.SetDiskID(new_drbd, instance.primary_node)
3165         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3166           logger.Error("Can't rollback on primary")
3167         raise errors.OpExecError("Full abort, cleanup manually!!")
3168
3169       dev.children.append(new_drbd)
3170       cfg.AddInstance(instance)
3171
3172     # this can fail as the old devices are degraded and _WaitForSync
3173     # does a combined result over all disks, so we don't check its
3174     # return value
3175     _WaitForSync(cfg, instance, unlock=True)
3176
3177     # so check manually all the devices
3178     for name in iv_names:
3179       dev, child, new_drbd = iv_names[name]
3180       cfg.SetDiskID(dev, instance.primary_node)
3181       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3182       if is_degr:
3183         raise errors.OpExecError("MD device %s is degraded!" % name)
3184       cfg.SetDiskID(new_drbd, instance.primary_node)
3185       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3186       if is_degr:
3187         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3188
3189     for name in iv_names:
3190       dev, child, new_drbd = iv_names[name]
3191       logger.Info("remove mirror %s component" % name)
3192       cfg.SetDiskID(dev, instance.primary_node)
3193       if not rpc.call_blockdev_removechild(instance.primary_node,
3194                                                 dev, child):
3195         logger.Error("Can't remove child from mirror, aborting"
3196                      " *this device cleanup*.\nYou need to cleanup manually!!")
3197         continue
3198
3199       for node in child.logical_id[:2]:
3200         logger.Info("remove child device on %s" % node)
3201         cfg.SetDiskID(child, node)
3202         if not rpc.call_blockdev_remove(node, child):
3203           logger.Error("Warning: failed to remove device from node %s,"
3204                        " continuing operation." % node)
3205
3206       dev.children.remove(child)
3207
3208       cfg.AddInstance(instance)
3209
3210
3211 class LUQueryInstanceData(NoHooksLU):
3212   """Query runtime instance data.
3213
3214   """
3215   _OP_REQP = ["instances"]
3216
3217   def CheckPrereq(self):
3218     """Check prerequisites.
3219
3220     This only checks the optional instance list against the existing names.
3221
3222     """
3223     if not isinstance(self.op.instances, list):
3224       raise errors.OpPrereqError("Invalid argument type 'instances'")
3225     if self.op.instances:
3226       self.wanted_instances = []
3227       names = self.op.instances
3228       for name in names:
3229         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3230         if instance is None:
3231           raise errors.OpPrereqError("No such instance name '%s'" % name)
3232       self.wanted_instances.append(instance)
3233     else:
3234       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3235                                in self.cfg.GetInstanceList()]
3236     return
3237
3238
3239   def _ComputeDiskStatus(self, instance, snode, dev):
3240     """Compute block device status.
3241
3242     """
3243     self.cfg.SetDiskID(dev, instance.primary_node)
3244     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3245     if dev.dev_type == "drbd":
3246       # we change the snode then (otherwise we use the one passed in)
3247       if dev.logical_id[0] == instance.primary_node:
3248         snode = dev.logical_id[1]
3249       else:
3250         snode = dev.logical_id[0]
3251
3252     if snode:
3253       self.cfg.SetDiskID(dev, snode)
3254       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3255     else:
3256       dev_sstatus = None
3257
3258     if dev.children:
3259       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3260                       for child in dev.children]
3261     else:
3262       dev_children = []
3263
3264     data = {
3265       "iv_name": dev.iv_name,
3266       "dev_type": dev.dev_type,
3267       "logical_id": dev.logical_id,
3268       "physical_id": dev.physical_id,
3269       "pstatus": dev_pstatus,
3270       "sstatus": dev_sstatus,
3271       "children": dev_children,
3272       }
3273
3274     return data
3275
3276   def Exec(self, feedback_fn):
3277     """Gather and return data"""
3278     result = {}
3279     for instance in self.wanted_instances:
3280       remote_info = rpc.call_instance_info(instance.primary_node,
3281                                                 instance.name)
3282       if remote_info and "state" in remote_info:
3283         remote_state = "up"
3284       else:
3285         remote_state = "down"
3286       if instance.status == "down":
3287         config_state = "down"
3288       else:
3289         config_state = "up"
3290
3291       disks = [self._ComputeDiskStatus(instance, None, device)
3292                for device in instance.disks]
3293
3294       idict = {
3295         "name": instance.name,
3296         "config_state": config_state,
3297         "run_state": remote_state,
3298         "pnode": instance.primary_node,
3299         "snodes": instance.secondary_nodes,
3300         "os": instance.os,
3301         "memory": instance.memory,
3302         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3303         "disks": disks,
3304         }
3305
3306       result[instance.name] = idict
3307
3308     return result
3309
3310
3311 class LUQueryNodeData(NoHooksLU):
3312   """Logical unit for querying node data.
3313
3314   """
3315   _OP_REQP = ["nodes"]
3316
3317   def CheckPrereq(self):
3318     """Check prerequisites.
3319
3320     This only checks the optional node list against the existing names.
3321
3322     """
3323     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3324
3325   def Exec(self, feedback_fn):
3326     """Compute and return the list of nodes.
3327
3328     """
3329     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3330              in self.cfg.GetInstanceList()]
3331     result = []
3332     for node in [self.cfg.GetNodeInfo(name) for name in self.wanted_nodes]:
3333       result.append((node.name, node.primary_ip, node.secondary_ip,
3334                      [inst.name for inst in ilist
3335                       if inst.primary_node == node.name],
3336                      [inst.name for inst in ilist
3337                       if node.name in inst.secondary_nodes],
3338                      ))
3339     return result
3340
3341
3342 class LUSetInstanceParms(LogicalUnit):
3343   """Modifies an instances's parameters.
3344
3345   """
3346   HPATH = "instance-modify"
3347   HTYPE = constants.HTYPE_INSTANCE
3348   _OP_REQP = ["instance_name"]
3349
3350   def BuildHooksEnv(self):
3351     """Build hooks env.
3352
3353     This runs on the master, primary and secondaries.
3354
3355     """
3356     args = dict()
3357     if self.mem:
3358       args['memory'] = self.mem
3359     if self.vcpus:
3360       args['vcpus'] = self.vcpus
3361     if self.do_ip or self.do_bridge:
3362       if self.do_ip:
3363         ip = self.ip
3364       else:
3365         ip = self.instance.nics[0].ip
3366       if self.bridge:
3367         bridge = self.bridge
3368       else:
3369         bridge = self.instance.nics[0].bridge
3370       args['nics'] = [(ip, bridge)]
3371     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3372     nl = [self.sstore.GetMasterNode(),
3373           self.instance.primary_node] + list(self.instance.secondary_nodes)
3374     return env, nl, nl
3375
3376   def CheckPrereq(self):
3377     """Check prerequisites.
3378
3379     This only checks the instance list against the existing names.
3380
3381     """
3382     self.mem = getattr(self.op, "mem", None)
3383     self.vcpus = getattr(self.op, "vcpus", None)
3384     self.ip = getattr(self.op, "ip", None)
3385     self.bridge = getattr(self.op, "bridge", None)
3386     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3387       raise errors.OpPrereqError("No changes submitted")
3388     if self.mem is not None:
3389       try:
3390         self.mem = int(self.mem)
3391       except ValueError, err:
3392         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3393     if self.vcpus is not None:
3394       try:
3395         self.vcpus = int(self.vcpus)
3396       except ValueError, err:
3397         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3398     if self.ip is not None:
3399       self.do_ip = True
3400       if self.ip.lower() == "none":
3401         self.ip = None
3402       else:
3403         if not utils.IsValidIP(self.ip):
3404           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3405     else:
3406       self.do_ip = False
3407     self.do_bridge = (self.bridge is not None)
3408
3409     instance = self.cfg.GetInstanceInfo(
3410       self.cfg.ExpandInstanceName(self.op.instance_name))
3411     if instance is None:
3412       raise errors.OpPrereqError("No such instance name '%s'" %
3413                                  self.op.instance_name)
3414     self.op.instance_name = instance.name
3415     self.instance = instance
3416     return
3417
3418   def Exec(self, feedback_fn):
3419     """Modifies an instance.
3420
3421     All parameters take effect only at the next restart of the instance.
3422     """
3423     result = []
3424     instance = self.instance
3425     if self.mem:
3426       instance.memory = self.mem
3427       result.append(("mem", self.mem))
3428     if self.vcpus:
3429       instance.vcpus = self.vcpus
3430       result.append(("vcpus",  self.vcpus))
3431     if self.do_ip:
3432       instance.nics[0].ip = self.ip
3433       result.append(("ip", self.ip))
3434     if self.bridge:
3435       instance.nics[0].bridge = self.bridge
3436       result.append(("bridge", self.bridge))
3437
3438     self.cfg.AddInstance(instance)
3439
3440     return result
3441
3442
3443 class LUQueryExports(NoHooksLU):
3444   """Query the exports list
3445
3446   """
3447   _OP_REQP = []
3448
3449   def CheckPrereq(self):
3450     """Check that the nodelist contains only existing nodes.
3451
3452     """
3453     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3454
3455   def Exec(self, feedback_fn):
3456     """Compute the list of all the exported system images.
3457
3458     Returns:
3459       a dictionary with the structure node->(export-list)
3460       where export-list is a list of the instances exported on
3461       that node.
3462
3463     """
3464     return rpc.call_export_list(self.nodes)
3465
3466
3467 class LUExportInstance(LogicalUnit):
3468   """Export an instance to an image in the cluster.
3469
3470   """
3471   HPATH = "instance-export"
3472   HTYPE = constants.HTYPE_INSTANCE
3473   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3474
3475   def BuildHooksEnv(self):
3476     """Build hooks env.
3477
3478     This will run on the master, primary node and target node.
3479
3480     """
3481     env = {
3482       "EXPORT_NODE": self.op.target_node,
3483       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3484       }
3485     env.update(_BuildInstanceHookEnvByObject(self.instance))
3486     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3487           self.op.target_node]
3488     return env, nl, nl
3489
3490   def CheckPrereq(self):
3491     """Check prerequisites.
3492
3493     This checks that the instance name is a valid one.
3494
3495     """
3496     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3497     self.instance = self.cfg.GetInstanceInfo(instance_name)
3498     if self.instance is None:
3499       raise errors.OpPrereqError("Instance '%s' not found" %
3500                                  self.op.instance_name)
3501
3502     # node verification
3503     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3504     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3505
3506     if self.dst_node is None:
3507       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3508                                  self.op.target_node)
3509     self.op.target_node = self.dst_node.name
3510
3511   def Exec(self, feedback_fn):
3512     """Export an instance to an image in the cluster.
3513
3514     """
3515     instance = self.instance
3516     dst_node = self.dst_node
3517     src_node = instance.primary_node
3518     # shutdown the instance, unless requested not to do so
3519     if self.op.shutdown:
3520       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3521       self.processor.ChainOpCode(op, feedback_fn)
3522
3523     vgname = self.cfg.GetVGName()
3524
3525     snap_disks = []
3526
3527     try:
3528       for disk in instance.disks:
3529         if disk.iv_name == "sda":
3530           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3531           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3532
3533           if not new_dev_name:
3534             logger.Error("could not snapshot block device %s on node %s" %
3535                          (disk.logical_id[1], src_node))
3536           else:
3537             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3538                                       logical_id=(vgname, new_dev_name),
3539                                       physical_id=(vgname, new_dev_name),
3540                                       iv_name=disk.iv_name)
3541             snap_disks.append(new_dev)
3542
3543     finally:
3544       if self.op.shutdown:
3545         op = opcodes.OpStartupInstance(instance_name=instance.name,
3546                                        force=False)
3547         self.processor.ChainOpCode(op, feedback_fn)
3548
3549     # TODO: check for size
3550
3551     for dev in snap_disks:
3552       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3553                                            instance):
3554         logger.Error("could not export block device %s from node"
3555                      " %s to node %s" %
3556                      (dev.logical_id[1], src_node, dst_node.name))
3557       if not rpc.call_blockdev_remove(src_node, dev):
3558         logger.Error("could not remove snapshot block device %s from"
3559                      " node %s" % (dev.logical_id[1], src_node))
3560
3561     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3562       logger.Error("could not finalize export for instance %s on node %s" %
3563                    (instance.name, dst_node.name))
3564
3565     nodelist = self.cfg.GetNodeList()
3566     nodelist.remove(dst_node.name)
3567
3568     # on one-node clusters nodelist will be empty after the removal
3569     # if we proceed the backup would be removed because OpQueryExports
3570     # substitutes an empty list with the full cluster node list.
3571     if nodelist:
3572       op = opcodes.OpQueryExports(nodes=nodelist)
3573       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3574       for node in exportlist:
3575         if instance.name in exportlist[node]:
3576           if not rpc.call_export_remove(node, instance.name):
3577             logger.Error("could not remove older export for instance %s"
3578                          " on node %s" % (instance.name, node))
3579
3580
3581 class TagsLU(NoHooksLU):
3582   """Generic tags LU.
3583
3584   This is an abstract class which is the parent of all the other tags LUs.
3585
3586   """
3587   def CheckPrereq(self):
3588     """Check prerequisites.
3589
3590     """
3591     if self.op.kind == constants.TAG_CLUSTER:
3592       self.target = self.cfg.GetClusterInfo()
3593     elif self.op.kind == constants.TAG_NODE:
3594       name = self.cfg.ExpandNodeName(self.op.name)
3595       if name is None:
3596         raise errors.OpPrereqError("Invalid node name (%s)" %
3597                                    (self.op.name,))
3598       self.op.name = name
3599       self.target = self.cfg.GetNodeInfo(name)
3600     elif self.op.kind == constants.TAG_INSTANCE:
3601       name = self.cfg.ExpandInstanceName(name)
3602       if name is None:
3603         raise errors.OpPrereqError("Invalid instance name (%s)" %
3604                                    (self.op.name,))
3605       self.op.name = name
3606       self.target = self.cfg.GetInstanceInfo(name)
3607     else:
3608       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3609                                  str(self.op.kind))
3610
3611
3612 class LUGetTags(TagsLU):
3613   """Returns the tags of a given object.
3614
3615   """
3616   _OP_REQP = ["kind", "name"]
3617
3618   def Exec(self, feedback_fn):
3619     """Returns the tag list.
3620
3621     """
3622     return self.target.GetTags()
3623
3624
3625 class LUAddTag(TagsLU):
3626   """Sets a tag on a given object.
3627
3628   """
3629   _OP_REQP = ["kind", "name", "tag"]
3630
3631   def CheckPrereq(self):
3632     """Check prerequisites.
3633
3634     This checks the type and length of the tag name and value.
3635
3636     """
3637     TagsLU.CheckPrereq(self)
3638     objects.TaggableObject.ValidateTag(self.op.tag)
3639
3640   def Exec(self, feedback_fn):
3641     """Sets the tag.
3642
3643     """
3644     try:
3645       self.target.AddTag(self.op.tag)
3646     except errors.TagError, err:
3647       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3648     try:
3649       self.cfg.Update(self.target)
3650     except errors.ConfigurationError:
3651       raise errors.OpRetryError("There has been a modification to the"
3652                                 " config file and the operation has been"
3653                                 " aborted. Please retry.")
3654
3655
3656 class LUDelTag(TagsLU):
3657   """Delete a tag from a given object.
3658
3659   """
3660   _OP_REQP = ["kind", "name", "tag"]
3661
3662   def CheckPrereq(self):
3663     """Check prerequisites.
3664
3665     This checks that we have the given tag.
3666
3667     """
3668     TagsLU.CheckPrereq(self)
3669     objects.TaggableObject.ValidateTag(self.op.tag)
3670     if self.op.tag not in self.target.GetTags():
3671       raise errors.OpPrereqError("Tag not found")
3672
3673   def Exec(self, feedback_fn):
3674     """Remove the tag from the object.
3675
3676     """
3677     self.target.RemoveTag(self.op.tag)
3678     try:
3679       self.cfg.Update(self.target)
3680     except errors.ConfigurationError:
3681       raise errors.OpRetryError("There has been a modification to the"
3682                                 " config file and the operation has been"
3683                                 " aborted. Please retry.")