Implement instance rename operation
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   while True:
389     rawline = f.readline()
390     logger.Debug('read %s' % (repr(rawline),))
391
392     if not rawline:
393       # End of file
394       break
395
396     line = rawline.split('\n')[0]
397
398     parts = line.split(' ')
399     fields = parts[0].split(',')
400     key = parts[2]
401
402     haveall = True
403     havesome = False
404     for spec in [ ip, fullnode ]:
405       if spec not in fields:
406         haveall = False
407       if spec in fields:
408         havesome = True
409
410     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411     if haveall and key == pubkey:
412       inthere = True
413       save_lines.append(rawline)
414       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415       continue
416
417     if havesome and (not haveall or key != pubkey):
418       removed = True
419       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420       continue
421
422     save_lines.append(rawline)
423
424   if not inthere:
425     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427
428   if removed:
429     save_lines = save_lines + add_lines
430
431     # Write a new file and replace old.
432     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433                                    constants.DATA_DIR)
434     newfile = os.fdopen(fd, 'w')
435     try:
436       newfile.write(''.join(save_lines))
437     finally:
438       newfile.close()
439     logger.Debug("Wrote new known_hosts.")
440     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441
442   elif add_lines:
443     # Simply appending a new line will do the trick.
444     f.seek(0, 2)
445     for add in add_lines:
446       f.write(add)
447
448   f.close()
449
450
451 def _HasValidVG(vglist, vgname):
452   """Checks if the volume group list is valid.
453
454   A non-None return value means there's an error, and the return value
455   is the error message.
456
457   """
458   vgsize = vglist.get(vgname, None)
459   if vgsize is None:
460     return "volume group '%s' missing" % vgname
461   elif vgsize < 20480:
462     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463             (vgname, vgsize))
464   return None
465
466
467 def _InitSSHSetup(node):
468   """Setup the SSH configuration for the cluster.
469
470
471   This generates a dsa keypair for root, adds the pub key to the
472   permitted hosts and adds the hostkey to its own known hosts.
473
474   Args:
475     node: the name of this host as a fqdn
476
477   """
478   if os.path.exists('/root/.ssh/id_dsa'):
479     utils.CreateBackup('/root/.ssh/id_dsa')
480   if os.path.exists('/root/.ssh/id_dsa.pub'):
481     utils.CreateBackup('/root/.ssh/id_dsa.pub')
482
483   utils.RemoveFile('/root/.ssh/id_dsa')
484   utils.RemoveFile('/root/.ssh/id_dsa.pub')
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", "/root/.ssh/id_dsa",
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open('/root/.ssh/id_dsa.pub', 'r')
494   try:
495     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 class LUInitCluster(LogicalUnit):
532   """Initialise the cluster.
533
534   """
535   HPATH = "cluster-init"
536   HTYPE = constants.HTYPE_CLUSTER
537   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538               "def_bridge", "master_netdev"]
539   REQ_CLUSTER = False
540
541   def BuildHooksEnv(self):
542     """Build hooks env.
543
544     Notes: Since we don't require a cluster, we must manually add
545     ourselves in the post-run node list.
546
547     """
548     env = {
549       "CLUSTER": self.op.cluster_name,
550       "MASTER": self.hostname['hostname_full'],
551       }
552     return env, [], [self.hostname['hostname_full']]
553
554   def CheckPrereq(self):
555     """Verify that the passed name is a valid one.
556
557     """
558     if config.ConfigWriter.IsCluster():
559       raise errors.OpPrereqError("Cluster is already initialised")
560
561     hostname_local = socket.gethostname()
562     self.hostname = hostname = utils.LookupHostname(hostname_local)
563     if not hostname:
564       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565                                  hostname_local)
566
567     if hostname["hostname_full"] != hostname_local:
568       raise errors.OpPrereqError("My own hostname (%s) does not match the"
569                                  " resolver (%s): probably not using FQDN"
570                                  " for hostname." %
571                                  (hostname_local, hostname["hostname_full"]))
572
573     if hostname["ip"].startswith("127."):
574       raise errors.OpPrereqError("This host's IP resolves to the private"
575                                  " range (%s). Please fix DNS or /etc/hosts." %
576                                  (hostname["ip"],))
577
578     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579     if not clustername:
580       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581                                  % self.op.cluster_name)
582
583     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584     if result.failed:
585       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586                                  " to %s,\nbut this ip address does not"
587                                  " belong to this host."
588                                  " Aborting." % hostname['ip'])
589
590     secondary_ip = getattr(self.op, "secondary_ip", None)
591     if secondary_ip and not utils.IsValidIP(secondary_ip):
592       raise errors.OpPrereqError("Invalid secondary ip given")
593     if secondary_ip and secondary_ip != hostname['ip']:
594       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595       if result.failed:
596         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597                                    "but it does not belong to this host." %
598                                    secondary_ip)
599     self.secondary_ip = secondary_ip
600
601     # checks presence of the volume group given
602     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603
604     if vgstatus:
605       raise errors.OpPrereqError("Error: %s" % vgstatus)
606
607     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608                     self.op.mac_prefix):
609       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610                                  self.op.mac_prefix)
611
612     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614                                  self.op.hypervisor_type)
615
616     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617     if result.failed:
618       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619                                  (self.op.master_netdev,
620                                   result.output.strip()))
621
622   def Exec(self, feedback_fn):
623     """Initialize the cluster.
624
625     """
626     clustername = self.clustername
627     hostname = self.hostname
628
629     # set up the simple store
630     ss = ssconf.SimpleStore()
631     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636
637     # set up the inter-node password and certificate
638     _InitGanetiServerSetup(ss)
639
640     # start the master ip
641     rpc.call_node_start_master(hostname['hostname_full'])
642
643     # set up ssh config and /etc/hosts
644     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645     try:
646       sshline = f.read()
647     finally:
648       f.close()
649     sshkey = sshline.split(" ")[1]
650
651     _UpdateEtcHosts(hostname['hostname_full'],
652                     hostname['ip'],
653                     )
654
655     _UpdateKnownHosts(hostname['hostname_full'],
656                       hostname['ip'],
657                       sshkey,
658                       )
659
660     _InitSSHSetup(hostname['hostname'])
661
662     # init of cluster config file
663     cfgw = config.ConfigWriter()
664     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665                     sshkey, self.op.mac_prefix,
666                     self.op.vg_name, self.op.def_bridge)
667
668
669 class LUDestroyCluster(NoHooksLU):
670   """Logical unit for destroying the cluster.
671
672   """
673   _OP_REQP = []
674
675   def CheckPrereq(self):
676     """Check prerequisites.
677
678     This checks whether the cluster is empty.
679
680     Any errors are signalled by raising errors.OpPrereqError.
681
682     """
683     master = self.sstore.GetMasterNode()
684
685     nodelist = self.cfg.GetNodeList()
686     if len(nodelist) != 1 or nodelist[0] != master:
687       raise errors.OpPrereqError("There are still %d node(s) in"
688                                  " this cluster." % (len(nodelist) - 1))
689     instancelist = self.cfg.GetInstanceList()
690     if instancelist:
691       raise errors.OpPrereqError("There are still %d instance(s) in"
692                                  " this cluster." % len(instancelist))
693
694   def Exec(self, feedback_fn):
695     """Destroys the cluster.
696
697     """
698     utils.CreateBackup('/root/.ssh/id_dsa')
699     utils.CreateBackup('/root/.ssh/id_dsa.pub')
700     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701
702
703 class LUVerifyCluster(NoHooksLU):
704   """Verifies the cluster status.
705
706   """
707   _OP_REQP = []
708
709   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710                   remote_version, feedback_fn):
711     """Run multiple tests against a node.
712
713     Test list:
714       - compares ganeti version
715       - checks vg existance and size > 20G
716       - checks config file checksum
717       - checks ssh to other nodes
718
719     Args:
720       node: name of the node to check
721       file_list: required list of files
722       local_cksum: dictionary of local files and their checksums
723
724     """
725     # compares ganeti version
726     local_version = constants.PROTOCOL_VERSION
727     if not remote_version:
728       feedback_fn(" - ERROR: connection to %s failed" % (node))
729       return True
730
731     if local_version != remote_version:
732       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733                       (local_version, node, remote_version))
734       return True
735
736     # checks vg existance and size > 20G
737
738     bad = False
739     if not vglist:
740       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741                       (node,))
742       bad = True
743     else:
744       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745       if vgstatus:
746         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747         bad = True
748
749     # checks config file checksum
750     # checks ssh to any
751
752     if 'filelist' not in node_result:
753       bad = True
754       feedback_fn("  - ERROR: node hasn't returned file checksum data")
755     else:
756       remote_cksum = node_result['filelist']
757       for file_name in file_list:
758         if file_name not in remote_cksum:
759           bad = True
760           feedback_fn("  - ERROR: file '%s' missing" % file_name)
761         elif remote_cksum[file_name] != local_cksum[file_name]:
762           bad = True
763           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764
765     if 'nodelist' not in node_result:
766       bad = True
767       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768     else:
769       if node_result['nodelist']:
770         bad = True
771         for node in node_result['nodelist']:
772           feedback_fn("  - ERROR: communication with node '%s': %s" %
773                           (node, node_result['nodelist'][node]))
774     hyp_result = node_result.get('hypervisor', None)
775     if hyp_result is not None:
776       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777     return bad
778
779   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780     """Verify an instance.
781
782     This function checks to see if the required block devices are
783     available on the instance's node.
784
785     """
786     bad = False
787
788     instancelist = self.cfg.GetInstanceList()
789     if not instance in instancelist:
790       feedback_fn("  - ERROR: instance %s not in instance list %s" %
791                       (instance, instancelist))
792       bad = True
793
794     instanceconfig = self.cfg.GetInstanceInfo(instance)
795     node_current = instanceconfig.primary_node
796
797     node_vol_should = {}
798     instanceconfig.MapLVsByNode(node_vol_should)
799
800     for node in node_vol_should:
801       for volume in node_vol_should[node]:
802         if node not in node_vol_is or volume not in node_vol_is[node]:
803           feedback_fn("  - ERROR: volume %s missing on node %s" %
804                           (volume, node))
805           bad = True
806
807     if not instanceconfig.status == 'down':
808       if not instance in node_instance[node_current]:
809         feedback_fn("  - ERROR: instance %s not running on node %s" %
810                         (instance, node_current))
811         bad = True
812
813     for node in node_instance:
814       if (not node == node_current):
815         if instance in node_instance[node]:
816           feedback_fn("  - ERROR: instance %s should not run on node %s" %
817                           (instance, node))
818           bad = True
819
820     return not bad
821
822   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823     """Verify if there are any unknown volumes in the cluster.
824
825     The .os, .swap and backup volumes are ignored. All other volumes are
826     reported as unknown.
827
828     """
829     bad = False
830
831     for node in node_vol_is:
832       for volume in node_vol_is[node]:
833         if node not in node_vol_should or volume not in node_vol_should[node]:
834           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835                       (volume, node))
836           bad = True
837     return bad
838
839   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840     """Verify the list of running instances.
841
842     This checks what instances are running but unknown to the cluster.
843
844     """
845     bad = False
846     for node in node_instance:
847       for runninginstance in node_instance[node]:
848         if runninginstance not in instancelist:
849           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850                           (runninginstance, node))
851           bad = True
852     return bad
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     This has no prerequisites.
858
859     """
860     pass
861
862   def Exec(self, feedback_fn):
863     """Verify integrity of cluster, performing various test on nodes.
864
865     """
866     bad = False
867     feedback_fn("* Verifying global settings")
868     self.cfg.VerifyConfig()
869
870     master = self.sstore.GetMasterNode()
871     vg_name = self.cfg.GetVGName()
872     nodelist = utils.NiceSort(self.cfg.GetNodeList())
873     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874     node_volume = {}
875     node_instance = {}
876
877     # FIXME: verify OS list
878     # do local checksums
879     file_names = list(self.sstore.GetFileList())
880     file_names.append(constants.SSL_CERT_FILE)
881     file_names.append(constants.CLUSTER_CONF_FILE)
882     local_checksums = utils.FingerprintFiles(file_names)
883
884     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886     all_instanceinfo = rpc.call_instance_list(nodelist)
887     all_vglist = rpc.call_vg_list(nodelist)
888     node_verify_param = {
889       'filelist': file_names,
890       'nodelist': nodelist,
891       'hypervisor': None,
892       }
893     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894     all_rversion = rpc.call_version(nodelist)
895
896     for node in nodelist:
897       feedback_fn("* Verifying node %s" % node)
898       result = self._VerifyNode(node, file_names, local_checksums,
899                                 all_vglist[node], all_nvinfo[node],
900                                 all_rversion[node], feedback_fn)
901       bad = bad or result
902
903       # node_volume
904       volumeinfo = all_volumeinfo[node]
905
906       if type(volumeinfo) != dict:
907         feedback_fn("  - ERROR: connection to %s failed" % (node,))
908         bad = True
909         continue
910
911       node_volume[node] = volumeinfo
912
913       # node_instance
914       nodeinstance = all_instanceinfo[node]
915       if type(nodeinstance) != list:
916         feedback_fn("  - ERROR: connection to %s failed" % (node,))
917         bad = True
918         continue
919
920       node_instance[node] = nodeinstance
921
922     node_vol_should = {}
923
924     for instance in instancelist:
925       feedback_fn("* Verifying instance %s" % instance)
926       result =  self._VerifyInstance(instance, node_volume, node_instance,
927                                      feedback_fn)
928       bad = bad or result
929
930       inst_config = self.cfg.GetInstanceInfo(instance)
931
932       inst_config.MapLVsByNode(node_vol_should)
933
934     feedback_fn("* Verifying orphan volumes")
935     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936                                        feedback_fn)
937     bad = bad or result
938
939     feedback_fn("* Verifying remaining instances")
940     result = self._VerifyOrphanInstances(instancelist, node_instance,
941                                          feedback_fn)
942     bad = bad or result
943
944     return int(bad)
945
946
947 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948   """Sleep and poll for an instance's disk to sync.
949
950   """
951   if not instance.disks:
952     return True
953
954   if not oneshot:
955     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956
957   node = instance.primary_node
958
959   for dev in instance.disks:
960     cfgw.SetDiskID(dev, node)
961
962   retries = 0
963   while True:
964     max_time = 0
965     done = True
966     cumul_degraded = False
967     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968     if not rstats:
969       logger.ToStderr("Can't get any data from node %s" % node)
970       retries += 1
971       if retries >= 10:
972         raise errors.RemoteError("Can't contact node %s for mirror data,"
973                                  " aborting." % node)
974       time.sleep(6)
975       continue
976     retries = 0
977     for i in range(len(rstats)):
978       mstat = rstats[i]
979       if mstat is None:
980         logger.ToStderr("Can't compute data for node %s/%s" %
981                         (node, instance.disks[i].iv_name))
982         continue
983       perc_done, est_time, is_degraded = mstat
984       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985       if perc_done is not None:
986         done = False
987         if est_time is not None:
988           rem_time = "%d estimated seconds remaining" % est_time
989           max_time = est_time
990         else:
991           rem_time = "no time estimate"
992         logger.ToStdout("- device %s: %5.2f%% done, %s" %
993                         (instance.disks[i].iv_name, perc_done, rem_time))
994     if done or oneshot:
995       break
996
997     if unlock:
998       utils.Unlock('cmd')
999     try:
1000       time.sleep(min(60, max_time))
1001     finally:
1002       if unlock:
1003         utils.Lock('cmd')
1004
1005   if done:
1006     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007   return not cumul_degraded
1008
1009
1010 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011   """Check that mirrors are not degraded.
1012
1013   """
1014   cfgw.SetDiskID(dev, node)
1015
1016   result = True
1017   if on_primary or dev.AssembleOnSecondary():
1018     rstats = rpc.call_blockdev_find(node, dev)
1019     if not rstats:
1020       logger.ToStderr("Can't get any data from node %s" % node)
1021       result = False
1022     else:
1023       result = result and (not rstats[5])
1024   if dev.children:
1025     for child in dev.children:
1026       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027
1028   return result
1029
1030
1031 class LUDiagnoseOS(NoHooksLU):
1032   """Logical unit for OS diagnose/query.
1033
1034   """
1035   _OP_REQP = []
1036
1037   def CheckPrereq(self):
1038     """Check prerequisites.
1039
1040     This always succeeds, since this is a pure query LU.
1041
1042     """
1043     return
1044
1045   def Exec(self, feedback_fn):
1046     """Compute the list of OSes.
1047
1048     """
1049     node_list = self.cfg.GetNodeList()
1050     node_data = rpc.call_os_diagnose(node_list)
1051     if node_data == False:
1052       raise errors.OpExecError("Can't gather the list of OSes")
1053     return node_data
1054
1055
1056 class LURemoveNode(LogicalUnit):
1057   """Logical unit for removing a node.
1058
1059   """
1060   HPATH = "node-remove"
1061   HTYPE = constants.HTYPE_NODE
1062   _OP_REQP = ["node_name"]
1063
1064   def BuildHooksEnv(self):
1065     """Build hooks env.
1066
1067     This doesn't run on the target node in the pre phase as a failed
1068     node would not allows itself to run.
1069
1070     """
1071     env = {
1072       "NODE_NAME": self.op.node_name,
1073       }
1074     all_nodes = self.cfg.GetNodeList()
1075     all_nodes.remove(self.op.node_name)
1076     return env, all_nodes, all_nodes
1077
1078   def CheckPrereq(self):
1079     """Check prerequisites.
1080
1081     This checks:
1082      - the node exists in the configuration
1083      - it does not have primary or secondary instances
1084      - it's not the master
1085
1086     Any errors are signalled by raising errors.OpPrereqError.
1087
1088     """
1089     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090     if node is None:
1091       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092
1093     instance_list = self.cfg.GetInstanceList()
1094
1095     masternode = self.sstore.GetMasterNode()
1096     if node.name == masternode:
1097       raise errors.OpPrereqError("Node is the master node,"
1098                                  " you need to failover first.")
1099
1100     for instance_name in instance_list:
1101       instance = self.cfg.GetInstanceInfo(instance_name)
1102       if node.name == instance.primary_node:
1103         raise errors.OpPrereqError("Instance %s still running on the node,"
1104                                    " please remove first." % instance_name)
1105       if node.name in instance.secondary_nodes:
1106         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107                                    " please remove first." % instance_name)
1108     self.op.node_name = node.name
1109     self.node = node
1110
1111   def Exec(self, feedback_fn):
1112     """Removes the node from the cluster.
1113
1114     """
1115     node = self.node
1116     logger.Info("stopping the node daemon and removing configs from node %s" %
1117                 node.name)
1118
1119     rpc.call_node_leave_cluster(node.name)
1120
1121     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122
1123     logger.Info("Removing node %s from config" % node.name)
1124
1125     self.cfg.RemoveNode(node.name)
1126
1127
1128 class LUQueryNodes(NoHooksLU):
1129   """Logical unit for querying nodes.
1130
1131   """
1132   _OP_REQP = ["output_fields", "names"]
1133
1134   def CheckPrereq(self):
1135     """Check prerequisites.
1136
1137     This checks that the fields required are valid output fields.
1138
1139     """
1140     self.dynamic_fields = frozenset(["dtotal", "dfree",
1141                                      "mtotal", "mnode", "mfree"])
1142
1143     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144                                "pinst_list", "sinst_list",
1145                                "pip", "sip"],
1146                        dynamic=self.dynamic_fields,
1147                        selected=self.op.output_fields)
1148
1149     self.wanted = _GetWantedNodes(self, self.op.names)
1150
1151   def Exec(self, feedback_fn):
1152     """Computes the list of nodes and their attributes.
1153
1154     """
1155     nodenames = self.wanted
1156     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157
1158     # begin data gathering
1159
1160     if self.dynamic_fields.intersection(self.op.output_fields):
1161       live_data = {}
1162       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163       for name in nodenames:
1164         nodeinfo = node_data.get(name, None)
1165         if nodeinfo:
1166           live_data[name] = {
1167             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172             }
1173         else:
1174           live_data[name] = {}
1175     else:
1176       live_data = dict.fromkeys(nodenames, {})
1177
1178     node_to_primary = dict([(name, set()) for name in nodenames])
1179     node_to_secondary = dict([(name, set()) for name in nodenames])
1180
1181     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182                              "sinst_cnt", "sinst_list"))
1183     if inst_fields & frozenset(self.op.output_fields):
1184       instancelist = self.cfg.GetInstanceList()
1185
1186       for instance_name in instancelist:
1187         inst = self.cfg.GetInstanceInfo(instance_name)
1188         if inst.primary_node in node_to_primary:
1189           node_to_primary[inst.primary_node].add(inst.name)
1190         for secnode in inst.secondary_nodes:
1191           if secnode in node_to_secondary:
1192             node_to_secondary[secnode].add(inst.name)
1193
1194     # end data gathering
1195
1196     output = []
1197     for node in nodelist:
1198       node_output = []
1199       for field in self.op.output_fields:
1200         if field == "name":
1201           val = node.name
1202         elif field == "pinst_list":
1203           val = list(node_to_primary[node.name])
1204         elif field == "sinst_list":
1205           val = list(node_to_secondary[node.name])
1206         elif field == "pinst_cnt":
1207           val = len(node_to_primary[node.name])
1208         elif field == "sinst_cnt":
1209           val = len(node_to_secondary[node.name])
1210         elif field == "pip":
1211           val = node.primary_ip
1212         elif field == "sip":
1213           val = node.secondary_ip
1214         elif field in self.dynamic_fields:
1215           val = live_data[node.name].get(field, None)
1216         else:
1217           raise errors.ParameterError(field)
1218         node_output.append(val)
1219       output.append(node_output)
1220
1221     return output
1222
1223
1224 class LUQueryNodeVolumes(NoHooksLU):
1225   """Logical unit for getting volumes on node(s).
1226
1227   """
1228   _OP_REQP = ["nodes", "output_fields"]
1229
1230   def CheckPrereq(self):
1231     """Check prerequisites.
1232
1233     This checks that the fields required are valid output fields.
1234
1235     """
1236     self.nodes = _GetWantedNodes(self, self.op.nodes)
1237
1238     _CheckOutputFields(static=["node"],
1239                        dynamic=["phys", "vg", "name", "size", "instance"],
1240                        selected=self.op.output_fields)
1241
1242
1243   def Exec(self, feedback_fn):
1244     """Computes the list of nodes and their attributes.
1245
1246     """
1247     nodenames = self.nodes
1248     volumes = rpc.call_node_volumes(nodenames)
1249
1250     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251              in self.cfg.GetInstanceList()]
1252
1253     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254
1255     output = []
1256     for node in nodenames:
1257       if node not in volumes or not volumes[node]:
1258         continue
1259
1260       node_vols = volumes[node][:]
1261       node_vols.sort(key=lambda vol: vol['dev'])
1262
1263       for vol in node_vols:
1264         node_output = []
1265         for field in self.op.output_fields:
1266           if field == "node":
1267             val = node
1268           elif field == "phys":
1269             val = vol['dev']
1270           elif field == "vg":
1271             val = vol['vg']
1272           elif field == "name":
1273             val = vol['name']
1274           elif field == "size":
1275             val = int(float(vol['size']))
1276           elif field == "instance":
1277             for inst in ilist:
1278               if node not in lv_by_node[inst]:
1279                 continue
1280               if vol['name'] in lv_by_node[inst][node]:
1281                 val = inst.name
1282                 break
1283             else:
1284               val = '-'
1285           else:
1286             raise errors.ParameterError(field)
1287           node_output.append(str(val))
1288
1289         output.append(node_output)
1290
1291     return output
1292
1293
1294 class LUAddNode(LogicalUnit):
1295   """Logical unit for adding node to the cluster.
1296
1297   """
1298   HPATH = "node-add"
1299   HTYPE = constants.HTYPE_NODE
1300   _OP_REQP = ["node_name"]
1301
1302   def BuildHooksEnv(self):
1303     """Build hooks env.
1304
1305     This will run on all nodes before, and on all nodes + the new node after.
1306
1307     """
1308     env = {
1309       "NODE_NAME": self.op.node_name,
1310       "NODE_PIP": self.op.primary_ip,
1311       "NODE_SIP": self.op.secondary_ip,
1312       }
1313     nodes_0 = self.cfg.GetNodeList()
1314     nodes_1 = nodes_0 + [self.op.node_name, ]
1315     return env, nodes_0, nodes_1
1316
1317   def CheckPrereq(self):
1318     """Check prerequisites.
1319
1320     This checks:
1321      - the new node is not already in the config
1322      - it is resolvable
1323      - its parameters (single/dual homed) matches the cluster
1324
1325     Any errors are signalled by raising errors.OpPrereqError.
1326
1327     """
1328     node_name = self.op.node_name
1329     cfg = self.cfg
1330
1331     dns_data = utils.LookupHostname(node_name)
1332     if not dns_data:
1333       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334
1335     node = dns_data['hostname']
1336     primary_ip = self.op.primary_ip = dns_data['ip']
1337     secondary_ip = getattr(self.op, "secondary_ip", None)
1338     if secondary_ip is None:
1339       secondary_ip = primary_ip
1340     if not utils.IsValidIP(secondary_ip):
1341       raise errors.OpPrereqError("Invalid secondary IP given")
1342     self.op.secondary_ip = secondary_ip
1343     node_list = cfg.GetNodeList()
1344     if node in node_list:
1345       raise errors.OpPrereqError("Node %s is already in the configuration"
1346                                  % node)
1347
1348     for existing_node_name in node_list:
1349       existing_node = cfg.GetNodeInfo(existing_node_name)
1350       if (existing_node.primary_ip == primary_ip or
1351           existing_node.secondary_ip == primary_ip or
1352           existing_node.primary_ip == secondary_ip or
1353           existing_node.secondary_ip == secondary_ip):
1354         raise errors.OpPrereqError("New node ip address(es) conflict with"
1355                                    " existing node %s" % existing_node.name)
1356
1357     # check that the type of the node (single versus dual homed) is the
1358     # same as for the master
1359     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360     master_singlehomed = myself.secondary_ip == myself.primary_ip
1361     newbie_singlehomed = secondary_ip == primary_ip
1362     if master_singlehomed != newbie_singlehomed:
1363       if master_singlehomed:
1364         raise errors.OpPrereqError("The master has no private ip but the"
1365                                    " new node has one")
1366       else:
1367         raise errors.OpPrereqError("The master has a private ip but the"
1368                                    " new node doesn't have one")
1369
1370     # checks reachablity
1371     command = ["fping", "-q", primary_ip]
1372     result = utils.RunCmd(command)
1373     if result.failed:
1374       raise errors.OpPrereqError("Node not reachable by ping")
1375
1376     if not newbie_singlehomed:
1377       # check reachability from my secondary ip to newbie's secondary ip
1378       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379       result = utils.RunCmd(command)
1380       if result.failed:
1381         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382
1383     self.new_node = objects.Node(name=node,
1384                                  primary_ip=primary_ip,
1385                                  secondary_ip=secondary_ip)
1386
1387   def Exec(self, feedback_fn):
1388     """Adds the new node to the cluster.
1389
1390     """
1391     new_node = self.new_node
1392     node = new_node.name
1393
1394     # set up inter-node password and certificate and restarts the node daemon
1395     gntpass = self.sstore.GetNodeDaemonPassword()
1396     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397       raise errors.OpExecError("ganeti password corruption detected")
1398     f = open(constants.SSL_CERT_FILE)
1399     try:
1400       gntpem = f.read(8192)
1401     finally:
1402       f.close()
1403     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404     # so we use this to detect an invalid certificate; as long as the
1405     # cert doesn't contain this, the here-document will be correctly
1406     # parsed by the shell sequence below
1407     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409     if not gntpem.endswith("\n"):
1410       raise errors.OpExecError("PEM must end with newline")
1411     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412
1413     # and then connect with ssh to set password and start ganeti-noded
1414     # note that all the below variables are sanitized at this point,
1415     # either by being constants or by the checks above
1416     ss = self.sstore
1417     mycommand = ("umask 077 && "
1418                  "echo '%s' > '%s' && "
1419                  "cat > '%s' << '!EOF.' && \n"
1420                  "%s!EOF.\n%s restart" %
1421                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422                   constants.SSL_CERT_FILE, gntpem,
1423                   constants.NODE_INITD_SCRIPT))
1424
1425     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426     if result.failed:
1427       raise errors.OpExecError("Remote command on node %s, error: %s,"
1428                                " output: %s" %
1429                                (node, result.fail_reason, result.output))
1430
1431     # check connectivity
1432     time.sleep(4)
1433
1434     result = rpc.call_version([node])[node]
1435     if result:
1436       if constants.PROTOCOL_VERSION == result:
1437         logger.Info("communication to node %s fine, sw version %s match" %
1438                     (node, result))
1439       else:
1440         raise errors.OpExecError("Version mismatch master version %s,"
1441                                  " node version %s" %
1442                                  (constants.PROTOCOL_VERSION, result))
1443     else:
1444       raise errors.OpExecError("Cannot get version from the new node")
1445
1446     # setup ssh on node
1447     logger.Info("copy ssh key to node %s" % node)
1448     keyarray = []
1449     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452
1453     for i in keyfiles:
1454       f = open(i, 'r')
1455       try:
1456         keyarray.append(f.read())
1457       finally:
1458         f.close()
1459
1460     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461                                keyarray[3], keyarray[4], keyarray[5])
1462
1463     if not result:
1464       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465
1466     # Add node to our /etc/hosts, and add key to known_hosts
1467     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469                       self.cfg.GetHostKey())
1470
1471     if new_node.secondary_ip != new_node.primary_ip:
1472       result = ssh.SSHCall(node, "root",
1473                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474       if result.failed:
1475         raise errors.OpExecError("Node claims it doesn't have the"
1476                                  " secondary ip you gave (%s).\n"
1477                                  "Please fix and re-run this command." %
1478                                  new_node.secondary_ip)
1479
1480     success, msg = ssh.VerifyNodeHostname(node)
1481     if not success:
1482       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483                                " than the one the resolver gives: %s.\n"
1484                                "Please fix and re-run this command." %
1485                                (node, msg))
1486
1487     # Distribute updated /etc/hosts and known_hosts to all nodes,
1488     # including the node just added
1489     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490     dist_nodes = self.cfg.GetNodeList() + [node]
1491     if myself.name in dist_nodes:
1492       dist_nodes.remove(myself.name)
1493
1494     logger.Debug("Copying hosts and known_hosts to all nodes")
1495     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496       result = rpc.call_upload_file(dist_nodes, fname)
1497       for to_node in dist_nodes:
1498         if not result[to_node]:
1499           logger.Error("copy of file %s to node %s failed" %
1500                        (fname, to_node))
1501
1502     to_copy = ss.GetFileList()
1503     for fname in to_copy:
1504       if not ssh.CopyFileToNode(node, fname):
1505         logger.Error("could not copy file %s to node %s" % (fname, node))
1506
1507     logger.Info("adding node %s to cluster.conf" % node)
1508     self.cfg.AddNode(new_node)
1509
1510
1511 class LUMasterFailover(LogicalUnit):
1512   """Failover the master node to the current node.
1513
1514   This is a special LU in that it must run on a non-master node.
1515
1516   """
1517   HPATH = "master-failover"
1518   HTYPE = constants.HTYPE_CLUSTER
1519   REQ_MASTER = False
1520   _OP_REQP = []
1521
1522   def BuildHooksEnv(self):
1523     """Build hooks env.
1524
1525     This will run on the new master only in the pre phase, and on all
1526     the nodes in the post phase.
1527
1528     """
1529     env = {
1530       "NEW_MASTER": self.new_master,
1531       "OLD_MASTER": self.old_master,
1532       }
1533     return env, [self.new_master], self.cfg.GetNodeList()
1534
1535   def CheckPrereq(self):
1536     """Check prerequisites.
1537
1538     This checks that we are not already the master.
1539
1540     """
1541     self.new_master = socket.gethostname()
1542
1543     self.old_master = self.sstore.GetMasterNode()
1544
1545     if self.old_master == self.new_master:
1546       raise errors.OpPrereqError("This commands must be run on the node"
1547                                  " where you want the new master to be.\n"
1548                                  "%s is already the master" %
1549                                  self.old_master)
1550
1551   def Exec(self, feedback_fn):
1552     """Failover the master node.
1553
1554     This command, when run on a non-master node, will cause the current
1555     master to cease being master, and the non-master to become new
1556     master.
1557
1558     """
1559     #TODO: do not rely on gethostname returning the FQDN
1560     logger.Info("setting master to %s, old master: %s" %
1561                 (self.new_master, self.old_master))
1562
1563     if not rpc.call_node_stop_master(self.old_master):
1564       logger.Error("could disable the master role on the old master"
1565                    " %s, please disable manually" % self.old_master)
1566
1567     ss = self.sstore
1568     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571       logger.Error("could not distribute the new simple store master file"
1572                    " to the other nodes, please check.")
1573
1574     if not rpc.call_node_start_master(self.new_master):
1575       logger.Error("could not start the master role on the new master"
1576                    " %s, please check" % self.new_master)
1577       feedback_fn("Error in activating the master IP on the new master,\n"
1578                   "please fix manually.")
1579
1580
1581
1582 class LUQueryClusterInfo(NoHooksLU):
1583   """Query cluster configuration.
1584
1585   """
1586   _OP_REQP = []
1587   REQ_MASTER = False
1588
1589   def CheckPrereq(self):
1590     """No prerequsites needed for this LU.
1591
1592     """
1593     pass
1594
1595   def Exec(self, feedback_fn):
1596     """Return cluster config.
1597
1598     """
1599     result = {
1600       "name": self.sstore.GetClusterName(),
1601       "software_version": constants.RELEASE_VERSION,
1602       "protocol_version": constants.PROTOCOL_VERSION,
1603       "config_version": constants.CONFIG_VERSION,
1604       "os_api_version": constants.OS_API_VERSION,
1605       "export_version": constants.EXPORT_VERSION,
1606       "master": self.sstore.GetMasterNode(),
1607       "architecture": (platform.architecture()[0], platform.machine()),
1608       }
1609
1610     return result
1611
1612
1613 class LUClusterCopyFile(NoHooksLU):
1614   """Copy file to cluster.
1615
1616   """
1617   _OP_REQP = ["nodes", "filename"]
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     It should check that the named file exists and that the given list
1623     of nodes is valid.
1624
1625     """
1626     if not os.path.exists(self.op.filename):
1627       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628
1629     self.nodes = _GetWantedNodes(self, self.op.nodes)
1630
1631   def Exec(self, feedback_fn):
1632     """Copy a file from master to some nodes.
1633
1634     Args:
1635       opts - class with options as members
1636       args - list containing a single element, the file name
1637     Opts used:
1638       nodes - list containing the name of target nodes; if empty, all nodes
1639
1640     """
1641     filename = self.op.filename
1642
1643     myname = socket.gethostname()
1644
1645     for node in self.nodes:
1646       if node == myname:
1647         continue
1648       if not ssh.CopyFileToNode(node, filename):
1649         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650
1651
1652 class LUDumpClusterConfig(NoHooksLU):
1653   """Return a text-representation of the cluster-config.
1654
1655   """
1656   _OP_REQP = []
1657
1658   def CheckPrereq(self):
1659     """No prerequisites.
1660
1661     """
1662     pass
1663
1664   def Exec(self, feedback_fn):
1665     """Dump a representation of the cluster config to the standard output.
1666
1667     """
1668     return self.cfg.DumpConfig()
1669
1670
1671 class LURunClusterCommand(NoHooksLU):
1672   """Run a command on some nodes.
1673
1674   """
1675   _OP_REQP = ["command", "nodes"]
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     It checks that the given list of nodes is valid.
1681
1682     """
1683     self.nodes = _GetWantedNodes(self, self.op.nodes)
1684
1685   def Exec(self, feedback_fn):
1686     """Run a command on some nodes.
1687
1688     """
1689     data = []
1690     for node in self.nodes:
1691       result = ssh.SSHCall(node, "root", self.op.command)
1692       data.append((node, result.output, result.exit_code))
1693
1694     return data
1695
1696
1697 class LUActivateInstanceDisks(NoHooksLU):
1698   """Bring up an instance's disks.
1699
1700   """
1701   _OP_REQP = ["instance_name"]
1702
1703   def CheckPrereq(self):
1704     """Check prerequisites.
1705
1706     This checks that the instance is in the cluster.
1707
1708     """
1709     instance = self.cfg.GetInstanceInfo(
1710       self.cfg.ExpandInstanceName(self.op.instance_name))
1711     if instance is None:
1712       raise errors.OpPrereqError("Instance '%s' not known" %
1713                                  self.op.instance_name)
1714     self.instance = instance
1715
1716
1717   def Exec(self, feedback_fn):
1718     """Activate the disks.
1719
1720     """
1721     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722     if not disks_ok:
1723       raise errors.OpExecError("Cannot activate block devices")
1724
1725     return disks_info
1726
1727
1728 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729   """Prepare the block devices for an instance.
1730
1731   This sets up the block devices on all nodes.
1732
1733   Args:
1734     instance: a ganeti.objects.Instance object
1735     ignore_secondaries: if true, errors on secondary nodes won't result
1736                         in an error return from the function
1737
1738   Returns:
1739     false if the operation failed
1740     list of (host, instance_visible_name, node_visible_name) if the operation
1741          suceeded with the mapping from node devices to instance devices
1742   """
1743   device_info = []
1744   disks_ok = True
1745   for inst_disk in instance.disks:
1746     master_result = None
1747     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748       cfg.SetDiskID(node_disk, node)
1749       is_primary = node == instance.primary_node
1750       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751       if not result:
1752         logger.Error("could not prepare block device %s on node %s (is_pri"
1753                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754         if is_primary or not ignore_secondaries:
1755           disks_ok = False
1756       if is_primary:
1757         master_result = result
1758     device_info.append((instance.primary_node, inst_disk.iv_name,
1759                         master_result))
1760
1761   return disks_ok, device_info
1762
1763
1764 def _StartInstanceDisks(cfg, instance, force):
1765   """Start the disks of an instance.
1766
1767   """
1768   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769                                            ignore_secondaries=force)
1770   if not disks_ok:
1771     _ShutdownInstanceDisks(instance, cfg)
1772     if force is not None and not force:
1773       logger.Error("If the message above refers to a secondary node,"
1774                    " you can retry the operation using '--force'.")
1775     raise errors.OpExecError("Disk consistency error")
1776
1777
1778 class LUDeactivateInstanceDisks(NoHooksLU):
1779   """Shutdown an instance's disks.
1780
1781   """
1782   _OP_REQP = ["instance_name"]
1783
1784   def CheckPrereq(self):
1785     """Check prerequisites.
1786
1787     This checks that the instance is in the cluster.
1788
1789     """
1790     instance = self.cfg.GetInstanceInfo(
1791       self.cfg.ExpandInstanceName(self.op.instance_name))
1792     if instance is None:
1793       raise errors.OpPrereqError("Instance '%s' not known" %
1794                                  self.op.instance_name)
1795     self.instance = instance
1796
1797   def Exec(self, feedback_fn):
1798     """Deactivate the disks
1799
1800     """
1801     instance = self.instance
1802     ins_l = rpc.call_instance_list([instance.primary_node])
1803     ins_l = ins_l[instance.primary_node]
1804     if not type(ins_l) is list:
1805       raise errors.OpExecError("Can't contact node '%s'" %
1806                                instance.primary_node)
1807
1808     if self.instance.name in ins_l:
1809       raise errors.OpExecError("Instance is running, can't shutdown"
1810                                " block devices.")
1811
1812     _ShutdownInstanceDisks(instance, self.cfg)
1813
1814
1815 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816   """Shutdown block devices of an instance.
1817
1818   This does the shutdown on all nodes of the instance.
1819
1820   If the ignore_primary is false, errors on the primary node are
1821   ignored.
1822
1823   """
1824   result = True
1825   for disk in instance.disks:
1826     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827       cfg.SetDiskID(top_disk, node)
1828       if not rpc.call_blockdev_shutdown(node, top_disk):
1829         logger.Error("could not shutdown block device %s on node %s" %
1830                      (disk.iv_name, node))
1831         if not ignore_primary or node != instance.primary_node:
1832           result = False
1833   return result
1834
1835
1836 class LUStartupInstance(LogicalUnit):
1837   """Starts an instance.
1838
1839   """
1840   HPATH = "instance-start"
1841   HTYPE = constants.HTYPE_INSTANCE
1842   _OP_REQP = ["instance_name", "force"]
1843
1844   def BuildHooksEnv(self):
1845     """Build hooks env.
1846
1847     This runs on master, primary and secondary nodes of the instance.
1848
1849     """
1850     env = {
1851       "FORCE": self.op.force,
1852       }
1853     env.update(_BuildInstanceHookEnvByObject(self.instance))
1854     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855           list(self.instance.secondary_nodes))
1856     return env, nl, nl
1857
1858   def CheckPrereq(self):
1859     """Check prerequisites.
1860
1861     This checks that the instance is in the cluster.
1862
1863     """
1864     instance = self.cfg.GetInstanceInfo(
1865       self.cfg.ExpandInstanceName(self.op.instance_name))
1866     if instance is None:
1867       raise errors.OpPrereqError("Instance '%s' not known" %
1868                                  self.op.instance_name)
1869
1870     # check bridges existance
1871     brlist = [nic.bridge for nic in instance.nics]
1872     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873       raise errors.OpPrereqError("one or more target bridges %s does not"
1874                                  " exist on destination node '%s'" %
1875                                  (brlist, instance.primary_node))
1876
1877     self.instance = instance
1878     self.op.instance_name = instance.name
1879
1880   def Exec(self, feedback_fn):
1881     """Start the instance.
1882
1883     """
1884     instance = self.instance
1885     force = self.op.force
1886     extra_args = getattr(self.op, "extra_args", "")
1887
1888     node_current = instance.primary_node
1889
1890     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891     if not nodeinfo:
1892       raise errors.OpExecError("Could not contact node %s for infos" %
1893                                (node_current))
1894
1895     freememory = nodeinfo[node_current]['memory_free']
1896     memory = instance.memory
1897     if memory > freememory:
1898       raise errors.OpExecError("Not enough memory to start instance"
1899                                " %s on node %s"
1900                                " needed %s MiB, available %s MiB" %
1901                                (instance.name, node_current, memory,
1902                                 freememory))
1903
1904     _StartInstanceDisks(self.cfg, instance, force)
1905
1906     if not rpc.call_instance_start(node_current, instance, extra_args):
1907       _ShutdownInstanceDisks(instance, self.cfg)
1908       raise errors.OpExecError("Could not start instance")
1909
1910     self.cfg.MarkInstanceUp(instance.name)
1911
1912
1913 class LUShutdownInstance(LogicalUnit):
1914   """Shutdown an instance.
1915
1916   """
1917   HPATH = "instance-stop"
1918   HTYPE = constants.HTYPE_INSTANCE
1919   _OP_REQP = ["instance_name"]
1920
1921   def BuildHooksEnv(self):
1922     """Build hooks env.
1923
1924     This runs on master, primary and secondary nodes of the instance.
1925
1926     """
1927     env = _BuildInstanceHookEnvByObject(self.instance)
1928     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929           list(self.instance.secondary_nodes))
1930     return env, nl, nl
1931
1932   def CheckPrereq(self):
1933     """Check prerequisites.
1934
1935     This checks that the instance is in the cluster.
1936
1937     """
1938     instance = self.cfg.GetInstanceInfo(
1939       self.cfg.ExpandInstanceName(self.op.instance_name))
1940     if instance is None:
1941       raise errors.OpPrereqError("Instance '%s' not known" %
1942                                  self.op.instance_name)
1943     self.instance = instance
1944
1945   def Exec(self, feedback_fn):
1946     """Shutdown the instance.
1947
1948     """
1949     instance = self.instance
1950     node_current = instance.primary_node
1951     if not rpc.call_instance_shutdown(node_current, instance):
1952       logger.Error("could not shutdown instance")
1953
1954     self.cfg.MarkInstanceDown(instance.name)
1955     _ShutdownInstanceDisks(instance, self.cfg)
1956
1957
1958 class LUReinstallInstance(LogicalUnit):
1959   """Reinstall an instance.
1960
1961   """
1962   HPATH = "instance-reinstall"
1963   HTYPE = constants.HTYPE_INSTANCE
1964   _OP_REQP = ["instance_name"]
1965
1966   def BuildHooksEnv(self):
1967     """Build hooks env.
1968
1969     This runs on master, primary and secondary nodes of the instance.
1970
1971     """
1972     env = _BuildInstanceHookEnvByObject(self.instance)
1973     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974           list(self.instance.secondary_nodes))
1975     return env, nl, nl
1976
1977   def CheckPrereq(self):
1978     """Check prerequisites.
1979
1980     This checks that the instance is in the cluster and is not running.
1981
1982     """
1983     instance = self.cfg.GetInstanceInfo(
1984       self.cfg.ExpandInstanceName(self.op.instance_name))
1985     if instance is None:
1986       raise errors.OpPrereqError("Instance '%s' not known" %
1987                                  self.op.instance_name)
1988     if instance.disk_template == constants.DT_DISKLESS:
1989       raise errors.OpPrereqError("Instance '%s' has no disks" %
1990                                  self.op.instance_name)
1991     if instance.status != "down":
1992       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993                                  self.op.instance_name)
1994     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995     if remote_info:
1996       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997                                  (self.op.instance_name,
1998                                   instance.primary_node))
1999
2000     self.op.os_type = getattr(self.op, "os_type", None)
2001     if self.op.os_type is not None:
2002       # OS verification
2003       pnode = self.cfg.GetNodeInfo(
2004         self.cfg.ExpandNodeName(instance.primary_node))
2005       if pnode is None:
2006         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007                                    self.op.pnode)
2008       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009       if not isinstance(os_obj, objects.OS):
2010         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011                                    " primary node"  % self.op.os_type)
2012
2013     self.instance = instance
2014
2015   def Exec(self, feedback_fn):
2016     """Reinstall the instance.
2017
2018     """
2019     inst = self.instance
2020
2021     if self.op.os_type is not None:
2022       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023       inst.os = self.op.os_type
2024       self.cfg.AddInstance(inst)
2025
2026     _StartInstanceDisks(self.cfg, inst, None)
2027     try:
2028       feedback_fn("Running the instance OS create scripts...")
2029       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030         raise errors.OpExecError("Could not install OS for instance %s "
2031                                  "on node %s" %
2032                                  (inst.name, inst.primary_node))
2033     finally:
2034       _ShutdownInstanceDisks(inst, self.cfg)
2035
2036
2037 class LURenameInstance(LogicalUnit):
2038   """Rename an instance.
2039
2040   """
2041   HPATH = "instance-rename"
2042   HTYPE = constants.HTYPE_INSTANCE
2043   _OP_REQP = ["instance_name", "new_name"]
2044
2045   def BuildHooksEnv(self):
2046     """Build hooks env.
2047
2048     This runs on master, primary and secondary nodes of the instance.
2049
2050     """
2051     env = _BuildInstanceHookEnvByObject(self.instance)
2052     env["INSTANCE_NEW_NAME"] = self.op.new_name
2053     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2054           list(self.instance.secondary_nodes))
2055     return env, nl, nl
2056
2057   def CheckPrereq(self):
2058     """Check prerequisites.
2059
2060     This checks that the instance is in the cluster and is not running.
2061
2062     """
2063     instance = self.cfg.GetInstanceInfo(
2064       self.cfg.ExpandInstanceName(self.op.instance_name))
2065     if instance is None:
2066       raise errors.OpPrereqError("Instance '%s' not known" %
2067                                  self.op.instance_name)
2068     if instance.status != "down":
2069       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2070                                  self.op.instance_name)
2071     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2072     if remote_info:
2073       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2074                                  (self.op.instance_name,
2075                                   instance.primary_node))
2076     self.instance = instance
2077
2078     # new name verification
2079     hostname1 = utils.LookupHostname(self.op.new_name)
2080     if not hostname1:
2081       raise errors.OpPrereqError("New instance name '%s' not found in dns" %
2082                                  self.op.new_name)
2083
2084     self.op.new_name = new_name = hostname1['hostname']
2085     if not getattr(self.op, "ignore_ip", False):
2086       command = ["fping", "-q", hostname1['ip']]
2087       result = utils.RunCmd(command)
2088       if not result.failed:
2089         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2090                                    (hostname1['ip'], new_name))
2091
2092
2093   def Exec(self, feedback_fn):
2094     """Reinstall the instance.
2095
2096     """
2097     inst = self.instance
2098     old_name = inst.name
2099
2100     self.cfg.RenameInstance(inst.name, self.op.new_name)
2101
2102     # re-read the instance from the configuration after rename
2103     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2104
2105     _StartInstanceDisks(self.cfg, inst, None)
2106     try:
2107       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2108                                           "sda", "sdb"):
2109         msg = ("Could run OS rename script for instance %s\n"
2110                "on node %s\n"
2111                "(but the instance has been renamed in Ganeti)" %
2112                (inst.name, inst.primary_node))
2113         logger.Error(msg)
2114     finally:
2115       _ShutdownInstanceDisks(inst, self.cfg)
2116
2117
2118 class LURemoveInstance(LogicalUnit):
2119   """Remove an instance.
2120
2121   """
2122   HPATH = "instance-remove"
2123   HTYPE = constants.HTYPE_INSTANCE
2124   _OP_REQP = ["instance_name"]
2125
2126   def BuildHooksEnv(self):
2127     """Build hooks env.
2128
2129     This runs on master, primary and secondary nodes of the instance.
2130
2131     """
2132     env = _BuildInstanceHookEnvByObject(self.instance)
2133     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2134           list(self.instance.secondary_nodes))
2135     return env, nl, nl
2136
2137   def CheckPrereq(self):
2138     """Check prerequisites.
2139
2140     This checks that the instance is in the cluster.
2141
2142     """
2143     instance = self.cfg.GetInstanceInfo(
2144       self.cfg.ExpandInstanceName(self.op.instance_name))
2145     if instance is None:
2146       raise errors.OpPrereqError("Instance '%s' not known" %
2147                                  self.op.instance_name)
2148     self.instance = instance
2149
2150   def Exec(self, feedback_fn):
2151     """Remove the instance.
2152
2153     """
2154     instance = self.instance
2155     logger.Info("shutting down instance %s on node %s" %
2156                 (instance.name, instance.primary_node))
2157
2158     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2159       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2160                                (instance.name, instance.primary_node))
2161
2162     logger.Info("removing block devices for instance %s" % instance.name)
2163
2164     _RemoveDisks(instance, self.cfg)
2165
2166     logger.Info("removing instance %s out of cluster config" % instance.name)
2167
2168     self.cfg.RemoveInstance(instance.name)
2169
2170
2171 class LUQueryInstances(NoHooksLU):
2172   """Logical unit for querying instances.
2173
2174   """
2175   _OP_REQP = ["output_fields", "names"]
2176
2177   def CheckPrereq(self):
2178     """Check prerequisites.
2179
2180     This checks that the fields required are valid output fields.
2181
2182     """
2183     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2184     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2185                                "admin_state", "admin_ram",
2186                                "disk_template", "ip", "mac", "bridge",
2187                                "sda_size", "sdb_size"],
2188                        dynamic=self.dynamic_fields,
2189                        selected=self.op.output_fields)
2190
2191     self.wanted = _GetWantedInstances(self, self.op.names)
2192
2193   def Exec(self, feedback_fn):
2194     """Computes the list of nodes and their attributes.
2195
2196     """
2197     instance_names = self.wanted
2198     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2199                      in instance_names]
2200
2201     # begin data gathering
2202
2203     nodes = frozenset([inst.primary_node for inst in instance_list])
2204
2205     bad_nodes = []
2206     if self.dynamic_fields.intersection(self.op.output_fields):
2207       live_data = {}
2208       node_data = rpc.call_all_instances_info(nodes)
2209       for name in nodes:
2210         result = node_data[name]
2211         if result:
2212           live_data.update(result)
2213         elif result == False:
2214           bad_nodes.append(name)
2215         # else no instance is alive
2216     else:
2217       live_data = dict([(name, {}) for name in instance_names])
2218
2219     # end data gathering
2220
2221     output = []
2222     for instance in instance_list:
2223       iout = []
2224       for field in self.op.output_fields:
2225         if field == "name":
2226           val = instance.name
2227         elif field == "os":
2228           val = instance.os
2229         elif field == "pnode":
2230           val = instance.primary_node
2231         elif field == "snodes":
2232           val = list(instance.secondary_nodes)
2233         elif field == "admin_state":
2234           val = (instance.status != "down")
2235         elif field == "oper_state":
2236           if instance.primary_node in bad_nodes:
2237             val = None
2238           else:
2239             val = bool(live_data.get(instance.name))
2240         elif field == "admin_ram":
2241           val = instance.memory
2242         elif field == "oper_ram":
2243           if instance.primary_node in bad_nodes:
2244             val = None
2245           elif instance.name in live_data:
2246             val = live_data[instance.name].get("memory", "?")
2247           else:
2248             val = "-"
2249         elif field == "disk_template":
2250           val = instance.disk_template
2251         elif field == "ip":
2252           val = instance.nics[0].ip
2253         elif field == "bridge":
2254           val = instance.nics[0].bridge
2255         elif field == "mac":
2256           val = instance.nics[0].mac
2257         elif field == "sda_size" or field == "sdb_size":
2258           disk = instance.FindDisk(field[:3])
2259           if disk is None:
2260             val = None
2261           else:
2262             val = disk.size
2263         else:
2264           raise errors.ParameterError(field)
2265         iout.append(val)
2266       output.append(iout)
2267
2268     return output
2269
2270
2271 class LUFailoverInstance(LogicalUnit):
2272   """Failover an instance.
2273
2274   """
2275   HPATH = "instance-failover"
2276   HTYPE = constants.HTYPE_INSTANCE
2277   _OP_REQP = ["instance_name", "ignore_consistency"]
2278
2279   def BuildHooksEnv(self):
2280     """Build hooks env.
2281
2282     This runs on master, primary and secondary nodes of the instance.
2283
2284     """
2285     env = {
2286       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2287       }
2288     env.update(_BuildInstanceHookEnvByObject(self.instance))
2289     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2290     return env, nl, nl
2291
2292   def CheckPrereq(self):
2293     """Check prerequisites.
2294
2295     This checks that the instance is in the cluster.
2296
2297     """
2298     instance = self.cfg.GetInstanceInfo(
2299       self.cfg.ExpandInstanceName(self.op.instance_name))
2300     if instance is None:
2301       raise errors.OpPrereqError("Instance '%s' not known" %
2302                                  self.op.instance_name)
2303
2304     if instance.disk_template != constants.DT_REMOTE_RAID1:
2305       raise errors.OpPrereqError("Instance's disk layout is not"
2306                                  " remote_raid1.")
2307
2308     secondary_nodes = instance.secondary_nodes
2309     if not secondary_nodes:
2310       raise errors.ProgrammerError("no secondary node but using "
2311                                    "DT_REMOTE_RAID1 template")
2312
2313     # check memory requirements on the secondary node
2314     target_node = secondary_nodes[0]
2315     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2316     info = nodeinfo.get(target_node, None)
2317     if not info:
2318       raise errors.OpPrereqError("Cannot get current information"
2319                                  " from node '%s'" % nodeinfo)
2320     if instance.memory > info['memory_free']:
2321       raise errors.OpPrereqError("Not enough memory on target node %s."
2322                                  " %d MB available, %d MB required" %
2323                                  (target_node, info['memory_free'],
2324                                   instance.memory))
2325
2326     # check bridge existance
2327     brlist = [nic.bridge for nic in instance.nics]
2328     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2329       raise errors.OpPrereqError("One or more target bridges %s does not"
2330                                  " exist on destination node '%s'" %
2331                                  (brlist, instance.primary_node))
2332
2333     self.instance = instance
2334
2335   def Exec(self, feedback_fn):
2336     """Failover an instance.
2337
2338     The failover is done by shutting it down on its present node and
2339     starting it on the secondary.
2340
2341     """
2342     instance = self.instance
2343
2344     source_node = instance.primary_node
2345     target_node = instance.secondary_nodes[0]
2346
2347     feedback_fn("* checking disk consistency between source and target")
2348     for dev in instance.disks:
2349       # for remote_raid1, these are md over drbd
2350       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2351         if not self.op.ignore_consistency:
2352           raise errors.OpExecError("Disk %s is degraded on target node,"
2353                                    " aborting failover." % dev.iv_name)
2354
2355     feedback_fn("* checking target node resource availability")
2356     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2357
2358     if not nodeinfo:
2359       raise errors.OpExecError("Could not contact target node %s." %
2360                                target_node)
2361
2362     free_memory = int(nodeinfo[target_node]['memory_free'])
2363     memory = instance.memory
2364     if memory > free_memory:
2365       raise errors.OpExecError("Not enough memory to create instance %s on"
2366                                " node %s. needed %s MiB, available %s MiB" %
2367                                (instance.name, target_node, memory,
2368                                 free_memory))
2369
2370     feedback_fn("* shutting down instance on source node")
2371     logger.Info("Shutting down instance %s on node %s" %
2372                 (instance.name, source_node))
2373
2374     if not rpc.call_instance_shutdown(source_node, instance):
2375       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2376                    " anyway. Please make sure node %s is down"  %
2377                    (instance.name, source_node, source_node))
2378
2379     feedback_fn("* deactivating the instance's disks on source node")
2380     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2381       raise errors.OpExecError("Can't shut down the instance's disks.")
2382
2383     instance.primary_node = target_node
2384     # distribute new instance config to the other nodes
2385     self.cfg.AddInstance(instance)
2386
2387     feedback_fn("* activating the instance's disks on target node")
2388     logger.Info("Starting instance %s on node %s" %
2389                 (instance.name, target_node))
2390
2391     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2392                                              ignore_secondaries=True)
2393     if not disks_ok:
2394       _ShutdownInstanceDisks(instance, self.cfg)
2395       raise errors.OpExecError("Can't activate the instance's disks")
2396
2397     feedback_fn("* starting the instance on the target node")
2398     if not rpc.call_instance_start(target_node, instance, None):
2399       _ShutdownInstanceDisks(instance, self.cfg)
2400       raise errors.OpExecError("Could not start instance %s on node %s." %
2401                                (instance.name, target_node))
2402
2403
2404 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2405   """Create a tree of block devices on the primary node.
2406
2407   This always creates all devices.
2408
2409   """
2410   if device.children:
2411     for child in device.children:
2412       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2413         return False
2414
2415   cfg.SetDiskID(device, node)
2416   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2417   if not new_id:
2418     return False
2419   if device.physical_id is None:
2420     device.physical_id = new_id
2421   return True
2422
2423
2424 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2425   """Create a tree of block devices on a secondary node.
2426
2427   If this device type has to be created on secondaries, create it and
2428   all its children.
2429
2430   If not, just recurse to children keeping the same 'force' value.
2431
2432   """
2433   if device.CreateOnSecondary():
2434     force = True
2435   if device.children:
2436     for child in device.children:
2437       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2438         return False
2439
2440   if not force:
2441     return True
2442   cfg.SetDiskID(device, node)
2443   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2444   if not new_id:
2445     return False
2446   if device.physical_id is None:
2447     device.physical_id = new_id
2448   return True
2449
2450
2451 def _GenerateUniqueNames(cfg, exts):
2452   """Generate a suitable LV name.
2453
2454   This will generate a logical volume name for the given instance.
2455
2456   """
2457   results = []
2458   for val in exts:
2459     new_id = cfg.GenerateUniqueID()
2460     results.append("%s%s" % (new_id, val))
2461   return results
2462
2463
2464 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2465   """Generate a drbd device complete with its children.
2466
2467   """
2468   port = cfg.AllocatePort()
2469   vgname = cfg.GetVGName()
2470   dev_data = objects.Disk(dev_type="lvm", size=size,
2471                           logical_id=(vgname, names[0]))
2472   dev_meta = objects.Disk(dev_type="lvm", size=128,
2473                           logical_id=(vgname, names[1]))
2474   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2475                           logical_id = (primary, secondary, port),
2476                           children = [dev_data, dev_meta])
2477   return drbd_dev
2478
2479
2480 def _GenerateDiskTemplate(cfg, template_name,
2481                           instance_name, primary_node,
2482                           secondary_nodes, disk_sz, swap_sz):
2483   """Generate the entire disk layout for a given template type.
2484
2485   """
2486   #TODO: compute space requirements
2487
2488   vgname = cfg.GetVGName()
2489   if template_name == "diskless":
2490     disks = []
2491   elif template_name == "plain":
2492     if len(secondary_nodes) != 0:
2493       raise errors.ProgrammerError("Wrong template configuration")
2494
2495     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2496     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2497                            logical_id=(vgname, names[0]),
2498                            iv_name = "sda")
2499     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2500                            logical_id=(vgname, names[1]),
2501                            iv_name = "sdb")
2502     disks = [sda_dev, sdb_dev]
2503   elif template_name == "local_raid1":
2504     if len(secondary_nodes) != 0:
2505       raise errors.ProgrammerError("Wrong template configuration")
2506
2507
2508     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2509                                        ".sdb_m1", ".sdb_m2"])
2510     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2511                               logical_id=(vgname, names[0]))
2512     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2513                               logical_id=(vgname, names[1]))
2514     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2515                               size=disk_sz,
2516                               children = [sda_dev_m1, sda_dev_m2])
2517     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2518                               logical_id=(vgname, names[2]))
2519     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2520                               logical_id=(vgname, names[3]))
2521     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2522                               size=swap_sz,
2523                               children = [sdb_dev_m1, sdb_dev_m2])
2524     disks = [md_sda_dev, md_sdb_dev]
2525   elif template_name == constants.DT_REMOTE_RAID1:
2526     if len(secondary_nodes) != 1:
2527       raise errors.ProgrammerError("Wrong template configuration")
2528     remote_node = secondary_nodes[0]
2529     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2530                                        ".sdb_data", ".sdb_meta"])
2531     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2532                                          disk_sz, names[0:2])
2533     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2534                               children = [drbd_sda_dev], size=disk_sz)
2535     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2536                                          swap_sz, names[2:4])
2537     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2538                               children = [drbd_sdb_dev], size=swap_sz)
2539     disks = [md_sda_dev, md_sdb_dev]
2540   else:
2541     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2542   return disks
2543
2544
2545 def _GetInstanceInfoText(instance):
2546   """Compute that text that should be added to the disk's metadata.
2547
2548   """
2549   return "originstname+%s" % instance.name
2550
2551
2552 def _CreateDisks(cfg, instance):
2553   """Create all disks for an instance.
2554
2555   This abstracts away some work from AddInstance.
2556
2557   Args:
2558     instance: the instance object
2559
2560   Returns:
2561     True or False showing the success of the creation process
2562
2563   """
2564   info = _GetInstanceInfoText(instance)
2565
2566   for device in instance.disks:
2567     logger.Info("creating volume %s for instance %s" %
2568               (device.iv_name, instance.name))
2569     #HARDCODE
2570     for secondary_node in instance.secondary_nodes:
2571       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2572                                         info):
2573         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2574                      (device.iv_name, device, secondary_node))
2575         return False
2576     #HARDCODE
2577     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2578       logger.Error("failed to create volume %s on primary!" %
2579                    device.iv_name)
2580       return False
2581   return True
2582
2583
2584 def _RemoveDisks(instance, cfg):
2585   """Remove all disks for an instance.
2586
2587   This abstracts away some work from `AddInstance()` and
2588   `RemoveInstance()`. Note that in case some of the devices couldn't
2589   be remove, the removal will continue with the other ones (compare
2590   with `_CreateDisks()`).
2591
2592   Args:
2593     instance: the instance object
2594
2595   Returns:
2596     True or False showing the success of the removal proces
2597
2598   """
2599   logger.Info("removing block devices for instance %s" % instance.name)
2600
2601   result = True
2602   for device in instance.disks:
2603     for node, disk in device.ComputeNodeTree(instance.primary_node):
2604       cfg.SetDiskID(disk, node)
2605       if not rpc.call_blockdev_remove(node, disk):
2606         logger.Error("could not remove block device %s on node %s,"
2607                      " continuing anyway" %
2608                      (device.iv_name, node))
2609         result = False
2610   return result
2611
2612
2613 class LUCreateInstance(LogicalUnit):
2614   """Create an instance.
2615
2616   """
2617   HPATH = "instance-add"
2618   HTYPE = constants.HTYPE_INSTANCE
2619   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2620               "disk_template", "swap_size", "mode", "start", "vcpus",
2621               "wait_for_sync"]
2622
2623   def BuildHooksEnv(self):
2624     """Build hooks env.
2625
2626     This runs on master, primary and secondary nodes of the instance.
2627
2628     """
2629     env = {
2630       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2631       "INSTANCE_DISK_SIZE": self.op.disk_size,
2632       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2633       "INSTANCE_ADD_MODE": self.op.mode,
2634       }
2635     if self.op.mode == constants.INSTANCE_IMPORT:
2636       env["INSTANCE_SRC_NODE"] = self.op.src_node
2637       env["INSTANCE_SRC_PATH"] = self.op.src_path
2638       env["INSTANCE_SRC_IMAGE"] = self.src_image
2639
2640     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2641       primary_node=self.op.pnode,
2642       secondary_nodes=self.secondaries,
2643       status=self.instance_status,
2644       os_type=self.op.os_type,
2645       memory=self.op.mem_size,
2646       vcpus=self.op.vcpus,
2647       nics=[(self.inst_ip, self.op.bridge)],
2648     ))
2649
2650     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2651           self.secondaries)
2652     return env, nl, nl
2653
2654
2655   def CheckPrereq(self):
2656     """Check prerequisites.
2657
2658     """
2659     if self.op.mode not in (constants.INSTANCE_CREATE,
2660                             constants.INSTANCE_IMPORT):
2661       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2662                                  self.op.mode)
2663
2664     if self.op.mode == constants.INSTANCE_IMPORT:
2665       src_node = getattr(self.op, "src_node", None)
2666       src_path = getattr(self.op, "src_path", None)
2667       if src_node is None or src_path is None:
2668         raise errors.OpPrereqError("Importing an instance requires source"
2669                                    " node and path options")
2670       src_node_full = self.cfg.ExpandNodeName(src_node)
2671       if src_node_full is None:
2672         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2673       self.op.src_node = src_node = src_node_full
2674
2675       if not os.path.isabs(src_path):
2676         raise errors.OpPrereqError("The source path must be absolute")
2677
2678       export_info = rpc.call_export_info(src_node, src_path)
2679
2680       if not export_info:
2681         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2682
2683       if not export_info.has_section(constants.INISECT_EXP):
2684         raise errors.ProgrammerError("Corrupted export config")
2685
2686       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2687       if (int(ei_version) != constants.EXPORT_VERSION):
2688         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2689                                    (ei_version, constants.EXPORT_VERSION))
2690
2691       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2692         raise errors.OpPrereqError("Can't import instance with more than"
2693                                    " one data disk")
2694
2695       # FIXME: are the old os-es, disk sizes, etc. useful?
2696       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2697       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2698                                                          'disk0_dump'))
2699       self.src_image = diskimage
2700     else: # INSTANCE_CREATE
2701       if getattr(self.op, "os_type", None) is None:
2702         raise errors.OpPrereqError("No guest OS specified")
2703
2704     # check primary node
2705     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2706     if pnode is None:
2707       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2708                                  self.op.pnode)
2709     self.op.pnode = pnode.name
2710     self.pnode = pnode
2711     self.secondaries = []
2712     # disk template and mirror node verification
2713     if self.op.disk_template not in constants.DISK_TEMPLATES:
2714       raise errors.OpPrereqError("Invalid disk template name")
2715
2716     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2717       if getattr(self.op, "snode", None) is None:
2718         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2719                                    " a mirror node")
2720
2721       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2722       if snode_name is None:
2723         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2724                                    self.op.snode)
2725       elif snode_name == pnode.name:
2726         raise errors.OpPrereqError("The secondary node cannot be"
2727                                    " the primary node.")
2728       self.secondaries.append(snode_name)
2729
2730     # Check lv size requirements
2731     nodenames = [pnode.name] + self.secondaries
2732     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2733
2734     # Required free disk space as a function of disk and swap space
2735     req_size_dict = {
2736       constants.DT_DISKLESS: 0,
2737       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2738       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2739       # 256 MB are added for drbd metadata, 128MB for each drbd device
2740       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2741     }
2742
2743     if self.op.disk_template not in req_size_dict:
2744       raise errors.ProgrammerError("Disk template '%s' size requirement"
2745                                    " is unknown" %  self.op.disk_template)
2746
2747     req_size = req_size_dict[self.op.disk_template]
2748
2749     for node in nodenames:
2750       info = nodeinfo.get(node, None)
2751       if not info:
2752         raise errors.OpPrereqError("Cannot get current information"
2753                                    " from node '%s'" % nodeinfo)
2754       if req_size > info['vg_free']:
2755         raise errors.OpPrereqError("Not enough disk space on target node %s."
2756                                    " %d MB available, %d MB required" %
2757                                    (node, info['vg_free'], req_size))
2758
2759     # os verification
2760     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2761     if not isinstance(os_obj, objects.OS):
2762       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2763                                  " primary node"  % self.op.os_type)
2764
2765     # instance verification
2766     hostname1 = utils.LookupHostname(self.op.instance_name)
2767     if not hostname1:
2768       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2769                                  self.op.instance_name)
2770
2771     self.op.instance_name = instance_name = hostname1['hostname']
2772     instance_list = self.cfg.GetInstanceList()
2773     if instance_name in instance_list:
2774       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2775                                  instance_name)
2776
2777     ip = getattr(self.op, "ip", None)
2778     if ip is None or ip.lower() == "none":
2779       inst_ip = None
2780     elif ip.lower() == "auto":
2781       inst_ip = hostname1['ip']
2782     else:
2783       if not utils.IsValidIP(ip):
2784         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2785                                    " like a valid IP" % ip)
2786       inst_ip = ip
2787     self.inst_ip = inst_ip
2788
2789     command = ["fping", "-q", hostname1['ip']]
2790     result = utils.RunCmd(command)
2791     if not result.failed:
2792       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2793                                  (hostname1['ip'], instance_name))
2794
2795     # bridge verification
2796     bridge = getattr(self.op, "bridge", None)
2797     if bridge is None:
2798       self.op.bridge = self.cfg.GetDefBridge()
2799     else:
2800       self.op.bridge = bridge
2801
2802     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2803       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2804                                  " destination node '%s'" %
2805                                  (self.op.bridge, pnode.name))
2806
2807     if self.op.start:
2808       self.instance_status = 'up'
2809     else:
2810       self.instance_status = 'down'
2811
2812   def Exec(self, feedback_fn):
2813     """Create and add the instance to the cluster.
2814
2815     """
2816     instance = self.op.instance_name
2817     pnode_name = self.pnode.name
2818
2819     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2820     if self.inst_ip is not None:
2821       nic.ip = self.inst_ip
2822
2823     disks = _GenerateDiskTemplate(self.cfg,
2824                                   self.op.disk_template,
2825                                   instance, pnode_name,
2826                                   self.secondaries, self.op.disk_size,
2827                                   self.op.swap_size)
2828
2829     iobj = objects.Instance(name=instance, os=self.op.os_type,
2830                             primary_node=pnode_name,
2831                             memory=self.op.mem_size,
2832                             vcpus=self.op.vcpus,
2833                             nics=[nic], disks=disks,
2834                             disk_template=self.op.disk_template,
2835                             status=self.instance_status,
2836                             )
2837
2838     feedback_fn("* creating instance disks...")
2839     if not _CreateDisks(self.cfg, iobj):
2840       _RemoveDisks(iobj, self.cfg)
2841       raise errors.OpExecError("Device creation failed, reverting...")
2842
2843     feedback_fn("adding instance %s to cluster config" % instance)
2844
2845     self.cfg.AddInstance(iobj)
2846
2847     if self.op.wait_for_sync:
2848       disk_abort = not _WaitForSync(self.cfg, iobj)
2849     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2850       # make sure the disks are not degraded (still sync-ing is ok)
2851       time.sleep(15)
2852       feedback_fn("* checking mirrors status")
2853       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2854     else:
2855       disk_abort = False
2856
2857     if disk_abort:
2858       _RemoveDisks(iobj, self.cfg)
2859       self.cfg.RemoveInstance(iobj.name)
2860       raise errors.OpExecError("There are some degraded disks for"
2861                                " this instance")
2862
2863     feedback_fn("creating os for instance %s on node %s" %
2864                 (instance, pnode_name))
2865
2866     if iobj.disk_template != constants.DT_DISKLESS:
2867       if self.op.mode == constants.INSTANCE_CREATE:
2868         feedback_fn("* running the instance OS create scripts...")
2869         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2870           raise errors.OpExecError("could not add os for instance %s"
2871                                    " on node %s" %
2872                                    (instance, pnode_name))
2873
2874       elif self.op.mode == constants.INSTANCE_IMPORT:
2875         feedback_fn("* running the instance OS import scripts...")
2876         src_node = self.op.src_node
2877         src_image = self.src_image
2878         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2879                                                 src_node, src_image):
2880           raise errors.OpExecError("Could not import os for instance"
2881                                    " %s on node %s" %
2882                                    (instance, pnode_name))
2883       else:
2884         # also checked in the prereq part
2885         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2886                                      % self.op.mode)
2887
2888     if self.op.start:
2889       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2890       feedback_fn("* starting instance...")
2891       if not rpc.call_instance_start(pnode_name, iobj, None):
2892         raise errors.OpExecError("Could not start instance")
2893
2894
2895 class LUConnectConsole(NoHooksLU):
2896   """Connect to an instance's console.
2897
2898   This is somewhat special in that it returns the command line that
2899   you need to run on the master node in order to connect to the
2900   console.
2901
2902   """
2903   _OP_REQP = ["instance_name"]
2904
2905   def CheckPrereq(self):
2906     """Check prerequisites.
2907
2908     This checks that the instance is in the cluster.
2909
2910     """
2911     instance = self.cfg.GetInstanceInfo(
2912       self.cfg.ExpandInstanceName(self.op.instance_name))
2913     if instance is None:
2914       raise errors.OpPrereqError("Instance '%s' not known" %
2915                                  self.op.instance_name)
2916     self.instance = instance
2917
2918   def Exec(self, feedback_fn):
2919     """Connect to the console of an instance
2920
2921     """
2922     instance = self.instance
2923     node = instance.primary_node
2924
2925     node_insts = rpc.call_instance_list([node])[node]
2926     if node_insts is False:
2927       raise errors.OpExecError("Can't connect to node %s." % node)
2928
2929     if instance.name not in node_insts:
2930       raise errors.OpExecError("Instance %s is not running." % instance.name)
2931
2932     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2933
2934     hyper = hypervisor.GetHypervisor()
2935     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2936     # build ssh cmdline
2937     argv = ["ssh", "-q", "-t"]
2938     argv.extend(ssh.KNOWN_HOSTS_OPTS)
2939     argv.extend(ssh.BATCH_MODE_OPTS)
2940     argv.append(node)
2941     argv.append(console_cmd)
2942     return "ssh", argv
2943
2944
2945 class LUAddMDDRBDComponent(LogicalUnit):
2946   """Adda new mirror member to an instance's disk.
2947
2948   """
2949   HPATH = "mirror-add"
2950   HTYPE = constants.HTYPE_INSTANCE
2951   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2952
2953   def BuildHooksEnv(self):
2954     """Build hooks env.
2955
2956     This runs on the master, the primary and all the secondaries.
2957
2958     """
2959     env = {
2960       "NEW_SECONDARY": self.op.remote_node,
2961       "DISK_NAME": self.op.disk_name,
2962       }
2963     env.update(_BuildInstanceHookEnvByObject(self.instance))
2964     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2965           self.op.remote_node,] + list(self.instance.secondary_nodes)
2966     return env, nl, nl
2967
2968   def CheckPrereq(self):
2969     """Check prerequisites.
2970
2971     This checks that the instance is in the cluster.
2972
2973     """
2974     instance = self.cfg.GetInstanceInfo(
2975       self.cfg.ExpandInstanceName(self.op.instance_name))
2976     if instance is None:
2977       raise errors.OpPrereqError("Instance '%s' not known" %
2978                                  self.op.instance_name)
2979     self.instance = instance
2980
2981     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2982     if remote_node is None:
2983       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2984     self.remote_node = remote_node
2985
2986     if remote_node == instance.primary_node:
2987       raise errors.OpPrereqError("The specified node is the primary node of"
2988                                  " the instance.")
2989
2990     if instance.disk_template != constants.DT_REMOTE_RAID1:
2991       raise errors.OpPrereqError("Instance's disk layout is not"
2992                                  " remote_raid1.")
2993     for disk in instance.disks:
2994       if disk.iv_name == self.op.disk_name:
2995         break
2996     else:
2997       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2998                                  " instance." % self.op.disk_name)
2999     if len(disk.children) > 1:
3000       raise errors.OpPrereqError("The device already has two slave"
3001                                  " devices.\n"
3002                                  "This would create a 3-disk raid1"
3003                                  " which we don't allow.")
3004     self.disk = disk
3005
3006   def Exec(self, feedback_fn):
3007     """Add the mirror component
3008
3009     """
3010     disk = self.disk
3011     instance = self.instance
3012
3013     remote_node = self.remote_node
3014     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3015     names = _GenerateUniqueNames(self.cfg, lv_names)
3016     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3017                                      remote_node, disk.size, names)
3018
3019     logger.Info("adding new mirror component on secondary")
3020     #HARDCODE
3021     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3022                                       _GetInstanceInfoText(instance)):
3023       raise errors.OpExecError("Failed to create new component on secondary"
3024                                " node %s" % remote_node)
3025
3026     logger.Info("adding new mirror component on primary")
3027     #HARDCODE
3028     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3029                                     _GetInstanceInfoText(instance)):
3030       # remove secondary dev
3031       self.cfg.SetDiskID(new_drbd, remote_node)
3032       rpc.call_blockdev_remove(remote_node, new_drbd)
3033       raise errors.OpExecError("Failed to create volume on primary")
3034
3035     # the device exists now
3036     # call the primary node to add the mirror to md
3037     logger.Info("adding new mirror component to md")
3038     if not rpc.call_blockdev_addchild(instance.primary_node,
3039                                            disk, new_drbd):
3040       logger.Error("Can't add mirror compoment to md!")
3041       self.cfg.SetDiskID(new_drbd, remote_node)
3042       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3043         logger.Error("Can't rollback on secondary")
3044       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3045       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3046         logger.Error("Can't rollback on primary")
3047       raise errors.OpExecError("Can't add mirror component to md array")
3048
3049     disk.children.append(new_drbd)
3050
3051     self.cfg.AddInstance(instance)
3052
3053     _WaitForSync(self.cfg, instance)
3054
3055     return 0
3056
3057
3058 class LURemoveMDDRBDComponent(LogicalUnit):
3059   """Remove a component from a remote_raid1 disk.
3060
3061   """
3062   HPATH = "mirror-remove"
3063   HTYPE = constants.HTYPE_INSTANCE
3064   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3065
3066   def BuildHooksEnv(self):
3067     """Build hooks env.
3068
3069     This runs on the master, the primary and all the secondaries.
3070
3071     """
3072     env = {
3073       "DISK_NAME": self.op.disk_name,
3074       "DISK_ID": self.op.disk_id,
3075       "OLD_SECONDARY": self.old_secondary,
3076       }
3077     env.update(_BuildInstanceHookEnvByObject(self.instance))
3078     nl = [self.sstore.GetMasterNode(),
3079           self.instance.primary_node] + list(self.instance.secondary_nodes)
3080     return env, nl, nl
3081
3082   def CheckPrereq(self):
3083     """Check prerequisites.
3084
3085     This checks that the instance is in the cluster.
3086
3087     """
3088     instance = self.cfg.GetInstanceInfo(
3089       self.cfg.ExpandInstanceName(self.op.instance_name))
3090     if instance is None:
3091       raise errors.OpPrereqError("Instance '%s' not known" %
3092                                  self.op.instance_name)
3093     self.instance = instance
3094
3095     if instance.disk_template != constants.DT_REMOTE_RAID1:
3096       raise errors.OpPrereqError("Instance's disk layout is not"
3097                                  " remote_raid1.")
3098     for disk in instance.disks:
3099       if disk.iv_name == self.op.disk_name:
3100         break
3101     else:
3102       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3103                                  " instance." % self.op.disk_name)
3104     for child in disk.children:
3105       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3106         break
3107     else:
3108       raise errors.OpPrereqError("Can't find the device with this port.")
3109
3110     if len(disk.children) < 2:
3111       raise errors.OpPrereqError("Cannot remove the last component from"
3112                                  " a mirror.")
3113     self.disk = disk
3114     self.child = child
3115     if self.child.logical_id[0] == instance.primary_node:
3116       oid = 1
3117     else:
3118       oid = 0
3119     self.old_secondary = self.child.logical_id[oid]
3120
3121   def Exec(self, feedback_fn):
3122     """Remove the mirror component
3123
3124     """
3125     instance = self.instance
3126     disk = self.disk
3127     child = self.child
3128     logger.Info("remove mirror component")
3129     self.cfg.SetDiskID(disk, instance.primary_node)
3130     if not rpc.call_blockdev_removechild(instance.primary_node,
3131                                               disk, child):
3132       raise errors.OpExecError("Can't remove child from mirror.")
3133
3134     for node in child.logical_id[:2]:
3135       self.cfg.SetDiskID(child, node)
3136       if not rpc.call_blockdev_remove(node, child):
3137         logger.Error("Warning: failed to remove device from node %s,"
3138                      " continuing operation." % node)
3139
3140     disk.children.remove(child)
3141     self.cfg.AddInstance(instance)
3142
3143
3144 class LUReplaceDisks(LogicalUnit):
3145   """Replace the disks of an instance.
3146
3147   """
3148   HPATH = "mirrors-replace"
3149   HTYPE = constants.HTYPE_INSTANCE
3150   _OP_REQP = ["instance_name"]
3151
3152   def BuildHooksEnv(self):
3153     """Build hooks env.
3154
3155     This runs on the master, the primary and all the secondaries.
3156
3157     """
3158     env = {
3159       "NEW_SECONDARY": self.op.remote_node,
3160       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3161       }
3162     env.update(_BuildInstanceHookEnvByObject(self.instance))
3163     nl = [self.sstore.GetMasterNode(),
3164           self.instance.primary_node] + list(self.instance.secondary_nodes)
3165     return env, nl, nl
3166
3167   def CheckPrereq(self):
3168     """Check prerequisites.
3169
3170     This checks that the instance is in the cluster.
3171
3172     """
3173     instance = self.cfg.GetInstanceInfo(
3174       self.cfg.ExpandInstanceName(self.op.instance_name))
3175     if instance is None:
3176       raise errors.OpPrereqError("Instance '%s' not known" %
3177                                  self.op.instance_name)
3178     self.instance = instance
3179
3180     if instance.disk_template != constants.DT_REMOTE_RAID1:
3181       raise errors.OpPrereqError("Instance's disk layout is not"
3182                                  " remote_raid1.")
3183
3184     if len(instance.secondary_nodes) != 1:
3185       raise errors.OpPrereqError("The instance has a strange layout,"
3186                                  " expected one secondary but found %d" %
3187                                  len(instance.secondary_nodes))
3188
3189     remote_node = getattr(self.op, "remote_node", None)
3190     if remote_node is None:
3191       remote_node = instance.secondary_nodes[0]
3192     else:
3193       remote_node = self.cfg.ExpandNodeName(remote_node)
3194       if remote_node is None:
3195         raise errors.OpPrereqError("Node '%s' not known" %
3196                                    self.op.remote_node)
3197     if remote_node == instance.primary_node:
3198       raise errors.OpPrereqError("The specified node is the primary node of"
3199                                  " the instance.")
3200     self.op.remote_node = remote_node
3201
3202   def Exec(self, feedback_fn):
3203     """Replace the disks of an instance.
3204
3205     """
3206     instance = self.instance
3207     iv_names = {}
3208     # start of work
3209     remote_node = self.op.remote_node
3210     cfg = self.cfg
3211     for dev in instance.disks:
3212       size = dev.size
3213       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3214       names = _GenerateUniqueNames(cfg, lv_names)
3215       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3216                                        remote_node, size, names)
3217       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3218       logger.Info("adding new mirror component on secondary for %s" %
3219                   dev.iv_name)
3220       #HARDCODE
3221       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3222                                         _GetInstanceInfoText(instance)):
3223         raise errors.OpExecError("Failed to create new component on"
3224                                  " secondary node %s\n"
3225                                  "Full abort, cleanup manually!" %
3226                                  remote_node)
3227
3228       logger.Info("adding new mirror component on primary")
3229       #HARDCODE
3230       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3231                                       _GetInstanceInfoText(instance)):
3232         # remove secondary dev
3233         cfg.SetDiskID(new_drbd, remote_node)
3234         rpc.call_blockdev_remove(remote_node, new_drbd)
3235         raise errors.OpExecError("Failed to create volume on primary!\n"
3236                                  "Full abort, cleanup manually!!")
3237
3238       # the device exists now
3239       # call the primary node to add the mirror to md
3240       logger.Info("adding new mirror component to md")
3241       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3242                                         new_drbd):
3243         logger.Error("Can't add mirror compoment to md!")
3244         cfg.SetDiskID(new_drbd, remote_node)
3245         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3246           logger.Error("Can't rollback on secondary")
3247         cfg.SetDiskID(new_drbd, instance.primary_node)
3248         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3249           logger.Error("Can't rollback on primary")
3250         raise errors.OpExecError("Full abort, cleanup manually!!")
3251
3252       dev.children.append(new_drbd)
3253       cfg.AddInstance(instance)
3254
3255     # this can fail as the old devices are degraded and _WaitForSync
3256     # does a combined result over all disks, so we don't check its
3257     # return value
3258     _WaitForSync(cfg, instance, unlock=True)
3259
3260     # so check manually all the devices
3261     for name in iv_names:
3262       dev, child, new_drbd = iv_names[name]
3263       cfg.SetDiskID(dev, instance.primary_node)
3264       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3265       if is_degr:
3266         raise errors.OpExecError("MD device %s is degraded!" % name)
3267       cfg.SetDiskID(new_drbd, instance.primary_node)
3268       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3269       if is_degr:
3270         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3271
3272     for name in iv_names:
3273       dev, child, new_drbd = iv_names[name]
3274       logger.Info("remove mirror %s component" % name)
3275       cfg.SetDiskID(dev, instance.primary_node)
3276       if not rpc.call_blockdev_removechild(instance.primary_node,
3277                                                 dev, child):
3278         logger.Error("Can't remove child from mirror, aborting"
3279                      " *this device cleanup*.\nYou need to cleanup manually!!")
3280         continue
3281
3282       for node in child.logical_id[:2]:
3283         logger.Info("remove child device on %s" % node)
3284         cfg.SetDiskID(child, node)
3285         if not rpc.call_blockdev_remove(node, child):
3286           logger.Error("Warning: failed to remove device from node %s,"
3287                        " continuing operation." % node)
3288
3289       dev.children.remove(child)
3290
3291       cfg.AddInstance(instance)
3292
3293
3294 class LUQueryInstanceData(NoHooksLU):
3295   """Query runtime instance data.
3296
3297   """
3298   _OP_REQP = ["instances"]
3299
3300   def CheckPrereq(self):
3301     """Check prerequisites.
3302
3303     This only checks the optional instance list against the existing names.
3304
3305     """
3306     if not isinstance(self.op.instances, list):
3307       raise errors.OpPrereqError("Invalid argument type 'instances'")
3308     if self.op.instances:
3309       self.wanted_instances = []
3310       names = self.op.instances
3311       for name in names:
3312         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3313         if instance is None:
3314           raise errors.OpPrereqError("No such instance name '%s'" % name)
3315       self.wanted_instances.append(instance)
3316     else:
3317       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3318                                in self.cfg.GetInstanceList()]
3319     return
3320
3321
3322   def _ComputeDiskStatus(self, instance, snode, dev):
3323     """Compute block device status.
3324
3325     """
3326     self.cfg.SetDiskID(dev, instance.primary_node)
3327     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3328     if dev.dev_type == "drbd":
3329       # we change the snode then (otherwise we use the one passed in)
3330       if dev.logical_id[0] == instance.primary_node:
3331         snode = dev.logical_id[1]
3332       else:
3333         snode = dev.logical_id[0]
3334
3335     if snode:
3336       self.cfg.SetDiskID(dev, snode)
3337       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3338     else:
3339       dev_sstatus = None
3340
3341     if dev.children:
3342       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3343                       for child in dev.children]
3344     else:
3345       dev_children = []
3346
3347     data = {
3348       "iv_name": dev.iv_name,
3349       "dev_type": dev.dev_type,
3350       "logical_id": dev.logical_id,
3351       "physical_id": dev.physical_id,
3352       "pstatus": dev_pstatus,
3353       "sstatus": dev_sstatus,
3354       "children": dev_children,
3355       }
3356
3357     return data
3358
3359   def Exec(self, feedback_fn):
3360     """Gather and return data"""
3361     result = {}
3362     for instance in self.wanted_instances:
3363       remote_info = rpc.call_instance_info(instance.primary_node,
3364                                                 instance.name)
3365       if remote_info and "state" in remote_info:
3366         remote_state = "up"
3367       else:
3368         remote_state = "down"
3369       if instance.status == "down":
3370         config_state = "down"
3371       else:
3372         config_state = "up"
3373
3374       disks = [self._ComputeDiskStatus(instance, None, device)
3375                for device in instance.disks]
3376
3377       idict = {
3378         "name": instance.name,
3379         "config_state": config_state,
3380         "run_state": remote_state,
3381         "pnode": instance.primary_node,
3382         "snodes": instance.secondary_nodes,
3383         "os": instance.os,
3384         "memory": instance.memory,
3385         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3386         "disks": disks,
3387         }
3388
3389       result[instance.name] = idict
3390
3391     return result
3392
3393
3394 class LUSetInstanceParms(LogicalUnit):
3395   """Modifies an instances's parameters.
3396
3397   """
3398   HPATH = "instance-modify"
3399   HTYPE = constants.HTYPE_INSTANCE
3400   _OP_REQP = ["instance_name"]
3401
3402   def BuildHooksEnv(self):
3403     """Build hooks env.
3404
3405     This runs on the master, primary and secondaries.
3406
3407     """
3408     args = dict()
3409     if self.mem:
3410       args['memory'] = self.mem
3411     if self.vcpus:
3412       args['vcpus'] = self.vcpus
3413     if self.do_ip or self.do_bridge:
3414       if self.do_ip:
3415         ip = self.ip
3416       else:
3417         ip = self.instance.nics[0].ip
3418       if self.bridge:
3419         bridge = self.bridge
3420       else:
3421         bridge = self.instance.nics[0].bridge
3422       args['nics'] = [(ip, bridge)]
3423     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3424     nl = [self.sstore.GetMasterNode(),
3425           self.instance.primary_node] + list(self.instance.secondary_nodes)
3426     return env, nl, nl
3427
3428   def CheckPrereq(self):
3429     """Check prerequisites.
3430
3431     This only checks the instance list against the existing names.
3432
3433     """
3434     self.mem = getattr(self.op, "mem", None)
3435     self.vcpus = getattr(self.op, "vcpus", None)
3436     self.ip = getattr(self.op, "ip", None)
3437     self.bridge = getattr(self.op, "bridge", None)
3438     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3439       raise errors.OpPrereqError("No changes submitted")
3440     if self.mem is not None:
3441       try:
3442         self.mem = int(self.mem)
3443       except ValueError, err:
3444         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3445     if self.vcpus is not None:
3446       try:
3447         self.vcpus = int(self.vcpus)
3448       except ValueError, err:
3449         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3450     if self.ip is not None:
3451       self.do_ip = True
3452       if self.ip.lower() == "none":
3453         self.ip = None
3454       else:
3455         if not utils.IsValidIP(self.ip):
3456           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3457     else:
3458       self.do_ip = False
3459     self.do_bridge = (self.bridge is not None)
3460
3461     instance = self.cfg.GetInstanceInfo(
3462       self.cfg.ExpandInstanceName(self.op.instance_name))
3463     if instance is None:
3464       raise errors.OpPrereqError("No such instance name '%s'" %
3465                                  self.op.instance_name)
3466     self.op.instance_name = instance.name
3467     self.instance = instance
3468     return
3469
3470   def Exec(self, feedback_fn):
3471     """Modifies an instance.
3472
3473     All parameters take effect only at the next restart of the instance.
3474     """
3475     result = []
3476     instance = self.instance
3477     if self.mem:
3478       instance.memory = self.mem
3479       result.append(("mem", self.mem))
3480     if self.vcpus:
3481       instance.vcpus = self.vcpus
3482       result.append(("vcpus",  self.vcpus))
3483     if self.do_ip:
3484       instance.nics[0].ip = self.ip
3485       result.append(("ip", self.ip))
3486     if self.bridge:
3487       instance.nics[0].bridge = self.bridge
3488       result.append(("bridge", self.bridge))
3489
3490     self.cfg.AddInstance(instance)
3491
3492     return result
3493
3494
3495 class LUQueryExports(NoHooksLU):
3496   """Query the exports list
3497
3498   """
3499   _OP_REQP = []
3500
3501   def CheckPrereq(self):
3502     """Check that the nodelist contains only existing nodes.
3503
3504     """
3505     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3506
3507   def Exec(self, feedback_fn):
3508     """Compute the list of all the exported system images.
3509
3510     Returns:
3511       a dictionary with the structure node->(export-list)
3512       where export-list is a list of the instances exported on
3513       that node.
3514
3515     """
3516     return rpc.call_export_list(self.nodes)
3517
3518
3519 class LUExportInstance(LogicalUnit):
3520   """Export an instance to an image in the cluster.
3521
3522   """
3523   HPATH = "instance-export"
3524   HTYPE = constants.HTYPE_INSTANCE
3525   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3526
3527   def BuildHooksEnv(self):
3528     """Build hooks env.
3529
3530     This will run on the master, primary node and target node.
3531
3532     """
3533     env = {
3534       "EXPORT_NODE": self.op.target_node,
3535       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3536       }
3537     env.update(_BuildInstanceHookEnvByObject(self.instance))
3538     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3539           self.op.target_node]
3540     return env, nl, nl
3541
3542   def CheckPrereq(self):
3543     """Check prerequisites.
3544
3545     This checks that the instance name is a valid one.
3546
3547     """
3548     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3549     self.instance = self.cfg.GetInstanceInfo(instance_name)
3550     if self.instance is None:
3551       raise errors.OpPrereqError("Instance '%s' not found" %
3552                                  self.op.instance_name)
3553
3554     # node verification
3555     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3556     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3557
3558     if self.dst_node is None:
3559       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3560                                  self.op.target_node)
3561     self.op.target_node = self.dst_node.name
3562
3563   def Exec(self, feedback_fn):
3564     """Export an instance to an image in the cluster.
3565
3566     """
3567     instance = self.instance
3568     dst_node = self.dst_node
3569     src_node = instance.primary_node
3570     # shutdown the instance, unless requested not to do so
3571     if self.op.shutdown:
3572       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3573       self.processor.ChainOpCode(op, feedback_fn)
3574
3575     vgname = self.cfg.GetVGName()
3576
3577     snap_disks = []
3578
3579     try:
3580       for disk in instance.disks:
3581         if disk.iv_name == "sda":
3582           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3583           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3584
3585           if not new_dev_name:
3586             logger.Error("could not snapshot block device %s on node %s" %
3587                          (disk.logical_id[1], src_node))
3588           else:
3589             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3590                                       logical_id=(vgname, new_dev_name),
3591                                       physical_id=(vgname, new_dev_name),
3592                                       iv_name=disk.iv_name)
3593             snap_disks.append(new_dev)
3594
3595     finally:
3596       if self.op.shutdown:
3597         op = opcodes.OpStartupInstance(instance_name=instance.name,
3598                                        force=False)
3599         self.processor.ChainOpCode(op, feedback_fn)
3600
3601     # TODO: check for size
3602
3603     for dev in snap_disks:
3604       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3605                                            instance):
3606         logger.Error("could not export block device %s from node"
3607                      " %s to node %s" %
3608                      (dev.logical_id[1], src_node, dst_node.name))
3609       if not rpc.call_blockdev_remove(src_node, dev):
3610         logger.Error("could not remove snapshot block device %s from"
3611                      " node %s" % (dev.logical_id[1], src_node))
3612
3613     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3614       logger.Error("could not finalize export for instance %s on node %s" %
3615                    (instance.name, dst_node.name))
3616
3617     nodelist = self.cfg.GetNodeList()
3618     nodelist.remove(dst_node.name)
3619
3620     # on one-node clusters nodelist will be empty after the removal
3621     # if we proceed the backup would be removed because OpQueryExports
3622     # substitutes an empty list with the full cluster node list.
3623     if nodelist:
3624       op = opcodes.OpQueryExports(nodes=nodelist)
3625       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3626       for node in exportlist:
3627         if instance.name in exportlist[node]:
3628           if not rpc.call_export_remove(node, instance.name):
3629             logger.Error("could not remove older export for instance %s"
3630                          " on node %s" % (instance.name, node))
3631
3632
3633 class TagsLU(NoHooksLU):
3634   """Generic tags LU.
3635
3636   This is an abstract class which is the parent of all the other tags LUs.
3637
3638   """
3639   def CheckPrereq(self):
3640     """Check prerequisites.
3641
3642     """
3643     if self.op.kind == constants.TAG_CLUSTER:
3644       self.target = self.cfg.GetClusterInfo()
3645     elif self.op.kind == constants.TAG_NODE:
3646       name = self.cfg.ExpandNodeName(self.op.name)
3647       if name is None:
3648         raise errors.OpPrereqError("Invalid node name (%s)" %
3649                                    (self.op.name,))
3650       self.op.name = name
3651       self.target = self.cfg.GetNodeInfo(name)
3652     elif self.op.kind == constants.TAG_INSTANCE:
3653       name = self.cfg.ExpandInstanceName(name)
3654       if name is None:
3655         raise errors.OpPrereqError("Invalid instance name (%s)" %
3656                                    (self.op.name,))
3657       self.op.name = name
3658       self.target = self.cfg.GetInstanceInfo(name)
3659     else:
3660       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3661                                  str(self.op.kind))
3662
3663
3664 class LUGetTags(TagsLU):
3665   """Returns the tags of a given object.
3666
3667   """
3668   _OP_REQP = ["kind", "name"]
3669
3670   def Exec(self, feedback_fn):
3671     """Returns the tag list.
3672
3673     """
3674     return self.target.GetTags()
3675
3676
3677 class LUAddTag(TagsLU):
3678   """Sets a tag on a given object.
3679
3680   """
3681   _OP_REQP = ["kind", "name", "tag"]
3682
3683   def CheckPrereq(self):
3684     """Check prerequisites.
3685
3686     This checks the type and length of the tag name and value.
3687
3688     """
3689     TagsLU.CheckPrereq(self)
3690     objects.TaggableObject.ValidateTag(self.op.tag)
3691
3692   def Exec(self, feedback_fn):
3693     """Sets the tag.
3694
3695     """
3696     try:
3697       self.target.AddTag(self.op.tag)
3698     except errors.TagError, err:
3699       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3700     try:
3701       self.cfg.Update(self.target)
3702     except errors.ConfigurationError:
3703       raise errors.OpRetryError("There has been a modification to the"
3704                                 " config file and the operation has been"
3705                                 " aborted. Please retry.")
3706
3707
3708 class LUDelTag(TagsLU):
3709   """Delete a tag from a given object.
3710
3711   """
3712   _OP_REQP = ["kind", "name", "tag"]
3713
3714   def CheckPrereq(self):
3715     """Check prerequisites.
3716
3717     This checks that we have the given tag.
3718
3719     """
3720     TagsLU.CheckPrereq(self)
3721     objects.TaggableObject.ValidateTag(self.op.tag)
3722     if self.op.tag not in self.target.GetTags():
3723       raise errors.OpPrereqError("Tag not found")
3724
3725   def Exec(self, feedback_fn):
3726     """Remove the tag from the object.
3727
3728     """
3729     self.target.RemoveTag(self.op.tag)
3730     try:
3731       self.cfg.Update(self.target)
3732     except errors.ConfigurationError:
3733       raise errors.OpRetryError("There has been a modification to the"
3734                                 " config file and the operation has been"
3735                                 " aborted. Please retry.")