Change OpQueryNodes nodes attribute to names
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   while True:
389     rawline = f.readline()
390     logger.Debug('read %s' % (repr(rawline),))
391
392     if not rawline:
393       # End of file
394       break
395
396     line = rawline.split('\n')[0]
397
398     parts = line.split(' ')
399     fields = parts[0].split(',')
400     key = parts[2]
401
402     haveall = True
403     havesome = False
404     for spec in [ ip, fullnode ]:
405       if spec not in fields:
406         haveall = False
407       if spec in fields:
408         havesome = True
409
410     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411     if haveall and key == pubkey:
412       inthere = True
413       save_lines.append(rawline)
414       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415       continue
416
417     if havesome and (not haveall or key != pubkey):
418       removed = True
419       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420       continue
421
422     save_lines.append(rawline)
423
424   if not inthere:
425     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427
428   if removed:
429     save_lines = save_lines + add_lines
430
431     # Write a new file and replace old.
432     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433                                    constants.DATA_DIR)
434     newfile = os.fdopen(fd, 'w')
435     try:
436       newfile.write(''.join(save_lines))
437     finally:
438       newfile.close()
439     logger.Debug("Wrote new known_hosts.")
440     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441
442   elif add_lines:
443     # Simply appending a new line will do the trick.
444     f.seek(0, 2)
445     for add in add_lines:
446       f.write(add)
447
448   f.close()
449
450
451 def _HasValidVG(vglist, vgname):
452   """Checks if the volume group list is valid.
453
454   A non-None return value means there's an error, and the return value
455   is the error message.
456
457   """
458   vgsize = vglist.get(vgname, None)
459   if vgsize is None:
460     return "volume group '%s' missing" % vgname
461   elif vgsize < 20480:
462     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463             (vgname, vgsize))
464   return None
465
466
467 def _InitSSHSetup(node):
468   """Setup the SSH configuration for the cluster.
469
470
471   This generates a dsa keypair for root, adds the pub key to the
472   permitted hosts and adds the hostkey to its own known hosts.
473
474   Args:
475     node: the name of this host as a fqdn
476
477   """
478   if os.path.exists('/root/.ssh/id_dsa'):
479     utils.CreateBackup('/root/.ssh/id_dsa')
480   if os.path.exists('/root/.ssh/id_dsa.pub'):
481     utils.CreateBackup('/root/.ssh/id_dsa.pub')
482
483   utils.RemoveFile('/root/.ssh/id_dsa')
484   utils.RemoveFile('/root/.ssh/id_dsa.pub')
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", "/root/.ssh/id_dsa",
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open('/root/.ssh/id_dsa.pub', 'r')
494   try:
495     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 class LUInitCluster(LogicalUnit):
532   """Initialise the cluster.
533
534   """
535   HPATH = "cluster-init"
536   HTYPE = constants.HTYPE_CLUSTER
537   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538               "def_bridge", "master_netdev"]
539   REQ_CLUSTER = False
540
541   def BuildHooksEnv(self):
542     """Build hooks env.
543
544     Notes: Since we don't require a cluster, we must manually add
545     ourselves in the post-run node list.
546
547     """
548     env = {
549       "CLUSTER": self.op.cluster_name,
550       "MASTER": self.hostname['hostname_full'],
551       }
552     return env, [], [self.hostname['hostname_full']]
553
554   def CheckPrereq(self):
555     """Verify that the passed name is a valid one.
556
557     """
558     if config.ConfigWriter.IsCluster():
559       raise errors.OpPrereqError("Cluster is already initialised")
560
561     hostname_local = socket.gethostname()
562     self.hostname = hostname = utils.LookupHostname(hostname_local)
563     if not hostname:
564       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565                                  hostname_local)
566
567     if hostname["hostname_full"] != hostname_local:
568       raise errors.OpPrereqError("My own hostname (%s) does not match the"
569                                  " resolver (%s): probably not using FQDN"
570                                  " for hostname." %
571                                  (hostname_local, hostname["hostname_full"]))
572
573     if hostname["ip"].startswith("127."):
574       raise errors.OpPrereqError("This host's IP resolves to the private"
575                                  " range (%s). Please fix DNS or /etc/hosts." %
576                                  (hostname["ip"],))
577
578     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579     if not clustername:
580       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581                                  % self.op.cluster_name)
582
583     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584     if result.failed:
585       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586                                  " to %s,\nbut this ip address does not"
587                                  " belong to this host."
588                                  " Aborting." % hostname['ip'])
589
590     secondary_ip = getattr(self.op, "secondary_ip", None)
591     if secondary_ip and not utils.IsValidIP(secondary_ip):
592       raise errors.OpPrereqError("Invalid secondary ip given")
593     if secondary_ip and secondary_ip != hostname['ip']:
594       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595       if result.failed:
596         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597                                    "but it does not belong to this host." %
598                                    secondary_ip)
599     self.secondary_ip = secondary_ip
600
601     # checks presence of the volume group given
602     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603
604     if vgstatus:
605       raise errors.OpPrereqError("Error: %s" % vgstatus)
606
607     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608                     self.op.mac_prefix):
609       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610                                  self.op.mac_prefix)
611
612     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614                                  self.op.hypervisor_type)
615
616     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617     if result.failed:
618       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619                                  (self.op.master_netdev,
620                                   result.output.strip()))
621
622   def Exec(self, feedback_fn):
623     """Initialize the cluster.
624
625     """
626     clustername = self.clustername
627     hostname = self.hostname
628
629     # set up the simple store
630     ss = ssconf.SimpleStore()
631     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636
637     # set up the inter-node password and certificate
638     _InitGanetiServerSetup(ss)
639
640     # start the master ip
641     rpc.call_node_start_master(hostname['hostname_full'])
642
643     # set up ssh config and /etc/hosts
644     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645     try:
646       sshline = f.read()
647     finally:
648       f.close()
649     sshkey = sshline.split(" ")[1]
650
651     _UpdateEtcHosts(hostname['hostname_full'],
652                     hostname['ip'],
653                     )
654
655     _UpdateKnownHosts(hostname['hostname_full'],
656                       hostname['ip'],
657                       sshkey,
658                       )
659
660     _InitSSHSetup(hostname['hostname'])
661
662     # init of cluster config file
663     cfgw = config.ConfigWriter()
664     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665                     sshkey, self.op.mac_prefix,
666                     self.op.vg_name, self.op.def_bridge)
667
668
669 class LUDestroyCluster(NoHooksLU):
670   """Logical unit for destroying the cluster.
671
672   """
673   _OP_REQP = []
674
675   def CheckPrereq(self):
676     """Check prerequisites.
677
678     This checks whether the cluster is empty.
679
680     Any errors are signalled by raising errors.OpPrereqError.
681
682     """
683     master = self.sstore.GetMasterNode()
684
685     nodelist = self.cfg.GetNodeList()
686     if len(nodelist) != 1 or nodelist[0] != master:
687       raise errors.OpPrereqError("There are still %d node(s) in"
688                                  " this cluster." % (len(nodelist) - 1))
689     instancelist = self.cfg.GetInstanceList()
690     if instancelist:
691       raise errors.OpPrereqError("There are still %d instance(s) in"
692                                  " this cluster." % len(instancelist))
693
694   def Exec(self, feedback_fn):
695     """Destroys the cluster.
696
697     """
698     utils.CreateBackup('/root/.ssh/id_dsa')
699     utils.CreateBackup('/root/.ssh/id_dsa.pub')
700     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701
702
703 class LUVerifyCluster(NoHooksLU):
704   """Verifies the cluster status.
705
706   """
707   _OP_REQP = []
708
709   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710                   remote_version, feedback_fn):
711     """Run multiple tests against a node.
712
713     Test list:
714       - compares ganeti version
715       - checks vg existance and size > 20G
716       - checks config file checksum
717       - checks ssh to other nodes
718
719     Args:
720       node: name of the node to check
721       file_list: required list of files
722       local_cksum: dictionary of local files and their checksums
723
724     """
725     # compares ganeti version
726     local_version = constants.PROTOCOL_VERSION
727     if not remote_version:
728       feedback_fn(" - ERROR: connection to %s failed" % (node))
729       return True
730
731     if local_version != remote_version:
732       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733                       (local_version, node, remote_version))
734       return True
735
736     # checks vg existance and size > 20G
737
738     bad = False
739     if not vglist:
740       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741                       (node,))
742       bad = True
743     else:
744       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745       if vgstatus:
746         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747         bad = True
748
749     # checks config file checksum
750     # checks ssh to any
751
752     if 'filelist' not in node_result:
753       bad = True
754       feedback_fn("  - ERROR: node hasn't returned file checksum data")
755     else:
756       remote_cksum = node_result['filelist']
757       for file_name in file_list:
758         if file_name not in remote_cksum:
759           bad = True
760           feedback_fn("  - ERROR: file '%s' missing" % file_name)
761         elif remote_cksum[file_name] != local_cksum[file_name]:
762           bad = True
763           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764
765     if 'nodelist' not in node_result:
766       bad = True
767       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768     else:
769       if node_result['nodelist']:
770         bad = True
771         for node in node_result['nodelist']:
772           feedback_fn("  - ERROR: communication with node '%s': %s" %
773                           (node, node_result['nodelist'][node]))
774     hyp_result = node_result.get('hypervisor', None)
775     if hyp_result is not None:
776       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777     return bad
778
779   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780     """Verify an instance.
781
782     This function checks to see if the required block devices are
783     available on the instance's node.
784
785     """
786     bad = False
787
788     instancelist = self.cfg.GetInstanceList()
789     if not instance in instancelist:
790       feedback_fn("  - ERROR: instance %s not in instance list %s" %
791                       (instance, instancelist))
792       bad = True
793
794     instanceconfig = self.cfg.GetInstanceInfo(instance)
795     node_current = instanceconfig.primary_node
796
797     node_vol_should = {}
798     instanceconfig.MapLVsByNode(node_vol_should)
799
800     for node in node_vol_should:
801       for volume in node_vol_should[node]:
802         if node not in node_vol_is or volume not in node_vol_is[node]:
803           feedback_fn("  - ERROR: volume %s missing on node %s" %
804                           (volume, node))
805           bad = True
806
807     if not instanceconfig.status == 'down':
808       if not instance in node_instance[node_current]:
809         feedback_fn("  - ERROR: instance %s not running on node %s" %
810                         (instance, node_current))
811         bad = True
812
813     for node in node_instance:
814       if (not node == node_current):
815         if instance in node_instance[node]:
816           feedback_fn("  - ERROR: instance %s should not run on node %s" %
817                           (instance, node))
818           bad = True
819
820     return not bad
821
822   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823     """Verify if there are any unknown volumes in the cluster.
824
825     The .os, .swap and backup volumes are ignored. All other volumes are
826     reported as unknown.
827
828     """
829     bad = False
830
831     for node in node_vol_is:
832       for volume in node_vol_is[node]:
833         if node not in node_vol_should or volume not in node_vol_should[node]:
834           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835                       (volume, node))
836           bad = True
837     return bad
838
839   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840     """Verify the list of running instances.
841
842     This checks what instances are running but unknown to the cluster.
843
844     """
845     bad = False
846     for node in node_instance:
847       for runninginstance in node_instance[node]:
848         if runninginstance not in instancelist:
849           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850                           (runninginstance, node))
851           bad = True
852     return bad
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     This has no prerequisites.
858
859     """
860     pass
861
862   def Exec(self, feedback_fn):
863     """Verify integrity of cluster, performing various test on nodes.
864
865     """
866     bad = False
867     feedback_fn("* Verifying global settings")
868     self.cfg.VerifyConfig()
869
870     master = self.sstore.GetMasterNode()
871     vg_name = self.cfg.GetVGName()
872     nodelist = utils.NiceSort(self.cfg.GetNodeList())
873     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874     node_volume = {}
875     node_instance = {}
876
877     # FIXME: verify OS list
878     # do local checksums
879     file_names = list(self.sstore.GetFileList())
880     file_names.append(constants.SSL_CERT_FILE)
881     file_names.append(constants.CLUSTER_CONF_FILE)
882     local_checksums = utils.FingerprintFiles(file_names)
883
884     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886     all_instanceinfo = rpc.call_instance_list(nodelist)
887     all_vglist = rpc.call_vg_list(nodelist)
888     node_verify_param = {
889       'filelist': file_names,
890       'nodelist': nodelist,
891       'hypervisor': None,
892       }
893     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894     all_rversion = rpc.call_version(nodelist)
895
896     for node in nodelist:
897       feedback_fn("* Verifying node %s" % node)
898       result = self._VerifyNode(node, file_names, local_checksums,
899                                 all_vglist[node], all_nvinfo[node],
900                                 all_rversion[node], feedback_fn)
901       bad = bad or result
902
903       # node_volume
904       volumeinfo = all_volumeinfo[node]
905
906       if type(volumeinfo) != dict:
907         feedback_fn("  - ERROR: connection to %s failed" % (node,))
908         bad = True
909         continue
910
911       node_volume[node] = volumeinfo
912
913       # node_instance
914       nodeinstance = all_instanceinfo[node]
915       if type(nodeinstance) != list:
916         feedback_fn("  - ERROR: connection to %s failed" % (node,))
917         bad = True
918         continue
919
920       node_instance[node] = nodeinstance
921
922     node_vol_should = {}
923
924     for instance in instancelist:
925       feedback_fn("* Verifying instance %s" % instance)
926       result =  self._VerifyInstance(instance, node_volume, node_instance,
927                                      feedback_fn)
928       bad = bad or result
929
930       inst_config = self.cfg.GetInstanceInfo(instance)
931
932       inst_config.MapLVsByNode(node_vol_should)
933
934     feedback_fn("* Verifying orphan volumes")
935     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936                                        feedback_fn)
937     bad = bad or result
938
939     feedback_fn("* Verifying remaining instances")
940     result = self._VerifyOrphanInstances(instancelist, node_instance,
941                                          feedback_fn)
942     bad = bad or result
943
944     return int(bad)
945
946
947 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948   """Sleep and poll for an instance's disk to sync.
949
950   """
951   if not instance.disks:
952     return True
953
954   if not oneshot:
955     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956
957   node = instance.primary_node
958
959   for dev in instance.disks:
960     cfgw.SetDiskID(dev, node)
961
962   retries = 0
963   while True:
964     max_time = 0
965     done = True
966     cumul_degraded = False
967     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968     if not rstats:
969       logger.ToStderr("Can't get any data from node %s" % node)
970       retries += 1
971       if retries >= 10:
972         raise errors.RemoteError("Can't contact node %s for mirror data,"
973                                  " aborting." % node)
974       time.sleep(6)
975       continue
976     retries = 0
977     for i in range(len(rstats)):
978       mstat = rstats[i]
979       if mstat is None:
980         logger.ToStderr("Can't compute data for node %s/%s" %
981                         (node, instance.disks[i].iv_name))
982         continue
983       perc_done, est_time, is_degraded = mstat
984       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985       if perc_done is not None:
986         done = False
987         if est_time is not None:
988           rem_time = "%d estimated seconds remaining" % est_time
989           max_time = est_time
990         else:
991           rem_time = "no time estimate"
992         logger.ToStdout("- device %s: %5.2f%% done, %s" %
993                         (instance.disks[i].iv_name, perc_done, rem_time))
994     if done or oneshot:
995       break
996
997     if unlock:
998       utils.Unlock('cmd')
999     try:
1000       time.sleep(min(60, max_time))
1001     finally:
1002       if unlock:
1003         utils.Lock('cmd')
1004
1005   if done:
1006     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007   return not cumul_degraded
1008
1009
1010 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011   """Check that mirrors are not degraded.
1012
1013   """
1014   cfgw.SetDiskID(dev, node)
1015
1016   result = True
1017   if on_primary or dev.AssembleOnSecondary():
1018     rstats = rpc.call_blockdev_find(node, dev)
1019     if not rstats:
1020       logger.ToStderr("Can't get any data from node %s" % node)
1021       result = False
1022     else:
1023       result = result and (not rstats[5])
1024   if dev.children:
1025     for child in dev.children:
1026       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027
1028   return result
1029
1030
1031 class LUDiagnoseOS(NoHooksLU):
1032   """Logical unit for OS diagnose/query.
1033
1034   """
1035   _OP_REQP = []
1036
1037   def CheckPrereq(self):
1038     """Check prerequisites.
1039
1040     This always succeeds, since this is a pure query LU.
1041
1042     """
1043     return
1044
1045   def Exec(self, feedback_fn):
1046     """Compute the list of OSes.
1047
1048     """
1049     node_list = self.cfg.GetNodeList()
1050     node_data = rpc.call_os_diagnose(node_list)
1051     if node_data == False:
1052       raise errors.OpExecError("Can't gather the list of OSes")
1053     return node_data
1054
1055
1056 class LURemoveNode(LogicalUnit):
1057   """Logical unit for removing a node.
1058
1059   """
1060   HPATH = "node-remove"
1061   HTYPE = constants.HTYPE_NODE
1062   _OP_REQP = ["node_name"]
1063
1064   def BuildHooksEnv(self):
1065     """Build hooks env.
1066
1067     This doesn't run on the target node in the pre phase as a failed
1068     node would not allows itself to run.
1069
1070     """
1071     env = {
1072       "NODE_NAME": self.op.node_name,
1073       }
1074     all_nodes = self.cfg.GetNodeList()
1075     all_nodes.remove(self.op.node_name)
1076     return env, all_nodes, all_nodes
1077
1078   def CheckPrereq(self):
1079     """Check prerequisites.
1080
1081     This checks:
1082      - the node exists in the configuration
1083      - it does not have primary or secondary instances
1084      - it's not the master
1085
1086     Any errors are signalled by raising errors.OpPrereqError.
1087
1088     """
1089     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090     if node is None:
1091       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092
1093     instance_list = self.cfg.GetInstanceList()
1094
1095     masternode = self.sstore.GetMasterNode()
1096     if node.name == masternode:
1097       raise errors.OpPrereqError("Node is the master node,"
1098                                  " you need to failover first.")
1099
1100     for instance_name in instance_list:
1101       instance = self.cfg.GetInstanceInfo(instance_name)
1102       if node.name == instance.primary_node:
1103         raise errors.OpPrereqError("Instance %s still running on the node,"
1104                                    " please remove first." % instance_name)
1105       if node.name in instance.secondary_nodes:
1106         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107                                    " please remove first." % instance_name)
1108     self.op.node_name = node.name
1109     self.node = node
1110
1111   def Exec(self, feedback_fn):
1112     """Removes the node from the cluster.
1113
1114     """
1115     node = self.node
1116     logger.Info("stopping the node daemon and removing configs from node %s" %
1117                 node.name)
1118
1119     rpc.call_node_leave_cluster(node.name)
1120
1121     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122
1123     logger.Info("Removing node %s from config" % node.name)
1124
1125     self.cfg.RemoveNode(node.name)
1126
1127
1128 class LUQueryNodes(NoHooksLU):
1129   """Logical unit for querying nodes.
1130
1131   """
1132   _OP_REQP = ["output_fields", "names"]
1133
1134   def CheckPrereq(self):
1135     """Check prerequisites.
1136
1137     This checks that the fields required are valid output fields.
1138
1139     """
1140     self.dynamic_fields = frozenset(["dtotal", "dfree",
1141                                      "mtotal", "mnode", "mfree"])
1142
1143     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144                                "pinst_list", "sinst_list",
1145                                "pip", "sip"],
1146                        dynamic=self.dynamic_fields,
1147                        selected=self.op.output_fields)
1148
1149     self.wanted = _GetWantedNodes(self, self.op.names)
1150
1151   def Exec(self, feedback_fn):
1152     """Computes the list of nodes and their attributes.
1153
1154     """
1155     nodenames = self.wanted
1156     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157
1158     # begin data gathering
1159
1160     if self.dynamic_fields.intersection(self.op.output_fields):
1161       live_data = {}
1162       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163       for name in nodenames:
1164         nodeinfo = node_data.get(name, None)
1165         if nodeinfo:
1166           live_data[name] = {
1167             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172             }
1173         else:
1174           live_data[name] = {}
1175     else:
1176       live_data = dict.fromkeys(nodenames, {})
1177
1178     node_to_primary = dict([(name, set()) for name in nodenames])
1179     node_to_secondary = dict([(name, set()) for name in nodenames])
1180
1181     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182                              "sinst_cnt", "sinst_list"))
1183     if inst_fields & frozenset(self.op.output_fields):
1184       instancelist = self.cfg.GetInstanceList()
1185
1186       for instance_name in instancelist:
1187         inst = self.cfg.GetInstanceInfo(instance_name)
1188         if inst.primary_node in node_to_primary:
1189           node_to_primary[inst.primary_node].add(inst.name)
1190         for secnode in inst.secondary_nodes:
1191           if secnode in node_to_secondary:
1192             node_to_secondary[secnode].add(inst.name)
1193
1194     # end data gathering
1195
1196     output = []
1197     for node in nodelist:
1198       node_output = []
1199       for field in self.op.output_fields:
1200         if field == "name":
1201           val = node.name
1202         elif field == "pinst_list":
1203           val = list(node_to_primary[node.name])
1204         elif field == "sinst_list":
1205           val = list(node_to_secondary[node.name])
1206         elif field == "pinst_cnt":
1207           val = len(node_to_primary[node.name])
1208         elif field == "sinst_cnt":
1209           val = len(node_to_secondary[node.name])
1210         elif field == "pip":
1211           val = node.primary_ip
1212         elif field == "sip":
1213           val = node.secondary_ip
1214         elif field in self.dynamic_fields:
1215           val = live_data[node.name].get(field, None)
1216         else:
1217           raise errors.ParameterError(field)
1218         node_output.append(val)
1219       output.append(node_output)
1220
1221     return output
1222
1223
1224 class LUQueryNodeVolumes(NoHooksLU):
1225   """Logical unit for getting volumes on node(s).
1226
1227   """
1228   _OP_REQP = ["nodes", "output_fields"]
1229
1230   def CheckPrereq(self):
1231     """Check prerequisites.
1232
1233     This checks that the fields required are valid output fields.
1234
1235     """
1236     self.nodes = _GetWantedNodes(self, self.op.nodes)
1237
1238     _CheckOutputFields(static=["node"],
1239                        dynamic=["phys", "vg", "name", "size", "instance"],
1240                        selected=self.op.output_fields)
1241
1242
1243   def Exec(self, feedback_fn):
1244     """Computes the list of nodes and their attributes.
1245
1246     """
1247     nodenames = self.nodes
1248     volumes = rpc.call_node_volumes(nodenames)
1249
1250     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251              in self.cfg.GetInstanceList()]
1252
1253     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254
1255     output = []
1256     for node in nodenames:
1257       if node not in volumes or not volumes[node]:
1258         continue
1259
1260       node_vols = volumes[node][:]
1261       node_vols.sort(key=lambda vol: vol['dev'])
1262
1263       for vol in node_vols:
1264         node_output = []
1265         for field in self.op.output_fields:
1266           if field == "node":
1267             val = node
1268           elif field == "phys":
1269             val = vol['dev']
1270           elif field == "vg":
1271             val = vol['vg']
1272           elif field == "name":
1273             val = vol['name']
1274           elif field == "size":
1275             val = int(float(vol['size']))
1276           elif field == "instance":
1277             for inst in ilist:
1278               if node not in lv_by_node[inst]:
1279                 continue
1280               if vol['name'] in lv_by_node[inst][node]:
1281                 val = inst.name
1282                 break
1283             else:
1284               val = '-'
1285           else:
1286             raise errors.ParameterError(field)
1287           node_output.append(str(val))
1288
1289         output.append(node_output)
1290
1291     return output
1292
1293
1294 class LUAddNode(LogicalUnit):
1295   """Logical unit for adding node to the cluster.
1296
1297   """
1298   HPATH = "node-add"
1299   HTYPE = constants.HTYPE_NODE
1300   _OP_REQP = ["node_name"]
1301
1302   def BuildHooksEnv(self):
1303     """Build hooks env.
1304
1305     This will run on all nodes before, and on all nodes + the new node after.
1306
1307     """
1308     env = {
1309       "NODE_NAME": self.op.node_name,
1310       "NODE_PIP": self.op.primary_ip,
1311       "NODE_SIP": self.op.secondary_ip,
1312       }
1313     nodes_0 = self.cfg.GetNodeList()
1314     nodes_1 = nodes_0 + [self.op.node_name, ]
1315     return env, nodes_0, nodes_1
1316
1317   def CheckPrereq(self):
1318     """Check prerequisites.
1319
1320     This checks:
1321      - the new node is not already in the config
1322      - it is resolvable
1323      - its parameters (single/dual homed) matches the cluster
1324
1325     Any errors are signalled by raising errors.OpPrereqError.
1326
1327     """
1328     node_name = self.op.node_name
1329     cfg = self.cfg
1330
1331     dns_data = utils.LookupHostname(node_name)
1332     if not dns_data:
1333       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334
1335     node = dns_data['hostname']
1336     primary_ip = self.op.primary_ip = dns_data['ip']
1337     secondary_ip = getattr(self.op, "secondary_ip", None)
1338     if secondary_ip is None:
1339       secondary_ip = primary_ip
1340     if not utils.IsValidIP(secondary_ip):
1341       raise errors.OpPrereqError("Invalid secondary IP given")
1342     self.op.secondary_ip = secondary_ip
1343     node_list = cfg.GetNodeList()
1344     if node in node_list:
1345       raise errors.OpPrereqError("Node %s is already in the configuration"
1346                                  % node)
1347
1348     for existing_node_name in node_list:
1349       existing_node = cfg.GetNodeInfo(existing_node_name)
1350       if (existing_node.primary_ip == primary_ip or
1351           existing_node.secondary_ip == primary_ip or
1352           existing_node.primary_ip == secondary_ip or
1353           existing_node.secondary_ip == secondary_ip):
1354         raise errors.OpPrereqError("New node ip address(es) conflict with"
1355                                    " existing node %s" % existing_node.name)
1356
1357     # check that the type of the node (single versus dual homed) is the
1358     # same as for the master
1359     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360     master_singlehomed = myself.secondary_ip == myself.primary_ip
1361     newbie_singlehomed = secondary_ip == primary_ip
1362     if master_singlehomed != newbie_singlehomed:
1363       if master_singlehomed:
1364         raise errors.OpPrereqError("The master has no private ip but the"
1365                                    " new node has one")
1366       else:
1367         raise errors.OpPrereqError("The master has a private ip but the"
1368                                    " new node doesn't have one")
1369
1370     # checks reachablity
1371     command = ["fping", "-q", primary_ip]
1372     result = utils.RunCmd(command)
1373     if result.failed:
1374       raise errors.OpPrereqError("Node not reachable by ping")
1375
1376     if not newbie_singlehomed:
1377       # check reachability from my secondary ip to newbie's secondary ip
1378       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379       result = utils.RunCmd(command)
1380       if result.failed:
1381         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382
1383     self.new_node = objects.Node(name=node,
1384                                  primary_ip=primary_ip,
1385                                  secondary_ip=secondary_ip)
1386
1387   def Exec(self, feedback_fn):
1388     """Adds the new node to the cluster.
1389
1390     """
1391     new_node = self.new_node
1392     node = new_node.name
1393
1394     # set up inter-node password and certificate and restarts the node daemon
1395     gntpass = self.sstore.GetNodeDaemonPassword()
1396     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397       raise errors.OpExecError("ganeti password corruption detected")
1398     f = open(constants.SSL_CERT_FILE)
1399     try:
1400       gntpem = f.read(8192)
1401     finally:
1402       f.close()
1403     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404     # so we use this to detect an invalid certificate; as long as the
1405     # cert doesn't contain this, the here-document will be correctly
1406     # parsed by the shell sequence below
1407     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409     if not gntpem.endswith("\n"):
1410       raise errors.OpExecError("PEM must end with newline")
1411     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412
1413     # and then connect with ssh to set password and start ganeti-noded
1414     # note that all the below variables are sanitized at this point,
1415     # either by being constants or by the checks above
1416     ss = self.sstore
1417     mycommand = ("umask 077 && "
1418                  "echo '%s' > '%s' && "
1419                  "cat > '%s' << '!EOF.' && \n"
1420                  "%s!EOF.\n%s restart" %
1421                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422                   constants.SSL_CERT_FILE, gntpem,
1423                   constants.NODE_INITD_SCRIPT))
1424
1425     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426     if result.failed:
1427       raise errors.OpExecError("Remote command on node %s, error: %s,"
1428                                " output: %s" %
1429                                (node, result.fail_reason, result.output))
1430
1431     # check connectivity
1432     time.sleep(4)
1433
1434     result = rpc.call_version([node])[node]
1435     if result:
1436       if constants.PROTOCOL_VERSION == result:
1437         logger.Info("communication to node %s fine, sw version %s match" %
1438                     (node, result))
1439       else:
1440         raise errors.OpExecError("Version mismatch master version %s,"
1441                                  " node version %s" %
1442                                  (constants.PROTOCOL_VERSION, result))
1443     else:
1444       raise errors.OpExecError("Cannot get version from the new node")
1445
1446     # setup ssh on node
1447     logger.Info("copy ssh key to node %s" % node)
1448     keyarray = []
1449     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452
1453     for i in keyfiles:
1454       f = open(i, 'r')
1455       try:
1456         keyarray.append(f.read())
1457       finally:
1458         f.close()
1459
1460     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461                                keyarray[3], keyarray[4], keyarray[5])
1462
1463     if not result:
1464       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465
1466     # Add node to our /etc/hosts, and add key to known_hosts
1467     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469                       self.cfg.GetHostKey())
1470
1471     if new_node.secondary_ip != new_node.primary_ip:
1472       result = ssh.SSHCall(node, "root",
1473                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474       if result.failed:
1475         raise errors.OpExecError("Node claims it doesn't have the"
1476                                  " secondary ip you gave (%s).\n"
1477                                  "Please fix and re-run this command." %
1478                                  new_node.secondary_ip)
1479
1480     success, msg = ssh.VerifyNodeHostname(node)
1481     if not success:
1482       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483                                " than the one the resolver gives: %s.\n"
1484                                "Please fix and re-run this command." %
1485                                (node, msg))
1486
1487     # Distribute updated /etc/hosts and known_hosts to all nodes,
1488     # including the node just added
1489     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490     dist_nodes = self.cfg.GetNodeList() + [node]
1491     if myself.name in dist_nodes:
1492       dist_nodes.remove(myself.name)
1493
1494     logger.Debug("Copying hosts and known_hosts to all nodes")
1495     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496       result = rpc.call_upload_file(dist_nodes, fname)
1497       for to_node in dist_nodes:
1498         if not result[to_node]:
1499           logger.Error("copy of file %s to node %s failed" %
1500                        (fname, to_node))
1501
1502     to_copy = ss.GetFileList()
1503     for fname in to_copy:
1504       if not ssh.CopyFileToNode(node, fname):
1505         logger.Error("could not copy file %s to node %s" % (fname, node))
1506
1507     logger.Info("adding node %s to cluster.conf" % node)
1508     self.cfg.AddNode(new_node)
1509
1510
1511 class LUMasterFailover(LogicalUnit):
1512   """Failover the master node to the current node.
1513
1514   This is a special LU in that it must run on a non-master node.
1515
1516   """
1517   HPATH = "master-failover"
1518   HTYPE = constants.HTYPE_CLUSTER
1519   REQ_MASTER = False
1520   _OP_REQP = []
1521
1522   def BuildHooksEnv(self):
1523     """Build hooks env.
1524
1525     This will run on the new master only in the pre phase, and on all
1526     the nodes in the post phase.
1527
1528     """
1529     env = {
1530       "NEW_MASTER": self.new_master,
1531       "OLD_MASTER": self.old_master,
1532       }
1533     return env, [self.new_master], self.cfg.GetNodeList()
1534
1535   def CheckPrereq(self):
1536     """Check prerequisites.
1537
1538     This checks that we are not already the master.
1539
1540     """
1541     self.new_master = socket.gethostname()
1542
1543     self.old_master = self.sstore.GetMasterNode()
1544
1545     if self.old_master == self.new_master:
1546       raise errors.OpPrereqError("This commands must be run on the node"
1547                                  " where you want the new master to be.\n"
1548                                  "%s is already the master" %
1549                                  self.old_master)
1550
1551   def Exec(self, feedback_fn):
1552     """Failover the master node.
1553
1554     This command, when run on a non-master node, will cause the current
1555     master to cease being master, and the non-master to become new
1556     master.
1557
1558     """
1559     #TODO: do not rely on gethostname returning the FQDN
1560     logger.Info("setting master to %s, old master: %s" %
1561                 (self.new_master, self.old_master))
1562
1563     if not rpc.call_node_stop_master(self.old_master):
1564       logger.Error("could disable the master role on the old master"
1565                    " %s, please disable manually" % self.old_master)
1566
1567     ss = self.sstore
1568     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571       logger.Error("could not distribute the new simple store master file"
1572                    " to the other nodes, please check.")
1573
1574     if not rpc.call_node_start_master(self.new_master):
1575       logger.Error("could not start the master role on the new master"
1576                    " %s, please check" % self.new_master)
1577       feedback_fn("Error in activating the master IP on the new master,\n"
1578                   "please fix manually.")
1579
1580
1581
1582 class LUQueryClusterInfo(NoHooksLU):
1583   """Query cluster configuration.
1584
1585   """
1586   _OP_REQP = []
1587   REQ_MASTER = False
1588
1589   def CheckPrereq(self):
1590     """No prerequsites needed for this LU.
1591
1592     """
1593     pass
1594
1595   def Exec(self, feedback_fn):
1596     """Return cluster config.
1597
1598     """
1599     result = {
1600       "name": self.sstore.GetClusterName(),
1601       "software_version": constants.RELEASE_VERSION,
1602       "protocol_version": constants.PROTOCOL_VERSION,
1603       "config_version": constants.CONFIG_VERSION,
1604       "os_api_version": constants.OS_API_VERSION,
1605       "export_version": constants.EXPORT_VERSION,
1606       "master": self.sstore.GetMasterNode(),
1607       "architecture": (platform.architecture()[0], platform.machine()),
1608       }
1609
1610     return result
1611
1612
1613 class LUClusterCopyFile(NoHooksLU):
1614   """Copy file to cluster.
1615
1616   """
1617   _OP_REQP = ["nodes", "filename"]
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     It should check that the named file exists and that the given list
1623     of nodes is valid.
1624
1625     """
1626     if not os.path.exists(self.op.filename):
1627       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628
1629     self.nodes = _GetWantedNodes(self, self.op.nodes)
1630
1631   def Exec(self, feedback_fn):
1632     """Copy a file from master to some nodes.
1633
1634     Args:
1635       opts - class with options as members
1636       args - list containing a single element, the file name
1637     Opts used:
1638       nodes - list containing the name of target nodes; if empty, all nodes
1639
1640     """
1641     filename = self.op.filename
1642
1643     myname = socket.gethostname()
1644
1645     for node in self.nodes:
1646       if node == myname:
1647         continue
1648       if not ssh.CopyFileToNode(node, filename):
1649         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650
1651
1652 class LUDumpClusterConfig(NoHooksLU):
1653   """Return a text-representation of the cluster-config.
1654
1655   """
1656   _OP_REQP = []
1657
1658   def CheckPrereq(self):
1659     """No prerequisites.
1660
1661     """
1662     pass
1663
1664   def Exec(self, feedback_fn):
1665     """Dump a representation of the cluster config to the standard output.
1666
1667     """
1668     return self.cfg.DumpConfig()
1669
1670
1671 class LURunClusterCommand(NoHooksLU):
1672   """Run a command on some nodes.
1673
1674   """
1675   _OP_REQP = ["command", "nodes"]
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     It checks that the given list of nodes is valid.
1681
1682     """
1683     self.nodes = _GetWantedNodes(self, self.op.nodes)
1684
1685   def Exec(self, feedback_fn):
1686     """Run a command on some nodes.
1687
1688     """
1689     data = []
1690     for node in self.nodes:
1691       result = ssh.SSHCall(node, "root", self.op.command)
1692       data.append((node, result.output, result.exit_code))
1693
1694     return data
1695
1696
1697 class LUActivateInstanceDisks(NoHooksLU):
1698   """Bring up an instance's disks.
1699
1700   """
1701   _OP_REQP = ["instance_name"]
1702
1703   def CheckPrereq(self):
1704     """Check prerequisites.
1705
1706     This checks that the instance is in the cluster.
1707
1708     """
1709     instance = self.cfg.GetInstanceInfo(
1710       self.cfg.ExpandInstanceName(self.op.instance_name))
1711     if instance is None:
1712       raise errors.OpPrereqError("Instance '%s' not known" %
1713                                  self.op.instance_name)
1714     self.instance = instance
1715
1716
1717   def Exec(self, feedback_fn):
1718     """Activate the disks.
1719
1720     """
1721     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722     if not disks_ok:
1723       raise errors.OpExecError("Cannot activate block devices")
1724
1725     return disks_info
1726
1727
1728 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729   """Prepare the block devices for an instance.
1730
1731   This sets up the block devices on all nodes.
1732
1733   Args:
1734     instance: a ganeti.objects.Instance object
1735     ignore_secondaries: if true, errors on secondary nodes won't result
1736                         in an error return from the function
1737
1738   Returns:
1739     false if the operation failed
1740     list of (host, instance_visible_name, node_visible_name) if the operation
1741          suceeded with the mapping from node devices to instance devices
1742   """
1743   device_info = []
1744   disks_ok = True
1745   for inst_disk in instance.disks:
1746     master_result = None
1747     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748       cfg.SetDiskID(node_disk, node)
1749       is_primary = node == instance.primary_node
1750       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751       if not result:
1752         logger.Error("could not prepare block device %s on node %s (is_pri"
1753                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754         if is_primary or not ignore_secondaries:
1755           disks_ok = False
1756       if is_primary:
1757         master_result = result
1758     device_info.append((instance.primary_node, inst_disk.iv_name,
1759                         master_result))
1760
1761   return disks_ok, device_info
1762
1763
1764 def _StartInstanceDisks(cfg, instance, force):
1765   """Start the disks of an instance.
1766
1767   """
1768   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769                                            ignore_secondaries=force)
1770   if not disks_ok:
1771     _ShutdownInstanceDisks(instance, cfg)
1772     if force is not None and not force:
1773       logger.Error("If the message above refers to a secondary node,"
1774                    " you can retry the operation using '--force'.")
1775     raise errors.OpExecError("Disk consistency error")
1776
1777
1778 class LUDeactivateInstanceDisks(NoHooksLU):
1779   """Shutdown an instance's disks.
1780
1781   """
1782   _OP_REQP = ["instance_name"]
1783
1784   def CheckPrereq(self):
1785     """Check prerequisites.
1786
1787     This checks that the instance is in the cluster.
1788
1789     """
1790     instance = self.cfg.GetInstanceInfo(
1791       self.cfg.ExpandInstanceName(self.op.instance_name))
1792     if instance is None:
1793       raise errors.OpPrereqError("Instance '%s' not known" %
1794                                  self.op.instance_name)
1795     self.instance = instance
1796
1797   def Exec(self, feedback_fn):
1798     """Deactivate the disks
1799
1800     """
1801     instance = self.instance
1802     ins_l = rpc.call_instance_list([instance.primary_node])
1803     ins_l = ins_l[instance.primary_node]
1804     if not type(ins_l) is list:
1805       raise errors.OpExecError("Can't contact node '%s'" %
1806                                instance.primary_node)
1807
1808     if self.instance.name in ins_l:
1809       raise errors.OpExecError("Instance is running, can't shutdown"
1810                                " block devices.")
1811
1812     _ShutdownInstanceDisks(instance, self.cfg)
1813
1814
1815 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816   """Shutdown block devices of an instance.
1817
1818   This does the shutdown on all nodes of the instance.
1819
1820   If the ignore_primary is false, errors on the primary node are
1821   ignored.
1822
1823   """
1824   result = True
1825   for disk in instance.disks:
1826     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827       cfg.SetDiskID(top_disk, node)
1828       if not rpc.call_blockdev_shutdown(node, top_disk):
1829         logger.Error("could not shutdown block device %s on node %s" %
1830                      (disk.iv_name, node))
1831         if not ignore_primary or node != instance.primary_node:
1832           result = False
1833   return result
1834
1835
1836 class LUStartupInstance(LogicalUnit):
1837   """Starts an instance.
1838
1839   """
1840   HPATH = "instance-start"
1841   HTYPE = constants.HTYPE_INSTANCE
1842   _OP_REQP = ["instance_name", "force"]
1843
1844   def BuildHooksEnv(self):
1845     """Build hooks env.
1846
1847     This runs on master, primary and secondary nodes of the instance.
1848
1849     """
1850     env = {
1851       "FORCE": self.op.force,
1852       }
1853     env.update(_BuildInstanceHookEnvByObject(self.instance))
1854     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855           list(self.instance.secondary_nodes))
1856     return env, nl, nl
1857
1858   def CheckPrereq(self):
1859     """Check prerequisites.
1860
1861     This checks that the instance is in the cluster.
1862
1863     """
1864     instance = self.cfg.GetInstanceInfo(
1865       self.cfg.ExpandInstanceName(self.op.instance_name))
1866     if instance is None:
1867       raise errors.OpPrereqError("Instance '%s' not known" %
1868                                  self.op.instance_name)
1869
1870     # check bridges existance
1871     brlist = [nic.bridge for nic in instance.nics]
1872     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873       raise errors.OpPrereqError("one or more target bridges %s does not"
1874                                  " exist on destination node '%s'" %
1875                                  (brlist, instance.primary_node))
1876
1877     self.instance = instance
1878     self.op.instance_name = instance.name
1879
1880   def Exec(self, feedback_fn):
1881     """Start the instance.
1882
1883     """
1884     instance = self.instance
1885     force = self.op.force
1886     extra_args = getattr(self.op, "extra_args", "")
1887
1888     node_current = instance.primary_node
1889
1890     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891     if not nodeinfo:
1892       raise errors.OpExecError("Could not contact node %s for infos" %
1893                                (node_current))
1894
1895     freememory = nodeinfo[node_current]['memory_free']
1896     memory = instance.memory
1897     if memory > freememory:
1898       raise errors.OpExecError("Not enough memory to start instance"
1899                                " %s on node %s"
1900                                " needed %s MiB, available %s MiB" %
1901                                (instance.name, node_current, memory,
1902                                 freememory))
1903
1904     _StartInstanceDisks(self.cfg, instance, force)
1905
1906     if not rpc.call_instance_start(node_current, instance, extra_args):
1907       _ShutdownInstanceDisks(instance, self.cfg)
1908       raise errors.OpExecError("Could not start instance")
1909
1910     self.cfg.MarkInstanceUp(instance.name)
1911
1912
1913 class LUShutdownInstance(LogicalUnit):
1914   """Shutdown an instance.
1915
1916   """
1917   HPATH = "instance-stop"
1918   HTYPE = constants.HTYPE_INSTANCE
1919   _OP_REQP = ["instance_name"]
1920
1921   def BuildHooksEnv(self):
1922     """Build hooks env.
1923
1924     This runs on master, primary and secondary nodes of the instance.
1925
1926     """
1927     env = _BuildInstanceHookEnvByObject(self.instance)
1928     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929           list(self.instance.secondary_nodes))
1930     return env, nl, nl
1931
1932   def CheckPrereq(self):
1933     """Check prerequisites.
1934
1935     This checks that the instance is in the cluster.
1936
1937     """
1938     instance = self.cfg.GetInstanceInfo(
1939       self.cfg.ExpandInstanceName(self.op.instance_name))
1940     if instance is None:
1941       raise errors.OpPrereqError("Instance '%s' not known" %
1942                                  self.op.instance_name)
1943     self.instance = instance
1944
1945   def Exec(self, feedback_fn):
1946     """Shutdown the instance.
1947
1948     """
1949     instance = self.instance
1950     node_current = instance.primary_node
1951     if not rpc.call_instance_shutdown(node_current, instance):
1952       logger.Error("could not shutdown instance")
1953
1954     self.cfg.MarkInstanceDown(instance.name)
1955     _ShutdownInstanceDisks(instance, self.cfg)
1956
1957
1958 class LUReinstallInstance(LogicalUnit):
1959   """Reinstall an instance.
1960
1961   """
1962   HPATH = "instance-reinstall"
1963   HTYPE = constants.HTYPE_INSTANCE
1964   _OP_REQP = ["instance_name"]
1965
1966   def BuildHooksEnv(self):
1967     """Build hooks env.
1968
1969     This runs on master, primary and secondary nodes of the instance.
1970
1971     """
1972     env = _BuildInstanceHookEnvByObject(self.instance)
1973     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974           list(self.instance.secondary_nodes))
1975     return env, nl, nl
1976
1977   def CheckPrereq(self):
1978     """Check prerequisites.
1979
1980     This checks that the instance is in the cluster and is not running.
1981
1982     """
1983     instance = self.cfg.GetInstanceInfo(
1984       self.cfg.ExpandInstanceName(self.op.instance_name))
1985     if instance is None:
1986       raise errors.OpPrereqError("Instance '%s' not known" %
1987                                  self.op.instance_name)
1988     if instance.disk_template == constants.DT_DISKLESS:
1989       raise errors.OpPrereqError("Instance '%s' has no disks" %
1990                                  self.op.instance_name)
1991     if instance.status != "down":
1992       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993                                  self.op.instance_name)
1994     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995     if remote_info:
1996       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997                                  (self.op.instance_name,
1998                                   instance.primary_node))
1999
2000     self.op.os_type = getattr(self.op, "os_type", None)
2001     if self.op.os_type is not None:
2002       # OS verification
2003       pnode = self.cfg.GetNodeInfo(
2004         self.cfg.ExpandNodeName(instance.primary_node))
2005       if pnode is None:
2006         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007                                    self.op.pnode)
2008       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009       if not isinstance(os_obj, objects.OS):
2010         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011                                    " primary node"  % self.op.os_type)
2012
2013     self.instance = instance
2014
2015   def Exec(self, feedback_fn):
2016     """Reinstall the instance.
2017
2018     """
2019     inst = self.instance
2020
2021     if self.op.os_type is not None:
2022       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023       inst.os = self.op.os_type
2024       self.cfg.AddInstance(inst)
2025
2026     _StartInstanceDisks(self.cfg, inst, None)
2027     try:
2028       feedback_fn("Running the instance OS create scripts...")
2029       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030         raise errors.OpExecError("Could not install OS for instance %s "
2031                                  "on node %s" %
2032                                  (inst.name, inst.primary_node))
2033     finally:
2034       _ShutdownInstanceDisks(inst, self.cfg)
2035
2036
2037 class LURemoveInstance(LogicalUnit):
2038   """Remove an instance.
2039
2040   """
2041   HPATH = "instance-remove"
2042   HTYPE = constants.HTYPE_INSTANCE
2043   _OP_REQP = ["instance_name"]
2044
2045   def BuildHooksEnv(self):
2046     """Build hooks env.
2047
2048     This runs on master, primary and secondary nodes of the instance.
2049
2050     """
2051     env = _BuildInstanceHookEnvByObject(self.instance)
2052     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053           list(self.instance.secondary_nodes))
2054     return env, nl, nl
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     This checks that the instance is in the cluster.
2060
2061     """
2062     instance = self.cfg.GetInstanceInfo(
2063       self.cfg.ExpandInstanceName(self.op.instance_name))
2064     if instance is None:
2065       raise errors.OpPrereqError("Instance '%s' not known" %
2066                                  self.op.instance_name)
2067     self.instance = instance
2068
2069   def Exec(self, feedback_fn):
2070     """Remove the instance.
2071
2072     """
2073     instance = self.instance
2074     logger.Info("shutting down instance %s on node %s" %
2075                 (instance.name, instance.primary_node))
2076
2077     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2078       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2079                                (instance.name, instance.primary_node))
2080
2081     logger.Info("removing block devices for instance %s" % instance.name)
2082
2083     _RemoveDisks(instance, self.cfg)
2084
2085     logger.Info("removing instance %s out of cluster config" % instance.name)
2086
2087     self.cfg.RemoveInstance(instance.name)
2088
2089
2090 class LUQueryInstances(NoHooksLU):
2091   """Logical unit for querying instances.
2092
2093   """
2094   _OP_REQP = ["output_fields", "names"]
2095
2096   def CheckPrereq(self):
2097     """Check prerequisites.
2098
2099     This checks that the fields required are valid output fields.
2100
2101     """
2102     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2103     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2104                                "admin_state", "admin_ram",
2105                                "disk_template", "ip", "mac", "bridge",
2106                                "sda_size", "sdb_size"],
2107                        dynamic=self.dynamic_fields,
2108                        selected=self.op.output_fields)
2109
2110     self.wanted = _GetWantedInstances(self, self.op.names)
2111
2112   def Exec(self, feedback_fn):
2113     """Computes the list of nodes and their attributes.
2114
2115     """
2116     instance_names = self.wanted
2117     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2118                      in instance_names]
2119
2120     # begin data gathering
2121
2122     nodes = frozenset([inst.primary_node for inst in instance_list])
2123
2124     bad_nodes = []
2125     if self.dynamic_fields.intersection(self.op.output_fields):
2126       live_data = {}
2127       node_data = rpc.call_all_instances_info(nodes)
2128       for name in nodes:
2129         result = node_data[name]
2130         if result:
2131           live_data.update(result)
2132         elif result == False:
2133           bad_nodes.append(name)
2134         # else no instance is alive
2135     else:
2136       live_data = dict([(name, {}) for name in instance_names])
2137
2138     # end data gathering
2139
2140     output = []
2141     for instance in instance_list:
2142       iout = []
2143       for field in self.op.output_fields:
2144         if field == "name":
2145           val = instance.name
2146         elif field == "os":
2147           val = instance.os
2148         elif field == "pnode":
2149           val = instance.primary_node
2150         elif field == "snodes":
2151           val = list(instance.secondary_nodes)
2152         elif field == "admin_state":
2153           val = (instance.status != "down")
2154         elif field == "oper_state":
2155           if instance.primary_node in bad_nodes:
2156             val = None
2157           else:
2158             val = bool(live_data.get(instance.name))
2159         elif field == "admin_ram":
2160           val = instance.memory
2161         elif field == "oper_ram":
2162           if instance.primary_node in bad_nodes:
2163             val = None
2164           elif instance.name in live_data:
2165             val = live_data[instance.name].get("memory", "?")
2166           else:
2167             val = "-"
2168         elif field == "disk_template":
2169           val = instance.disk_template
2170         elif field == "ip":
2171           val = instance.nics[0].ip
2172         elif field == "bridge":
2173           val = instance.nics[0].bridge
2174         elif field == "mac":
2175           val = instance.nics[0].mac
2176         elif field == "sda_size" or field == "sdb_size":
2177           disk = instance.FindDisk(field[:3])
2178           if disk is None:
2179             val = None
2180           else:
2181             val = disk.size
2182         else:
2183           raise errors.ParameterError(field)
2184         iout.append(val)
2185       output.append(iout)
2186
2187     return output
2188
2189
2190 class LUFailoverInstance(LogicalUnit):
2191   """Failover an instance.
2192
2193   """
2194   HPATH = "instance-failover"
2195   HTYPE = constants.HTYPE_INSTANCE
2196   _OP_REQP = ["instance_name", "ignore_consistency"]
2197
2198   def BuildHooksEnv(self):
2199     """Build hooks env.
2200
2201     This runs on master, primary and secondary nodes of the instance.
2202
2203     """
2204     env = {
2205       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2206       }
2207     env.update(_BuildInstanceHookEnvByObject(self.instance))
2208     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2209     return env, nl, nl
2210
2211   def CheckPrereq(self):
2212     """Check prerequisites.
2213
2214     This checks that the instance is in the cluster.
2215
2216     """
2217     instance = self.cfg.GetInstanceInfo(
2218       self.cfg.ExpandInstanceName(self.op.instance_name))
2219     if instance is None:
2220       raise errors.OpPrereqError("Instance '%s' not known" %
2221                                  self.op.instance_name)
2222
2223     if instance.disk_template != constants.DT_REMOTE_RAID1:
2224       raise errors.OpPrereqError("Instance's disk layout is not"
2225                                  " remote_raid1.")
2226
2227     secondary_nodes = instance.secondary_nodes
2228     if not secondary_nodes:
2229       raise errors.ProgrammerError("no secondary node but using "
2230                                    "DT_REMOTE_RAID1 template")
2231
2232     # check memory requirements on the secondary node
2233     target_node = secondary_nodes[0]
2234     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2235     info = nodeinfo.get(target_node, None)
2236     if not info:
2237       raise errors.OpPrereqError("Cannot get current information"
2238                                  " from node '%s'" % nodeinfo)
2239     if instance.memory > info['memory_free']:
2240       raise errors.OpPrereqError("Not enough memory on target node %s."
2241                                  " %d MB available, %d MB required" %
2242                                  (target_node, info['memory_free'],
2243                                   instance.memory))
2244
2245     # check bridge existance
2246     brlist = [nic.bridge for nic in instance.nics]
2247     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2248       raise errors.OpPrereqError("One or more target bridges %s does not"
2249                                  " exist on destination node '%s'" %
2250                                  (brlist, instance.primary_node))
2251
2252     self.instance = instance
2253
2254   def Exec(self, feedback_fn):
2255     """Failover an instance.
2256
2257     The failover is done by shutting it down on its present node and
2258     starting it on the secondary.
2259
2260     """
2261     instance = self.instance
2262
2263     source_node = instance.primary_node
2264     target_node = instance.secondary_nodes[0]
2265
2266     feedback_fn("* checking disk consistency between source and target")
2267     for dev in instance.disks:
2268       # for remote_raid1, these are md over drbd
2269       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2270         if not self.op.ignore_consistency:
2271           raise errors.OpExecError("Disk %s is degraded on target node,"
2272                                    " aborting failover." % dev.iv_name)
2273
2274     feedback_fn("* checking target node resource availability")
2275     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2276
2277     if not nodeinfo:
2278       raise errors.OpExecError("Could not contact target node %s." %
2279                                target_node)
2280
2281     free_memory = int(nodeinfo[target_node]['memory_free'])
2282     memory = instance.memory
2283     if memory > free_memory:
2284       raise errors.OpExecError("Not enough memory to create instance %s on"
2285                                " node %s. needed %s MiB, available %s MiB" %
2286                                (instance.name, target_node, memory,
2287                                 free_memory))
2288
2289     feedback_fn("* shutting down instance on source node")
2290     logger.Info("Shutting down instance %s on node %s" %
2291                 (instance.name, source_node))
2292
2293     if not rpc.call_instance_shutdown(source_node, instance):
2294       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2295                    " anyway. Please make sure node %s is down"  %
2296                    (instance.name, source_node, source_node))
2297
2298     feedback_fn("* deactivating the instance's disks on source node")
2299     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2300       raise errors.OpExecError("Can't shut down the instance's disks.")
2301
2302     instance.primary_node = target_node
2303     # distribute new instance config to the other nodes
2304     self.cfg.AddInstance(instance)
2305
2306     feedback_fn("* activating the instance's disks on target node")
2307     logger.Info("Starting instance %s on node %s" %
2308                 (instance.name, target_node))
2309
2310     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2311                                              ignore_secondaries=True)
2312     if not disks_ok:
2313       _ShutdownInstanceDisks(instance, self.cfg)
2314       raise errors.OpExecError("Can't activate the instance's disks")
2315
2316     feedback_fn("* starting the instance on the target node")
2317     if not rpc.call_instance_start(target_node, instance, None):
2318       _ShutdownInstanceDisks(instance, self.cfg)
2319       raise errors.OpExecError("Could not start instance %s on node %s." %
2320                                (instance.name, target_node))
2321
2322
2323 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2324   """Create a tree of block devices on the primary node.
2325
2326   This always creates all devices.
2327
2328   """
2329   if device.children:
2330     for child in device.children:
2331       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2332         return False
2333
2334   cfg.SetDiskID(device, node)
2335   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2336   if not new_id:
2337     return False
2338   if device.physical_id is None:
2339     device.physical_id = new_id
2340   return True
2341
2342
2343 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2344   """Create a tree of block devices on a secondary node.
2345
2346   If this device type has to be created on secondaries, create it and
2347   all its children.
2348
2349   If not, just recurse to children keeping the same 'force' value.
2350
2351   """
2352   if device.CreateOnSecondary():
2353     force = True
2354   if device.children:
2355     for child in device.children:
2356       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2357         return False
2358
2359   if not force:
2360     return True
2361   cfg.SetDiskID(device, node)
2362   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2363   if not new_id:
2364     return False
2365   if device.physical_id is None:
2366     device.physical_id = new_id
2367   return True
2368
2369
2370 def _GenerateUniqueNames(cfg, exts):
2371   """Generate a suitable LV name.
2372
2373   This will generate a logical volume name for the given instance.
2374
2375   """
2376   results = []
2377   for val in exts:
2378     new_id = cfg.GenerateUniqueID()
2379     results.append("%s%s" % (new_id, val))
2380   return results
2381
2382
2383 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2384   """Generate a drbd device complete with its children.
2385
2386   """
2387   port = cfg.AllocatePort()
2388   vgname = cfg.GetVGName()
2389   dev_data = objects.Disk(dev_type="lvm", size=size,
2390                           logical_id=(vgname, names[0]))
2391   dev_meta = objects.Disk(dev_type="lvm", size=128,
2392                           logical_id=(vgname, names[1]))
2393   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2394                           logical_id = (primary, secondary, port),
2395                           children = [dev_data, dev_meta])
2396   return drbd_dev
2397
2398
2399 def _GenerateDiskTemplate(cfg, template_name,
2400                           instance_name, primary_node,
2401                           secondary_nodes, disk_sz, swap_sz):
2402   """Generate the entire disk layout for a given template type.
2403
2404   """
2405   #TODO: compute space requirements
2406
2407   vgname = cfg.GetVGName()
2408   if template_name == "diskless":
2409     disks = []
2410   elif template_name == "plain":
2411     if len(secondary_nodes) != 0:
2412       raise errors.ProgrammerError("Wrong template configuration")
2413
2414     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2415     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2416                            logical_id=(vgname, names[0]),
2417                            iv_name = "sda")
2418     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2419                            logical_id=(vgname, names[1]),
2420                            iv_name = "sdb")
2421     disks = [sda_dev, sdb_dev]
2422   elif template_name == "local_raid1":
2423     if len(secondary_nodes) != 0:
2424       raise errors.ProgrammerError("Wrong template configuration")
2425
2426
2427     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2428                                        ".sdb_m1", ".sdb_m2"])
2429     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2430                               logical_id=(vgname, names[0]))
2431     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2432                               logical_id=(vgname, names[1]))
2433     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2434                               size=disk_sz,
2435                               children = [sda_dev_m1, sda_dev_m2])
2436     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2437                               logical_id=(vgname, names[2]))
2438     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2439                               logical_id=(vgname, names[3]))
2440     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2441                               size=swap_sz,
2442                               children = [sdb_dev_m1, sdb_dev_m2])
2443     disks = [md_sda_dev, md_sdb_dev]
2444   elif template_name == constants.DT_REMOTE_RAID1:
2445     if len(secondary_nodes) != 1:
2446       raise errors.ProgrammerError("Wrong template configuration")
2447     remote_node = secondary_nodes[0]
2448     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2449                                        ".sdb_data", ".sdb_meta"])
2450     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2451                                          disk_sz, names[0:2])
2452     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2453                               children = [drbd_sda_dev], size=disk_sz)
2454     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2455                                          swap_sz, names[2:4])
2456     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2457                               children = [drbd_sdb_dev], size=swap_sz)
2458     disks = [md_sda_dev, md_sdb_dev]
2459   else:
2460     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2461   return disks
2462
2463
2464 def _GetInstanceInfoText(instance):
2465   """Compute that text that should be added to the disk's metadata.
2466
2467   """
2468   return "originstname+%s" % instance.name
2469
2470
2471 def _CreateDisks(cfg, instance):
2472   """Create all disks for an instance.
2473
2474   This abstracts away some work from AddInstance.
2475
2476   Args:
2477     instance: the instance object
2478
2479   Returns:
2480     True or False showing the success of the creation process
2481
2482   """
2483   info = _GetInstanceInfoText(instance)
2484
2485   for device in instance.disks:
2486     logger.Info("creating volume %s for instance %s" %
2487               (device.iv_name, instance.name))
2488     #HARDCODE
2489     for secondary_node in instance.secondary_nodes:
2490       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2491                                         info):
2492         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2493                      (device.iv_name, device, secondary_node))
2494         return False
2495     #HARDCODE
2496     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2497       logger.Error("failed to create volume %s on primary!" %
2498                    device.iv_name)
2499       return False
2500   return True
2501
2502
2503 def _RemoveDisks(instance, cfg):
2504   """Remove all disks for an instance.
2505
2506   This abstracts away some work from `AddInstance()` and
2507   `RemoveInstance()`. Note that in case some of the devices couldn't
2508   be remove, the removal will continue with the other ones (compare
2509   with `_CreateDisks()`).
2510
2511   Args:
2512     instance: the instance object
2513
2514   Returns:
2515     True or False showing the success of the removal proces
2516
2517   """
2518   logger.Info("removing block devices for instance %s" % instance.name)
2519
2520   result = True
2521   for device in instance.disks:
2522     for node, disk in device.ComputeNodeTree(instance.primary_node):
2523       cfg.SetDiskID(disk, node)
2524       if not rpc.call_blockdev_remove(node, disk):
2525         logger.Error("could not remove block device %s on node %s,"
2526                      " continuing anyway" %
2527                      (device.iv_name, node))
2528         result = False
2529   return result
2530
2531
2532 class LUCreateInstance(LogicalUnit):
2533   """Create an instance.
2534
2535   """
2536   HPATH = "instance-add"
2537   HTYPE = constants.HTYPE_INSTANCE
2538   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2539               "disk_template", "swap_size", "mode", "start", "vcpus",
2540               "wait_for_sync"]
2541
2542   def BuildHooksEnv(self):
2543     """Build hooks env.
2544
2545     This runs on master, primary and secondary nodes of the instance.
2546
2547     """
2548     env = {
2549       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2550       "INSTANCE_DISK_SIZE": self.op.disk_size,
2551       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2552       "INSTANCE_ADD_MODE": self.op.mode,
2553       }
2554     if self.op.mode == constants.INSTANCE_IMPORT:
2555       env["INSTANCE_SRC_NODE"] = self.op.src_node
2556       env["INSTANCE_SRC_PATH"] = self.op.src_path
2557       env["INSTANCE_SRC_IMAGE"] = self.src_image
2558
2559     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2560       primary_node=self.op.pnode,
2561       secondary_nodes=self.secondaries,
2562       status=self.instance_status,
2563       os_type=self.op.os_type,
2564       memory=self.op.mem_size,
2565       vcpus=self.op.vcpus,
2566       nics=[(self.inst_ip, self.op.bridge)],
2567     ))
2568
2569     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2570           self.secondaries)
2571     return env, nl, nl
2572
2573
2574   def CheckPrereq(self):
2575     """Check prerequisites.
2576
2577     """
2578     if self.op.mode not in (constants.INSTANCE_CREATE,
2579                             constants.INSTANCE_IMPORT):
2580       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2581                                  self.op.mode)
2582
2583     if self.op.mode == constants.INSTANCE_IMPORT:
2584       src_node = getattr(self.op, "src_node", None)
2585       src_path = getattr(self.op, "src_path", None)
2586       if src_node is None or src_path is None:
2587         raise errors.OpPrereqError("Importing an instance requires source"
2588                                    " node and path options")
2589       src_node_full = self.cfg.ExpandNodeName(src_node)
2590       if src_node_full is None:
2591         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2592       self.op.src_node = src_node = src_node_full
2593
2594       if not os.path.isabs(src_path):
2595         raise errors.OpPrereqError("The source path must be absolute")
2596
2597       export_info = rpc.call_export_info(src_node, src_path)
2598
2599       if not export_info:
2600         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2601
2602       if not export_info.has_section(constants.INISECT_EXP):
2603         raise errors.ProgrammerError("Corrupted export config")
2604
2605       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2606       if (int(ei_version) != constants.EXPORT_VERSION):
2607         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2608                                    (ei_version, constants.EXPORT_VERSION))
2609
2610       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2611         raise errors.OpPrereqError("Can't import instance with more than"
2612                                    " one data disk")
2613
2614       # FIXME: are the old os-es, disk sizes, etc. useful?
2615       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2616       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2617                                                          'disk0_dump'))
2618       self.src_image = diskimage
2619     else: # INSTANCE_CREATE
2620       if getattr(self.op, "os_type", None) is None:
2621         raise errors.OpPrereqError("No guest OS specified")
2622
2623     # check primary node
2624     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2625     if pnode is None:
2626       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2627                                  self.op.pnode)
2628     self.op.pnode = pnode.name
2629     self.pnode = pnode
2630     self.secondaries = []
2631     # disk template and mirror node verification
2632     if self.op.disk_template not in constants.DISK_TEMPLATES:
2633       raise errors.OpPrereqError("Invalid disk template name")
2634
2635     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2636       if getattr(self.op, "snode", None) is None:
2637         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2638                                    " a mirror node")
2639
2640       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2641       if snode_name is None:
2642         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2643                                    self.op.snode)
2644       elif snode_name == pnode.name:
2645         raise errors.OpPrereqError("The secondary node cannot be"
2646                                    " the primary node.")
2647       self.secondaries.append(snode_name)
2648
2649     # Check lv size requirements
2650     nodenames = [pnode.name] + self.secondaries
2651     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2652
2653     # Required free disk space as a function of disk and swap space
2654     req_size_dict = {
2655       constants.DT_DISKLESS: 0,
2656       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2657       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2658       # 256 MB are added for drbd metadata, 128MB for each drbd device
2659       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2660     }
2661
2662     if self.op.disk_template not in req_size_dict:
2663       raise errors.ProgrammerError("Disk template '%s' size requirement"
2664                                    " is unknown" %  self.op.disk_template)
2665
2666     req_size = req_size_dict[self.op.disk_template]
2667
2668     for node in nodenames:
2669       info = nodeinfo.get(node, None)
2670       if not info:
2671         raise errors.OpPrereqError("Cannot get current information"
2672                                    " from node '%s'" % nodeinfo)
2673       if req_size > info['vg_free']:
2674         raise errors.OpPrereqError("Not enough disk space on target node %s."
2675                                    " %d MB available, %d MB required" %
2676                                    (node, info['vg_free'], req_size))
2677
2678     # os verification
2679     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2680     if not isinstance(os_obj, objects.OS):
2681       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2682                                  " primary node"  % self.op.os_type)
2683
2684     # instance verification
2685     hostname1 = utils.LookupHostname(self.op.instance_name)
2686     if not hostname1:
2687       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2688                                  self.op.instance_name)
2689
2690     self.op.instance_name = instance_name = hostname1['hostname']
2691     instance_list = self.cfg.GetInstanceList()
2692     if instance_name in instance_list:
2693       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2694                                  instance_name)
2695
2696     ip = getattr(self.op, "ip", None)
2697     if ip is None or ip.lower() == "none":
2698       inst_ip = None
2699     elif ip.lower() == "auto":
2700       inst_ip = hostname1['ip']
2701     else:
2702       if not utils.IsValidIP(ip):
2703         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2704                                    " like a valid IP" % ip)
2705       inst_ip = ip
2706     self.inst_ip = inst_ip
2707
2708     command = ["fping", "-q", hostname1['ip']]
2709     result = utils.RunCmd(command)
2710     if not result.failed:
2711       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2712                                  (hostname1['ip'], instance_name))
2713
2714     # bridge verification
2715     bridge = getattr(self.op, "bridge", None)
2716     if bridge is None:
2717       self.op.bridge = self.cfg.GetDefBridge()
2718     else:
2719       self.op.bridge = bridge
2720
2721     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2722       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2723                                  " destination node '%s'" %
2724                                  (self.op.bridge, pnode.name))
2725
2726     if self.op.start:
2727       self.instance_status = 'up'
2728     else:
2729       self.instance_status = 'down'
2730
2731   def Exec(self, feedback_fn):
2732     """Create and add the instance to the cluster.
2733
2734     """
2735     instance = self.op.instance_name
2736     pnode_name = self.pnode.name
2737
2738     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2739     if self.inst_ip is not None:
2740       nic.ip = self.inst_ip
2741
2742     disks = _GenerateDiskTemplate(self.cfg,
2743                                   self.op.disk_template,
2744                                   instance, pnode_name,
2745                                   self.secondaries, self.op.disk_size,
2746                                   self.op.swap_size)
2747
2748     iobj = objects.Instance(name=instance, os=self.op.os_type,
2749                             primary_node=pnode_name,
2750                             memory=self.op.mem_size,
2751                             vcpus=self.op.vcpus,
2752                             nics=[nic], disks=disks,
2753                             disk_template=self.op.disk_template,
2754                             status=self.instance_status,
2755                             )
2756
2757     feedback_fn("* creating instance disks...")
2758     if not _CreateDisks(self.cfg, iobj):
2759       _RemoveDisks(iobj, self.cfg)
2760       raise errors.OpExecError("Device creation failed, reverting...")
2761
2762     feedback_fn("adding instance %s to cluster config" % instance)
2763
2764     self.cfg.AddInstance(iobj)
2765
2766     if self.op.wait_for_sync:
2767       disk_abort = not _WaitForSync(self.cfg, iobj)
2768     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2769       # make sure the disks are not degraded (still sync-ing is ok)
2770       time.sleep(15)
2771       feedback_fn("* checking mirrors status")
2772       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2773     else:
2774       disk_abort = False
2775
2776     if disk_abort:
2777       _RemoveDisks(iobj, self.cfg)
2778       self.cfg.RemoveInstance(iobj.name)
2779       raise errors.OpExecError("There are some degraded disks for"
2780                                " this instance")
2781
2782     feedback_fn("creating os for instance %s on node %s" %
2783                 (instance, pnode_name))
2784
2785     if iobj.disk_template != constants.DT_DISKLESS:
2786       if self.op.mode == constants.INSTANCE_CREATE:
2787         feedback_fn("* running the instance OS create scripts...")
2788         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2789           raise errors.OpExecError("could not add os for instance %s"
2790                                    " on node %s" %
2791                                    (instance, pnode_name))
2792
2793       elif self.op.mode == constants.INSTANCE_IMPORT:
2794         feedback_fn("* running the instance OS import scripts...")
2795         src_node = self.op.src_node
2796         src_image = self.src_image
2797         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2798                                                 src_node, src_image):
2799           raise errors.OpExecError("Could not import os for instance"
2800                                    " %s on node %s" %
2801                                    (instance, pnode_name))
2802       else:
2803         # also checked in the prereq part
2804         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2805                                      % self.op.mode)
2806
2807     if self.op.start:
2808       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2809       feedback_fn("* starting instance...")
2810       if not rpc.call_instance_start(pnode_name, iobj, None):
2811         raise errors.OpExecError("Could not start instance")
2812
2813
2814 class LUConnectConsole(NoHooksLU):
2815   """Connect to an instance's console.
2816
2817   This is somewhat special in that it returns the command line that
2818   you need to run on the master node in order to connect to the
2819   console.
2820
2821   """
2822   _OP_REQP = ["instance_name"]
2823
2824   def CheckPrereq(self):
2825     """Check prerequisites.
2826
2827     This checks that the instance is in the cluster.
2828
2829     """
2830     instance = self.cfg.GetInstanceInfo(
2831       self.cfg.ExpandInstanceName(self.op.instance_name))
2832     if instance is None:
2833       raise errors.OpPrereqError("Instance '%s' not known" %
2834                                  self.op.instance_name)
2835     self.instance = instance
2836
2837   def Exec(self, feedback_fn):
2838     """Connect to the console of an instance
2839
2840     """
2841     instance = self.instance
2842     node = instance.primary_node
2843
2844     node_insts = rpc.call_instance_list([node])[node]
2845     if node_insts is False:
2846       raise errors.OpExecError("Can't connect to node %s." % node)
2847
2848     if instance.name not in node_insts:
2849       raise errors.OpExecError("Instance %s is not running." % instance.name)
2850
2851     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2852
2853     hyper = hypervisor.GetHypervisor()
2854     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2855     # build ssh cmdline
2856     argv = ["ssh", "-q", "-t"]
2857     argv.extend(ssh.KNOWN_HOSTS_OPTS)
2858     argv.extend(ssh.BATCH_MODE_OPTS)
2859     argv.append(node)
2860     argv.append(console_cmd)
2861     return "ssh", argv
2862
2863
2864 class LUAddMDDRBDComponent(LogicalUnit):
2865   """Adda new mirror member to an instance's disk.
2866
2867   """
2868   HPATH = "mirror-add"
2869   HTYPE = constants.HTYPE_INSTANCE
2870   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2871
2872   def BuildHooksEnv(self):
2873     """Build hooks env.
2874
2875     This runs on the master, the primary and all the secondaries.
2876
2877     """
2878     env = {
2879       "NEW_SECONDARY": self.op.remote_node,
2880       "DISK_NAME": self.op.disk_name,
2881       }
2882     env.update(_BuildInstanceHookEnvByObject(self.instance))
2883     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2884           self.op.remote_node,] + list(self.instance.secondary_nodes)
2885     return env, nl, nl
2886
2887   def CheckPrereq(self):
2888     """Check prerequisites.
2889
2890     This checks that the instance is in the cluster.
2891
2892     """
2893     instance = self.cfg.GetInstanceInfo(
2894       self.cfg.ExpandInstanceName(self.op.instance_name))
2895     if instance is None:
2896       raise errors.OpPrereqError("Instance '%s' not known" %
2897                                  self.op.instance_name)
2898     self.instance = instance
2899
2900     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2901     if remote_node is None:
2902       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2903     self.remote_node = remote_node
2904
2905     if remote_node == instance.primary_node:
2906       raise errors.OpPrereqError("The specified node is the primary node of"
2907                                  " the instance.")
2908
2909     if instance.disk_template != constants.DT_REMOTE_RAID1:
2910       raise errors.OpPrereqError("Instance's disk layout is not"
2911                                  " remote_raid1.")
2912     for disk in instance.disks:
2913       if disk.iv_name == self.op.disk_name:
2914         break
2915     else:
2916       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2917                                  " instance." % self.op.disk_name)
2918     if len(disk.children) > 1:
2919       raise errors.OpPrereqError("The device already has two slave"
2920                                  " devices.\n"
2921                                  "This would create a 3-disk raid1"
2922                                  " which we don't allow.")
2923     self.disk = disk
2924
2925   def Exec(self, feedback_fn):
2926     """Add the mirror component
2927
2928     """
2929     disk = self.disk
2930     instance = self.instance
2931
2932     remote_node = self.remote_node
2933     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2934     names = _GenerateUniqueNames(self.cfg, lv_names)
2935     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2936                                      remote_node, disk.size, names)
2937
2938     logger.Info("adding new mirror component on secondary")
2939     #HARDCODE
2940     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2941                                       _GetInstanceInfoText(instance)):
2942       raise errors.OpExecError("Failed to create new component on secondary"
2943                                " node %s" % remote_node)
2944
2945     logger.Info("adding new mirror component on primary")
2946     #HARDCODE
2947     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2948                                     _GetInstanceInfoText(instance)):
2949       # remove secondary dev
2950       self.cfg.SetDiskID(new_drbd, remote_node)
2951       rpc.call_blockdev_remove(remote_node, new_drbd)
2952       raise errors.OpExecError("Failed to create volume on primary")
2953
2954     # the device exists now
2955     # call the primary node to add the mirror to md
2956     logger.Info("adding new mirror component to md")
2957     if not rpc.call_blockdev_addchild(instance.primary_node,
2958                                            disk, new_drbd):
2959       logger.Error("Can't add mirror compoment to md!")
2960       self.cfg.SetDiskID(new_drbd, remote_node)
2961       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2962         logger.Error("Can't rollback on secondary")
2963       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2964       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2965         logger.Error("Can't rollback on primary")
2966       raise errors.OpExecError("Can't add mirror component to md array")
2967
2968     disk.children.append(new_drbd)
2969
2970     self.cfg.AddInstance(instance)
2971
2972     _WaitForSync(self.cfg, instance)
2973
2974     return 0
2975
2976
2977 class LURemoveMDDRBDComponent(LogicalUnit):
2978   """Remove a component from a remote_raid1 disk.
2979
2980   """
2981   HPATH = "mirror-remove"
2982   HTYPE = constants.HTYPE_INSTANCE
2983   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2984
2985   def BuildHooksEnv(self):
2986     """Build hooks env.
2987
2988     This runs on the master, the primary and all the secondaries.
2989
2990     """
2991     env = {
2992       "DISK_NAME": self.op.disk_name,
2993       "DISK_ID": self.op.disk_id,
2994       "OLD_SECONDARY": self.old_secondary,
2995       }
2996     env.update(_BuildInstanceHookEnvByObject(self.instance))
2997     nl = [self.sstore.GetMasterNode(),
2998           self.instance.primary_node] + list(self.instance.secondary_nodes)
2999     return env, nl, nl
3000
3001   def CheckPrereq(self):
3002     """Check prerequisites.
3003
3004     This checks that the instance is in the cluster.
3005
3006     """
3007     instance = self.cfg.GetInstanceInfo(
3008       self.cfg.ExpandInstanceName(self.op.instance_name))
3009     if instance is None:
3010       raise errors.OpPrereqError("Instance '%s' not known" %
3011                                  self.op.instance_name)
3012     self.instance = instance
3013
3014     if instance.disk_template != constants.DT_REMOTE_RAID1:
3015       raise errors.OpPrereqError("Instance's disk layout is not"
3016                                  " remote_raid1.")
3017     for disk in instance.disks:
3018       if disk.iv_name == self.op.disk_name:
3019         break
3020     else:
3021       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3022                                  " instance." % self.op.disk_name)
3023     for child in disk.children:
3024       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3025         break
3026     else:
3027       raise errors.OpPrereqError("Can't find the device with this port.")
3028
3029     if len(disk.children) < 2:
3030       raise errors.OpPrereqError("Cannot remove the last component from"
3031                                  " a mirror.")
3032     self.disk = disk
3033     self.child = child
3034     if self.child.logical_id[0] == instance.primary_node:
3035       oid = 1
3036     else:
3037       oid = 0
3038     self.old_secondary = self.child.logical_id[oid]
3039
3040   def Exec(self, feedback_fn):
3041     """Remove the mirror component
3042
3043     """
3044     instance = self.instance
3045     disk = self.disk
3046     child = self.child
3047     logger.Info("remove mirror component")
3048     self.cfg.SetDiskID(disk, instance.primary_node)
3049     if not rpc.call_blockdev_removechild(instance.primary_node,
3050                                               disk, child):
3051       raise errors.OpExecError("Can't remove child from mirror.")
3052
3053     for node in child.logical_id[:2]:
3054       self.cfg.SetDiskID(child, node)
3055       if not rpc.call_blockdev_remove(node, child):
3056         logger.Error("Warning: failed to remove device from node %s,"
3057                      " continuing operation." % node)
3058
3059     disk.children.remove(child)
3060     self.cfg.AddInstance(instance)
3061
3062
3063 class LUReplaceDisks(LogicalUnit):
3064   """Replace the disks of an instance.
3065
3066   """
3067   HPATH = "mirrors-replace"
3068   HTYPE = constants.HTYPE_INSTANCE
3069   _OP_REQP = ["instance_name"]
3070
3071   def BuildHooksEnv(self):
3072     """Build hooks env.
3073
3074     This runs on the master, the primary and all the secondaries.
3075
3076     """
3077     env = {
3078       "NEW_SECONDARY": self.op.remote_node,
3079       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3080       }
3081     env.update(_BuildInstanceHookEnvByObject(self.instance))
3082     nl = [self.sstore.GetMasterNode(),
3083           self.instance.primary_node] + list(self.instance.secondary_nodes)
3084     return env, nl, nl
3085
3086   def CheckPrereq(self):
3087     """Check prerequisites.
3088
3089     This checks that the instance is in the cluster.
3090
3091     """
3092     instance = self.cfg.GetInstanceInfo(
3093       self.cfg.ExpandInstanceName(self.op.instance_name))
3094     if instance is None:
3095       raise errors.OpPrereqError("Instance '%s' not known" %
3096                                  self.op.instance_name)
3097     self.instance = instance
3098
3099     if instance.disk_template != constants.DT_REMOTE_RAID1:
3100       raise errors.OpPrereqError("Instance's disk layout is not"
3101                                  " remote_raid1.")
3102
3103     if len(instance.secondary_nodes) != 1:
3104       raise errors.OpPrereqError("The instance has a strange layout,"
3105                                  " expected one secondary but found %d" %
3106                                  len(instance.secondary_nodes))
3107
3108     remote_node = getattr(self.op, "remote_node", None)
3109     if remote_node is None:
3110       remote_node = instance.secondary_nodes[0]
3111     else:
3112       remote_node = self.cfg.ExpandNodeName(remote_node)
3113       if remote_node is None:
3114         raise errors.OpPrereqError("Node '%s' not known" %
3115                                    self.op.remote_node)
3116     if remote_node == instance.primary_node:
3117       raise errors.OpPrereqError("The specified node is the primary node of"
3118                                  " the instance.")
3119     self.op.remote_node = remote_node
3120
3121   def Exec(self, feedback_fn):
3122     """Replace the disks of an instance.
3123
3124     """
3125     instance = self.instance
3126     iv_names = {}
3127     # start of work
3128     remote_node = self.op.remote_node
3129     cfg = self.cfg
3130     for dev in instance.disks:
3131       size = dev.size
3132       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3133       names = _GenerateUniqueNames(cfg, lv_names)
3134       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3135                                        remote_node, size, names)
3136       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3137       logger.Info("adding new mirror component on secondary for %s" %
3138                   dev.iv_name)
3139       #HARDCODE
3140       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3141                                         _GetInstanceInfoText(instance)):
3142         raise errors.OpExecError("Failed to create new component on"
3143                                  " secondary node %s\n"
3144                                  "Full abort, cleanup manually!" %
3145                                  remote_node)
3146
3147       logger.Info("adding new mirror component on primary")
3148       #HARDCODE
3149       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3150                                       _GetInstanceInfoText(instance)):
3151         # remove secondary dev
3152         cfg.SetDiskID(new_drbd, remote_node)
3153         rpc.call_blockdev_remove(remote_node, new_drbd)
3154         raise errors.OpExecError("Failed to create volume on primary!\n"
3155                                  "Full abort, cleanup manually!!")
3156
3157       # the device exists now
3158       # call the primary node to add the mirror to md
3159       logger.Info("adding new mirror component to md")
3160       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3161                                         new_drbd):
3162         logger.Error("Can't add mirror compoment to md!")
3163         cfg.SetDiskID(new_drbd, remote_node)
3164         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3165           logger.Error("Can't rollback on secondary")
3166         cfg.SetDiskID(new_drbd, instance.primary_node)
3167         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3168           logger.Error("Can't rollback on primary")
3169         raise errors.OpExecError("Full abort, cleanup manually!!")
3170
3171       dev.children.append(new_drbd)
3172       cfg.AddInstance(instance)
3173
3174     # this can fail as the old devices are degraded and _WaitForSync
3175     # does a combined result over all disks, so we don't check its
3176     # return value
3177     _WaitForSync(cfg, instance, unlock=True)
3178
3179     # so check manually all the devices
3180     for name in iv_names:
3181       dev, child, new_drbd = iv_names[name]
3182       cfg.SetDiskID(dev, instance.primary_node)
3183       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3184       if is_degr:
3185         raise errors.OpExecError("MD device %s is degraded!" % name)
3186       cfg.SetDiskID(new_drbd, instance.primary_node)
3187       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3188       if is_degr:
3189         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3190
3191     for name in iv_names:
3192       dev, child, new_drbd = iv_names[name]
3193       logger.Info("remove mirror %s component" % name)
3194       cfg.SetDiskID(dev, instance.primary_node)
3195       if not rpc.call_blockdev_removechild(instance.primary_node,
3196                                                 dev, child):
3197         logger.Error("Can't remove child from mirror, aborting"
3198                      " *this device cleanup*.\nYou need to cleanup manually!!")
3199         continue
3200
3201       for node in child.logical_id[:2]:
3202         logger.Info("remove child device on %s" % node)
3203         cfg.SetDiskID(child, node)
3204         if not rpc.call_blockdev_remove(node, child):
3205           logger.Error("Warning: failed to remove device from node %s,"
3206                        " continuing operation." % node)
3207
3208       dev.children.remove(child)
3209
3210       cfg.AddInstance(instance)
3211
3212
3213 class LUQueryInstanceData(NoHooksLU):
3214   """Query runtime instance data.
3215
3216   """
3217   _OP_REQP = ["instances"]
3218
3219   def CheckPrereq(self):
3220     """Check prerequisites.
3221
3222     This only checks the optional instance list against the existing names.
3223
3224     """
3225     if not isinstance(self.op.instances, list):
3226       raise errors.OpPrereqError("Invalid argument type 'instances'")
3227     if self.op.instances:
3228       self.wanted_instances = []
3229       names = self.op.instances
3230       for name in names:
3231         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3232         if instance is None:
3233           raise errors.OpPrereqError("No such instance name '%s'" % name)
3234       self.wanted_instances.append(instance)
3235     else:
3236       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3237                                in self.cfg.GetInstanceList()]
3238     return
3239
3240
3241   def _ComputeDiskStatus(self, instance, snode, dev):
3242     """Compute block device status.
3243
3244     """
3245     self.cfg.SetDiskID(dev, instance.primary_node)
3246     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3247     if dev.dev_type == "drbd":
3248       # we change the snode then (otherwise we use the one passed in)
3249       if dev.logical_id[0] == instance.primary_node:
3250         snode = dev.logical_id[1]
3251       else:
3252         snode = dev.logical_id[0]
3253
3254     if snode:
3255       self.cfg.SetDiskID(dev, snode)
3256       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3257     else:
3258       dev_sstatus = None
3259
3260     if dev.children:
3261       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3262                       for child in dev.children]
3263     else:
3264       dev_children = []
3265
3266     data = {
3267       "iv_name": dev.iv_name,
3268       "dev_type": dev.dev_type,
3269       "logical_id": dev.logical_id,
3270       "physical_id": dev.physical_id,
3271       "pstatus": dev_pstatus,
3272       "sstatus": dev_sstatus,
3273       "children": dev_children,
3274       }
3275
3276     return data
3277
3278   def Exec(self, feedback_fn):
3279     """Gather and return data"""
3280     result = {}
3281     for instance in self.wanted_instances:
3282       remote_info = rpc.call_instance_info(instance.primary_node,
3283                                                 instance.name)
3284       if remote_info and "state" in remote_info:
3285         remote_state = "up"
3286       else:
3287         remote_state = "down"
3288       if instance.status == "down":
3289         config_state = "down"
3290       else:
3291         config_state = "up"
3292
3293       disks = [self._ComputeDiskStatus(instance, None, device)
3294                for device in instance.disks]
3295
3296       idict = {
3297         "name": instance.name,
3298         "config_state": config_state,
3299         "run_state": remote_state,
3300         "pnode": instance.primary_node,
3301         "snodes": instance.secondary_nodes,
3302         "os": instance.os,
3303         "memory": instance.memory,
3304         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3305         "disks": disks,
3306         }
3307
3308       result[instance.name] = idict
3309
3310     return result
3311
3312
3313 class LUSetInstanceParms(LogicalUnit):
3314   """Modifies an instances's parameters.
3315
3316   """
3317   HPATH = "instance-modify"
3318   HTYPE = constants.HTYPE_INSTANCE
3319   _OP_REQP = ["instance_name"]
3320
3321   def BuildHooksEnv(self):
3322     """Build hooks env.
3323
3324     This runs on the master, primary and secondaries.
3325
3326     """
3327     args = dict()
3328     if self.mem:
3329       args['memory'] = self.mem
3330     if self.vcpus:
3331       args['vcpus'] = self.vcpus
3332     if self.do_ip or self.do_bridge:
3333       if self.do_ip:
3334         ip = self.ip
3335       else:
3336         ip = self.instance.nics[0].ip
3337       if self.bridge:
3338         bridge = self.bridge
3339       else:
3340         bridge = self.instance.nics[0].bridge
3341       args['nics'] = [(ip, bridge)]
3342     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3343     nl = [self.sstore.GetMasterNode(),
3344           self.instance.primary_node] + list(self.instance.secondary_nodes)
3345     return env, nl, nl
3346
3347   def CheckPrereq(self):
3348     """Check prerequisites.
3349
3350     This only checks the instance list against the existing names.
3351
3352     """
3353     self.mem = getattr(self.op, "mem", None)
3354     self.vcpus = getattr(self.op, "vcpus", None)
3355     self.ip = getattr(self.op, "ip", None)
3356     self.bridge = getattr(self.op, "bridge", None)
3357     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3358       raise errors.OpPrereqError("No changes submitted")
3359     if self.mem is not None:
3360       try:
3361         self.mem = int(self.mem)
3362       except ValueError, err:
3363         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3364     if self.vcpus is not None:
3365       try:
3366         self.vcpus = int(self.vcpus)
3367       except ValueError, err:
3368         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3369     if self.ip is not None:
3370       self.do_ip = True
3371       if self.ip.lower() == "none":
3372         self.ip = None
3373       else:
3374         if not utils.IsValidIP(self.ip):
3375           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3376     else:
3377       self.do_ip = False
3378     self.do_bridge = (self.bridge is not None)
3379
3380     instance = self.cfg.GetInstanceInfo(
3381       self.cfg.ExpandInstanceName(self.op.instance_name))
3382     if instance is None:
3383       raise errors.OpPrereqError("No such instance name '%s'" %
3384                                  self.op.instance_name)
3385     self.op.instance_name = instance.name
3386     self.instance = instance
3387     return
3388
3389   def Exec(self, feedback_fn):
3390     """Modifies an instance.
3391
3392     All parameters take effect only at the next restart of the instance.
3393     """
3394     result = []
3395     instance = self.instance
3396     if self.mem:
3397       instance.memory = self.mem
3398       result.append(("mem", self.mem))
3399     if self.vcpus:
3400       instance.vcpus = self.vcpus
3401       result.append(("vcpus",  self.vcpus))
3402     if self.do_ip:
3403       instance.nics[0].ip = self.ip
3404       result.append(("ip", self.ip))
3405     if self.bridge:
3406       instance.nics[0].bridge = self.bridge
3407       result.append(("bridge", self.bridge))
3408
3409     self.cfg.AddInstance(instance)
3410
3411     return result
3412
3413
3414 class LUQueryExports(NoHooksLU):
3415   """Query the exports list
3416
3417   """
3418   _OP_REQP = []
3419
3420   def CheckPrereq(self):
3421     """Check that the nodelist contains only existing nodes.
3422
3423     """
3424     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3425
3426   def Exec(self, feedback_fn):
3427     """Compute the list of all the exported system images.
3428
3429     Returns:
3430       a dictionary with the structure node->(export-list)
3431       where export-list is a list of the instances exported on
3432       that node.
3433
3434     """
3435     return rpc.call_export_list(self.nodes)
3436
3437
3438 class LUExportInstance(LogicalUnit):
3439   """Export an instance to an image in the cluster.
3440
3441   """
3442   HPATH = "instance-export"
3443   HTYPE = constants.HTYPE_INSTANCE
3444   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3445
3446   def BuildHooksEnv(self):
3447     """Build hooks env.
3448
3449     This will run on the master, primary node and target node.
3450
3451     """
3452     env = {
3453       "EXPORT_NODE": self.op.target_node,
3454       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3455       }
3456     env.update(_BuildInstanceHookEnvByObject(self.instance))
3457     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3458           self.op.target_node]
3459     return env, nl, nl
3460
3461   def CheckPrereq(self):
3462     """Check prerequisites.
3463
3464     This checks that the instance name is a valid one.
3465
3466     """
3467     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3468     self.instance = self.cfg.GetInstanceInfo(instance_name)
3469     if self.instance is None:
3470       raise errors.OpPrereqError("Instance '%s' not found" %
3471                                  self.op.instance_name)
3472
3473     # node verification
3474     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3475     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3476
3477     if self.dst_node is None:
3478       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3479                                  self.op.target_node)
3480     self.op.target_node = self.dst_node.name
3481
3482   def Exec(self, feedback_fn):
3483     """Export an instance to an image in the cluster.
3484
3485     """
3486     instance = self.instance
3487     dst_node = self.dst_node
3488     src_node = instance.primary_node
3489     # shutdown the instance, unless requested not to do so
3490     if self.op.shutdown:
3491       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3492       self.processor.ChainOpCode(op, feedback_fn)
3493
3494     vgname = self.cfg.GetVGName()
3495
3496     snap_disks = []
3497
3498     try:
3499       for disk in instance.disks:
3500         if disk.iv_name == "sda":
3501           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3502           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3503
3504           if not new_dev_name:
3505             logger.Error("could not snapshot block device %s on node %s" %
3506                          (disk.logical_id[1], src_node))
3507           else:
3508             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3509                                       logical_id=(vgname, new_dev_name),
3510                                       physical_id=(vgname, new_dev_name),
3511                                       iv_name=disk.iv_name)
3512             snap_disks.append(new_dev)
3513
3514     finally:
3515       if self.op.shutdown:
3516         op = opcodes.OpStartupInstance(instance_name=instance.name,
3517                                        force=False)
3518         self.processor.ChainOpCode(op, feedback_fn)
3519
3520     # TODO: check for size
3521
3522     for dev in snap_disks:
3523       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3524                                            instance):
3525         logger.Error("could not export block device %s from node"
3526                      " %s to node %s" %
3527                      (dev.logical_id[1], src_node, dst_node.name))
3528       if not rpc.call_blockdev_remove(src_node, dev):
3529         logger.Error("could not remove snapshot block device %s from"
3530                      " node %s" % (dev.logical_id[1], src_node))
3531
3532     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3533       logger.Error("could not finalize export for instance %s on node %s" %
3534                    (instance.name, dst_node.name))
3535
3536     nodelist = self.cfg.GetNodeList()
3537     nodelist.remove(dst_node.name)
3538
3539     # on one-node clusters nodelist will be empty after the removal
3540     # if we proceed the backup would be removed because OpQueryExports
3541     # substitutes an empty list with the full cluster node list.
3542     if nodelist:
3543       op = opcodes.OpQueryExports(nodes=nodelist)
3544       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3545       for node in exportlist:
3546         if instance.name in exportlist[node]:
3547           if not rpc.call_export_remove(node, instance.name):
3548             logger.Error("could not remove older export for instance %s"
3549                          " on node %s" % (instance.name, node))
3550
3551
3552 class TagsLU(NoHooksLU):
3553   """Generic tags LU.
3554
3555   This is an abstract class which is the parent of all the other tags LUs.
3556
3557   """
3558   def CheckPrereq(self):
3559     """Check prerequisites.
3560
3561     """
3562     if self.op.kind == constants.TAG_CLUSTER:
3563       self.target = self.cfg.GetClusterInfo()
3564     elif self.op.kind == constants.TAG_NODE:
3565       name = self.cfg.ExpandNodeName(self.op.name)
3566       if name is None:
3567         raise errors.OpPrereqError("Invalid node name (%s)" %
3568                                    (self.op.name,))
3569       self.op.name = name
3570       self.target = self.cfg.GetNodeInfo(name)
3571     elif self.op.kind == constants.TAG_INSTANCE:
3572       name = self.cfg.ExpandInstanceName(name)
3573       if name is None:
3574         raise errors.OpPrereqError("Invalid instance name (%s)" %
3575                                    (self.op.name,))
3576       self.op.name = name
3577       self.target = self.cfg.GetInstanceInfo(name)
3578     else:
3579       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3580                                  str(self.op.kind))
3581
3582
3583 class LUGetTags(TagsLU):
3584   """Returns the tags of a given object.
3585
3586   """
3587   _OP_REQP = ["kind", "name"]
3588
3589   def Exec(self, feedback_fn):
3590     """Returns the tag list.
3591
3592     """
3593     return self.target.GetTags()
3594
3595
3596 class LUAddTag(TagsLU):
3597   """Sets a tag on a given object.
3598
3599   """
3600   _OP_REQP = ["kind", "name", "tag"]
3601
3602   def CheckPrereq(self):
3603     """Check prerequisites.
3604
3605     This checks the type and length of the tag name and value.
3606
3607     """
3608     TagsLU.CheckPrereq(self)
3609     objects.TaggableObject.ValidateTag(self.op.tag)
3610
3611   def Exec(self, feedback_fn):
3612     """Sets the tag.
3613
3614     """
3615     try:
3616       self.target.AddTag(self.op.tag)
3617     except errors.TagError, err:
3618       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3619     try:
3620       self.cfg.Update(self.target)
3621     except errors.ConfigurationError:
3622       raise errors.OpRetryError("There has been a modification to the"
3623                                 " config file and the operation has been"
3624                                 " aborted. Please retry.")
3625
3626
3627 class LUDelTag(TagsLU):
3628   """Delete a tag from a given object.
3629
3630   """
3631   _OP_REQP = ["kind", "name", "tag"]
3632
3633   def CheckPrereq(self):
3634     """Check prerequisites.
3635
3636     This checks that we have the given tag.
3637
3638     """
3639     TagsLU.CheckPrereq(self)
3640     objects.TaggableObject.ValidateTag(self.op.tag)
3641     if self.op.tag not in self.target.GetTags():
3642       raise errors.OpPrereqError("Tag not found")
3643
3644   def Exec(self, feedback_fn):
3645     """Remove the tag from the object.
3646
3647     """
3648     self.target.RemoveTag(self.op.tag)
3649     try:
3650       self.cfg.Update(self.target)
3651     except errors.ConfigurationError:
3652       raise errors.OpRetryError("There has been a modification to the"
3653                                 " config file and the operation has been"
3654                                 " aborted. Please retry.")