Don't bail out if node isn't there on “gnt-node volumes”.
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81                                      attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError, ("Cluster not initialized yet,"
85                                      " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError, ("Commands must be run on the master"
90                                        " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError, ("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206                                  % ",".join(frozenset(selected).
207                                             difference(all_fields)))
208
209
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211                           memory, vcpus, nics):
212   """Builds instance related env variables for hooks from single variables.
213
214   Args:
215     secondary_nodes: List of secondary nodes as strings
216   """
217   env = {
218     "INSTANCE_NAME": name,
219     "INSTANCE_PRIMARY": primary_node,
220     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221     "INSTANCE_OS_TYPE": os_type,
222     "INSTANCE_STATUS": status,
223     "INSTANCE_MEMORY": memory,
224     "INSTANCE_VCPUS": vcpus,
225   }
226
227   if nics:
228     nic_count = len(nics)
229     for idx, (ip, bridge) in enumerate(nics):
230       if ip is None:
231         ip = ""
232       env["INSTANCE_NIC%d_IP" % idx] = ip
233       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234   else:
235     nic_count = 0
236
237   env["INSTANCE_NIC_COUNT"] = nic_count
238
239   return env
240
241
242 def _BuildInstanceHookEnvByObject(instance, override=None):
243   """Builds instance related env variables for hooks from an object.
244
245   Args:
246     instance: objects.Instance object of instance
247     override: dict of values to override
248   """
249   args = {
250     'name': instance.name,
251     'primary_node': instance.primary_node,
252     'secondary_nodes': instance.secondary_nodes,
253     'os_type': instance.os,
254     'status': instance.os,
255     'memory': instance.memory,
256     'vcpus': instance.vcpus,
257     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258   }
259   if override:
260     args.update(override)
261   return _BuildInstanceHookEnv(**args)
262
263
264 def _UpdateEtcHosts(fullnode, ip):
265   """Ensure a node has a correct entry in /etc/hosts.
266
267   Args:
268     fullnode - Fully qualified domain name of host. (str)
269     ip       - IPv4 address of host (str)
270
271   """
272   node = fullnode.split(".", 1)[0]
273
274   f = open('/etc/hosts', 'r+')
275
276   inthere = False
277
278   save_lines = []
279   add_lines = []
280   removed = False
281
282   while True:
283     rawline = f.readline()
284
285     if not rawline:
286       # End of file
287       break
288
289     line = rawline.split('\n')[0]
290
291     # Strip off comments
292     line = line.split('#')[0]
293
294     if not line:
295       # Entire line was comment, skip
296       save_lines.append(rawline)
297       continue
298
299     fields = line.split()
300
301     haveall = True
302     havesome = False
303     for spec in [ ip, fullnode, node ]:
304       if spec not in fields:
305         haveall = False
306       if spec in fields:
307         havesome = True
308
309     if haveall:
310       inthere = True
311       save_lines.append(rawline)
312       continue
313
314     if havesome and not haveall:
315       # Line (old, or manual?) which is missing some.  Remove.
316       removed = True
317       continue
318
319     save_lines.append(rawline)
320
321   if not inthere:
322     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323
324   if removed:
325     if add_lines:
326       save_lines = save_lines + add_lines
327
328     # We removed a line, write a new file and replace old.
329     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330     newfile = os.fdopen(fd, 'w')
331     newfile.write(''.join(save_lines))
332     newfile.close()
333     os.rename(tmpname, '/etc/hosts')
334
335   elif add_lines:
336     # Simply appending a new line will do the trick.
337     f.seek(0, 2)
338     for add in add_lines:
339       f.write(add)
340
341   f.close()
342
343
344 def _UpdateKnownHosts(fullnode, ip, pubkey):
345   """Ensure a node has a correct known_hosts entry.
346
347   Args:
348     fullnode - Fully qualified domain name of host. (str)
349     ip       - IPv4 address of host (str)
350     pubkey   - the public key of the cluster
351
352   """
353   if os.path.exists('/etc/ssh/ssh_known_hosts'):
354     f = open('/etc/ssh/ssh_known_hosts', 'r+')
355   else:
356     f = open('/etc/ssh/ssh_known_hosts', 'w+')
357
358   inthere = False
359
360   save_lines = []
361   add_lines = []
362   removed = False
363
364   while True:
365     rawline = f.readline()
366     logger.Debug('read %s' % (repr(rawline),))
367
368     if not rawline:
369       # End of file
370       break
371
372     line = rawline.split('\n')[0]
373
374     parts = line.split(' ')
375     fields = parts[0].split(',')
376     key = parts[2]
377
378     haveall = True
379     havesome = False
380     for spec in [ ip, fullnode ]:
381       if spec not in fields:
382         haveall = False
383       if spec in fields:
384         havesome = True
385
386     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387     if haveall and key == pubkey:
388       inthere = True
389       save_lines.append(rawline)
390       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391       continue
392
393     if havesome and (not haveall or key != pubkey):
394       removed = True
395       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396       continue
397
398     save_lines.append(rawline)
399
400   if not inthere:
401     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403
404   if removed:
405     save_lines = save_lines + add_lines
406
407     # Write a new file and replace old.
408     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
409     newfile = os.fdopen(fd, 'w')
410     newfile.write(''.join(save_lines))
411     newfile.close()
412     logger.Debug("Wrote new known_hosts.")
413     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
414
415   elif add_lines:
416     # Simply appending a new line will do the trick.
417     f.seek(0, 2)
418     for add in add_lines:
419       f.write(add)
420
421   f.close()
422
423
424 def _HasValidVG(vglist, vgname):
425   """Checks if the volume group list is valid.
426
427   A non-None return value means there's an error, and the return value
428   is the error message.
429
430   """
431   vgsize = vglist.get(vgname, None)
432   if vgsize is None:
433     return "volume group '%s' missing" % vgname
434   elif vgsize < 20480:
435     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
436             (vgname, vgsize))
437   return None
438
439
440 def _InitSSHSetup(node):
441   """Setup the SSH configuration for the cluster.
442
443
444   This generates a dsa keypair for root, adds the pub key to the
445   permitted hosts and adds the hostkey to its own known hosts.
446
447   Args:
448     node: the name of this host as a fqdn
449
450   """
451   utils.RemoveFile('/root/.ssh/known_hosts')
452
453   if os.path.exists('/root/.ssh/id_dsa'):
454     utils.CreateBackup('/root/.ssh/id_dsa')
455   if os.path.exists('/root/.ssh/id_dsa.pub'):
456     utils.CreateBackup('/root/.ssh/id_dsa.pub')
457
458   utils.RemoveFile('/root/.ssh/id_dsa')
459   utils.RemoveFile('/root/.ssh/id_dsa.pub')
460
461   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
462                          "-f", "/root/.ssh/id_dsa",
463                          "-q", "-N", ""])
464   if result.failed:
465     raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
466                                result.output)
467
468   f = open('/root/.ssh/id_dsa.pub', 'r')
469   try:
470     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
471   finally:
472     f.close()
473
474
475 def _InitGanetiServerSetup(ss):
476   """Setup the necessary configuration for the initial node daemon.
477
478   This creates the nodepass file containing the shared password for
479   the cluster and also generates the SSL certificate.
480
481   """
482   # Create pseudo random password
483   randpass = sha.new(os.urandom(64)).hexdigest()
484   # and write it into sstore
485   ss.SetKey(ss.SS_NODED_PASS, randpass)
486
487   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
488                          "-days", str(365*5), "-nodes", "-x509",
489                          "-keyout", constants.SSL_CERT_FILE,
490                          "-out", constants.SSL_CERT_FILE, "-batch"])
491   if result.failed:
492     raise errors.OpExecError, ("could not generate server ssl cert, command"
493                                " %s had exitcode %s and error message %s" %
494                                (result.cmd, result.exit_code, result.output))
495
496   os.chmod(constants.SSL_CERT_FILE, 0400)
497
498   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
499
500   if result.failed:
501     raise errors.OpExecError, ("could not start the node daemon, command %s"
502                                " had exitcode %s and error %s" %
503                                (result.cmd, result.exit_code, result.output))
504
505
506 class LUInitCluster(LogicalUnit):
507   """Initialise the cluster.
508
509   """
510   HPATH = "cluster-init"
511   HTYPE = constants.HTYPE_CLUSTER
512   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
513               "def_bridge", "master_netdev"]
514   REQ_CLUSTER = False
515
516   def BuildHooksEnv(self):
517     """Build hooks env.
518
519     Notes: Since we don't require a cluster, we must manually add
520     ourselves in the post-run node list.
521
522     """
523     env = {
524       "CLUSTER": self.op.cluster_name,
525       "MASTER": self.hostname['hostname_full'],
526       }
527     return env, [], [self.hostname['hostname_full']]
528
529   def CheckPrereq(self):
530     """Verify that the passed name is a valid one.
531
532     """
533     if config.ConfigWriter.IsCluster():
534       raise errors.OpPrereqError, ("Cluster is already initialised")
535
536     hostname_local = socket.gethostname()
537     self.hostname = hostname = utils.LookupHostname(hostname_local)
538     if not hostname:
539       raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
540                                    hostname_local)
541
542     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
543     if not clustername:
544       raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
545                                    % self.op.cluster_name)
546
547     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
548     if result.failed:
549       raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
550                                    " to %s,\nbut this ip address does not"
551                                    " belong to this host."
552                                    " Aborting." % hostname['ip'])
553
554     secondary_ip = getattr(self.op, "secondary_ip", None)
555     if secondary_ip and not utils.IsValidIP(secondary_ip):
556       raise errors.OpPrereqError, ("Invalid secondary ip given")
557     if secondary_ip and secondary_ip != hostname['ip']:
558       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
559       if result.failed:
560         raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
561                                      "but it does not belong to this host." %
562                                      secondary_ip)
563     self.secondary_ip = secondary_ip
564
565     # checks presence of the volume group given
566     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
567
568     if vgstatus:
569       raise errors.OpPrereqError, ("Error: %s" % vgstatus)
570
571     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
572                     self.op.mac_prefix):
573       raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
574                                    self.op.mac_prefix)
575
576     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
577       raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
578                                    self.op.hypervisor_type)
579
580     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
581     if result.failed:
582       raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
583                                    (self.op.master_netdev, result.output))
584
585   def Exec(self, feedback_fn):
586     """Initialize the cluster.
587
588     """
589     clustername = self.clustername
590     hostname = self.hostname
591
592     # set up the simple store
593     ss = ssconf.SimpleStore()
594     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
595     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
596     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
597     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
598     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
599
600     # set up the inter-node password and certificate
601     _InitGanetiServerSetup(ss)
602
603     # start the master ip
604     rpc.call_node_start_master(hostname['hostname_full'])
605
606     # set up ssh config and /etc/hosts
607     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
608     try:
609       sshline = f.read()
610     finally:
611       f.close()
612     sshkey = sshline.split(" ")[1]
613
614     _UpdateEtcHosts(hostname['hostname_full'],
615                     hostname['ip'],
616                     )
617
618     _UpdateKnownHosts(hostname['hostname_full'],
619                       hostname['ip'],
620                       sshkey,
621                       )
622
623     _InitSSHSetup(hostname['hostname'])
624
625     # init of cluster config file
626     cfgw = config.ConfigWriter()
627     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
628                     sshkey, self.op.mac_prefix,
629                     self.op.vg_name, self.op.def_bridge)
630
631
632 class LUDestroyCluster(NoHooksLU):
633   """Logical unit for destroying the cluster.
634
635   """
636   _OP_REQP = []
637
638   def CheckPrereq(self):
639     """Check prerequisites.
640
641     This checks whether the cluster is empty.
642
643     Any errors are signalled by raising errors.OpPrereqError.
644
645     """
646     master = self.sstore.GetMasterNode()
647
648     nodelist = self.cfg.GetNodeList()
649     if len(nodelist) != 1 or nodelist[0] != master:
650       raise errors.OpPrereqError, ("There are still %d node(s) in "
651                                    "this cluster." % (len(nodelist) - 1))
652     instancelist = self.cfg.GetInstanceList()
653     if instancelist:
654       raise errors.OpPrereqError, ("There are still %d instance(s) in "
655                                    "this cluster." % len(instancelist))
656
657   def Exec(self, feedback_fn):
658     """Destroys the cluster.
659
660     """
661     utils.CreateBackup('/root/.ssh/id_dsa')
662     utils.CreateBackup('/root/.ssh/id_dsa.pub')
663     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
664
665
666 class LUVerifyCluster(NoHooksLU):
667   """Verifies the cluster status.
668
669   """
670   _OP_REQP = []
671
672   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
673                   remote_version, feedback_fn):
674     """Run multiple tests against a node.
675
676     Test list:
677       - compares ganeti version
678       - checks vg existance and size > 20G
679       - checks config file checksum
680       - checks ssh to other nodes
681
682     Args:
683       node: name of the node to check
684       file_list: required list of files
685       local_cksum: dictionary of local files and their checksums
686
687     """
688     # compares ganeti version
689     local_version = constants.PROTOCOL_VERSION
690     if not remote_version:
691       feedback_fn(" - ERROR: connection to %s failed" % (node))
692       return True
693
694     if local_version != remote_version:
695       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
696                       (local_version, node, remote_version))
697       return True
698
699     # checks vg existance and size > 20G
700
701     bad = False
702     if not vglist:
703       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
704                       (node,))
705       bad = True
706     else:
707       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
708       if vgstatus:
709         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
710         bad = True
711
712     # checks config file checksum
713     # checks ssh to any
714
715     if 'filelist' not in node_result:
716       bad = True
717       feedback_fn("  - ERROR: node hasn't returned file checksum data")
718     else:
719       remote_cksum = node_result['filelist']
720       for file_name in file_list:
721         if file_name not in remote_cksum:
722           bad = True
723           feedback_fn("  - ERROR: file '%s' missing" % file_name)
724         elif remote_cksum[file_name] != local_cksum[file_name]:
725           bad = True
726           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
727
728     if 'nodelist' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
731     else:
732       if node_result['nodelist']:
733         bad = True
734         for node in node_result['nodelist']:
735           feedback_fn("  - ERROR: communication with node '%s': %s" %
736                           (node, node_result['nodelist'][node]))
737     hyp_result = node_result.get('hypervisor', None)
738     if hyp_result is not None:
739       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
740     return bad
741
742   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
743     """Verify an instance.
744
745     This function checks to see if the required block devices are
746     available on the instance's node.
747
748     """
749     bad = False
750
751     instancelist = self.cfg.GetInstanceList()
752     if not instance in instancelist:
753       feedback_fn("  - ERROR: instance %s not in instance list %s" %
754                       (instance, instancelist))
755       bad = True
756
757     instanceconfig = self.cfg.GetInstanceInfo(instance)
758     node_current = instanceconfig.primary_node
759
760     node_vol_should = {}
761     instanceconfig.MapLVsByNode(node_vol_should)
762
763     for node in node_vol_should:
764       for volume in node_vol_should[node]:
765         if node not in node_vol_is or volume not in node_vol_is[node]:
766           feedback_fn("  - ERROR: volume %s missing on node %s" %
767                           (volume, node))
768           bad = True
769
770     if not instanceconfig.status == 'down':
771       if not instance in node_instance[node_current]:
772         feedback_fn("  - ERROR: instance %s not running on node %s" %
773                         (instance, node_current))
774         bad = True
775
776     for node in node_instance:
777       if (not node == node_current):
778         if instance in node_instance[node]:
779           feedback_fn("  - ERROR: instance %s should not run on node %s" %
780                           (instance, node))
781           bad = True
782
783     return not bad
784
785   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
786     """Verify if there are any unknown volumes in the cluster.
787
788     The .os, .swap and backup volumes are ignored. All other volumes are
789     reported as unknown.
790
791     """
792     bad = False
793
794     for node in node_vol_is:
795       for volume in node_vol_is[node]:
796         if node not in node_vol_should or volume not in node_vol_should[node]:
797           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
798                       (volume, node))
799           bad = True
800     return bad
801
802   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
803     """Verify the list of running instances.
804
805     This checks what instances are running but unknown to the cluster.
806
807     """
808     bad = False
809     for node in node_instance:
810       for runninginstance in node_instance[node]:
811         if runninginstance not in instancelist:
812           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
813                           (runninginstance, node))
814           bad = True
815     return bad
816
817   def CheckPrereq(self):
818     """Check prerequisites.
819
820     This has no prerequisites.
821
822     """
823     pass
824
825   def Exec(self, feedback_fn):
826     """Verify integrity of cluster, performing various test on nodes.
827
828     """
829     bad = False
830     feedback_fn("* Verifying global settings")
831     self.cfg.VerifyConfig()
832
833     master = self.sstore.GetMasterNode()
834     vg_name = self.cfg.GetVGName()
835     nodelist = utils.NiceSort(self.cfg.GetNodeList())
836     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
837     node_volume = {}
838     node_instance = {}
839
840     # FIXME: verify OS list
841     # do local checksums
842     file_names = list(self.sstore.GetFileList())
843     file_names.append(constants.SSL_CERT_FILE)
844     file_names.append(constants.CLUSTER_CONF_FILE)
845     local_checksums = utils.FingerprintFiles(file_names)
846
847     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
848     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
849     all_instanceinfo = rpc.call_instance_list(nodelist)
850     all_vglist = rpc.call_vg_list(nodelist)
851     node_verify_param = {
852       'filelist': file_names,
853       'nodelist': nodelist,
854       'hypervisor': None,
855       }
856     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
857     all_rversion = rpc.call_version(nodelist)
858
859     for node in nodelist:
860       feedback_fn("* Verifying node %s" % node)
861       result = self._VerifyNode(node, file_names, local_checksums,
862                                 all_vglist[node], all_nvinfo[node],
863                                 all_rversion[node], feedback_fn)
864       bad = bad or result
865
866       # node_volume
867       volumeinfo = all_volumeinfo[node]
868
869       if type(volumeinfo) != dict:
870         feedback_fn("  - ERROR: connection to %s failed" % (node,))
871         bad = True
872         continue
873
874       node_volume[node] = volumeinfo
875
876       # node_instance
877       nodeinstance = all_instanceinfo[node]
878       if type(nodeinstance) != list:
879         feedback_fn("  - ERROR: connection to %s failed" % (node,))
880         bad = True
881         continue
882
883       node_instance[node] = nodeinstance
884
885     node_vol_should = {}
886
887     for instance in instancelist:
888       feedback_fn("* Verifying instance %s" % instance)
889       result =  self._VerifyInstance(instance, node_volume, node_instance,
890                                      feedback_fn)
891       bad = bad or result
892
893       inst_config = self.cfg.GetInstanceInfo(instance)
894
895       inst_config.MapLVsByNode(node_vol_should)
896
897     feedback_fn("* Verifying orphan volumes")
898     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
899                                        feedback_fn)
900     bad = bad or result
901
902     feedback_fn("* Verifying remaining instances")
903     result = self._VerifyOrphanInstances(instancelist, node_instance,
904                                          feedback_fn)
905     bad = bad or result
906
907     return int(bad)
908
909
910 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
911   """Sleep and poll for an instance's disk to sync.
912
913   """
914   if not instance.disks:
915     return True
916
917   if not oneshot:
918     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
919
920   node = instance.primary_node
921
922   for dev in instance.disks:
923     cfgw.SetDiskID(dev, node)
924
925   retries = 0
926   while True:
927     max_time = 0
928     done = True
929     cumul_degraded = False
930     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
931     if not rstats:
932       logger.ToStderr("Can't get any data from node %s" % node)
933       retries += 1
934       if retries >= 10:
935         raise errors.RemoteError, ("Can't contact node %s for mirror data,"
936                                    " aborting." % node)
937       time.sleep(6)
938       continue
939     retries = 0
940     for i in range(len(rstats)):
941       mstat = rstats[i]
942       if mstat is None:
943         logger.ToStderr("Can't compute data for node %s/%s" %
944                         (node, instance.disks[i].iv_name))
945         continue
946       perc_done, est_time, is_degraded = mstat
947       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
948       if perc_done is not None:
949         done = False
950         if est_time is not None:
951           rem_time = "%d estimated seconds remaining" % est_time
952           max_time = est_time
953         else:
954           rem_time = "no time estimate"
955         logger.ToStdout("- device %s: %5.2f%% done, %s" %
956                         (instance.disks[i].iv_name, perc_done, rem_time))
957     if done or oneshot:
958       break
959
960     if unlock:
961       utils.Unlock('cmd')
962     try:
963       time.sleep(min(60, max_time))
964     finally:
965       if unlock:
966         utils.Lock('cmd')
967
968   if done:
969     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
970   return not cumul_degraded
971
972
973 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
974   """Check that mirrors are not degraded.
975
976   """
977   cfgw.SetDiskID(dev, node)
978
979   result = True
980   if on_primary or dev.AssembleOnSecondary():
981     rstats = rpc.call_blockdev_find(node, dev)
982     if not rstats:
983       logger.ToStderr("Can't get any data from node %s" % node)
984       result = False
985     else:
986       result = result and (not rstats[5])
987   if dev.children:
988     for child in dev.children:
989       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
990
991   return result
992
993
994 class LUDiagnoseOS(NoHooksLU):
995   """Logical unit for OS diagnose/query.
996
997   """
998   _OP_REQP = []
999
1000   def CheckPrereq(self):
1001     """Check prerequisites.
1002
1003     This always succeeds, since this is a pure query LU.
1004
1005     """
1006     return
1007
1008   def Exec(self, feedback_fn):
1009     """Compute the list of OSes.
1010
1011     """
1012     node_list = self.cfg.GetNodeList()
1013     node_data = rpc.call_os_diagnose(node_list)
1014     if node_data == False:
1015       raise errors.OpExecError, "Can't gather the list of OSes"
1016     return node_data
1017
1018
1019 class LURemoveNode(LogicalUnit):
1020   """Logical unit for removing a node.
1021
1022   """
1023   HPATH = "node-remove"
1024   HTYPE = constants.HTYPE_NODE
1025   _OP_REQP = ["node_name"]
1026
1027   def BuildHooksEnv(self):
1028     """Build hooks env.
1029
1030     This doesn't run on the target node in the pre phase as a failed
1031     node would not allows itself to run.
1032
1033     """
1034     env = {
1035       "NODE_NAME": self.op.node_name,
1036       }
1037     all_nodes = self.cfg.GetNodeList()
1038     all_nodes.remove(self.op.node_name)
1039     return env, all_nodes, all_nodes
1040
1041   def CheckPrereq(self):
1042     """Check prerequisites.
1043
1044     This checks:
1045      - the node exists in the configuration
1046      - it does not have primary or secondary instances
1047      - it's not the master
1048
1049     Any errors are signalled by raising errors.OpPrereqError.
1050
1051     """
1052     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1053     if node is None:
1054       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1055       return 1
1056
1057     instance_list = self.cfg.GetInstanceList()
1058
1059     masternode = self.sstore.GetMasterNode()
1060     if node.name == masternode:
1061       raise errors.OpPrereqError, ("Node is the master node,"
1062                                    " you need to failover first.")
1063
1064     for instance_name in instance_list:
1065       instance = self.cfg.GetInstanceInfo(instance_name)
1066       if node.name == instance.primary_node:
1067         raise errors.OpPrereqError, ("Instance %s still running on the node,"
1068                                      " please remove first." % instance_name)
1069       if node.name in instance.secondary_nodes:
1070         raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1071                                      " please remove first." % instance_name)
1072     self.op.node_name = node.name
1073     self.node = node
1074
1075   def Exec(self, feedback_fn):
1076     """Removes the node from the cluster.
1077
1078     """
1079     node = self.node
1080     logger.Info("stopping the node daemon and removing configs from node %s" %
1081                 node.name)
1082
1083     rpc.call_node_leave_cluster(node.name)
1084
1085     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1086
1087     logger.Info("Removing node %s from config" % node.name)
1088
1089     self.cfg.RemoveNode(node.name)
1090
1091
1092 class LUQueryNodes(NoHooksLU):
1093   """Logical unit for querying nodes.
1094
1095   """
1096   _OP_REQP = ["output_fields"]
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This checks that the fields required are valid output fields.
1102
1103     """
1104     self.dynamic_fields = frozenset(["dtotal", "dfree",
1105                                      "mtotal", "mnode", "mfree"])
1106
1107     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1108                        dynamic=self.dynamic_fields,
1109                        selected=self.op.output_fields)
1110
1111
1112   def Exec(self, feedback_fn):
1113     """Computes the list of nodes and their attributes.
1114
1115     """
1116     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1117     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1118
1119
1120     # begin data gathering
1121
1122     if self.dynamic_fields.intersection(self.op.output_fields):
1123       live_data = {}
1124       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1125       for name in nodenames:
1126         nodeinfo = node_data.get(name, None)
1127         if nodeinfo:
1128           live_data[name] = {
1129             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1130             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1131             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1132             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1133             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1134             }
1135         else:
1136           live_data[name] = {}
1137     else:
1138       live_data = dict.fromkeys(nodenames, {})
1139
1140     node_to_primary = dict.fromkeys(nodenames, 0)
1141     node_to_secondary = dict.fromkeys(nodenames, 0)
1142
1143     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1144       instancelist = self.cfg.GetInstanceList()
1145
1146       for instance in instancelist:
1147         instanceinfo = self.cfg.GetInstanceInfo(instance)
1148         node_to_primary[instanceinfo.primary_node] += 1
1149         for secnode in instanceinfo.secondary_nodes:
1150           node_to_secondary[secnode] += 1
1151
1152     # end data gathering
1153
1154     output = []
1155     for node in nodelist:
1156       node_output = []
1157       for field in self.op.output_fields:
1158         if field == "name":
1159           val = node.name
1160         elif field == "pinst":
1161           val = node_to_primary[node.name]
1162         elif field == "sinst":
1163           val = node_to_secondary[node.name]
1164         elif field == "pip":
1165           val = node.primary_ip
1166         elif field == "sip":
1167           val = node.secondary_ip
1168         elif field in self.dynamic_fields:
1169           val = live_data[node.name].get(field, "?")
1170         else:
1171           raise errors.ParameterError, field
1172         val = str(val)
1173         node_output.append(val)
1174       output.append(node_output)
1175
1176     return output
1177
1178
1179 class LUQueryNodeVolumes(NoHooksLU):
1180   """Logical unit for getting volumes on node(s).
1181
1182   """
1183   _OP_REQP = ["nodes", "output_fields"]
1184
1185   def CheckPrereq(self):
1186     """Check prerequisites.
1187
1188     This checks that the fields required are valid output fields.
1189
1190     """
1191     self.nodes = _GetWantedNodes(self, self.op.nodes)
1192
1193     _CheckOutputFields(static=["node"],
1194                        dynamic=["phys", "vg", "name", "size", "instance"],
1195                        selected=self.op.output_fields)
1196
1197
1198   def Exec(self, feedback_fn):
1199     """Computes the list of nodes and their attributes.
1200
1201     """
1202     nodenames = utils.NiceSort([node.name for node in self.nodes])
1203     volumes = rpc.call_node_volumes(nodenames)
1204
1205     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1206              in self.cfg.GetInstanceList()]
1207
1208     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1209
1210     output = []
1211     for node in nodenames:
1212       if node not in volumes or not volumes[node]:
1213         continue
1214
1215       node_vols = volumes[node][:]
1216       node_vols.sort(key=lambda vol: vol['dev'])
1217
1218       for vol in node_vols:
1219         node_output = []
1220         for field in self.op.output_fields:
1221           if field == "node":
1222             val = node
1223           elif field == "phys":
1224             val = vol['dev']
1225           elif field == "vg":
1226             val = vol['vg']
1227           elif field == "name":
1228             val = vol['name']
1229           elif field == "size":
1230             val = int(float(vol['size']))
1231           elif field == "instance":
1232             for inst in ilist:
1233               if node not in lv_by_node[inst]:
1234                 continue
1235               if vol['name'] in lv_by_node[inst][node]:
1236                 val = inst.name
1237                 break
1238             else:
1239               val = '-'
1240           else:
1241             raise errors.ParameterError, field
1242           node_output.append(str(val))
1243
1244         output.append(node_output)
1245
1246     return output
1247
1248
1249 class LUAddNode(LogicalUnit):
1250   """Logical unit for adding node to the cluster.
1251
1252   """
1253   HPATH = "node-add"
1254   HTYPE = constants.HTYPE_NODE
1255   _OP_REQP = ["node_name"]
1256
1257   def BuildHooksEnv(self):
1258     """Build hooks env.
1259
1260     This will run on all nodes before, and on all nodes + the new node after.
1261
1262     """
1263     env = {
1264       "NODE_NAME": self.op.node_name,
1265       "NODE_PIP": self.op.primary_ip,
1266       "NODE_SIP": self.op.secondary_ip,
1267       }
1268     nodes_0 = self.cfg.GetNodeList()
1269     nodes_1 = nodes_0 + [self.op.node_name, ]
1270     return env, nodes_0, nodes_1
1271
1272   def CheckPrereq(self):
1273     """Check prerequisites.
1274
1275     This checks:
1276      - the new node is not already in the config
1277      - it is resolvable
1278      - its parameters (single/dual homed) matches the cluster
1279
1280     Any errors are signalled by raising errors.OpPrereqError.
1281
1282     """
1283     node_name = self.op.node_name
1284     cfg = self.cfg
1285
1286     dns_data = utils.LookupHostname(node_name)
1287     if not dns_data:
1288       raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1289
1290     node = dns_data['hostname']
1291     primary_ip = self.op.primary_ip = dns_data['ip']
1292     secondary_ip = getattr(self.op, "secondary_ip", None)
1293     if secondary_ip is None:
1294       secondary_ip = primary_ip
1295     if not utils.IsValidIP(secondary_ip):
1296       raise errors.OpPrereqError, ("Invalid secondary IP given")
1297     self.op.secondary_ip = secondary_ip
1298     node_list = cfg.GetNodeList()
1299     if node in node_list:
1300       raise errors.OpPrereqError, ("Node %s is already in the configuration"
1301                                    % node)
1302
1303     for existing_node_name in node_list:
1304       existing_node = cfg.GetNodeInfo(existing_node_name)
1305       if (existing_node.primary_ip == primary_ip or
1306           existing_node.secondary_ip == primary_ip or
1307           existing_node.primary_ip == secondary_ip or
1308           existing_node.secondary_ip == secondary_ip):
1309         raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1310                                      " existing node %s" % existing_node.name)
1311
1312     # check that the type of the node (single versus dual homed) is the
1313     # same as for the master
1314     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1315     master_singlehomed = myself.secondary_ip == myself.primary_ip
1316     newbie_singlehomed = secondary_ip == primary_ip
1317     if master_singlehomed != newbie_singlehomed:
1318       if master_singlehomed:
1319         raise errors.OpPrereqError, ("The master has no private ip but the"
1320                                      " new node has one")
1321       else:
1322         raise errors.OpPrereqError ("The master has a private ip but the"
1323                                     " new node doesn't have one")
1324
1325     # checks reachablity
1326     command = ["fping", "-q", primary_ip]
1327     result = utils.RunCmd(command)
1328     if result.failed:
1329       raise errors.OpPrereqError, ("Node not reachable by ping")
1330
1331     if not newbie_singlehomed:
1332       # check reachability from my secondary ip to newbie's secondary ip
1333       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1334       result = utils.RunCmd(command)
1335       if result.failed:
1336         raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1337
1338     self.new_node = objects.Node(name=node,
1339                                  primary_ip=primary_ip,
1340                                  secondary_ip=secondary_ip)
1341
1342   def Exec(self, feedback_fn):
1343     """Adds the new node to the cluster.
1344
1345     """
1346     new_node = self.new_node
1347     node = new_node.name
1348
1349     # set up inter-node password and certificate and restarts the node daemon
1350     gntpass = self.sstore.GetNodeDaemonPassword()
1351     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1352       raise errors.OpExecError, ("ganeti password corruption detected")
1353     f = open(constants.SSL_CERT_FILE)
1354     try:
1355       gntpem = f.read(8192)
1356     finally:
1357       f.close()
1358     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1359     # so we use this to detect an invalid certificate; as long as the
1360     # cert doesn't contain this, the here-document will be correctly
1361     # parsed by the shell sequence below
1362     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1363       raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1364     if not gntpem.endswith("\n"):
1365       raise errors.OpExecError, ("PEM must end with newline")
1366     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1367
1368     # remove first the root's known_hosts file
1369     utils.RemoveFile("/root/.ssh/known_hosts")
1370     # and then connect with ssh to set password and start ganeti-noded
1371     # note that all the below variables are sanitized at this point,
1372     # either by being constants or by the checks above
1373     ss = self.sstore
1374     mycommand = ("umask 077 && "
1375                  "echo '%s' > '%s' && "
1376                  "cat > '%s' << '!EOF.' && \n"
1377                  "%s!EOF.\n%s restart" %
1378                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1379                   constants.SSL_CERT_FILE, gntpem,
1380                   constants.NODE_INITD_SCRIPT))
1381
1382     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1383     if result.failed:
1384       raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1385                                  " output: %s" %
1386                                  (node, result.fail_reason, result.output))
1387
1388     # check connectivity
1389     time.sleep(4)
1390
1391     result = rpc.call_version([node])[node]
1392     if result:
1393       if constants.PROTOCOL_VERSION == result:
1394         logger.Info("communication to node %s fine, sw version %s match" %
1395                     (node, result))
1396       else:
1397         raise errors.OpExecError, ("Version mismatch master version %s,"
1398                                    " node version %s" %
1399                                    (constants.PROTOCOL_VERSION, result))
1400     else:
1401       raise errors.OpExecError, ("Cannot get version from the new node")
1402
1403     # setup ssh on node
1404     logger.Info("copy ssh key to node %s" % node)
1405     keyarray = []
1406     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1407                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1408                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1409
1410     for i in keyfiles:
1411       f = open(i, 'r')
1412       try:
1413         keyarray.append(f.read())
1414       finally:
1415         f.close()
1416
1417     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1418                                keyarray[3], keyarray[4], keyarray[5])
1419
1420     if not result:
1421       raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1422
1423     # Add node to our /etc/hosts, and add key to known_hosts
1424     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1425     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1426                       self.cfg.GetHostKey())
1427
1428     if new_node.secondary_ip != new_node.primary_ip:
1429       result = ssh.SSHCall(node, "root",
1430                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1431       if result.failed:
1432         raise errors.OpExecError, ("Node claims it doesn't have the"
1433                                    " secondary ip you gave (%s).\n"
1434                                    "Please fix and re-run this command." %
1435                                    new_node.secondary_ip)
1436
1437     # Distribute updated /etc/hosts and known_hosts to all nodes,
1438     # including the node just added
1439     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1440     dist_nodes = self.cfg.GetNodeList() + [node]
1441     if myself.name in dist_nodes:
1442       dist_nodes.remove(myself.name)
1443
1444     logger.Debug("Copying hosts and known_hosts to all nodes")
1445     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1446       result = rpc.call_upload_file(dist_nodes, fname)
1447       for to_node in dist_nodes:
1448         if not result[to_node]:
1449           logger.Error("copy of file %s to node %s failed" %
1450                        (fname, to_node))
1451
1452     to_copy = ss.GetFileList()
1453     for fname in to_copy:
1454       if not ssh.CopyFileToNode(node, fname):
1455         logger.Error("could not copy file %s to node %s" % (fname, node))
1456
1457     logger.Info("adding node %s to cluster.conf" % node)
1458     self.cfg.AddNode(new_node)
1459
1460
1461 class LUMasterFailover(LogicalUnit):
1462   """Failover the master node to the current node.
1463
1464   This is a special LU in that it must run on a non-master node.
1465
1466   """
1467   HPATH = "master-failover"
1468   HTYPE = constants.HTYPE_CLUSTER
1469   REQ_MASTER = False
1470   _OP_REQP = []
1471
1472   def BuildHooksEnv(self):
1473     """Build hooks env.
1474
1475     This will run on the new master only in the pre phase, and on all
1476     the nodes in the post phase.
1477
1478     """
1479     env = {
1480       "NEW_MASTER": self.new_master,
1481       "OLD_MASTER": self.old_master,
1482       }
1483     return env, [self.new_master], self.cfg.GetNodeList()
1484
1485   def CheckPrereq(self):
1486     """Check prerequisites.
1487
1488     This checks that we are not already the master.
1489
1490     """
1491     self.new_master = socket.gethostname()
1492
1493     self.old_master = self.sstore.GetMasterNode()
1494
1495     if self.old_master == self.new_master:
1496       raise errors.OpPrereqError, ("This commands must be run on the node"
1497                                    " where you want the new master to be.\n"
1498                                    "%s is already the master" %
1499                                    self.old_master)
1500
1501   def Exec(self, feedback_fn):
1502     """Failover the master node.
1503
1504     This command, when run on a non-master node, will cause the current
1505     master to cease being master, and the non-master to become new
1506     master.
1507
1508     """
1509     #TODO: do not rely on gethostname returning the FQDN
1510     logger.Info("setting master to %s, old master: %s" %
1511                 (self.new_master, self.old_master))
1512
1513     if not rpc.call_node_stop_master(self.old_master):
1514       logger.Error("could disable the master role on the old master"
1515                    " %s, please disable manually" % self.old_master)
1516
1517     ss = self.sstore
1518     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1519     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1520                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1521       logger.Error("could not distribute the new simple store master file"
1522                    " to the other nodes, please check.")
1523
1524     if not rpc.call_node_start_master(self.new_master):
1525       logger.Error("could not start the master role on the new master"
1526                    " %s, please check" % self.new_master)
1527       feedback_fn("Error in activating the master IP on the new master,\n"
1528                   "please fix manually.")
1529
1530
1531
1532 class LUQueryClusterInfo(NoHooksLU):
1533   """Query cluster configuration.
1534
1535   """
1536   _OP_REQP = []
1537   REQ_MASTER = False
1538
1539   def CheckPrereq(self):
1540     """No prerequsites needed for this LU.
1541
1542     """
1543     pass
1544
1545   def Exec(self, feedback_fn):
1546     """Return cluster config.
1547
1548     """
1549     result = {
1550       "name": self.sstore.GetClusterName(),
1551       "software_version": constants.RELEASE_VERSION,
1552       "protocol_version": constants.PROTOCOL_VERSION,
1553       "config_version": constants.CONFIG_VERSION,
1554       "os_api_version": constants.OS_API_VERSION,
1555       "export_version": constants.EXPORT_VERSION,
1556       "master": self.sstore.GetMasterNode(),
1557       "architecture": (platform.architecture()[0], platform.machine()),
1558       }
1559
1560     return result
1561
1562
1563 class LUClusterCopyFile(NoHooksLU):
1564   """Copy file to cluster.
1565
1566   """
1567   _OP_REQP = ["nodes", "filename"]
1568
1569   def CheckPrereq(self):
1570     """Check prerequisites.
1571
1572     It should check that the named file exists and that the given list
1573     of nodes is valid.
1574
1575     """
1576     if not os.path.exists(self.op.filename):
1577       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1578
1579     self.nodes = _GetWantedNodes(self, self.op.nodes)
1580
1581   def Exec(self, feedback_fn):
1582     """Copy a file from master to some nodes.
1583
1584     Args:
1585       opts - class with options as members
1586       args - list containing a single element, the file name
1587     Opts used:
1588       nodes - list containing the name of target nodes; if empty, all nodes
1589
1590     """
1591     filename = self.op.filename
1592
1593     myname = socket.gethostname()
1594
1595     for node in self.nodes:
1596       if node == myname:
1597         continue
1598       if not ssh.CopyFileToNode(node, filename):
1599         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1600
1601
1602 class LUDumpClusterConfig(NoHooksLU):
1603   """Return a text-representation of the cluster-config.
1604
1605   """
1606   _OP_REQP = []
1607
1608   def CheckPrereq(self):
1609     """No prerequisites.
1610
1611     """
1612     pass
1613
1614   def Exec(self, feedback_fn):
1615     """Dump a representation of the cluster config to the standard output.
1616
1617     """
1618     return self.cfg.DumpConfig()
1619
1620
1621 class LURunClusterCommand(NoHooksLU):
1622   """Run a command on some nodes.
1623
1624   """
1625   _OP_REQP = ["command", "nodes"]
1626
1627   def CheckPrereq(self):
1628     """Check prerequisites.
1629
1630     It checks that the given list of nodes is valid.
1631
1632     """
1633     self.nodes = _GetWantedNodes(self, self.op.nodes)
1634
1635   def Exec(self, feedback_fn):
1636     """Run a command on some nodes.
1637
1638     """
1639     data = []
1640     for node in self.nodes:
1641       result = utils.RunCmd(["ssh", node.name, self.op.command])
1642       data.append((node.name, result.cmd, result.output, result.exit_code))
1643
1644     return data
1645
1646
1647 class LUActivateInstanceDisks(NoHooksLU):
1648   """Bring up an instance's disks.
1649
1650   """
1651   _OP_REQP = ["instance_name"]
1652
1653   def CheckPrereq(self):
1654     """Check prerequisites.
1655
1656     This checks that the instance is in the cluster.
1657
1658     """
1659     instance = self.cfg.GetInstanceInfo(
1660       self.cfg.ExpandInstanceName(self.op.instance_name))
1661     if instance is None:
1662       raise errors.OpPrereqError, ("Instance '%s' not known" %
1663                                    self.op.instance_name)
1664     self.instance = instance
1665
1666
1667   def Exec(self, feedback_fn):
1668     """Activate the disks.
1669
1670     """
1671     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1672     if not disks_ok:
1673       raise errors.OpExecError, ("Cannot activate block devices")
1674
1675     return disks_info
1676
1677
1678 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1679   """Prepare the block devices for an instance.
1680
1681   This sets up the block devices on all nodes.
1682
1683   Args:
1684     instance: a ganeti.objects.Instance object
1685     ignore_secondaries: if true, errors on secondary nodes won't result
1686                         in an error return from the function
1687
1688   Returns:
1689     false if the operation failed
1690     list of (host, instance_visible_name, node_visible_name) if the operation
1691          suceeded with the mapping from node devices to instance devices
1692   """
1693   device_info = []
1694   disks_ok = True
1695   for inst_disk in instance.disks:
1696     master_result = None
1697     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1698       cfg.SetDiskID(node_disk, node)
1699       is_primary = node == instance.primary_node
1700       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1701       if not result:
1702         logger.Error("could not prepare block device %s on node %s (is_pri"
1703                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1704         if is_primary or not ignore_secondaries:
1705           disks_ok = False
1706       if is_primary:
1707         master_result = result
1708     device_info.append((instance.primary_node, inst_disk.iv_name,
1709                         master_result))
1710
1711   return disks_ok, device_info
1712
1713
1714 def _StartInstanceDisks(cfg, instance, force):
1715   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1716                                            ignore_secondaries=force)
1717   if not disks_ok:
1718     _ShutdownInstanceDisks(instance, cfg)
1719     if force is not None and not force:
1720       logger.Error("If the message above refers to a secondary node,"
1721                    " you can retry the operation using '--force'.")
1722     raise errors.OpExecError, ("Disk consistency error")
1723
1724
1725 class LUDeactivateInstanceDisks(NoHooksLU):
1726   """Shutdown an instance's disks.
1727
1728   """
1729   _OP_REQP = ["instance_name"]
1730
1731   def CheckPrereq(self):
1732     """Check prerequisites.
1733
1734     This checks that the instance is in the cluster.
1735
1736     """
1737     instance = self.cfg.GetInstanceInfo(
1738       self.cfg.ExpandInstanceName(self.op.instance_name))
1739     if instance is None:
1740       raise errors.OpPrereqError, ("Instance '%s' not known" %
1741                                    self.op.instance_name)
1742     self.instance = instance
1743
1744   def Exec(self, feedback_fn):
1745     """Deactivate the disks
1746
1747     """
1748     instance = self.instance
1749     ins_l = rpc.call_instance_list([instance.primary_node])
1750     ins_l = ins_l[instance.primary_node]
1751     if not type(ins_l) is list:
1752       raise errors.OpExecError, ("Can't contact node '%s'" %
1753                                  instance.primary_node)
1754
1755     if self.instance.name in ins_l:
1756       raise errors.OpExecError, ("Instance is running, can't shutdown"
1757                                  " block devices.")
1758
1759     _ShutdownInstanceDisks(instance, self.cfg)
1760
1761
1762 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1763   """Shutdown block devices of an instance.
1764
1765   This does the shutdown on all nodes of the instance.
1766
1767   If the ignore_primary is false, errors on the primary node are
1768   ignored.
1769
1770   """
1771   result = True
1772   for disk in instance.disks:
1773     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1774       cfg.SetDiskID(top_disk, node)
1775       if not rpc.call_blockdev_shutdown(node, top_disk):
1776         logger.Error("could not shutdown block device %s on node %s" %
1777                      (disk.iv_name, node))
1778         if not ignore_primary or node != instance.primary_node:
1779           result = False
1780   return result
1781
1782
1783 class LUStartupInstance(LogicalUnit):
1784   """Starts an instance.
1785
1786   """
1787   HPATH = "instance-start"
1788   HTYPE = constants.HTYPE_INSTANCE
1789   _OP_REQP = ["instance_name", "force"]
1790
1791   def BuildHooksEnv(self):
1792     """Build hooks env.
1793
1794     This runs on master, primary and secondary nodes of the instance.
1795
1796     """
1797     env = {
1798       "FORCE": self.op.force,
1799       }
1800     env.update(_BuildInstanceHookEnvByObject(self.instance))
1801     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1802           list(self.instance.secondary_nodes))
1803     return env, nl, nl
1804
1805   def CheckPrereq(self):
1806     """Check prerequisites.
1807
1808     This checks that the instance is in the cluster.
1809
1810     """
1811     instance = self.cfg.GetInstanceInfo(
1812       self.cfg.ExpandInstanceName(self.op.instance_name))
1813     if instance is None:
1814       raise errors.OpPrereqError, ("Instance '%s' not known" %
1815                                    self.op.instance_name)
1816
1817     # check bridges existance
1818     brlist = [nic.bridge for nic in instance.nics]
1819     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1820       raise errors.OpPrereqError, ("one or more target bridges %s does not"
1821                                    " exist on destination node '%s'" %
1822                                    (brlist, instance.primary_node))
1823
1824     self.instance = instance
1825     self.op.instance_name = instance.name
1826
1827   def Exec(self, feedback_fn):
1828     """Start the instance.
1829
1830     """
1831     instance = self.instance
1832     force = self.op.force
1833     extra_args = getattr(self.op, "extra_args", "")
1834
1835     node_current = instance.primary_node
1836
1837     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1838     if not nodeinfo:
1839       raise errors.OpExecError, ("Could not contact node %s for infos" %
1840                                  (node_current))
1841
1842     freememory = nodeinfo[node_current]['memory_free']
1843     memory = instance.memory
1844     if memory > freememory:
1845       raise errors.OpExecError, ("Not enough memory to start instance"
1846                                  " %s on node %s"
1847                                  " needed %s MiB, available %s MiB" %
1848                                  (instance.name, node_current, memory,
1849                                   freememory))
1850
1851     _StartInstanceDisks(self.cfg, instance, force)
1852
1853     if not rpc.call_instance_start(node_current, instance, extra_args):
1854       _ShutdownInstanceDisks(instance, self.cfg)
1855       raise errors.OpExecError, ("Could not start instance")
1856
1857     self.cfg.MarkInstanceUp(instance.name)
1858
1859
1860 class LUShutdownInstance(LogicalUnit):
1861   """Shutdown an instance.
1862
1863   """
1864   HPATH = "instance-stop"
1865   HTYPE = constants.HTYPE_INSTANCE
1866   _OP_REQP = ["instance_name"]
1867
1868   def BuildHooksEnv(self):
1869     """Build hooks env.
1870
1871     This runs on master, primary and secondary nodes of the instance.
1872
1873     """
1874     env = _BuildInstanceHookEnvByObject(self.instance)
1875     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1876           list(self.instance.secondary_nodes))
1877     return env, nl, nl
1878
1879   def CheckPrereq(self):
1880     """Check prerequisites.
1881
1882     This checks that the instance is in the cluster.
1883
1884     """
1885     instance = self.cfg.GetInstanceInfo(
1886       self.cfg.ExpandInstanceName(self.op.instance_name))
1887     if instance is None:
1888       raise errors.OpPrereqError, ("Instance '%s' not known" %
1889                                    self.op.instance_name)
1890     self.instance = instance
1891
1892   def Exec(self, feedback_fn):
1893     """Shutdown the instance.
1894
1895     """
1896     instance = self.instance
1897     node_current = instance.primary_node
1898     if not rpc.call_instance_shutdown(node_current, instance):
1899       logger.Error("could not shutdown instance")
1900
1901     self.cfg.MarkInstanceDown(instance.name)
1902     _ShutdownInstanceDisks(instance, self.cfg)
1903
1904
1905 class LUReinstallInstance(LogicalUnit):
1906   """Reinstall an instance.
1907
1908   """
1909   HPATH = "instance-reinstall"
1910   HTYPE = constants.HTYPE_INSTANCE
1911   _OP_REQP = ["instance_name"]
1912
1913   def BuildHooksEnv(self):
1914     """Build hooks env.
1915
1916     This runs on master, primary and secondary nodes of the instance.
1917
1918     """
1919     env = _BuildInstanceHookEnvByObject(self.instance)
1920     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1921           list(self.instance.secondary_nodes))
1922     return env, nl, nl
1923
1924   def CheckPrereq(self):
1925     """Check prerequisites.
1926
1927     This checks that the instance is in the cluster and is not running.
1928
1929     """
1930     instance = self.cfg.GetInstanceInfo(
1931       self.cfg.ExpandInstanceName(self.op.instance_name))
1932     if instance is None:
1933       raise errors.OpPrereqError, ("Instance '%s' not known" %
1934                                    self.op.instance_name)
1935     if instance.disk_template == constants.DT_DISKLESS:
1936       raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1937                                    self.op.instance_name)
1938     if instance.status != "down":
1939       raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1940                                    self.op.instance_name)
1941     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1942     if remote_info:
1943       raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1944                                    (self.op.instance_name,
1945                                     instance.primary_node))
1946
1947     self.op.os_type = getattr(self.op, "os_type", None)
1948     if self.op.os_type is not None:
1949       # OS verification
1950       pnode = self.cfg.GetNodeInfo(
1951         self.cfg.ExpandNodeName(instance.primary_node))
1952       if pnode is None:
1953         raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1954                                      self.op.pnode)
1955       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1956       if not isinstance(os_obj, objects.OS):
1957         raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1958                                      " primary node"  % self.op.os_type)
1959
1960     self.instance = instance
1961
1962   def Exec(self, feedback_fn):
1963     """Reinstall the instance.
1964
1965     """
1966     inst = self.instance
1967
1968     if self.op.os_type is not None:
1969       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1970       inst.os = self.op.os_type
1971       self.cfg.AddInstance(inst)
1972
1973     _StartInstanceDisks(self.cfg, inst, None)
1974     try:
1975       feedback_fn("Running the instance OS create scripts...")
1976       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1977         raise errors.OpExecError, ("Could not install OS for instance %s "
1978                                    "on node %s" %
1979                                    (inst.name, inst.primary_node))
1980     finally:
1981       _ShutdownInstanceDisks(inst, self.cfg)
1982
1983
1984 class LURemoveInstance(LogicalUnit):
1985   """Remove an instance.
1986
1987   """
1988   HPATH = "instance-remove"
1989   HTYPE = constants.HTYPE_INSTANCE
1990   _OP_REQP = ["instance_name"]
1991
1992   def BuildHooksEnv(self):
1993     """Build hooks env.
1994
1995     This runs on master, primary and secondary nodes of the instance.
1996
1997     """
1998     env = _BuildInstanceHookEnvByObject(self.instance)
1999     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2000           list(self.instance.secondary_nodes))
2001     return env, nl, nl
2002
2003   def CheckPrereq(self):
2004     """Check prerequisites.
2005
2006     This checks that the instance is in the cluster.
2007
2008     """
2009     instance = self.cfg.GetInstanceInfo(
2010       self.cfg.ExpandInstanceName(self.op.instance_name))
2011     if instance is None:
2012       raise errors.OpPrereqError, ("Instance '%s' not known" %
2013                                    self.op.instance_name)
2014     self.instance = instance
2015
2016   def Exec(self, feedback_fn):
2017     """Remove the instance.
2018
2019     """
2020     instance = self.instance
2021     logger.Info("shutting down instance %s on node %s" %
2022                 (instance.name, instance.primary_node))
2023
2024     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2025       raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
2026                                  (instance.name, instance.primary_node))
2027
2028     logger.Info("removing block devices for instance %s" % instance.name)
2029
2030     _RemoveDisks(instance, self.cfg)
2031
2032     logger.Info("removing instance %s out of cluster config" % instance.name)
2033
2034     self.cfg.RemoveInstance(instance.name)
2035
2036
2037 class LUQueryInstances(NoHooksLU):
2038   """Logical unit for querying instances.
2039
2040   """
2041   _OP_REQP = ["output_fields"]
2042
2043   def CheckPrereq(self):
2044     """Check prerequisites.
2045
2046     This checks that the fields required are valid output fields.
2047
2048     """
2049     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2050     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2051                                "admin_state", "admin_ram",
2052                                "disk_template", "ip", "mac", "bridge"],
2053                        dynamic=self.dynamic_fields,
2054                        selected=self.op.output_fields)
2055
2056   def Exec(self, feedback_fn):
2057     """Computes the list of nodes and their attributes.
2058
2059     """
2060     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2061     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2062                      in instance_names]
2063
2064     # begin data gathering
2065
2066     nodes = frozenset([inst.primary_node for inst in instance_list])
2067
2068     bad_nodes = []
2069     if self.dynamic_fields.intersection(self.op.output_fields):
2070       live_data = {}
2071       node_data = rpc.call_all_instances_info(nodes)
2072       for name in nodes:
2073         result = node_data[name]
2074         if result:
2075           live_data.update(result)
2076         elif result == False:
2077           bad_nodes.append(name)
2078         # else no instance is alive
2079     else:
2080       live_data = dict([(name, {}) for name in instance_names])
2081
2082     # end data gathering
2083
2084     output = []
2085     for instance in instance_list:
2086       iout = []
2087       for field in self.op.output_fields:
2088         if field == "name":
2089           val = instance.name
2090         elif field == "os":
2091           val = instance.os
2092         elif field == "pnode":
2093           val = instance.primary_node
2094         elif field == "snodes":
2095           val = ",".join(instance.secondary_nodes) or "-"
2096         elif field == "admin_state":
2097           if instance.status == "down":
2098             val = "no"
2099           else:
2100             val = "yes"
2101         elif field == "oper_state":
2102           if instance.primary_node in bad_nodes:
2103             val = "(node down)"
2104           else:
2105             if live_data.get(instance.name):
2106               val = "running"
2107             else:
2108               val = "stopped"
2109         elif field == "admin_ram":
2110           val = instance.memory
2111         elif field == "oper_ram":
2112           if instance.primary_node in bad_nodes:
2113             val = "(node down)"
2114           elif instance.name in live_data:
2115             val = live_data[instance.name].get("memory", "?")
2116           else:
2117             val = "-"
2118         elif field == "disk_template":
2119           val = instance.disk_template
2120         elif field == "ip":
2121           val = instance.nics[0].ip
2122         elif field == "bridge":
2123           val = instance.nics[0].bridge
2124         elif field == "mac":
2125           val = instance.nics[0].mac
2126         else:
2127           raise errors.ParameterError, field
2128         val = str(val)
2129         iout.append(val)
2130       output.append(iout)
2131
2132     return output
2133
2134
2135 class LUFailoverInstance(LogicalUnit):
2136   """Failover an instance.
2137
2138   """
2139   HPATH = "instance-failover"
2140   HTYPE = constants.HTYPE_INSTANCE
2141   _OP_REQP = ["instance_name", "ignore_consistency"]
2142
2143   def BuildHooksEnv(self):
2144     """Build hooks env.
2145
2146     This runs on master, primary and secondary nodes of the instance.
2147
2148     """
2149     env = {
2150       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2151       }
2152     env.update(_BuildInstanceHookEnvByObject(self.instance))
2153     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2154     return env, nl, nl
2155
2156   def CheckPrereq(self):
2157     """Check prerequisites.
2158
2159     This checks that the instance is in the cluster.
2160
2161     """
2162     instance = self.cfg.GetInstanceInfo(
2163       self.cfg.ExpandInstanceName(self.op.instance_name))
2164     if instance is None:
2165       raise errors.OpPrereqError, ("Instance '%s' not known" %
2166                                    self.op.instance_name)
2167
2168     # check memory requirements on the secondary node
2169     target_node = instance.secondary_nodes[0]
2170     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2171     info = nodeinfo.get(target_node, None)
2172     if not info:
2173       raise errors.OpPrereqError, ("Cannot get current information"
2174                                    " from node '%s'" % nodeinfo)
2175     if instance.memory > info['memory_free']:
2176       raise errors.OpPrereqError, ("Not enough memory on target node %s."
2177                                    " %d MB available, %d MB required" %
2178                                    (target_node, info['memory_free'],
2179                                     instance.memory))
2180
2181     # check bridge existance
2182     brlist = [nic.bridge for nic in instance.nics]
2183     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2184       raise errors.OpPrereqError, ("one or more target bridges %s does not"
2185                                    " exist on destination node '%s'" %
2186                                    (brlist, instance.primary_node))
2187
2188     self.instance = instance
2189
2190   def Exec(self, feedback_fn):
2191     """Failover an instance.
2192
2193     The failover is done by shutting it down on its present node and
2194     starting it on the secondary.
2195
2196     """
2197     instance = self.instance
2198
2199     source_node = instance.primary_node
2200     target_node = instance.secondary_nodes[0]
2201
2202     feedback_fn("* checking disk consistency between source and target")
2203     for dev in instance.disks:
2204       # for remote_raid1, these are md over drbd
2205       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2206         if not self.op.ignore_consistency:
2207           raise errors.OpExecError, ("Disk %s is degraded on target node,"
2208                                      " aborting failover." % dev.iv_name)
2209
2210     feedback_fn("* checking target node resource availability")
2211     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2212
2213     if not nodeinfo:
2214       raise errors.OpExecError, ("Could not contact target node %s." %
2215                                  target_node)
2216
2217     free_memory = int(nodeinfo[target_node]['memory_free'])
2218     memory = instance.memory
2219     if memory > free_memory:
2220       raise errors.OpExecError, ("Not enough memory to create instance %s on"
2221                                  " node %s. needed %s MiB, available %s MiB" %
2222                                  (instance.name, target_node, memory,
2223                                   free_memory))
2224
2225     feedback_fn("* shutting down instance on source node")
2226     logger.Info("Shutting down instance %s on node %s" %
2227                 (instance.name, source_node))
2228
2229     if not rpc.call_instance_shutdown(source_node, instance):
2230       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2231                    " anyway. Please make sure node %s is down"  %
2232                    (instance.name, source_node, source_node))
2233
2234     feedback_fn("* deactivating the instance's disks on source node")
2235     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2236       raise errors.OpExecError, ("Can't shut down the instance's disks.")
2237
2238     instance.primary_node = target_node
2239     # distribute new instance config to the other nodes
2240     self.cfg.AddInstance(instance)
2241
2242     feedback_fn("* activating the instance's disks on target node")
2243     logger.Info("Starting instance %s on node %s" %
2244                 (instance.name, target_node))
2245
2246     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2247                                              ignore_secondaries=True)
2248     if not disks_ok:
2249       _ShutdownInstanceDisks(instance, self.cfg)
2250       raise errors.OpExecError, ("Can't activate the instance's disks")
2251
2252     feedback_fn("* starting the instance on the target node")
2253     if not rpc.call_instance_start(target_node, instance, None):
2254       _ShutdownInstanceDisks(instance, self.cfg)
2255       raise errors.OpExecError("Could not start instance %s on node %s." %
2256                                (instance.name, target_node))
2257
2258
2259 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2260   """Create a tree of block devices on the primary node.
2261
2262   This always creates all devices.
2263
2264   """
2265   if device.children:
2266     for child in device.children:
2267       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2268         return False
2269
2270   cfg.SetDiskID(device, node)
2271   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2272   if not new_id:
2273     return False
2274   if device.physical_id is None:
2275     device.physical_id = new_id
2276   return True
2277
2278
2279 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2280   """Create a tree of block devices on a secondary node.
2281
2282   If this device type has to be created on secondaries, create it and
2283   all its children.
2284
2285   If not, just recurse to children keeping the same 'force' value.
2286
2287   """
2288   if device.CreateOnSecondary():
2289     force = True
2290   if device.children:
2291     for child in device.children:
2292       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2293         return False
2294
2295   if not force:
2296     return True
2297   cfg.SetDiskID(device, node)
2298   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2299   if not new_id:
2300     return False
2301   if device.physical_id is None:
2302     device.physical_id = new_id
2303   return True
2304
2305
2306 def _GenerateUniqueNames(cfg, exts):
2307   """Generate a suitable LV name.
2308
2309   This will generate a logical volume name for the given instance.
2310
2311   """
2312   results = []
2313   for val in exts:
2314     new_id = cfg.GenerateUniqueID()
2315     results.append("%s%s" % (new_id, val))
2316   return results
2317
2318
2319 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2320   """Generate a drbd device complete with its children.
2321
2322   """
2323   port = cfg.AllocatePort()
2324   vgname = cfg.GetVGName()
2325   dev_data = objects.Disk(dev_type="lvm", size=size,
2326                           logical_id=(vgname, names[0]))
2327   dev_meta = objects.Disk(dev_type="lvm", size=128,
2328                           logical_id=(vgname, names[1]))
2329   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2330                           logical_id = (primary, secondary, port),
2331                           children = [dev_data, dev_meta])
2332   return drbd_dev
2333
2334
2335 def _GenerateDiskTemplate(cfg, template_name,
2336                           instance_name, primary_node,
2337                           secondary_nodes, disk_sz, swap_sz):
2338   """Generate the entire disk layout for a given template type.
2339
2340   """
2341   #TODO: compute space requirements
2342
2343   vgname = cfg.GetVGName()
2344   if template_name == "diskless":
2345     disks = []
2346   elif template_name == "plain":
2347     if len(secondary_nodes) != 0:
2348       raise errors.ProgrammerError("Wrong template configuration")
2349
2350     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2351     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2352                            logical_id=(vgname, names[0]),
2353                            iv_name = "sda")
2354     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2355                            logical_id=(vgname, names[1]),
2356                            iv_name = "sdb")
2357     disks = [sda_dev, sdb_dev]
2358   elif template_name == "local_raid1":
2359     if len(secondary_nodes) != 0:
2360       raise errors.ProgrammerError("Wrong template configuration")
2361
2362
2363     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2364                                        ".sdb_m1", ".sdb_m2"])
2365     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2366                               logical_id=(vgname, names[0]))
2367     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2368                               logical_id=(vgname, names[1]))
2369     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2370                               size=disk_sz,
2371                               children = [sda_dev_m1, sda_dev_m2])
2372     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2373                               logical_id=(vgname, names[2]))
2374     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2375                               logical_id=(vgname, names[3]))
2376     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2377                               size=swap_sz,
2378                               children = [sdb_dev_m1, sdb_dev_m2])
2379     disks = [md_sda_dev, md_sdb_dev]
2380   elif template_name == "remote_raid1":
2381     if len(secondary_nodes) != 1:
2382       raise errors.ProgrammerError("Wrong template configuration")
2383     remote_node = secondary_nodes[0]
2384     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2385                                        ".sdb_data", ".sdb_meta"])
2386     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2387                                          disk_sz, names[0:2])
2388     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2389                               children = [drbd_sda_dev], size=disk_sz)
2390     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2391                                          swap_sz, names[2:4])
2392     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2393                               children = [drbd_sdb_dev], size=swap_sz)
2394     disks = [md_sda_dev, md_sdb_dev]
2395   else:
2396     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2397   return disks
2398
2399
2400 def _GetInstanceInfoText(instance):
2401   return "originstname+%s" % instance.name
2402
2403
2404 def _CreateDisks(cfg, instance):
2405   """Create all disks for an instance.
2406
2407   This abstracts away some work from AddInstance.
2408
2409   Args:
2410     instance: the instance object
2411
2412   Returns:
2413     True or False showing the success of the creation process
2414
2415   """
2416   info = _GetInstanceInfoText(instance)
2417
2418   for device in instance.disks:
2419     logger.Info("creating volume %s for instance %s" %
2420               (device.iv_name, instance.name))
2421     #HARDCODE
2422     for secondary_node in instance.secondary_nodes:
2423       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2424                                         info):
2425         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2426                      (device.iv_name, device, secondary_node))
2427         return False
2428     #HARDCODE
2429     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2430       logger.Error("failed to create volume %s on primary!" %
2431                    device.iv_name)
2432       return False
2433   return True
2434
2435
2436 def _RemoveDisks(instance, cfg):
2437   """Remove all disks for an instance.
2438
2439   This abstracts away some work from `AddInstance()` and
2440   `RemoveInstance()`. Note that in case some of the devices couldn't
2441   be remove, the removal will continue with the other ones (compare
2442   with `_CreateDisks()`).
2443
2444   Args:
2445     instance: the instance object
2446
2447   Returns:
2448     True or False showing the success of the removal proces
2449
2450   """
2451   logger.Info("removing block devices for instance %s" % instance.name)
2452
2453   result = True
2454   for device in instance.disks:
2455     for node, disk in device.ComputeNodeTree(instance.primary_node):
2456       cfg.SetDiskID(disk, node)
2457       if not rpc.call_blockdev_remove(node, disk):
2458         logger.Error("could not remove block device %s on node %s,"
2459                      " continuing anyway" %
2460                      (device.iv_name, node))
2461         result = False
2462   return result
2463
2464
2465 class LUCreateInstance(LogicalUnit):
2466   """Create an instance.
2467
2468   """
2469   HPATH = "instance-add"
2470   HTYPE = constants.HTYPE_INSTANCE
2471   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2472               "disk_template", "swap_size", "mode", "start", "vcpus",
2473               "wait_for_sync"]
2474
2475   def BuildHooksEnv(self):
2476     """Build hooks env.
2477
2478     This runs on master, primary and secondary nodes of the instance.
2479
2480     """
2481     env = {
2482       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2483       "INSTANCE_DISK_SIZE": self.op.disk_size,
2484       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2485       "INSTANCE_ADD_MODE": self.op.mode,
2486       }
2487     if self.op.mode == constants.INSTANCE_IMPORT:
2488       env["INSTANCE_SRC_NODE"] = self.op.src_node
2489       env["INSTANCE_SRC_PATH"] = self.op.src_path
2490       env["INSTANCE_SRC_IMAGE"] = self.src_image
2491
2492     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2493       primary_node=self.op.pnode,
2494       secondary_nodes=self.secondaries,
2495       status=self.instance_status,
2496       os_type=self.op.os_type,
2497       memory=self.op.mem_size,
2498       vcpus=self.op.vcpus,
2499       nics=[(self.inst_ip, self.op.bridge)],
2500     ))
2501
2502     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2503           self.secondaries)
2504     return env, nl, nl
2505
2506
2507   def CheckPrereq(self):
2508     """Check prerequisites.
2509
2510     """
2511     if self.op.mode not in (constants.INSTANCE_CREATE,
2512                             constants.INSTANCE_IMPORT):
2513       raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2514                                    self.op.mode)
2515
2516     if self.op.mode == constants.INSTANCE_IMPORT:
2517       src_node = getattr(self.op, "src_node", None)
2518       src_path = getattr(self.op, "src_path", None)
2519       if src_node is None or src_path is None:
2520         raise errors.OpPrereqError, ("Importing an instance requires source"
2521                                      " node and path options")
2522       src_node_full = self.cfg.ExpandNodeName(src_node)
2523       if src_node_full is None:
2524         raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2525       self.op.src_node = src_node = src_node_full
2526
2527       if not os.path.isabs(src_path):
2528         raise errors.OpPrereqError, ("The source path must be absolute")
2529
2530       export_info = rpc.call_export_info(src_node, src_path)
2531
2532       if not export_info:
2533         raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2534
2535       if not export_info.has_section(constants.INISECT_EXP):
2536         raise errors.ProgrammerError, ("Corrupted export config")
2537
2538       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2539       if (int(ei_version) != constants.EXPORT_VERSION):
2540         raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2541                                      (ei_version, constants.EXPORT_VERSION))
2542
2543       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2544         raise errors.OpPrereqError, ("Can't import instance with more than"
2545                                      " one data disk")
2546
2547       # FIXME: are the old os-es, disk sizes, etc. useful?
2548       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2549       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2550                                                          'disk0_dump'))
2551       self.src_image = diskimage
2552     else: # INSTANCE_CREATE
2553       if getattr(self.op, "os_type", None) is None:
2554         raise errors.OpPrereqError, ("No guest OS specified")
2555
2556     # check primary node
2557     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2558     if pnode is None:
2559       raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2560                                    self.op.pnode)
2561     self.op.pnode = pnode.name
2562     self.pnode = pnode
2563     self.secondaries = []
2564     # disk template and mirror node verification
2565     if self.op.disk_template not in constants.DISK_TEMPLATES:
2566       raise errors.OpPrereqError, ("Invalid disk template name")
2567
2568     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2569       if getattr(self.op, "snode", None) is None:
2570         raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2571                                      " a mirror node")
2572
2573       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2574       if snode_name is None:
2575         raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2576                                      self.op.snode)
2577       elif snode_name == pnode.name:
2578         raise errors.OpPrereqError, ("The secondary node cannot be"
2579                                      " the primary node.")
2580       self.secondaries.append(snode_name)
2581
2582     # Check lv size requirements
2583     nodenames = [pnode.name] + self.secondaries
2584     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2585
2586     # Required free disk space as a function of disk and swap space
2587     req_size_dict = {
2588       constants.DT_DISKLESS: 0,
2589       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2590       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2591       # 256 MB are added for drbd metadata, 128MB for each drbd device
2592       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2593     }
2594
2595     if self.op.disk_template not in req_size_dict:
2596       raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2597                                      " is unknown" %  self.op.disk_template)
2598
2599     req_size = req_size_dict[self.op.disk_template]
2600
2601     for node in nodenames:
2602       info = nodeinfo.get(node, None)
2603       if not info:
2604         raise errors.OpPrereqError, ("Cannot get current information"
2605                                      " from node '%s'" % nodeinfo)
2606       if req_size > info['vg_free']:
2607         raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2608                                      " %d MB available, %d MB required" %
2609                                      (node, info['vg_free'], req_size))
2610
2611     # os verification
2612     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2613     if not isinstance(os_obj, objects.OS):
2614       raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2615                                    " primary node"  % self.op.os_type)
2616
2617     # instance verification
2618     hostname1 = utils.LookupHostname(self.op.instance_name)
2619     if not hostname1:
2620       raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2621                                    self.op.instance_name)
2622
2623     self.op.instance_name = instance_name = hostname1['hostname']
2624     instance_list = self.cfg.GetInstanceList()
2625     if instance_name in instance_list:
2626       raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2627                                    instance_name)
2628
2629     ip = getattr(self.op, "ip", None)
2630     if ip is None or ip.lower() == "none":
2631       inst_ip = None
2632     elif ip.lower() == "auto":
2633       inst_ip = hostname1['ip']
2634     else:
2635       if not utils.IsValidIP(ip):
2636         raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2637                                      " like a valid IP" % ip)
2638       inst_ip = ip
2639     self.inst_ip = inst_ip
2640
2641     command = ["fping", "-q", hostname1['ip']]
2642     result = utils.RunCmd(command)
2643     if not result.failed:
2644       raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2645                                    (hostname1['ip'], instance_name))
2646
2647     # bridge verification
2648     bridge = getattr(self.op, "bridge", None)
2649     if bridge is None:
2650       self.op.bridge = self.cfg.GetDefBridge()
2651     else:
2652       self.op.bridge = bridge
2653
2654     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2655       raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2656                                    " destination node '%s'" %
2657                                    (self.op.bridge, pnode.name))
2658
2659     if self.op.start:
2660       self.instance_status = 'up'
2661     else:
2662       self.instance_status = 'down'
2663
2664   def Exec(self, feedback_fn):
2665     """Create and add the instance to the cluster.
2666
2667     """
2668     instance = self.op.instance_name
2669     pnode_name = self.pnode.name
2670
2671     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2672     if self.inst_ip is not None:
2673       nic.ip = self.inst_ip
2674
2675     disks = _GenerateDiskTemplate(self.cfg,
2676                                   self.op.disk_template,
2677                                   instance, pnode_name,
2678                                   self.secondaries, self.op.disk_size,
2679                                   self.op.swap_size)
2680
2681     iobj = objects.Instance(name=instance, os=self.op.os_type,
2682                             primary_node=pnode_name,
2683                             memory=self.op.mem_size,
2684                             vcpus=self.op.vcpus,
2685                             nics=[nic], disks=disks,
2686                             disk_template=self.op.disk_template,
2687                             status=self.instance_status,
2688                             )
2689
2690     feedback_fn("* creating instance disks...")
2691     if not _CreateDisks(self.cfg, iobj):
2692       _RemoveDisks(iobj, self.cfg)
2693       raise errors.OpExecError, ("Device creation failed, reverting...")
2694
2695     feedback_fn("adding instance %s to cluster config" % instance)
2696
2697     self.cfg.AddInstance(iobj)
2698
2699     if self.op.wait_for_sync:
2700       disk_abort = not _WaitForSync(self.cfg, iobj)
2701     elif iobj.disk_template == "remote_raid1":
2702       # make sure the disks are not degraded (still sync-ing is ok)
2703       time.sleep(15)
2704       feedback_fn("* checking mirrors status")
2705       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2706     else:
2707       disk_abort = False
2708
2709     if disk_abort:
2710       _RemoveDisks(iobj, self.cfg)
2711       self.cfg.RemoveInstance(iobj.name)
2712       raise errors.OpExecError, ("There are some degraded disks for"
2713                                       " this instance")
2714
2715     feedback_fn("creating os for instance %s on node %s" %
2716                 (instance, pnode_name))
2717
2718     if iobj.disk_template != constants.DT_DISKLESS:
2719       if self.op.mode == constants.INSTANCE_CREATE:
2720         feedback_fn("* running the instance OS create scripts...")
2721         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2722           raise errors.OpExecError, ("could not add os for instance %s"
2723                                           " on node %s" %
2724                                           (instance, pnode_name))
2725
2726       elif self.op.mode == constants.INSTANCE_IMPORT:
2727         feedback_fn("* running the instance OS import scripts...")
2728         src_node = self.op.src_node
2729         src_image = self.src_image
2730         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2731                                                 src_node, src_image):
2732           raise errors.OpExecError, ("Could not import os for instance"
2733                                           " %s on node %s" %
2734                                           (instance, pnode_name))
2735       else:
2736         # also checked in the prereq part
2737         raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2738                                        % self.op.mode)
2739
2740     if self.op.start:
2741       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2742       feedback_fn("* starting instance...")
2743       if not rpc.call_instance_start(pnode_name, iobj, None):
2744         raise errors.OpExecError, ("Could not start instance")
2745
2746
2747 class LUConnectConsole(NoHooksLU):
2748   """Connect to an instance's console.
2749
2750   This is somewhat special in that it returns the command line that
2751   you need to run on the master node in order to connect to the
2752   console.
2753
2754   """
2755   _OP_REQP = ["instance_name"]
2756
2757   def CheckPrereq(self):
2758     """Check prerequisites.
2759
2760     This checks that the instance is in the cluster.
2761
2762     """
2763     instance = self.cfg.GetInstanceInfo(
2764       self.cfg.ExpandInstanceName(self.op.instance_name))
2765     if instance is None:
2766       raise errors.OpPrereqError, ("Instance '%s' not known" %
2767                                    self.op.instance_name)
2768     self.instance = instance
2769
2770   def Exec(self, feedback_fn):
2771     """Connect to the console of an instance
2772
2773     """
2774     instance = self.instance
2775     node = instance.primary_node
2776
2777     node_insts = rpc.call_instance_list([node])[node]
2778     if node_insts is False:
2779       raise errors.OpExecError, ("Can't connect to node %s." % node)
2780
2781     if instance.name not in node_insts:
2782       raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2783
2784     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2785
2786     hyper = hypervisor.GetHypervisor()
2787     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2788     return node, console_cmd
2789
2790
2791 class LUAddMDDRBDComponent(LogicalUnit):
2792   """Adda new mirror member to an instance's disk.
2793
2794   """
2795   HPATH = "mirror-add"
2796   HTYPE = constants.HTYPE_INSTANCE
2797   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2798
2799   def BuildHooksEnv(self):
2800     """Build hooks env.
2801
2802     This runs on the master, the primary and all the secondaries.
2803
2804     """
2805     env = {
2806       "NEW_SECONDARY": self.op.remote_node,
2807       "DISK_NAME": self.op.disk_name,
2808       }
2809     env.update(_BuildInstanceHookEnvByObject(self.instance))
2810     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2811           self.op.remote_node,] + list(self.instance.secondary_nodes)
2812     return env, nl, nl
2813
2814   def CheckPrereq(self):
2815     """Check prerequisites.
2816
2817     This checks that the instance is in the cluster.
2818
2819     """
2820     instance = self.cfg.GetInstanceInfo(
2821       self.cfg.ExpandInstanceName(self.op.instance_name))
2822     if instance is None:
2823       raise errors.OpPrereqError, ("Instance '%s' not known" %
2824                                    self.op.instance_name)
2825     self.instance = instance
2826
2827     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2828     if remote_node is None:
2829       raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2830     self.remote_node = remote_node
2831
2832     if remote_node == instance.primary_node:
2833       raise errors.OpPrereqError, ("The specified node is the primary node of"
2834                                    " the instance.")
2835
2836     if instance.disk_template != constants.DT_REMOTE_RAID1:
2837       raise errors.OpPrereqError, ("Instance's disk layout is not"
2838                                    " remote_raid1.")
2839     for disk in instance.disks:
2840       if disk.iv_name == self.op.disk_name:
2841         break
2842     else:
2843       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2844                                    " instance." % self.op.disk_name)
2845     if len(disk.children) > 1:
2846       raise errors.OpPrereqError, ("The device already has two slave"
2847                                    " devices.\n"
2848                                    "This would create a 3-disk raid1"
2849                                    " which we don't allow.")
2850     self.disk = disk
2851
2852   def Exec(self, feedback_fn):
2853     """Add the mirror component
2854
2855     """
2856     disk = self.disk
2857     instance = self.instance
2858
2859     remote_node = self.remote_node
2860     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2861     names = _GenerateUniqueNames(self.cfg, lv_names)
2862     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2863                                      remote_node, disk.size, names)
2864
2865     logger.Info("adding new mirror component on secondary")
2866     #HARDCODE
2867     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2868                                       _GetInstanceInfoText(instance)):
2869       raise errors.OpExecError, ("Failed to create new component on secondary"
2870                                  " node %s" % remote_node)
2871
2872     logger.Info("adding new mirror component on primary")
2873     #HARDCODE
2874     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2875                                     _GetInstanceInfoText(instance)):
2876       # remove secondary dev
2877       self.cfg.SetDiskID(new_drbd, remote_node)
2878       rpc.call_blockdev_remove(remote_node, new_drbd)
2879       raise errors.OpExecError, ("Failed to create volume on primary")
2880
2881     # the device exists now
2882     # call the primary node to add the mirror to md
2883     logger.Info("adding new mirror component to md")
2884     if not rpc.call_blockdev_addchild(instance.primary_node,
2885                                            disk, new_drbd):
2886       logger.Error("Can't add mirror compoment to md!")
2887       self.cfg.SetDiskID(new_drbd, remote_node)
2888       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2889         logger.Error("Can't rollback on secondary")
2890       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2891       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2892         logger.Error("Can't rollback on primary")
2893       raise errors.OpExecError, "Can't add mirror component to md array"
2894
2895     disk.children.append(new_drbd)
2896
2897     self.cfg.AddInstance(instance)
2898
2899     _WaitForSync(self.cfg, instance)
2900
2901     return 0
2902
2903
2904 class LURemoveMDDRBDComponent(LogicalUnit):
2905   """Remove a component from a remote_raid1 disk.
2906
2907   """
2908   HPATH = "mirror-remove"
2909   HTYPE = constants.HTYPE_INSTANCE
2910   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2911
2912   def BuildHooksEnv(self):
2913     """Build hooks env.
2914
2915     This runs on the master, the primary and all the secondaries.
2916
2917     """
2918     env = {
2919       "DISK_NAME": self.op.disk_name,
2920       "DISK_ID": self.op.disk_id,
2921       "OLD_SECONDARY": self.old_secondary,
2922       }
2923     env.update(_BuildInstanceHookEnvByObject(self.instance))
2924     nl = [self.sstore.GetMasterNode(),
2925           self.instance.primary_node] + list(self.instance.secondary_nodes)
2926     return env, nl, nl
2927
2928   def CheckPrereq(self):
2929     """Check prerequisites.
2930
2931     This checks that the instance is in the cluster.
2932
2933     """
2934     instance = self.cfg.GetInstanceInfo(
2935       self.cfg.ExpandInstanceName(self.op.instance_name))
2936     if instance is None:
2937       raise errors.OpPrereqError, ("Instance '%s' not known" %
2938                                    self.op.instance_name)
2939     self.instance = instance
2940
2941     if instance.disk_template != constants.DT_REMOTE_RAID1:
2942       raise errors.OpPrereqError, ("Instance's disk layout is not"
2943                                    " remote_raid1.")
2944     for disk in instance.disks:
2945       if disk.iv_name == self.op.disk_name:
2946         break
2947     else:
2948       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2949                                    " instance." % self.op.disk_name)
2950     for child in disk.children:
2951       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2952         break
2953     else:
2954       raise errors.OpPrereqError, ("Can't find the device with this port.")
2955
2956     if len(disk.children) < 2:
2957       raise errors.OpPrereqError, ("Cannot remove the last component from"
2958                                    " a mirror.")
2959     self.disk = disk
2960     self.child = child
2961     if self.child.logical_id[0] == instance.primary_node:
2962       oid = 1
2963     else:
2964       oid = 0
2965     self.old_secondary = self.child.logical_id[oid]
2966
2967   def Exec(self, feedback_fn):
2968     """Remove the mirror component
2969
2970     """
2971     instance = self.instance
2972     disk = self.disk
2973     child = self.child
2974     logger.Info("remove mirror component")
2975     self.cfg.SetDiskID(disk, instance.primary_node)
2976     if not rpc.call_blockdev_removechild(instance.primary_node,
2977                                               disk, child):
2978       raise errors.OpExecError, ("Can't remove child from mirror.")
2979
2980     for node in child.logical_id[:2]:
2981       self.cfg.SetDiskID(child, node)
2982       if not rpc.call_blockdev_remove(node, child):
2983         logger.Error("Warning: failed to remove device from node %s,"
2984                      " continuing operation." % node)
2985
2986     disk.children.remove(child)
2987     self.cfg.AddInstance(instance)
2988
2989
2990 class LUReplaceDisks(LogicalUnit):
2991   """Replace the disks of an instance.
2992
2993   """
2994   HPATH = "mirrors-replace"
2995   HTYPE = constants.HTYPE_INSTANCE
2996   _OP_REQP = ["instance_name"]
2997
2998   def BuildHooksEnv(self):
2999     """Build hooks env.
3000
3001     This runs on the master, the primary and all the secondaries.
3002
3003     """
3004     env = {
3005       "NEW_SECONDARY": self.op.remote_node,
3006       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3007       }
3008     env.update(_BuildInstanceHookEnvByObject(self.instance))
3009     nl = [self.sstore.GetMasterNode(),
3010           self.instance.primary_node] + list(self.instance.secondary_nodes)
3011     return env, nl, nl
3012
3013   def CheckPrereq(self):
3014     """Check prerequisites.
3015
3016     This checks that the instance is in the cluster.
3017
3018     """
3019     instance = self.cfg.GetInstanceInfo(
3020       self.cfg.ExpandInstanceName(self.op.instance_name))
3021     if instance is None:
3022       raise errors.OpPrereqError, ("Instance '%s' not known" %
3023                                    self.op.instance_name)
3024     self.instance = instance
3025
3026     if instance.disk_template != constants.DT_REMOTE_RAID1:
3027       raise errors.OpPrereqError, ("Instance's disk layout is not"
3028                                    " remote_raid1.")
3029
3030     if len(instance.secondary_nodes) != 1:
3031       raise errors.OpPrereqError, ("The instance has a strange layout,"
3032                                    " expected one secondary but found %d" %
3033                                    len(instance.secondary_nodes))
3034
3035     remote_node = getattr(self.op, "remote_node", None)
3036     if remote_node is None:
3037       remote_node = instance.secondary_nodes[0]
3038     else:
3039       remote_node = self.cfg.ExpandNodeName(remote_node)
3040       if remote_node is None:
3041         raise errors.OpPrereqError, ("Node '%s' not known" %
3042                                      self.op.remote_node)
3043     if remote_node == instance.primary_node:
3044       raise errors.OpPrereqError, ("The specified node is the primary node of"
3045                                    " the instance.")
3046     self.op.remote_node = remote_node
3047
3048   def Exec(self, feedback_fn):
3049     """Replace the disks of an instance.
3050
3051     """
3052     instance = self.instance
3053     iv_names = {}
3054     # start of work
3055     remote_node = self.op.remote_node
3056     cfg = self.cfg
3057     vgname = cfg.GetVGName()
3058     for dev in instance.disks:
3059       size = dev.size
3060       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3061       names = _GenerateUniqueNames(cfg, lv_names)
3062       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3063                                        remote_node, size, names)
3064       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3065       logger.Info("adding new mirror component on secondary for %s" %
3066                   dev.iv_name)
3067       #HARDCODE
3068       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3069                                         _GetInstanceInfoText(instance)):
3070         raise errors.OpExecError, ("Failed to create new component on"
3071                                    " secondary node %s\n"
3072                                    "Full abort, cleanup manually!" %
3073                                    remote_node)
3074
3075       logger.Info("adding new mirror component on primary")
3076       #HARDCODE
3077       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3078                                       _GetInstanceInfoText(instance)):
3079         # remove secondary dev
3080         cfg.SetDiskID(new_drbd, remote_node)
3081         rpc.call_blockdev_remove(remote_node, new_drbd)
3082         raise errors.OpExecError("Failed to create volume on primary!\n"
3083                                  "Full abort, cleanup manually!!")
3084
3085       # the device exists now
3086       # call the primary node to add the mirror to md
3087       logger.Info("adding new mirror component to md")
3088       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3089                                         new_drbd):
3090         logger.Error("Can't add mirror compoment to md!")
3091         cfg.SetDiskID(new_drbd, remote_node)
3092         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3093           logger.Error("Can't rollback on secondary")
3094         cfg.SetDiskID(new_drbd, instance.primary_node)
3095         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3096           logger.Error("Can't rollback on primary")
3097         raise errors.OpExecError, ("Full abort, cleanup manually!!")
3098
3099       dev.children.append(new_drbd)
3100       cfg.AddInstance(instance)
3101
3102     # this can fail as the old devices are degraded and _WaitForSync
3103     # does a combined result over all disks, so we don't check its
3104     # return value
3105     _WaitForSync(cfg, instance, unlock=True)
3106
3107     # so check manually all the devices
3108     for name in iv_names:
3109       dev, child, new_drbd = iv_names[name]
3110       cfg.SetDiskID(dev, instance.primary_node)
3111       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3112       if is_degr:
3113         raise errors.OpExecError, ("MD device %s is degraded!" % name)
3114       cfg.SetDiskID(new_drbd, instance.primary_node)
3115       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3116       if is_degr:
3117         raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3118
3119     for name in iv_names:
3120       dev, child, new_drbd = iv_names[name]
3121       logger.Info("remove mirror %s component" % name)
3122       cfg.SetDiskID(dev, instance.primary_node)
3123       if not rpc.call_blockdev_removechild(instance.primary_node,
3124                                                 dev, child):
3125         logger.Error("Can't remove child from mirror, aborting"
3126                      " *this device cleanup*.\nYou need to cleanup manually!!")
3127         continue
3128
3129       for node in child.logical_id[:2]:
3130         logger.Info("remove child device on %s" % node)
3131         cfg.SetDiskID(child, node)
3132         if not rpc.call_blockdev_remove(node, child):
3133           logger.Error("Warning: failed to remove device from node %s,"
3134                        " continuing operation." % node)
3135
3136       dev.children.remove(child)
3137
3138       cfg.AddInstance(instance)
3139
3140
3141 class LUQueryInstanceData(NoHooksLU):
3142   """Query runtime instance data.
3143
3144   """
3145   _OP_REQP = ["instances"]
3146
3147   def CheckPrereq(self):
3148     """Check prerequisites.
3149
3150     This only checks the optional instance list against the existing names.
3151
3152     """
3153     if not isinstance(self.op.instances, list):
3154       raise errors.OpPrereqError, "Invalid argument type 'instances'"
3155     if self.op.instances:
3156       self.wanted_instances = []
3157       names = self.op.instances
3158       for name in names:
3159         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3160         if instance is None:
3161           raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3162       self.wanted_instances.append(instance)
3163     else:
3164       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3165                                in self.cfg.GetInstanceList()]
3166     return
3167
3168
3169   def _ComputeDiskStatus(self, instance, snode, dev):
3170     """Compute block device status.
3171
3172     """
3173     self.cfg.SetDiskID(dev, instance.primary_node)
3174     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3175     if dev.dev_type == "drbd":
3176       # we change the snode then (otherwise we use the one passed in)
3177       if dev.logical_id[0] == instance.primary_node:
3178         snode = dev.logical_id[1]
3179       else:
3180         snode = dev.logical_id[0]
3181
3182     if snode:
3183       self.cfg.SetDiskID(dev, snode)
3184       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3185     else:
3186       dev_sstatus = None
3187
3188     if dev.children:
3189       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3190                       for child in dev.children]
3191     else:
3192       dev_children = []
3193
3194     data = {
3195       "iv_name": dev.iv_name,
3196       "dev_type": dev.dev_type,
3197       "logical_id": dev.logical_id,
3198       "physical_id": dev.physical_id,
3199       "pstatus": dev_pstatus,
3200       "sstatus": dev_sstatus,
3201       "children": dev_children,
3202       }
3203
3204     return data
3205
3206   def Exec(self, feedback_fn):
3207     """Gather and return data"""
3208     result = {}
3209     for instance in self.wanted_instances:
3210       remote_info = rpc.call_instance_info(instance.primary_node,
3211                                                 instance.name)
3212       if remote_info and "state" in remote_info:
3213         remote_state = "up"
3214       else:
3215         remote_state = "down"
3216       if instance.status == "down":
3217         config_state = "down"
3218       else:
3219         config_state = "up"
3220
3221       disks = [self._ComputeDiskStatus(instance, None, device)
3222                for device in instance.disks]
3223
3224       idict = {
3225         "name": instance.name,
3226         "config_state": config_state,
3227         "run_state": remote_state,
3228         "pnode": instance.primary_node,
3229         "snodes": instance.secondary_nodes,
3230         "os": instance.os,
3231         "memory": instance.memory,
3232         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3233         "disks": disks,
3234         }
3235
3236       result[instance.name] = idict
3237
3238     return result
3239
3240
3241 class LUQueryNodeData(NoHooksLU):
3242   """Logical unit for querying node data.
3243
3244   """
3245   _OP_REQP = ["nodes"]
3246
3247   def CheckPrereq(self):
3248     """Check prerequisites.
3249
3250     This only checks the optional node list against the existing names.
3251
3252     """
3253     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3254
3255   def Exec(self, feedback_fn):
3256     """Compute and return the list of nodes.
3257
3258     """
3259     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3260              in self.cfg.GetInstanceList()]
3261     result = []
3262     for node in self.wanted_nodes:
3263       result.append((node.name, node.primary_ip, node.secondary_ip,
3264                      [inst.name for inst in ilist
3265                       if inst.primary_node == node.name],
3266                      [inst.name for inst in ilist
3267                       if node.name in inst.secondary_nodes],
3268                      ))
3269     return result
3270
3271
3272 class LUSetInstanceParms(LogicalUnit):
3273   """Modifies an instances's parameters.
3274
3275   """
3276   HPATH = "instance-modify"
3277   HTYPE = constants.HTYPE_INSTANCE
3278   _OP_REQP = ["instance_name"]
3279
3280   def BuildHooksEnv(self):
3281     """Build hooks env.
3282
3283     This runs on the master, primary and secondaries.
3284
3285     """
3286     args = dict()
3287     if self.mem:
3288       args['memory'] = self.mem
3289     if self.vcpus:
3290       args['vcpus'] = self.vcpus
3291     if self.do_ip or self.do_bridge:
3292       if self.do_ip:
3293         ip = self.ip
3294       else:
3295         ip = self.instance.nics[0].ip
3296       if self.bridge:
3297         bridge = self.bridge
3298       else:
3299         bridge = self.instance.nics[0].bridge
3300       args['nics'] = [(ip, bridge)]
3301     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3302     nl = [self.sstore.GetMasterNode(),
3303           self.instance.primary_node] + list(self.instance.secondary_nodes)
3304     return env, nl, nl
3305
3306   def CheckPrereq(self):
3307     """Check prerequisites.
3308
3309     This only checks the instance list against the existing names.
3310
3311     """
3312     self.mem = getattr(self.op, "mem", None)
3313     self.vcpus = getattr(self.op, "vcpus", None)
3314     self.ip = getattr(self.op, "ip", None)
3315     self.bridge = getattr(self.op, "bridge", None)
3316     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3317       raise errors.OpPrereqError, ("No changes submitted")
3318     if self.mem is not None:
3319       try:
3320         self.mem = int(self.mem)
3321       except ValueError, err:
3322         raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3323     if self.vcpus is not None:
3324       try:
3325         self.vcpus = int(self.vcpus)
3326       except ValueError, err:
3327         raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3328     if self.ip is not None:
3329       self.do_ip = True
3330       if self.ip.lower() == "none":
3331         self.ip = None
3332       else:
3333         if not utils.IsValidIP(self.ip):
3334           raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3335     else:
3336       self.do_ip = False
3337     self.do_bridge = (self.bridge is not None)
3338
3339     instance = self.cfg.GetInstanceInfo(
3340       self.cfg.ExpandInstanceName(self.op.instance_name))
3341     if instance is None:
3342       raise errors.OpPrereqError, ("No such instance name '%s'" %
3343                                    self.op.instance_name)
3344     self.op.instance_name = instance.name
3345     self.instance = instance
3346     return
3347
3348   def Exec(self, feedback_fn):
3349     """Modifies an instance.
3350
3351     All parameters take effect only at the next restart of the instance.
3352     """
3353     result = []
3354     instance = self.instance
3355     if self.mem:
3356       instance.memory = self.mem
3357       result.append(("mem", self.mem))
3358     if self.vcpus:
3359       instance.vcpus = self.vcpus
3360       result.append(("vcpus",  self.vcpus))
3361     if self.do_ip:
3362       instance.nics[0].ip = self.ip
3363       result.append(("ip", self.ip))
3364     if self.bridge:
3365       instance.nics[0].bridge = self.bridge
3366       result.append(("bridge", self.bridge))
3367
3368     self.cfg.AddInstance(instance)
3369
3370     return result
3371
3372
3373 class LUQueryExports(NoHooksLU):
3374   """Query the exports list
3375
3376   """
3377   _OP_REQP = []
3378
3379   def CheckPrereq(self):
3380     """Check that the nodelist contains only existing nodes.
3381
3382     """
3383     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3384
3385   def Exec(self, feedback_fn):
3386     """Compute the list of all the exported system images.
3387
3388     Returns:
3389       a dictionary with the structure node->(export-list)
3390       where export-list is a list of the instances exported on
3391       that node.
3392
3393     """
3394     return rpc.call_export_list([node.name for node in self.nodes])
3395
3396
3397 class LUExportInstance(LogicalUnit):
3398   """Export an instance to an image in the cluster.
3399
3400   """
3401   HPATH = "instance-export"
3402   HTYPE = constants.HTYPE_INSTANCE
3403   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3404
3405   def BuildHooksEnv(self):
3406     """Build hooks env.
3407
3408     This will run on the master, primary node and target node.
3409
3410     """
3411     env = {
3412       "EXPORT_NODE": self.op.target_node,
3413       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3414       }
3415     env.update(_BuildInstanceHookEnvByObject(self.instance))
3416     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3417           self.op.target_node]
3418     return env, nl, nl
3419
3420   def CheckPrereq(self):
3421     """Check prerequisites.
3422
3423     This checks that the instance name is a valid one.
3424
3425     """
3426     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3427     self.instance = self.cfg.GetInstanceInfo(instance_name)
3428     if self.instance is None:
3429       raise errors.OpPrereqError, ("Instance '%s' not found" %
3430                                    self.op.instance_name)
3431
3432     # node verification
3433     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3434     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3435
3436     if self.dst_node is None:
3437       raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3438                                    self.op.target_node)
3439     self.op.target_node = self.dst_node.name
3440
3441   def Exec(self, feedback_fn):
3442     """Export an instance to an image in the cluster.
3443
3444     """
3445     instance = self.instance
3446     dst_node = self.dst_node
3447     src_node = instance.primary_node
3448     # shutdown the instance, unless requested not to do so
3449     if self.op.shutdown:
3450       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3451       self.processor.ChainOpCode(op, feedback_fn)
3452
3453     vgname = self.cfg.GetVGName()
3454
3455     snap_disks = []
3456
3457     try:
3458       for disk in instance.disks:
3459         if disk.iv_name == "sda":
3460           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3461           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3462
3463           if not new_dev_name:
3464             logger.Error("could not snapshot block device %s on node %s" %
3465                          (disk.logical_id[1], src_node))
3466           else:
3467             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3468                                       logical_id=(vgname, new_dev_name),
3469                                       physical_id=(vgname, new_dev_name),
3470                                       iv_name=disk.iv_name)
3471             snap_disks.append(new_dev)
3472
3473     finally:
3474       if self.op.shutdown:
3475         op = opcodes.OpStartupInstance(instance_name=instance.name,
3476                                        force=False)
3477         self.processor.ChainOpCode(op, feedback_fn)
3478
3479     # TODO: check for size
3480
3481     for dev in snap_disks:
3482       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3483                                            instance):
3484         logger.Error("could not export block device %s from node"
3485                      " %s to node %s" %
3486                      (dev.logical_id[1], src_node, dst_node.name))
3487       if not rpc.call_blockdev_remove(src_node, dev):
3488         logger.Error("could not remove snapshot block device %s from"
3489                      " node %s" % (dev.logical_id[1], src_node))
3490
3491     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3492       logger.Error("could not finalize export for instance %s on node %s" %
3493                    (instance.name, dst_node.name))
3494
3495     nodelist = self.cfg.GetNodeList()
3496     nodelist.remove(dst_node.name)
3497
3498     # on one-node clusters nodelist will be empty after the removal
3499     # if we proceed the backup would be removed because OpQueryExports
3500     # substitutes an empty list with the full cluster node list.
3501     if nodelist:
3502       op = opcodes.OpQueryExports(nodes=nodelist)
3503       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3504       for node in exportlist:
3505         if instance.name in exportlist[node]:
3506           if not rpc.call_export_remove(node, instance.name):
3507             logger.Error("could not remove older export for instance %s"
3508                          " on node %s" % (instance.name, node))