Fix issues reported by pylint.
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81                                      attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError, ("Cluster not initialized yet,"
85                                      " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError, ("Commands must be run on the master"
90                                        " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError, ("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206                                  % ",".join(frozenset(selected).
207                                             difference(all_fields)))
208
209
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211                           memory, vcpus, nics):
212   """Builds instance related env variables for hooks from single variables.
213
214   Args:
215     secondary_nodes: List of secondary nodes as strings
216   """
217   env = {
218     "INSTANCE_NAME": name,
219     "INSTANCE_PRIMARY": primary_node,
220     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221     "INSTANCE_OS_TYPE": os_type,
222     "INSTANCE_STATUS": status,
223     "INSTANCE_MEMORY": memory,
224     "INSTANCE_VCPUS": vcpus,
225   }
226
227   if nics:
228     nic_count = len(nics)
229     for idx, (ip, bridge) in enumerate(nics):
230       if ip is None:
231         ip = ""
232       env["INSTANCE_NIC%d_IP" % idx] = ip
233       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234   else:
235     nic_count = 0
236
237   env["INSTANCE_NIC_COUNT"] = nic_count
238
239   return env
240
241
242 def _BuildInstanceHookEnvByObject(instance, override=None):
243   """Builds instance related env variables for hooks from an object.
244
245   Args:
246     instance: objects.Instance object of instance
247     override: dict of values to override
248   """
249   args = {
250     'name': instance.name,
251     'primary_node': instance.primary_node,
252     'secondary_nodes': instance.secondary_nodes,
253     'os_type': instance.os,
254     'status': instance.os,
255     'memory': instance.memory,
256     'vcpus': instance.vcpus,
257     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258   }
259   if override:
260     args.update(override)
261   return _BuildInstanceHookEnv(**args)
262
263
264 def _UpdateEtcHosts(fullnode, ip):
265   """Ensure a node has a correct entry in /etc/hosts.
266
267   Args:
268     fullnode - Fully qualified domain name of host. (str)
269     ip       - IPv4 address of host (str)
270
271   """
272   node = fullnode.split(".", 1)[0]
273
274   f = open('/etc/hosts', 'r+')
275
276   inthere = False
277
278   save_lines = []
279   add_lines = []
280   removed = False
281
282   while True:
283     rawline = f.readline()
284
285     if not rawline:
286       # End of file
287       break
288
289     line = rawline.split('\n')[0]
290
291     # Strip off comments
292     line = line.split('#')[0]
293
294     if not line:
295       # Entire line was comment, skip
296       save_lines.append(rawline)
297       continue
298
299     fields = line.split()
300
301     haveall = True
302     havesome = False
303     for spec in [ ip, fullnode, node ]:
304       if spec not in fields:
305         haveall = False
306       if spec in fields:
307         havesome = True
308
309     if haveall:
310       inthere = True
311       save_lines.append(rawline)
312       continue
313
314     if havesome and not haveall:
315       # Line (old, or manual?) which is missing some.  Remove.
316       removed = True
317       continue
318
319     save_lines.append(rawline)
320
321   if not inthere:
322     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323
324   if removed:
325     if add_lines:
326       save_lines = save_lines + add_lines
327
328     # We removed a line, write a new file and replace old.
329     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330     newfile = os.fdopen(fd, 'w')
331     newfile.write(''.join(save_lines))
332     newfile.close()
333     os.rename(tmpname, '/etc/hosts')
334
335   elif add_lines:
336     # Simply appending a new line will do the trick.
337     f.seek(0, 2)
338     for add in add_lines:
339       f.write(add)
340
341   f.close()
342
343
344 def _UpdateKnownHosts(fullnode, ip, pubkey):
345   """Ensure a node has a correct known_hosts entry.
346
347   Args:
348     fullnode - Fully qualified domain name of host. (str)
349     ip       - IPv4 address of host (str)
350     pubkey   - the public key of the cluster
351
352   """
353   if os.path.exists('/etc/ssh/ssh_known_hosts'):
354     f = open('/etc/ssh/ssh_known_hosts', 'r+')
355   else:
356     f = open('/etc/ssh/ssh_known_hosts', 'w+')
357
358   inthere = False
359
360   save_lines = []
361   add_lines = []
362   removed = False
363
364   while True:
365     rawline = f.readline()
366     logger.Debug('read %s' % (repr(rawline),))
367
368     if not rawline:
369       # End of file
370       break
371
372     line = rawline.split('\n')[0]
373
374     parts = line.split(' ')
375     fields = parts[0].split(',')
376     key = parts[2]
377
378     haveall = True
379     havesome = False
380     for spec in [ ip, fullnode ]:
381       if spec not in fields:
382         haveall = False
383       if spec in fields:
384         havesome = True
385
386     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387     if haveall and key == pubkey:
388       inthere = True
389       save_lines.append(rawline)
390       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391       continue
392
393     if havesome and (not haveall or key != pubkey):
394       removed = True
395       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396       continue
397
398     save_lines.append(rawline)
399
400   if not inthere:
401     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403
404   if removed:
405     save_lines = save_lines + add_lines
406
407     # Write a new file and replace old.
408     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
409     newfile = os.fdopen(fd, 'w')
410     newfile.write(''.join(save_lines))
411     newfile.close()
412     logger.Debug("Wrote new known_hosts.")
413     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
414
415   elif add_lines:
416     # Simply appending a new line will do the trick.
417     f.seek(0, 2)
418     for add in add_lines:
419       f.write(add)
420
421   f.close()
422
423
424 def _HasValidVG(vglist, vgname):
425   """Checks if the volume group list is valid.
426
427   A non-None return value means there's an error, and the return value
428   is the error message.
429
430   """
431   vgsize = vglist.get(vgname, None)
432   if vgsize is None:
433     return "volume group '%s' missing" % vgname
434   elif vgsize < 20480:
435     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
436             (vgname, vgsize))
437   return None
438
439
440 def _InitSSHSetup(node):
441   """Setup the SSH configuration for the cluster.
442
443
444   This generates a dsa keypair for root, adds the pub key to the
445   permitted hosts and adds the hostkey to its own known hosts.
446
447   Args:
448     node: the name of this host as a fqdn
449
450   """
451   utils.RemoveFile('/root/.ssh/known_hosts')
452
453   if os.path.exists('/root/.ssh/id_dsa'):
454     utils.CreateBackup('/root/.ssh/id_dsa')
455   if os.path.exists('/root/.ssh/id_dsa.pub'):
456     utils.CreateBackup('/root/.ssh/id_dsa.pub')
457
458   utils.RemoveFile('/root/.ssh/id_dsa')
459   utils.RemoveFile('/root/.ssh/id_dsa.pub')
460
461   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
462                          "-f", "/root/.ssh/id_dsa",
463                          "-q", "-N", ""])
464   if result.failed:
465     raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
466                                result.output)
467
468   f = open('/root/.ssh/id_dsa.pub', 'r')
469   try:
470     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
471   finally:
472     f.close()
473
474
475 def _InitGanetiServerSetup(ss):
476   """Setup the necessary configuration for the initial node daemon.
477
478   This creates the nodepass file containing the shared password for
479   the cluster and also generates the SSL certificate.
480
481   """
482   # Create pseudo random password
483   randpass = sha.new(os.urandom(64)).hexdigest()
484   # and write it into sstore
485   ss.SetKey(ss.SS_NODED_PASS, randpass)
486
487   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
488                          "-days", str(365*5), "-nodes", "-x509",
489                          "-keyout", constants.SSL_CERT_FILE,
490                          "-out", constants.SSL_CERT_FILE, "-batch"])
491   if result.failed:
492     raise errors.OpExecError, ("could not generate server ssl cert, command"
493                                " %s had exitcode %s and error message %s" %
494                                (result.cmd, result.exit_code, result.output))
495
496   os.chmod(constants.SSL_CERT_FILE, 0400)
497
498   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
499
500   if result.failed:
501     raise errors.OpExecError, ("could not start the node daemon, command %s"
502                                " had exitcode %s and error %s" %
503                                (result.cmd, result.exit_code, result.output))
504
505
506 class LUInitCluster(LogicalUnit):
507   """Initialise the cluster.
508
509   """
510   HPATH = "cluster-init"
511   HTYPE = constants.HTYPE_CLUSTER
512   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
513               "def_bridge", "master_netdev"]
514   REQ_CLUSTER = False
515
516   def BuildHooksEnv(self):
517     """Build hooks env.
518
519     Notes: Since we don't require a cluster, we must manually add
520     ourselves in the post-run node list.
521
522     """
523     env = {
524       "CLUSTER": self.op.cluster_name,
525       "MASTER": self.hostname['hostname_full'],
526       }
527     return env, [], [self.hostname['hostname_full']]
528
529   def CheckPrereq(self):
530     """Verify that the passed name is a valid one.
531
532     """
533     if config.ConfigWriter.IsCluster():
534       raise errors.OpPrereqError, ("Cluster is already initialised")
535
536     hostname_local = socket.gethostname()
537     self.hostname = hostname = utils.LookupHostname(hostname_local)
538     if not hostname:
539       raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
540                                    hostname_local)
541
542     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
543     if not clustername:
544       raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
545                                    % self.op.cluster_name)
546
547     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
548     if result.failed:
549       raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
550                                    " to %s,\nbut this ip address does not"
551                                    " belong to this host."
552                                    " Aborting." % hostname['ip'])
553
554     secondary_ip = getattr(self.op, "secondary_ip", None)
555     if secondary_ip and not utils.IsValidIP(secondary_ip):
556       raise errors.OpPrereqError, ("Invalid secondary ip given")
557     if secondary_ip and secondary_ip != hostname['ip']:
558       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
559       if result.failed:
560         raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
561                                      "but it does not belong to this host." %
562                                      secondary_ip)
563     self.secondary_ip = secondary_ip
564
565     # checks presence of the volume group given
566     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
567
568     if vgstatus:
569       raise errors.OpPrereqError, ("Error: %s" % vgstatus)
570
571     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
572                     self.op.mac_prefix):
573       raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
574                                    self.op.mac_prefix)
575
576     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
577       raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
578                                    self.op.hypervisor_type)
579
580     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
581     if result.failed:
582       raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
583                                    (self.op.master_netdev, result.output))
584
585   def Exec(self, feedback_fn):
586     """Initialize the cluster.
587
588     """
589     clustername = self.clustername
590     hostname = self.hostname
591
592     # set up the simple store
593     ss = ssconf.SimpleStore()
594     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
595     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
596     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
597     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
598     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
599
600     # set up the inter-node password and certificate
601     _InitGanetiServerSetup(ss)
602
603     # start the master ip
604     rpc.call_node_start_master(hostname['hostname_full'])
605
606     # set up ssh config and /etc/hosts
607     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
608     try:
609       sshline = f.read()
610     finally:
611       f.close()
612     sshkey = sshline.split(" ")[1]
613
614     _UpdateEtcHosts(hostname['hostname_full'],
615                     hostname['ip'],
616                     )
617
618     _UpdateKnownHosts(hostname['hostname_full'],
619                       hostname['ip'],
620                       sshkey,
621                       )
622
623     _InitSSHSetup(hostname['hostname'])
624
625     # init of cluster config file
626     cfgw = config.ConfigWriter()
627     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
628                     sshkey, self.op.mac_prefix,
629                     self.op.vg_name, self.op.def_bridge)
630
631
632 class LUDestroyCluster(NoHooksLU):
633   """Logical unit for destroying the cluster.
634
635   """
636   _OP_REQP = []
637
638   def CheckPrereq(self):
639     """Check prerequisites.
640
641     This checks whether the cluster is empty.
642
643     Any errors are signalled by raising errors.OpPrereqError.
644
645     """
646     master = self.sstore.GetMasterNode()
647
648     nodelist = self.cfg.GetNodeList()
649     if len(nodelist) != 1 or nodelist[0] != master:
650       raise errors.OpPrereqError, ("There are still %d node(s) in "
651                                    "this cluster." % (len(nodelist) - 1))
652     instancelist = self.cfg.GetInstanceList()
653     if instancelist:
654       raise errors.OpPrereqError, ("There are still %d instance(s) in "
655                                    "this cluster." % len(instancelist))
656
657   def Exec(self, feedback_fn):
658     """Destroys the cluster.
659
660     """
661     utils.CreateBackup('/root/.ssh/id_dsa')
662     utils.CreateBackup('/root/.ssh/id_dsa.pub')
663     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
664
665
666 class LUVerifyCluster(NoHooksLU):
667   """Verifies the cluster status.
668
669   """
670   _OP_REQP = []
671
672   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
673                   remote_version, feedback_fn):
674     """Run multiple tests against a node.
675
676     Test list:
677       - compares ganeti version
678       - checks vg existance and size > 20G
679       - checks config file checksum
680       - checks ssh to other nodes
681
682     Args:
683       node: name of the node to check
684       file_list: required list of files
685       local_cksum: dictionary of local files and their checksums
686
687     """
688     # compares ganeti version
689     local_version = constants.PROTOCOL_VERSION
690     if not remote_version:
691       feedback_fn(" - ERROR: connection to %s failed" % (node))
692       return True
693
694     if local_version != remote_version:
695       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
696                       (local_version, node, remote_version))
697       return True
698
699     # checks vg existance and size > 20G
700
701     bad = False
702     if not vglist:
703       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
704                       (node,))
705       bad = True
706     else:
707       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
708       if vgstatus:
709         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
710         bad = True
711
712     # checks config file checksum
713     # checks ssh to any
714
715     if 'filelist' not in node_result:
716       bad = True
717       feedback_fn("  - ERROR: node hasn't returned file checksum data")
718     else:
719       remote_cksum = node_result['filelist']
720       for file_name in file_list:
721         if file_name not in remote_cksum:
722           bad = True
723           feedback_fn("  - ERROR: file '%s' missing" % file_name)
724         elif remote_cksum[file_name] != local_cksum[file_name]:
725           bad = True
726           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
727
728     if 'nodelist' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
731     else:
732       if node_result['nodelist']:
733         bad = True
734         for node in node_result['nodelist']:
735           feedback_fn("  - ERROR: communication with node '%s': %s" %
736                           (node, node_result['nodelist'][node]))
737     hyp_result = node_result.get('hypervisor', None)
738     if hyp_result is not None:
739       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
740     return bad
741
742   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
743     """Verify an instance.
744
745     This function checks to see if the required block devices are
746     available on the instance's node.
747
748     """
749     bad = False
750
751     instancelist = self.cfg.GetInstanceList()
752     if not instance in instancelist:
753       feedback_fn("  - ERROR: instance %s not in instance list %s" %
754                       (instance, instancelist))
755       bad = True
756
757     instanceconfig = self.cfg.GetInstanceInfo(instance)
758     node_current = instanceconfig.primary_node
759
760     node_vol_should = {}
761     instanceconfig.MapLVsByNode(node_vol_should)
762
763     for node in node_vol_should:
764       for volume in node_vol_should[node]:
765         if node not in node_vol_is or volume not in node_vol_is[node]:
766           feedback_fn("  - ERROR: volume %s missing on node %s" %
767                           (volume, node))
768           bad = True
769
770     if not instanceconfig.status == 'down':
771       if not instance in node_instance[node_current]:
772         feedback_fn("  - ERROR: instance %s not running on node %s" %
773                         (instance, node_current))
774         bad = True
775
776     for node in node_instance:
777       if (not node == node_current):
778         if instance in node_instance[node]:
779           feedback_fn("  - ERROR: instance %s should not run on node %s" %
780                           (instance, node))
781           bad = True
782
783     return not bad
784
785   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
786     """Verify if there are any unknown volumes in the cluster.
787
788     The .os, .swap and backup volumes are ignored. All other volumes are
789     reported as unknown.
790
791     """
792     bad = False
793
794     for node in node_vol_is:
795       for volume in node_vol_is[node]:
796         if node not in node_vol_should or volume not in node_vol_should[node]:
797           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
798                       (volume, node))
799           bad = True
800     return bad
801
802   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
803     """Verify the list of running instances.
804
805     This checks what instances are running but unknown to the cluster.
806
807     """
808     bad = False
809     for node in node_instance:
810       for runninginstance in node_instance[node]:
811         if runninginstance not in instancelist:
812           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
813                           (runninginstance, node))
814           bad = True
815     return bad
816
817   def CheckPrereq(self):
818     """Check prerequisites.
819
820     This has no prerequisites.
821
822     """
823     pass
824
825   def Exec(self, feedback_fn):
826     """Verify integrity of cluster, performing various test on nodes.
827
828     """
829     bad = False
830     feedback_fn("* Verifying global settings")
831     self.cfg.VerifyConfig()
832
833     master = self.sstore.GetMasterNode()
834     vg_name = self.cfg.GetVGName()
835     nodelist = utils.NiceSort(self.cfg.GetNodeList())
836     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
837     node_volume = {}
838     node_instance = {}
839
840     # FIXME: verify OS list
841     # do local checksums
842     file_names = list(self.sstore.GetFileList())
843     file_names.append(constants.SSL_CERT_FILE)
844     file_names.append(constants.CLUSTER_CONF_FILE)
845     local_checksums = utils.FingerprintFiles(file_names)
846
847     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
848     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
849     all_instanceinfo = rpc.call_instance_list(nodelist)
850     all_vglist = rpc.call_vg_list(nodelist)
851     node_verify_param = {
852       'filelist': file_names,
853       'nodelist': nodelist,
854       'hypervisor': None,
855       }
856     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
857     all_rversion = rpc.call_version(nodelist)
858
859     for node in nodelist:
860       feedback_fn("* Verifying node %s" % node)
861       result = self._VerifyNode(node, file_names, local_checksums,
862                                 all_vglist[node], all_nvinfo[node],
863                                 all_rversion[node], feedback_fn)
864       bad = bad or result
865
866       # node_volume
867       volumeinfo = all_volumeinfo[node]
868
869       if type(volumeinfo) != dict:
870         feedback_fn("  - ERROR: connection to %s failed" % (node,))
871         bad = True
872         continue
873
874       node_volume[node] = volumeinfo
875
876       # node_instance
877       nodeinstance = all_instanceinfo[node]
878       if type(nodeinstance) != list:
879         feedback_fn("  - ERROR: connection to %s failed" % (node,))
880         bad = True
881         continue
882
883       node_instance[node] = nodeinstance
884
885     node_vol_should = {}
886
887     for instance in instancelist:
888       feedback_fn("* Verifying instance %s" % instance)
889       result =  self._VerifyInstance(instance, node_volume, node_instance,
890                                      feedback_fn)
891       bad = bad or result
892
893       inst_config = self.cfg.GetInstanceInfo(instance)
894
895       inst_config.MapLVsByNode(node_vol_should)
896
897     feedback_fn("* Verifying orphan volumes")
898     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
899                                        feedback_fn)
900     bad = bad or result
901
902     feedback_fn("* Verifying remaining instances")
903     result = self._VerifyOrphanInstances(instancelist, node_instance,
904                                          feedback_fn)
905     bad = bad or result
906
907     return int(bad)
908
909
910 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
911   """Sleep and poll for an instance's disk to sync.
912
913   """
914   if not instance.disks:
915     return True
916
917   if not oneshot:
918     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
919
920   node = instance.primary_node
921
922   for dev in instance.disks:
923     cfgw.SetDiskID(dev, node)
924
925   retries = 0
926   while True:
927     max_time = 0
928     done = True
929     cumul_degraded = False
930     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
931     if not rstats:
932       logger.ToStderr("Can't get any data from node %s" % node)
933       retries += 1
934       if retries >= 10:
935         raise errors.RemoteError, ("Can't contact node %s for mirror data,"
936                                    " aborting." % node)
937       time.sleep(6)
938       continue
939     retries = 0
940     for i in range(len(rstats)):
941       mstat = rstats[i]
942       if mstat is None:
943         logger.ToStderr("Can't compute data for node %s/%s" %
944                         (node, instance.disks[i].iv_name))
945         continue
946       perc_done, est_time, is_degraded = mstat
947       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
948       if perc_done is not None:
949         done = False
950         if est_time is not None:
951           rem_time = "%d estimated seconds remaining" % est_time
952           max_time = est_time
953         else:
954           rem_time = "no time estimate"
955         logger.ToStdout("- device %s: %5.2f%% done, %s" %
956                         (instance.disks[i].iv_name, perc_done, rem_time))
957     if done or oneshot:
958       break
959
960     if unlock:
961       utils.Unlock('cmd')
962     try:
963       time.sleep(min(60, max_time))
964     finally:
965       if unlock:
966         utils.Lock('cmd')
967
968   if done:
969     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
970   return not cumul_degraded
971
972
973 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
974   """Check that mirrors are not degraded.
975
976   """
977   cfgw.SetDiskID(dev, node)
978
979   result = True
980   if on_primary or dev.AssembleOnSecondary():
981     rstats = rpc.call_blockdev_find(node, dev)
982     if not rstats:
983       logger.ToStderr("Can't get any data from node %s" % node)
984       result = False
985     else:
986       result = result and (not rstats[5])
987   if dev.children:
988     for child in dev.children:
989       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
990
991   return result
992
993
994 class LUDiagnoseOS(NoHooksLU):
995   """Logical unit for OS diagnose/query.
996
997   """
998   _OP_REQP = []
999
1000   def CheckPrereq(self):
1001     """Check prerequisites.
1002
1003     This always succeeds, since this is a pure query LU.
1004
1005     """
1006     return
1007
1008   def Exec(self, feedback_fn):
1009     """Compute the list of OSes.
1010
1011     """
1012     node_list = self.cfg.GetNodeList()
1013     node_data = rpc.call_os_diagnose(node_list)
1014     if node_data == False:
1015       raise errors.OpExecError, "Can't gather the list of OSes"
1016     return node_data
1017
1018
1019 class LURemoveNode(LogicalUnit):
1020   """Logical unit for removing a node.
1021
1022   """
1023   HPATH = "node-remove"
1024   HTYPE = constants.HTYPE_NODE
1025   _OP_REQP = ["node_name"]
1026
1027   def BuildHooksEnv(self):
1028     """Build hooks env.
1029
1030     This doesn't run on the target node in the pre phase as a failed
1031     node would not allows itself to run.
1032
1033     """
1034     env = {
1035       "NODE_NAME": self.op.node_name,
1036       }
1037     all_nodes = self.cfg.GetNodeList()
1038     all_nodes.remove(self.op.node_name)
1039     return env, all_nodes, all_nodes
1040
1041   def CheckPrereq(self):
1042     """Check prerequisites.
1043
1044     This checks:
1045      - the node exists in the configuration
1046      - it does not have primary or secondary instances
1047      - it's not the master
1048
1049     Any errors are signalled by raising errors.OpPrereqError.
1050
1051     """
1052     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1053     if node is None:
1054       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1055       return 1
1056
1057     instance_list = self.cfg.GetInstanceList()
1058
1059     masternode = self.sstore.GetMasterNode()
1060     if node.name == masternode:
1061       raise errors.OpPrereqError, ("Node is the master node,"
1062                                    " you need to failover first.")
1063
1064     for instance_name in instance_list:
1065       instance = self.cfg.GetInstanceInfo(instance_name)
1066       if node.name == instance.primary_node:
1067         raise errors.OpPrereqError, ("Instance %s still running on the node,"
1068                                      " please remove first." % instance_name)
1069       if node.name in instance.secondary_nodes:
1070         raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1071                                      " please remove first." % instance_name)
1072     self.op.node_name = node.name
1073     self.node = node
1074
1075   def Exec(self, feedback_fn):
1076     """Removes the node from the cluster.
1077
1078     """
1079     node = self.node
1080     logger.Info("stopping the node daemon and removing configs from node %s" %
1081                 node.name)
1082
1083     rpc.call_node_leave_cluster(node.name)
1084
1085     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1086
1087     logger.Info("Removing node %s from config" % node.name)
1088
1089     self.cfg.RemoveNode(node.name)
1090
1091
1092 class LUQueryNodes(NoHooksLU):
1093   """Logical unit for querying nodes.
1094
1095   """
1096   _OP_REQP = ["output_fields"]
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This checks that the fields required are valid output fields.
1102
1103     """
1104     self.dynamic_fields = frozenset(["dtotal", "dfree",
1105                                      "mtotal", "mnode", "mfree"])
1106
1107     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1108                        dynamic=self.dynamic_fields,
1109                        selected=self.op.output_fields)
1110
1111
1112   def Exec(self, feedback_fn):
1113     """Computes the list of nodes and their attributes.
1114
1115     """
1116     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1117     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1118
1119
1120     # begin data gathering
1121
1122     if self.dynamic_fields.intersection(self.op.output_fields):
1123       live_data = {}
1124       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1125       for name in nodenames:
1126         nodeinfo = node_data.get(name, None)
1127         if nodeinfo:
1128           live_data[name] = {
1129             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1130             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1131             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1132             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1133             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1134             }
1135         else:
1136           live_data[name] = {}
1137     else:
1138       live_data = dict.fromkeys(nodenames, {})
1139
1140     node_to_primary = dict.fromkeys(nodenames, 0)
1141     node_to_secondary = dict.fromkeys(nodenames, 0)
1142
1143     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1144       instancelist = self.cfg.GetInstanceList()
1145
1146       for instance in instancelist:
1147         instanceinfo = self.cfg.GetInstanceInfo(instance)
1148         node_to_primary[instanceinfo.primary_node] += 1
1149         for secnode in instanceinfo.secondary_nodes:
1150           node_to_secondary[secnode] += 1
1151
1152     # end data gathering
1153
1154     output = []
1155     for node in nodelist:
1156       node_output = []
1157       for field in self.op.output_fields:
1158         if field == "name":
1159           val = node.name
1160         elif field == "pinst":
1161           val = node_to_primary[node.name]
1162         elif field == "sinst":
1163           val = node_to_secondary[node.name]
1164         elif field == "pip":
1165           val = node.primary_ip
1166         elif field == "sip":
1167           val = node.secondary_ip
1168         elif field in self.dynamic_fields:
1169           val = live_data[node.name].get(field, "?")
1170         else:
1171           raise errors.ParameterError, field
1172         val = str(val)
1173         node_output.append(val)
1174       output.append(node_output)
1175
1176     return output
1177
1178
1179 class LUQueryNodeVolumes(NoHooksLU):
1180   """Logical unit for getting volumes on node(s).
1181
1182   """
1183   _OP_REQP = ["nodes", "output_fields"]
1184
1185   def CheckPrereq(self):
1186     """Check prerequisites.
1187
1188     This checks that the fields required are valid output fields.
1189
1190     """
1191     self.nodes = _GetWantedNodes(self, self.op.nodes)
1192
1193     _CheckOutputFields(static=["node"],
1194                        dynamic=["phys", "vg", "name", "size", "instance"],
1195                        selected=self.op.output_fields)
1196
1197
1198   def Exec(self, feedback_fn):
1199     """Computes the list of nodes and their attributes.
1200
1201     """
1202     nodenames = utils.NiceSort([node.name for node in self.nodes])
1203     volumes = rpc.call_node_volumes(nodenames)
1204
1205     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1206              in self.cfg.GetInstanceList()]
1207
1208     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1209
1210     output = []
1211     for node in nodenames:
1212       node_vols = volumes[node][:]
1213       node_vols.sort(key=lambda vol: vol['dev'])
1214
1215       for vol in node_vols:
1216         node_output = []
1217         for field in self.op.output_fields:
1218           if field == "node":
1219             val = node
1220           elif field == "phys":
1221             val = vol['dev']
1222           elif field == "vg":
1223             val = vol['vg']
1224           elif field == "name":
1225             val = vol['name']
1226           elif field == "size":
1227             val = int(float(vol['size']))
1228           elif field == "instance":
1229             for inst in ilist:
1230               if node not in lv_by_node[inst]:
1231                 continue
1232               if vol['name'] in lv_by_node[inst][node]:
1233                 val = inst.name
1234                 break
1235             else:
1236               val = '-'
1237           else:
1238             raise errors.ParameterError, field
1239           node_output.append(str(val))
1240
1241         output.append(node_output)
1242
1243     return output
1244
1245
1246 class LUAddNode(LogicalUnit):
1247   """Logical unit for adding node to the cluster.
1248
1249   """
1250   HPATH = "node-add"
1251   HTYPE = constants.HTYPE_NODE
1252   _OP_REQP = ["node_name"]
1253
1254   def BuildHooksEnv(self):
1255     """Build hooks env.
1256
1257     This will run on all nodes before, and on all nodes + the new node after.
1258
1259     """
1260     env = {
1261       "NODE_NAME": self.op.node_name,
1262       "NODE_PIP": self.op.primary_ip,
1263       "NODE_SIP": self.op.secondary_ip,
1264       }
1265     nodes_0 = self.cfg.GetNodeList()
1266     nodes_1 = nodes_0 + [self.op.node_name, ]
1267     return env, nodes_0, nodes_1
1268
1269   def CheckPrereq(self):
1270     """Check prerequisites.
1271
1272     This checks:
1273      - the new node is not already in the config
1274      - it is resolvable
1275      - its parameters (single/dual homed) matches the cluster
1276
1277     Any errors are signalled by raising errors.OpPrereqError.
1278
1279     """
1280     node_name = self.op.node_name
1281     cfg = self.cfg
1282
1283     dns_data = utils.LookupHostname(node_name)
1284     if not dns_data:
1285       raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1286
1287     node = dns_data['hostname']
1288     primary_ip = self.op.primary_ip = dns_data['ip']
1289     secondary_ip = getattr(self.op, "secondary_ip", None)
1290     if secondary_ip is None:
1291       secondary_ip = primary_ip
1292     if not utils.IsValidIP(secondary_ip):
1293       raise errors.OpPrereqError, ("Invalid secondary IP given")
1294     self.op.secondary_ip = secondary_ip
1295     node_list = cfg.GetNodeList()
1296     if node in node_list:
1297       raise errors.OpPrereqError, ("Node %s is already in the configuration"
1298                                    % node)
1299
1300     for existing_node_name in node_list:
1301       existing_node = cfg.GetNodeInfo(existing_node_name)
1302       if (existing_node.primary_ip == primary_ip or
1303           existing_node.secondary_ip == primary_ip or
1304           existing_node.primary_ip == secondary_ip or
1305           existing_node.secondary_ip == secondary_ip):
1306         raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1307                                      " existing node %s" % existing_node.name)
1308
1309     # check that the type of the node (single versus dual homed) is the
1310     # same as for the master
1311     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1312     master_singlehomed = myself.secondary_ip == myself.primary_ip
1313     newbie_singlehomed = secondary_ip == primary_ip
1314     if master_singlehomed != newbie_singlehomed:
1315       if master_singlehomed:
1316         raise errors.OpPrereqError, ("The master has no private ip but the"
1317                                      " new node has one")
1318       else:
1319         raise errors.OpPrereqError ("The master has a private ip but the"
1320                                     " new node doesn't have one")
1321
1322     # checks reachablity
1323     command = ["fping", "-q", primary_ip]
1324     result = utils.RunCmd(command)
1325     if result.failed:
1326       raise errors.OpPrereqError, ("Node not reachable by ping")
1327
1328     if not newbie_singlehomed:
1329       # check reachability from my secondary ip to newbie's secondary ip
1330       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1331       result = utils.RunCmd(command)
1332       if result.failed:
1333         raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1334
1335     self.new_node = objects.Node(name=node,
1336                                  primary_ip=primary_ip,
1337                                  secondary_ip=secondary_ip)
1338
1339   def Exec(self, feedback_fn):
1340     """Adds the new node to the cluster.
1341
1342     """
1343     new_node = self.new_node
1344     node = new_node.name
1345
1346     # set up inter-node password and certificate and restarts the node daemon
1347     gntpass = self.sstore.GetNodeDaemonPassword()
1348     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1349       raise errors.OpExecError, ("ganeti password corruption detected")
1350     f = open(constants.SSL_CERT_FILE)
1351     try:
1352       gntpem = f.read(8192)
1353     finally:
1354       f.close()
1355     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1356     # so we use this to detect an invalid certificate; as long as the
1357     # cert doesn't contain this, the here-document will be correctly
1358     # parsed by the shell sequence below
1359     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1360       raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1361     if not gntpem.endswith("\n"):
1362       raise errors.OpExecError, ("PEM must end with newline")
1363     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1364
1365     # remove first the root's known_hosts file
1366     utils.RemoveFile("/root/.ssh/known_hosts")
1367     # and then connect with ssh to set password and start ganeti-noded
1368     # note that all the below variables are sanitized at this point,
1369     # either by being constants or by the checks above
1370     ss = self.sstore
1371     mycommand = ("umask 077 && "
1372                  "echo '%s' > '%s' && "
1373                  "cat > '%s' << '!EOF.' && \n"
1374                  "%s!EOF.\n%s restart" %
1375                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1376                   constants.SSL_CERT_FILE, gntpem,
1377                   constants.NODE_INITD_SCRIPT))
1378
1379     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1380     if result.failed:
1381       raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1382                                  " output: %s" %
1383                                  (node, result.fail_reason, result.output))
1384
1385     # check connectivity
1386     time.sleep(4)
1387
1388     result = rpc.call_version([node])[node]
1389     if result:
1390       if constants.PROTOCOL_VERSION == result:
1391         logger.Info("communication to node %s fine, sw version %s match" %
1392                     (node, result))
1393       else:
1394         raise errors.OpExecError, ("Version mismatch master version %s,"
1395                                    " node version %s" %
1396                                    (constants.PROTOCOL_VERSION, result))
1397     else:
1398       raise errors.OpExecError, ("Cannot get version from the new node")
1399
1400     # setup ssh on node
1401     logger.Info("copy ssh key to node %s" % node)
1402     keyarray = []
1403     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1404                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1405                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1406
1407     for i in keyfiles:
1408       f = open(i, 'r')
1409       try:
1410         keyarray.append(f.read())
1411       finally:
1412         f.close()
1413
1414     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1415                                keyarray[3], keyarray[4], keyarray[5])
1416
1417     if not result:
1418       raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1419
1420     # Add node to our /etc/hosts, and add key to known_hosts
1421     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1422     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1423                       self.cfg.GetHostKey())
1424
1425     if new_node.secondary_ip != new_node.primary_ip:
1426       result = ssh.SSHCall(node, "root",
1427                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1428       if result.failed:
1429         raise errors.OpExecError, ("Node claims it doesn't have the"
1430                                    " secondary ip you gave (%s).\n"
1431                                    "Please fix and re-run this command." %
1432                                    new_node.secondary_ip)
1433
1434     # Distribute updated /etc/hosts and known_hosts to all nodes,
1435     # including the node just added
1436     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1437     dist_nodes = self.cfg.GetNodeList() + [node]
1438     if myself.name in dist_nodes:
1439       dist_nodes.remove(myself.name)
1440
1441     logger.Debug("Copying hosts and known_hosts to all nodes")
1442     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1443       result = rpc.call_upload_file(dist_nodes, fname)
1444       for to_node in dist_nodes:
1445         if not result[to_node]:
1446           logger.Error("copy of file %s to node %s failed" %
1447                        (fname, to_node))
1448
1449     to_copy = ss.GetFileList()
1450     for fname in to_copy:
1451       if not ssh.CopyFileToNode(node, fname):
1452         logger.Error("could not copy file %s to node %s" % (fname, node))
1453
1454     logger.Info("adding node %s to cluster.conf" % node)
1455     self.cfg.AddNode(new_node)
1456
1457
1458 class LUMasterFailover(LogicalUnit):
1459   """Failover the master node to the current node.
1460
1461   This is a special LU in that it must run on a non-master node.
1462
1463   """
1464   HPATH = "master-failover"
1465   HTYPE = constants.HTYPE_CLUSTER
1466   REQ_MASTER = False
1467   _OP_REQP = []
1468
1469   def BuildHooksEnv(self):
1470     """Build hooks env.
1471
1472     This will run on the new master only in the pre phase, and on all
1473     the nodes in the post phase.
1474
1475     """
1476     env = {
1477       "NEW_MASTER": self.new_master,
1478       "OLD_MASTER": self.old_master,
1479       }
1480     return env, [self.new_master], self.cfg.GetNodeList()
1481
1482   def CheckPrereq(self):
1483     """Check prerequisites.
1484
1485     This checks that we are not already the master.
1486
1487     """
1488     self.new_master = socket.gethostname()
1489
1490     self.old_master = self.sstore.GetMasterNode()
1491
1492     if self.old_master == self.new_master:
1493       raise errors.OpPrereqError, ("This commands must be run on the node"
1494                                    " where you want the new master to be.\n"
1495                                    "%s is already the master" %
1496                                    self.old_master)
1497
1498   def Exec(self, feedback_fn):
1499     """Failover the master node.
1500
1501     This command, when run on a non-master node, will cause the current
1502     master to cease being master, and the non-master to become new
1503     master.
1504
1505     """
1506     #TODO: do not rely on gethostname returning the FQDN
1507     logger.Info("setting master to %s, old master: %s" %
1508                 (self.new_master, self.old_master))
1509
1510     if not rpc.call_node_stop_master(self.old_master):
1511       logger.Error("could disable the master role on the old master"
1512                    " %s, please disable manually" % self.old_master)
1513
1514     ss = self.sstore
1515     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1516     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1517                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1518       logger.Error("could not distribute the new simple store master file"
1519                    " to the other nodes, please check.")
1520
1521     if not rpc.call_node_start_master(self.new_master):
1522       logger.Error("could not start the master role on the new master"
1523                    " %s, please check" % self.new_master)
1524       feedback_fn("Error in activating the master IP on the new master,\n"
1525                   "please fix manually.")
1526
1527
1528
1529 class LUQueryClusterInfo(NoHooksLU):
1530   """Query cluster configuration.
1531
1532   """
1533   _OP_REQP = []
1534   REQ_MASTER = False
1535
1536   def CheckPrereq(self):
1537     """No prerequsites needed for this LU.
1538
1539     """
1540     pass
1541
1542   def Exec(self, feedback_fn):
1543     """Return cluster config.
1544
1545     """
1546     result = {
1547       "name": self.sstore.GetClusterName(),
1548       "software_version": constants.RELEASE_VERSION,
1549       "protocol_version": constants.PROTOCOL_VERSION,
1550       "config_version": constants.CONFIG_VERSION,
1551       "os_api_version": constants.OS_API_VERSION,
1552       "export_version": constants.EXPORT_VERSION,
1553       "master": self.sstore.GetMasterNode(),
1554       "architecture": (platform.architecture()[0], platform.machine()),
1555       }
1556
1557     return result
1558
1559
1560 class LUClusterCopyFile(NoHooksLU):
1561   """Copy file to cluster.
1562
1563   """
1564   _OP_REQP = ["nodes", "filename"]
1565
1566   def CheckPrereq(self):
1567     """Check prerequisites.
1568
1569     It should check that the named file exists and that the given list
1570     of nodes is valid.
1571
1572     """
1573     if not os.path.exists(self.op.filename):
1574       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1575
1576     self.nodes = _GetWantedNodes(self, self.op.nodes)
1577
1578   def Exec(self, feedback_fn):
1579     """Copy a file from master to some nodes.
1580
1581     Args:
1582       opts - class with options as members
1583       args - list containing a single element, the file name
1584     Opts used:
1585       nodes - list containing the name of target nodes; if empty, all nodes
1586
1587     """
1588     filename = self.op.filename
1589
1590     myname = socket.gethostname()
1591
1592     for node in self.nodes:
1593       if node == myname:
1594         continue
1595       if not ssh.CopyFileToNode(node, filename):
1596         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1597
1598
1599 class LUDumpClusterConfig(NoHooksLU):
1600   """Return a text-representation of the cluster-config.
1601
1602   """
1603   _OP_REQP = []
1604
1605   def CheckPrereq(self):
1606     """No prerequisites.
1607
1608     """
1609     pass
1610
1611   def Exec(self, feedback_fn):
1612     """Dump a representation of the cluster config to the standard output.
1613
1614     """
1615     return self.cfg.DumpConfig()
1616
1617
1618 class LURunClusterCommand(NoHooksLU):
1619   """Run a command on some nodes.
1620
1621   """
1622   _OP_REQP = ["command", "nodes"]
1623
1624   def CheckPrereq(self):
1625     """Check prerequisites.
1626
1627     It checks that the given list of nodes is valid.
1628
1629     """
1630     self.nodes = _GetWantedNodes(self, self.op.nodes)
1631
1632   def Exec(self, feedback_fn):
1633     """Run a command on some nodes.
1634
1635     """
1636     data = []
1637     for node in self.nodes:
1638       result = utils.RunCmd(["ssh", node.name, self.op.command])
1639       data.append((node.name, result.cmd, result.output, result.exit_code))
1640
1641     return data
1642
1643
1644 class LUActivateInstanceDisks(NoHooksLU):
1645   """Bring up an instance's disks.
1646
1647   """
1648   _OP_REQP = ["instance_name"]
1649
1650   def CheckPrereq(self):
1651     """Check prerequisites.
1652
1653     This checks that the instance is in the cluster.
1654
1655     """
1656     instance = self.cfg.GetInstanceInfo(
1657       self.cfg.ExpandInstanceName(self.op.instance_name))
1658     if instance is None:
1659       raise errors.OpPrereqError, ("Instance '%s' not known" %
1660                                    self.op.instance_name)
1661     self.instance = instance
1662
1663
1664   def Exec(self, feedback_fn):
1665     """Activate the disks.
1666
1667     """
1668     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1669     if not disks_ok:
1670       raise errors.OpExecError, ("Cannot activate block devices")
1671
1672     return disks_info
1673
1674
1675 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1676   """Prepare the block devices for an instance.
1677
1678   This sets up the block devices on all nodes.
1679
1680   Args:
1681     instance: a ganeti.objects.Instance object
1682     ignore_secondaries: if true, errors on secondary nodes won't result
1683                         in an error return from the function
1684
1685   Returns:
1686     false if the operation failed
1687     list of (host, instance_visible_name, node_visible_name) if the operation
1688          suceeded with the mapping from node devices to instance devices
1689   """
1690   device_info = []
1691   disks_ok = True
1692   for inst_disk in instance.disks:
1693     master_result = None
1694     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1695       cfg.SetDiskID(node_disk, node)
1696       is_primary = node == instance.primary_node
1697       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1698       if not result:
1699         logger.Error("could not prepare block device %s on node %s (is_pri"
1700                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1701         if is_primary or not ignore_secondaries:
1702           disks_ok = False
1703       if is_primary:
1704         master_result = result
1705     device_info.append((instance.primary_node, inst_disk.iv_name,
1706                         master_result))
1707
1708   return disks_ok, device_info
1709
1710
1711 def _StartInstanceDisks(cfg, instance, force):
1712   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1713                                            ignore_secondaries=force)
1714   if not disks_ok:
1715     _ShutdownInstanceDisks(instance, cfg)
1716     if force is not None and not force:
1717       logger.Error("If the message above refers to a secondary node,"
1718                    " you can retry the operation using '--force'.")
1719     raise errors.OpExecError, ("Disk consistency error")
1720
1721
1722 class LUDeactivateInstanceDisks(NoHooksLU):
1723   """Shutdown an instance's disks.
1724
1725   """
1726   _OP_REQP = ["instance_name"]
1727
1728   def CheckPrereq(self):
1729     """Check prerequisites.
1730
1731     This checks that the instance is in the cluster.
1732
1733     """
1734     instance = self.cfg.GetInstanceInfo(
1735       self.cfg.ExpandInstanceName(self.op.instance_name))
1736     if instance is None:
1737       raise errors.OpPrereqError, ("Instance '%s' not known" %
1738                                    self.op.instance_name)
1739     self.instance = instance
1740
1741   def Exec(self, feedback_fn):
1742     """Deactivate the disks
1743
1744     """
1745     instance = self.instance
1746     ins_l = rpc.call_instance_list([instance.primary_node])
1747     ins_l = ins_l[instance.primary_node]
1748     if not type(ins_l) is list:
1749       raise errors.OpExecError, ("Can't contact node '%s'" %
1750                                  instance.primary_node)
1751
1752     if self.instance.name in ins_l:
1753       raise errors.OpExecError, ("Instance is running, can't shutdown"
1754                                  " block devices.")
1755
1756     _ShutdownInstanceDisks(instance, self.cfg)
1757
1758
1759 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1760   """Shutdown block devices of an instance.
1761
1762   This does the shutdown on all nodes of the instance.
1763
1764   If the ignore_primary is false, errors on the primary node are
1765   ignored.
1766
1767   """
1768   result = True
1769   for disk in instance.disks:
1770     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1771       cfg.SetDiskID(top_disk, node)
1772       if not rpc.call_blockdev_shutdown(node, top_disk):
1773         logger.Error("could not shutdown block device %s on node %s" %
1774                      (disk.iv_name, node))
1775         if not ignore_primary or node != instance.primary_node:
1776           result = False
1777   return result
1778
1779
1780 class LUStartupInstance(LogicalUnit):
1781   """Starts an instance.
1782
1783   """
1784   HPATH = "instance-start"
1785   HTYPE = constants.HTYPE_INSTANCE
1786   _OP_REQP = ["instance_name", "force"]
1787
1788   def BuildHooksEnv(self):
1789     """Build hooks env.
1790
1791     This runs on master, primary and secondary nodes of the instance.
1792
1793     """
1794     env = {
1795       "FORCE": self.op.force,
1796       }
1797     env.update(_BuildInstanceHookEnvByObject(self.instance))
1798     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1799           list(self.instance.secondary_nodes))
1800     return env, nl, nl
1801
1802   def CheckPrereq(self):
1803     """Check prerequisites.
1804
1805     This checks that the instance is in the cluster.
1806
1807     """
1808     instance = self.cfg.GetInstanceInfo(
1809       self.cfg.ExpandInstanceName(self.op.instance_name))
1810     if instance is None:
1811       raise errors.OpPrereqError, ("Instance '%s' not known" %
1812                                    self.op.instance_name)
1813
1814     # check bridges existance
1815     brlist = [nic.bridge for nic in instance.nics]
1816     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1817       raise errors.OpPrereqError, ("one or more target bridges %s does not"
1818                                    " exist on destination node '%s'" %
1819                                    (brlist, instance.primary_node))
1820
1821     self.instance = instance
1822     self.op.instance_name = instance.name
1823
1824   def Exec(self, feedback_fn):
1825     """Start the instance.
1826
1827     """
1828     instance = self.instance
1829     force = self.op.force
1830     extra_args = getattr(self.op, "extra_args", "")
1831
1832     node_current = instance.primary_node
1833
1834     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1835     if not nodeinfo:
1836       raise errors.OpExecError, ("Could not contact node %s for infos" %
1837                                  (node_current))
1838
1839     freememory = nodeinfo[node_current]['memory_free']
1840     memory = instance.memory
1841     if memory > freememory:
1842       raise errors.OpExecError, ("Not enough memory to start instance"
1843                                  " %s on node %s"
1844                                  " needed %s MiB, available %s MiB" %
1845                                  (instance.name, node_current, memory,
1846                                   freememory))
1847
1848     _StartInstanceDisks(self.cfg, instance, force)
1849
1850     if not rpc.call_instance_start(node_current, instance, extra_args):
1851       _ShutdownInstanceDisks(instance, self.cfg)
1852       raise errors.OpExecError, ("Could not start instance")
1853
1854     self.cfg.MarkInstanceUp(instance.name)
1855
1856
1857 class LUShutdownInstance(LogicalUnit):
1858   """Shutdown an instance.
1859
1860   """
1861   HPATH = "instance-stop"
1862   HTYPE = constants.HTYPE_INSTANCE
1863   _OP_REQP = ["instance_name"]
1864
1865   def BuildHooksEnv(self):
1866     """Build hooks env.
1867
1868     This runs on master, primary and secondary nodes of the instance.
1869
1870     """
1871     env = _BuildInstanceHookEnvByObject(self.instance)
1872     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1873           list(self.instance.secondary_nodes))
1874     return env, nl, nl
1875
1876   def CheckPrereq(self):
1877     """Check prerequisites.
1878
1879     This checks that the instance is in the cluster.
1880
1881     """
1882     instance = self.cfg.GetInstanceInfo(
1883       self.cfg.ExpandInstanceName(self.op.instance_name))
1884     if instance is None:
1885       raise errors.OpPrereqError, ("Instance '%s' not known" %
1886                                    self.op.instance_name)
1887     self.instance = instance
1888
1889   def Exec(self, feedback_fn):
1890     """Shutdown the instance.
1891
1892     """
1893     instance = self.instance
1894     node_current = instance.primary_node
1895     if not rpc.call_instance_shutdown(node_current, instance):
1896       logger.Error("could not shutdown instance")
1897
1898     self.cfg.MarkInstanceDown(instance.name)
1899     _ShutdownInstanceDisks(instance, self.cfg)
1900
1901
1902 class LUReinstallInstance(LogicalUnit):
1903   """Reinstall an instance.
1904
1905   """
1906   HPATH = "instance-reinstall"
1907   HTYPE = constants.HTYPE_INSTANCE
1908   _OP_REQP = ["instance_name"]
1909
1910   def BuildHooksEnv(self):
1911     """Build hooks env.
1912
1913     This runs on master, primary and secondary nodes of the instance.
1914
1915     """
1916     env = _BuildInstanceHookEnvByObject(self.instance)
1917     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1918           list(self.instance.secondary_nodes))
1919     return env, nl, nl
1920
1921   def CheckPrereq(self):
1922     """Check prerequisites.
1923
1924     This checks that the instance is in the cluster and is not running.
1925
1926     """
1927     instance = self.cfg.GetInstanceInfo(
1928       self.cfg.ExpandInstanceName(self.op.instance_name))
1929     if instance is None:
1930       raise errors.OpPrereqError, ("Instance '%s' not known" %
1931                                    self.op.instance_name)
1932     if instance.disk_template == constants.DT_DISKLESS:
1933       raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1934                                    self.op.instance_name)
1935     if instance.status != "down":
1936       raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1937                                    self.op.instance_name)
1938     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1939     if remote_info:
1940       raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1941                                    (self.op.instance_name,
1942                                     instance.primary_node))
1943
1944     self.op.os_type = getattr(self.op, "os_type", None)
1945     if self.op.os_type is not None:
1946       # OS verification
1947       pnode = self.cfg.GetNodeInfo(
1948         self.cfg.ExpandNodeName(instance.primary_node))
1949       if pnode is None:
1950         raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1951                                      self.op.pnode)
1952       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1953       if not isinstance(os_obj, objects.OS):
1954         raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1955                                      " primary node"  % self.op.os_type)
1956
1957     self.instance = instance
1958
1959   def Exec(self, feedback_fn):
1960     """Reinstall the instance.
1961
1962     """
1963     inst = self.instance
1964
1965     if self.op.os_type is not None:
1966       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1967       inst.os = self.op.os_type
1968       self.cfg.AddInstance(inst)
1969
1970     _StartInstanceDisks(self.cfg, inst, None)
1971     try:
1972       feedback_fn("Running the instance OS create scripts...")
1973       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1974         raise errors.OpExecError, ("Could not install OS for instance %s "
1975                                    "on node %s" %
1976                                    (inst.name, inst.primary_node))
1977     finally:
1978       _ShutdownInstanceDisks(inst, self.cfg)
1979
1980
1981 class LURemoveInstance(LogicalUnit):
1982   """Remove an instance.
1983
1984   """
1985   HPATH = "instance-remove"
1986   HTYPE = constants.HTYPE_INSTANCE
1987   _OP_REQP = ["instance_name"]
1988
1989   def BuildHooksEnv(self):
1990     """Build hooks env.
1991
1992     This runs on master, primary and secondary nodes of the instance.
1993
1994     """
1995     env = _BuildInstanceHookEnvByObject(self.instance)
1996     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1997           list(self.instance.secondary_nodes))
1998     return env, nl, nl
1999
2000   def CheckPrereq(self):
2001     """Check prerequisites.
2002
2003     This checks that the instance is in the cluster.
2004
2005     """
2006     instance = self.cfg.GetInstanceInfo(
2007       self.cfg.ExpandInstanceName(self.op.instance_name))
2008     if instance is None:
2009       raise errors.OpPrereqError, ("Instance '%s' not known" %
2010                                    self.op.instance_name)
2011     self.instance = instance
2012
2013   def Exec(self, feedback_fn):
2014     """Remove the instance.
2015
2016     """
2017     instance = self.instance
2018     logger.Info("shutting down instance %s on node %s" %
2019                 (instance.name, instance.primary_node))
2020
2021     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2022       raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
2023                                  (instance.name, instance.primary_node))
2024
2025     logger.Info("removing block devices for instance %s" % instance.name)
2026
2027     _RemoveDisks(instance, self.cfg)
2028
2029     logger.Info("removing instance %s out of cluster config" % instance.name)
2030
2031     self.cfg.RemoveInstance(instance.name)
2032
2033
2034 class LUQueryInstances(NoHooksLU):
2035   """Logical unit for querying instances.
2036
2037   """
2038   _OP_REQP = ["output_fields"]
2039
2040   def CheckPrereq(self):
2041     """Check prerequisites.
2042
2043     This checks that the fields required are valid output fields.
2044
2045     """
2046     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2047     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2048                                "admin_state", "admin_ram",
2049                                "disk_template", "ip", "mac", "bridge"],
2050                        dynamic=self.dynamic_fields,
2051                        selected=self.op.output_fields)
2052
2053   def Exec(self, feedback_fn):
2054     """Computes the list of nodes and their attributes.
2055
2056     """
2057     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2058     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2059                      in instance_names]
2060
2061     # begin data gathering
2062
2063     nodes = frozenset([inst.primary_node for inst in instance_list])
2064
2065     bad_nodes = []
2066     if self.dynamic_fields.intersection(self.op.output_fields):
2067       live_data = {}
2068       node_data = rpc.call_all_instances_info(nodes)
2069       for name in nodes:
2070         result = node_data[name]
2071         if result:
2072           live_data.update(result)
2073         elif result == False:
2074           bad_nodes.append(name)
2075         # else no instance is alive
2076     else:
2077       live_data = dict([(name, {}) for name in instance_names])
2078
2079     # end data gathering
2080
2081     output = []
2082     for instance in instance_list:
2083       iout = []
2084       for field in self.op.output_fields:
2085         if field == "name":
2086           val = instance.name
2087         elif field == "os":
2088           val = instance.os
2089         elif field == "pnode":
2090           val = instance.primary_node
2091         elif field == "snodes":
2092           val = ",".join(instance.secondary_nodes) or "-"
2093         elif field == "admin_state":
2094           if instance.status == "down":
2095             val = "no"
2096           else:
2097             val = "yes"
2098         elif field == "oper_state":
2099           if instance.primary_node in bad_nodes:
2100             val = "(node down)"
2101           else:
2102             if live_data.get(instance.name):
2103               val = "running"
2104             else:
2105               val = "stopped"
2106         elif field == "admin_ram":
2107           val = instance.memory
2108         elif field == "oper_ram":
2109           if instance.primary_node in bad_nodes:
2110             val = "(node down)"
2111           elif instance.name in live_data:
2112             val = live_data[instance.name].get("memory", "?")
2113           else:
2114             val = "-"
2115         elif field == "disk_template":
2116           val = instance.disk_template
2117         elif field == "ip":
2118           val = instance.nics[0].ip
2119         elif field == "bridge":
2120           val = instance.nics[0].bridge
2121         elif field == "mac":
2122           val = instance.nics[0].mac
2123         else:
2124           raise errors.ParameterError, field
2125         val = str(val)
2126         iout.append(val)
2127       output.append(iout)
2128
2129     return output
2130
2131
2132 class LUFailoverInstance(LogicalUnit):
2133   """Failover an instance.
2134
2135   """
2136   HPATH = "instance-failover"
2137   HTYPE = constants.HTYPE_INSTANCE
2138   _OP_REQP = ["instance_name", "ignore_consistency"]
2139
2140   def BuildHooksEnv(self):
2141     """Build hooks env.
2142
2143     This runs on master, primary and secondary nodes of the instance.
2144
2145     """
2146     env = {
2147       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2148       }
2149     env.update(_BuildInstanceHookEnvByObject(self.instance))
2150     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2151     return env, nl, nl
2152
2153   def CheckPrereq(self):
2154     """Check prerequisites.
2155
2156     This checks that the instance is in the cluster.
2157
2158     """
2159     instance = self.cfg.GetInstanceInfo(
2160       self.cfg.ExpandInstanceName(self.op.instance_name))
2161     if instance is None:
2162       raise errors.OpPrereqError, ("Instance '%s' not known" %
2163                                    self.op.instance_name)
2164
2165     # check memory requirements on the secondary node
2166     target_node = instance.secondary_nodes[0]
2167     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2168     info = nodeinfo.get(target_node, None)
2169     if not info:
2170       raise errors.OpPrereqError, ("Cannot get current information"
2171                                    " from node '%s'" % nodeinfo)
2172     if instance.memory > info['memory_free']:
2173       raise errors.OpPrereqError, ("Not enough memory on target node %s."
2174                                    " %d MB available, %d MB required" %
2175                                    (target_node, info['memory_free'],
2176                                     instance.memory))
2177
2178     # check bridge existance
2179     brlist = [nic.bridge for nic in instance.nics]
2180     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2181       raise errors.OpPrereqError, ("one or more target bridges %s does not"
2182                                    " exist on destination node '%s'" %
2183                                    (brlist, instance.primary_node))
2184
2185     self.instance = instance
2186
2187   def Exec(self, feedback_fn):
2188     """Failover an instance.
2189
2190     The failover is done by shutting it down on its present node and
2191     starting it on the secondary.
2192
2193     """
2194     instance = self.instance
2195
2196     source_node = instance.primary_node
2197     target_node = instance.secondary_nodes[0]
2198
2199     feedback_fn("* checking disk consistency between source and target")
2200     for dev in instance.disks:
2201       # for remote_raid1, these are md over drbd
2202       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2203         if not self.op.ignore_consistency:
2204           raise errors.OpExecError, ("Disk %s is degraded on target node,"
2205                                      " aborting failover." % dev.iv_name)
2206
2207     feedback_fn("* checking target node resource availability")
2208     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2209
2210     if not nodeinfo:
2211       raise errors.OpExecError, ("Could not contact target node %s." %
2212                                  target_node)
2213
2214     free_memory = int(nodeinfo[target_node]['memory_free'])
2215     memory = instance.memory
2216     if memory > free_memory:
2217       raise errors.OpExecError, ("Not enough memory to create instance %s on"
2218                                  " node %s. needed %s MiB, available %s MiB" %
2219                                  (instance.name, target_node, memory,
2220                                   free_memory))
2221
2222     feedback_fn("* shutting down instance on source node")
2223     logger.Info("Shutting down instance %s on node %s" %
2224                 (instance.name, source_node))
2225
2226     if not rpc.call_instance_shutdown(source_node, instance):
2227       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2228                    " anyway. Please make sure node %s is down"  %
2229                    (instance.name, source_node, source_node))
2230
2231     feedback_fn("* deactivating the instance's disks on source node")
2232     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2233       raise errors.OpExecError, ("Can't shut down the instance's disks.")
2234
2235     instance.primary_node = target_node
2236     # distribute new instance config to the other nodes
2237     self.cfg.AddInstance(instance)
2238
2239     feedback_fn("* activating the instance's disks on target node")
2240     logger.Info("Starting instance %s on node %s" %
2241                 (instance.name, target_node))
2242
2243     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2244                                              ignore_secondaries=True)
2245     if not disks_ok:
2246       _ShutdownInstanceDisks(instance, self.cfg)
2247       raise errors.OpExecError, ("Can't activate the instance's disks")
2248
2249     feedback_fn("* starting the instance on the target node")
2250     if not rpc.call_instance_start(target_node, instance, None):
2251       _ShutdownInstanceDisks(instance, self.cfg)
2252       raise errors.OpExecError("Could not start instance %s on node %s." %
2253                                (instance.name, target_node))
2254
2255
2256 def _CreateBlockDevOnPrimary(cfg, node, device):
2257   """Create a tree of block devices on the primary node.
2258
2259   This always creates all devices.
2260
2261   """
2262   if device.children:
2263     for child in device.children:
2264       if not _CreateBlockDevOnPrimary(cfg, node, child):
2265         return False
2266
2267   cfg.SetDiskID(device, node)
2268   new_id = rpc.call_blockdev_create(node, device, device.size, True)
2269   if not new_id:
2270     return False
2271   if device.physical_id is None:
2272     device.physical_id = new_id
2273   return True
2274
2275
2276 def _CreateBlockDevOnSecondary(cfg, node, device, force):
2277   """Create a tree of block devices on a secondary node.
2278
2279   If this device type has to be created on secondaries, create it and
2280   all its children.
2281
2282   If not, just recurse to children keeping the same 'force' value.
2283
2284   """
2285   if device.CreateOnSecondary():
2286     force = True
2287   if device.children:
2288     for child in device.children:
2289       if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2290         return False
2291
2292   if not force:
2293     return True
2294   cfg.SetDiskID(device, node)
2295   new_id = rpc.call_blockdev_create(node, device, device.size, False)
2296   if not new_id:
2297     return False
2298   if device.physical_id is None:
2299     device.physical_id = new_id
2300   return True
2301
2302
2303 def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2304   """Generate a drbd device complete with its children.
2305
2306   """
2307   port = cfg.AllocatePort()
2308   base = "%s_%s" % (base, port)
2309   dev_data = objects.Disk(dev_type="lvm", size=size,
2310                           logical_id=(vgname, "%s.data" % base))
2311   dev_meta = objects.Disk(dev_type="lvm", size=128,
2312                           logical_id=(vgname, "%s.meta" % base))
2313   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2314                           logical_id = (primary, secondary, port),
2315                           children = [dev_data, dev_meta])
2316   return drbd_dev
2317
2318
2319 def _GenerateDiskTemplate(cfg, vgname, template_name,
2320                           instance_name, primary_node,
2321                           secondary_nodes, disk_sz, swap_sz):
2322   """Generate the entire disk layout for a given template type.
2323
2324   """
2325   #TODO: compute space requirements
2326
2327   if template_name == "diskless":
2328     disks = []
2329   elif template_name == "plain":
2330     if len(secondary_nodes) != 0:
2331       raise errors.ProgrammerError("Wrong template configuration")
2332     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2333                            logical_id=(vgname, "%s.os" % instance_name),
2334                            iv_name = "sda")
2335     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2336                            logical_id=(vgname, "%s.swap" % instance_name),
2337                            iv_name = "sdb")
2338     disks = [sda_dev, sdb_dev]
2339   elif template_name == "local_raid1":
2340     if len(secondary_nodes) != 0:
2341       raise errors.ProgrammerError("Wrong template configuration")
2342     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2343                               logical_id=(vgname, "%s.os_m1" % instance_name))
2344     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2345                               logical_id=(vgname, "%s.os_m2" % instance_name))
2346     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2347                               size=disk_sz,
2348                               children = [sda_dev_m1, sda_dev_m2])
2349     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2350                               logical_id=(vgname, "%s.swap_m1" %
2351                                           instance_name))
2352     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2353                               logical_id=(vgname, "%s.swap_m2" %
2354                                           instance_name))
2355     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2356                               size=swap_sz,
2357                               children = [sdb_dev_m1, sdb_dev_m2])
2358     disks = [md_sda_dev, md_sdb_dev]
2359   elif template_name == "remote_raid1":
2360     if len(secondary_nodes) != 1:
2361       raise errors.ProgrammerError("Wrong template configuration")
2362     remote_node = secondary_nodes[0]
2363     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2364                                          primary_node, remote_node, disk_sz,
2365                                          "%s-sda" % instance_name)
2366     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2367                               children = [drbd_sda_dev], size=disk_sz)
2368     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2369                                          primary_node, remote_node, swap_sz,
2370                                          "%s-sdb" % instance_name)
2371     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2372                               children = [drbd_sdb_dev], size=swap_sz)
2373     disks = [md_sda_dev, md_sdb_dev]
2374   else:
2375     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2376   return disks
2377
2378
2379 def _CreateDisks(cfg, instance):
2380   """Create all disks for an instance.
2381
2382   This abstracts away some work from AddInstance.
2383
2384   Args:
2385     instance: the instance object
2386
2387   Returns:
2388     True or False showing the success of the creation process
2389
2390   """
2391   for device in instance.disks:
2392     logger.Info("creating volume %s for instance %s" %
2393               (device.iv_name, instance.name))
2394     #HARDCODE
2395     for secondary_node in instance.secondary_nodes:
2396       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2397         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2398                      (device.iv_name, device, secondary_node))
2399         return False
2400     #HARDCODE
2401     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2402       logger.Error("failed to create volume %s on primary!" %
2403                    device.iv_name)
2404       return False
2405   return True
2406
2407
2408 def _RemoveDisks(instance, cfg):
2409   """Remove all disks for an instance.
2410
2411   This abstracts away some work from `AddInstance()` and
2412   `RemoveInstance()`. Note that in case some of the devices couldn't
2413   be remove, the removal will continue with the other ones (compare
2414   with `_CreateDisks()`).
2415
2416   Args:
2417     instance: the instance object
2418
2419   Returns:
2420     True or False showing the success of the removal proces
2421
2422   """
2423   logger.Info("removing block devices for instance %s" % instance.name)
2424
2425   result = True
2426   for device in instance.disks:
2427     for node, disk in device.ComputeNodeTree(instance.primary_node):
2428       cfg.SetDiskID(disk, node)
2429       if not rpc.call_blockdev_remove(node, disk):
2430         logger.Error("could not remove block device %s on node %s,"
2431                      " continuing anyway" %
2432                      (device.iv_name, node))
2433         result = False
2434   return result
2435
2436
2437 class LUCreateInstance(LogicalUnit):
2438   """Create an instance.
2439
2440   """
2441   HPATH = "instance-add"
2442   HTYPE = constants.HTYPE_INSTANCE
2443   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2444               "disk_template", "swap_size", "mode", "start", "vcpus",
2445               "wait_for_sync"]
2446
2447   def BuildHooksEnv(self):
2448     """Build hooks env.
2449
2450     This runs on master, primary and secondary nodes of the instance.
2451
2452     """
2453     env = {
2454       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2455       "INSTANCE_DISK_SIZE": self.op.disk_size,
2456       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2457       "INSTANCE_ADD_MODE": self.op.mode,
2458       }
2459     if self.op.mode == constants.INSTANCE_IMPORT:
2460       env["INSTANCE_SRC_NODE"] = self.op.src_node
2461       env["INSTANCE_SRC_PATH"] = self.op.src_path
2462       env["INSTANCE_SRC_IMAGE"] = self.src_image
2463
2464     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2465       primary_node=self.op.pnode,
2466       secondary_nodes=self.secondaries,
2467       status=self.instance_status,
2468       os_type=self.op.os_type,
2469       memory=self.op.mem_size,
2470       vcpus=self.op.vcpus,
2471       nics=[(self.inst_ip, self.op.bridge)],
2472     ))
2473
2474     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2475           self.secondaries)
2476     return env, nl, nl
2477
2478
2479   def CheckPrereq(self):
2480     """Check prerequisites.
2481
2482     """
2483     if self.op.mode not in (constants.INSTANCE_CREATE,
2484                             constants.INSTANCE_IMPORT):
2485       raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2486                                    self.op.mode)
2487
2488     if self.op.mode == constants.INSTANCE_IMPORT:
2489       src_node = getattr(self.op, "src_node", None)
2490       src_path = getattr(self.op, "src_path", None)
2491       if src_node is None or src_path is None:
2492         raise errors.OpPrereqError, ("Importing an instance requires source"
2493                                      " node and path options")
2494       src_node_full = self.cfg.ExpandNodeName(src_node)
2495       if src_node_full is None:
2496         raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2497       self.op.src_node = src_node = src_node_full
2498
2499       if not os.path.isabs(src_path):
2500         raise errors.OpPrereqError, ("The source path must be absolute")
2501
2502       export_info = rpc.call_export_info(src_node, src_path)
2503
2504       if not export_info:
2505         raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2506
2507       if not export_info.has_section(constants.INISECT_EXP):
2508         raise errors.ProgrammerError, ("Corrupted export config")
2509
2510       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2511       if (int(ei_version) != constants.EXPORT_VERSION):
2512         raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2513                                      (ei_version, constants.EXPORT_VERSION))
2514
2515       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2516         raise errors.OpPrereqError, ("Can't import instance with more than"
2517                                      " one data disk")
2518
2519       # FIXME: are the old os-es, disk sizes, etc. useful?
2520       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2521       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2522                                                          'disk0_dump'))
2523       self.src_image = diskimage
2524     else: # INSTANCE_CREATE
2525       if getattr(self.op, "os_type", None) is None:
2526         raise errors.OpPrereqError, ("No guest OS specified")
2527
2528     # check primary node
2529     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2530     if pnode is None:
2531       raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2532                                    self.op.pnode)
2533     self.op.pnode = pnode.name
2534     self.pnode = pnode
2535     self.secondaries = []
2536     # disk template and mirror node verification
2537     if self.op.disk_template not in constants.DISK_TEMPLATES:
2538       raise errors.OpPrereqError, ("Invalid disk template name")
2539
2540     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2541       if getattr(self.op, "snode", None) is None:
2542         raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2543                                      " a mirror node")
2544
2545       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2546       if snode_name is None:
2547         raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2548                                      self.op.snode)
2549       elif snode_name == pnode.name:
2550         raise errors.OpPrereqError, ("The secondary node cannot be"
2551                                      " the primary node.")
2552       self.secondaries.append(snode_name)
2553
2554     # Check lv size requirements
2555     nodenames = [pnode.name] + self.secondaries
2556     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2557
2558     # Required free disk space as a function of disk and swap space
2559     req_size_dict = {
2560       constants.DT_DISKLESS: 0,
2561       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2562       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2563       # 256 MB are added for drbd metadata, 128MB for each drbd device
2564       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2565     }
2566
2567     if self.op.disk_template not in req_size_dict:
2568       raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2569                                      " is unknown" %  self.op.disk_template)
2570
2571     req_size = req_size_dict[self.op.disk_template]
2572
2573     for node in nodenames:
2574       info = nodeinfo.get(node, None)
2575       if not info:
2576         raise errors.OpPrereqError, ("Cannot get current information"
2577                                      " from node '%s'" % nodeinfo)
2578       if req_size > info['vg_free']:
2579         raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2580                                      " %d MB available, %d MB required" %
2581                                      (node, info['vg_free'], req_size))
2582
2583     # os verification
2584     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2585     if not isinstance(os_obj, objects.OS):
2586       raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2587                                    " primary node"  % self.op.os_type)
2588
2589     # instance verification
2590     hostname1 = utils.LookupHostname(self.op.instance_name)
2591     if not hostname1:
2592       raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2593                                    self.op.instance_name)
2594
2595     self.op.instance_name = instance_name = hostname1['hostname']
2596     instance_list = self.cfg.GetInstanceList()
2597     if instance_name in instance_list:
2598       raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2599                                    instance_name)
2600
2601     ip = getattr(self.op, "ip", None)
2602     if ip is None or ip.lower() == "none":
2603       inst_ip = None
2604     elif ip.lower() == "auto":
2605       inst_ip = hostname1['ip']
2606     else:
2607       if not utils.IsValidIP(ip):
2608         raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2609                                      " like a valid IP" % ip)
2610       inst_ip = ip
2611     self.inst_ip = inst_ip
2612
2613     command = ["fping", "-q", hostname1['ip']]
2614     result = utils.RunCmd(command)
2615     if not result.failed:
2616       raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2617                                    (hostname1['ip'], instance_name))
2618
2619     # bridge verification
2620     bridge = getattr(self.op, "bridge", None)
2621     if bridge is None:
2622       self.op.bridge = self.cfg.GetDefBridge()
2623     else:
2624       self.op.bridge = bridge
2625
2626     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2627       raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2628                                    " destination node '%s'" %
2629                                    (self.op.bridge, pnode.name))
2630
2631     if self.op.start:
2632       self.instance_status = 'up'
2633     else:
2634       self.instance_status = 'down'
2635
2636   def Exec(self, feedback_fn):
2637     """Create and add the instance to the cluster.
2638
2639     """
2640     instance = self.op.instance_name
2641     pnode_name = self.pnode.name
2642
2643     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2644     if self.inst_ip is not None:
2645       nic.ip = self.inst_ip
2646
2647     disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2648                                   self.op.disk_template,
2649                                   instance, pnode_name,
2650                                   self.secondaries, self.op.disk_size,
2651                                   self.op.swap_size)
2652
2653     iobj = objects.Instance(name=instance, os=self.op.os_type,
2654                             primary_node=pnode_name,
2655                             memory=self.op.mem_size,
2656                             vcpus=self.op.vcpus,
2657                             nics=[nic], disks=disks,
2658                             disk_template=self.op.disk_template,
2659                             status=self.instance_status,
2660                             )
2661
2662     feedback_fn("* creating instance disks...")
2663     if not _CreateDisks(self.cfg, iobj):
2664       _RemoveDisks(iobj, self.cfg)
2665       raise errors.OpExecError, ("Device creation failed, reverting...")
2666
2667     feedback_fn("adding instance %s to cluster config" % instance)
2668
2669     self.cfg.AddInstance(iobj)
2670
2671     if self.op.wait_for_sync:
2672       disk_abort = not _WaitForSync(self.cfg, iobj)
2673     elif iobj.disk_template == "remote_raid1":
2674       # make sure the disks are not degraded (still sync-ing is ok)
2675       time.sleep(15)
2676       feedback_fn("* checking mirrors status")
2677       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2678     else:
2679       disk_abort = False
2680
2681     if disk_abort:
2682       _RemoveDisks(iobj, self.cfg)
2683       self.cfg.RemoveInstance(iobj.name)
2684       raise errors.OpExecError, ("There are some degraded disks for"
2685                                       " this instance")
2686
2687     feedback_fn("creating os for instance %s on node %s" %
2688                 (instance, pnode_name))
2689
2690     if iobj.disk_template != constants.DT_DISKLESS:
2691       if self.op.mode == constants.INSTANCE_CREATE:
2692         feedback_fn("* running the instance OS create scripts...")
2693         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2694           raise errors.OpExecError, ("could not add os for instance %s"
2695                                           " on node %s" %
2696                                           (instance, pnode_name))
2697
2698       elif self.op.mode == constants.INSTANCE_IMPORT:
2699         feedback_fn("* running the instance OS import scripts...")
2700         src_node = self.op.src_node
2701         src_image = self.src_image
2702         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2703                                                 src_node, src_image):
2704           raise errors.OpExecError, ("Could not import os for instance"
2705                                           " %s on node %s" %
2706                                           (instance, pnode_name))
2707       else:
2708         # also checked in the prereq part
2709         raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2710                                        % self.op.mode)
2711
2712     if self.op.start:
2713       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2714       feedback_fn("* starting instance...")
2715       if not rpc.call_instance_start(pnode_name, iobj, None):
2716         raise errors.OpExecError, ("Could not start instance")
2717
2718
2719 class LUConnectConsole(NoHooksLU):
2720   """Connect to an instance's console.
2721
2722   This is somewhat special in that it returns the command line that
2723   you need to run on the master node in order to connect to the
2724   console.
2725
2726   """
2727   _OP_REQP = ["instance_name"]
2728
2729   def CheckPrereq(self):
2730     """Check prerequisites.
2731
2732     This checks that the instance is in the cluster.
2733
2734     """
2735     instance = self.cfg.GetInstanceInfo(
2736       self.cfg.ExpandInstanceName(self.op.instance_name))
2737     if instance is None:
2738       raise errors.OpPrereqError, ("Instance '%s' not known" %
2739                                    self.op.instance_name)
2740     self.instance = instance
2741
2742   def Exec(self, feedback_fn):
2743     """Connect to the console of an instance
2744
2745     """
2746     instance = self.instance
2747     node = instance.primary_node
2748
2749     node_insts = rpc.call_instance_list([node])[node]
2750     if node_insts is False:
2751       raise errors.OpExecError, ("Can't connect to node %s." % node)
2752
2753     if instance.name not in node_insts:
2754       raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2755
2756     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2757
2758     hyper = hypervisor.GetHypervisor()
2759     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2760     return node, console_cmd
2761
2762
2763 class LUAddMDDRBDComponent(LogicalUnit):
2764   """Adda new mirror member to an instance's disk.
2765
2766   """
2767   HPATH = "mirror-add"
2768   HTYPE = constants.HTYPE_INSTANCE
2769   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2770
2771   def BuildHooksEnv(self):
2772     """Build hooks env.
2773
2774     This runs on the master, the primary and all the secondaries.
2775
2776     """
2777     env = {
2778       "NEW_SECONDARY": self.op.remote_node,
2779       "DISK_NAME": self.op.disk_name,
2780       }
2781     env.update(_BuildInstanceHookEnvByObject(self.instance))
2782     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2783           self.op.remote_node,] + list(self.instance.secondary_nodes)
2784     return env, nl, nl
2785
2786   def CheckPrereq(self):
2787     """Check prerequisites.
2788
2789     This checks that the instance is in the cluster.
2790
2791     """
2792     instance = self.cfg.GetInstanceInfo(
2793       self.cfg.ExpandInstanceName(self.op.instance_name))
2794     if instance is None:
2795       raise errors.OpPrereqError, ("Instance '%s' not known" %
2796                                    self.op.instance_name)
2797     self.instance = instance
2798
2799     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2800     if remote_node is None:
2801       raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2802     self.remote_node = remote_node
2803
2804     if remote_node == instance.primary_node:
2805       raise errors.OpPrereqError, ("The specified node is the primary node of"
2806                                    " the instance.")
2807
2808     if instance.disk_template != constants.DT_REMOTE_RAID1:
2809       raise errors.OpPrereqError, ("Instance's disk layout is not"
2810                                    " remote_raid1.")
2811     for disk in instance.disks:
2812       if disk.iv_name == self.op.disk_name:
2813         break
2814     else:
2815       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2816                                    " instance." % self.op.disk_name)
2817     if len(disk.children) > 1:
2818       raise errors.OpPrereqError, ("The device already has two slave"
2819                                    " devices.\n"
2820                                    "This would create a 3-disk raid1"
2821                                    " which we don't allow.")
2822     self.disk = disk
2823
2824   def Exec(self, feedback_fn):
2825     """Add the mirror component
2826
2827     """
2828     disk = self.disk
2829     instance = self.instance
2830
2831     remote_node = self.remote_node
2832     new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2833                                      instance.primary_node, remote_node,
2834                                      disk.size, "%s-%s" %
2835                                      (instance.name, self.op.disk_name))
2836
2837     logger.Info("adding new mirror component on secondary")
2838     #HARDCODE
2839     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2840       raise errors.OpExecError, ("Failed to create new component on secondary"
2841                                  " node %s" % remote_node)
2842
2843     logger.Info("adding new mirror component on primary")
2844     #HARDCODE
2845     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2846       # remove secondary dev
2847       self.cfg.SetDiskID(new_drbd, remote_node)
2848       rpc.call_blockdev_remove(remote_node, new_drbd)
2849       raise errors.OpExecError, ("Failed to create volume on primary")
2850
2851     # the device exists now
2852     # call the primary node to add the mirror to md
2853     logger.Info("adding new mirror component to md")
2854     if not rpc.call_blockdev_addchild(instance.primary_node,
2855                                            disk, new_drbd):
2856       logger.Error("Can't add mirror compoment to md!")
2857       self.cfg.SetDiskID(new_drbd, remote_node)
2858       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2859         logger.Error("Can't rollback on secondary")
2860       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2861       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2862         logger.Error("Can't rollback on primary")
2863       raise errors.OpExecError, "Can't add mirror component to md array"
2864
2865     disk.children.append(new_drbd)
2866
2867     self.cfg.AddInstance(instance)
2868
2869     _WaitForSync(self.cfg, instance)
2870
2871     return 0
2872
2873
2874 class LURemoveMDDRBDComponent(LogicalUnit):
2875   """Remove a component from a remote_raid1 disk.
2876
2877   """
2878   HPATH = "mirror-remove"
2879   HTYPE = constants.HTYPE_INSTANCE
2880   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2881
2882   def BuildHooksEnv(self):
2883     """Build hooks env.
2884
2885     This runs on the master, the primary and all the secondaries.
2886
2887     """
2888     env = {
2889       "DISK_NAME": self.op.disk_name,
2890       "DISK_ID": self.op.disk_id,
2891       "OLD_SECONDARY": self.old_secondary,
2892       }
2893     env.update(_BuildInstanceHookEnvByObject(self.instance))
2894     nl = [self.sstore.GetMasterNode(),
2895           self.instance.primary_node] + list(self.instance.secondary_nodes)
2896     return env, nl, nl
2897
2898   def CheckPrereq(self):
2899     """Check prerequisites.
2900
2901     This checks that the instance is in the cluster.
2902
2903     """
2904     instance = self.cfg.GetInstanceInfo(
2905       self.cfg.ExpandInstanceName(self.op.instance_name))
2906     if instance is None:
2907       raise errors.OpPrereqError, ("Instance '%s' not known" %
2908                                    self.op.instance_name)
2909     self.instance = instance
2910
2911     if instance.disk_template != constants.DT_REMOTE_RAID1:
2912       raise errors.OpPrereqError, ("Instance's disk layout is not"
2913                                    " remote_raid1.")
2914     for disk in instance.disks:
2915       if disk.iv_name == self.op.disk_name:
2916         break
2917     else:
2918       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2919                                    " instance." % self.op.disk_name)
2920     for child in disk.children:
2921       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2922         break
2923     else:
2924       raise errors.OpPrereqError, ("Can't find the device with this port.")
2925
2926     if len(disk.children) < 2:
2927       raise errors.OpPrereqError, ("Cannot remove the last component from"
2928                                    " a mirror.")
2929     self.disk = disk
2930     self.child = child
2931     if self.child.logical_id[0] == instance.primary_node:
2932       oid = 1
2933     else:
2934       oid = 0
2935     self.old_secondary = self.child.logical_id[oid]
2936
2937   def Exec(self, feedback_fn):
2938     """Remove the mirror component
2939
2940     """
2941     instance = self.instance
2942     disk = self.disk
2943     child = self.child
2944     logger.Info("remove mirror component")
2945     self.cfg.SetDiskID(disk, instance.primary_node)
2946     if not rpc.call_blockdev_removechild(instance.primary_node,
2947                                               disk, child):
2948       raise errors.OpExecError, ("Can't remove child from mirror.")
2949
2950     for node in child.logical_id[:2]:
2951       self.cfg.SetDiskID(child, node)
2952       if not rpc.call_blockdev_remove(node, child):
2953         logger.Error("Warning: failed to remove device from node %s,"
2954                      " continuing operation." % node)
2955
2956     disk.children.remove(child)
2957     self.cfg.AddInstance(instance)
2958
2959
2960 class LUReplaceDisks(LogicalUnit):
2961   """Replace the disks of an instance.
2962
2963   """
2964   HPATH = "mirrors-replace"
2965   HTYPE = constants.HTYPE_INSTANCE
2966   _OP_REQP = ["instance_name"]
2967
2968   def BuildHooksEnv(self):
2969     """Build hooks env.
2970
2971     This runs on the master, the primary and all the secondaries.
2972
2973     """
2974     env = {
2975       "NEW_SECONDARY": self.op.remote_node,
2976       "OLD_SECONDARY": self.instance.secondary_nodes[0],
2977       }
2978     env.update(_BuildInstanceHookEnvByObject(self.instance))
2979     nl = [self.sstore.GetMasterNode(),
2980           self.instance.primary_node] + list(self.instance.secondary_nodes)
2981     return env, nl, nl
2982
2983   def CheckPrereq(self):
2984     """Check prerequisites.
2985
2986     This checks that the instance is in the cluster.
2987
2988     """
2989     instance = self.cfg.GetInstanceInfo(
2990       self.cfg.ExpandInstanceName(self.op.instance_name))
2991     if instance is None:
2992       raise errors.OpPrereqError, ("Instance '%s' not known" %
2993                                    self.op.instance_name)
2994     self.instance = instance
2995
2996     if instance.disk_template != constants.DT_REMOTE_RAID1:
2997       raise errors.OpPrereqError, ("Instance's disk layout is not"
2998                                    " remote_raid1.")
2999
3000     if len(instance.secondary_nodes) != 1:
3001       raise errors.OpPrereqError, ("The instance has a strange layout,"
3002                                    " expected one secondary but found %d" %
3003                                    len(instance.secondary_nodes))
3004
3005     remote_node = getattr(self.op, "remote_node", None)
3006     if remote_node is None:
3007       remote_node = instance.secondary_nodes[0]
3008     else:
3009       remote_node = self.cfg.ExpandNodeName(remote_node)
3010       if remote_node is None:
3011         raise errors.OpPrereqError, ("Node '%s' not known" %
3012                                      self.op.remote_node)
3013     if remote_node == instance.primary_node:
3014       raise errors.OpPrereqError, ("The specified node is the primary node of"
3015                                    " the instance.")
3016     self.op.remote_node = remote_node
3017
3018   def Exec(self, feedback_fn):
3019     """Replace the disks of an instance.
3020
3021     """
3022     instance = self.instance
3023     iv_names = {}
3024     # start of work
3025     remote_node = self.op.remote_node
3026     cfg = self.cfg
3027     vgname = cfg.GetVGName()
3028     for dev in instance.disks:
3029       size = dev.size
3030       new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
3031                                        remote_node, size,
3032                                        "%s-%s" % (instance.name, dev.iv_name))
3033       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3034       logger.Info("adding new mirror component on secondary for %s" %
3035                   dev.iv_name)
3036       #HARDCODE
3037       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
3038         raise errors.OpExecError, ("Failed to create new component on"
3039                                    " secondary node %s\n"
3040                                    "Full abort, cleanup manually!" %
3041                                    remote_node)
3042
3043       logger.Info("adding new mirror component on primary")
3044       #HARDCODE
3045       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
3046         # remove secondary dev
3047         cfg.SetDiskID(new_drbd, remote_node)
3048         rpc.call_blockdev_remove(remote_node, new_drbd)
3049         raise errors.OpExecError("Failed to create volume on primary!\n"
3050                                  "Full abort, cleanup manually!!")
3051
3052       # the device exists now
3053       # call the primary node to add the mirror to md
3054       logger.Info("adding new mirror component to md")
3055       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3056                                         new_drbd):
3057         logger.Error("Can't add mirror compoment to md!")
3058         cfg.SetDiskID(new_drbd, remote_node)
3059         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3060           logger.Error("Can't rollback on secondary")
3061         cfg.SetDiskID(new_drbd, instance.primary_node)
3062         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3063           logger.Error("Can't rollback on primary")
3064         raise errors.OpExecError, ("Full abort, cleanup manually!!")
3065
3066       dev.children.append(new_drbd)
3067       cfg.AddInstance(instance)
3068
3069     # this can fail as the old devices are degraded and _WaitForSync
3070     # does a combined result over all disks, so we don't check its
3071     # return value
3072     _WaitForSync(cfg, instance, unlock=True)
3073
3074     # so check manually all the devices
3075     for name in iv_names:
3076       dev, child, new_drbd = iv_names[name]
3077       cfg.SetDiskID(dev, instance.primary_node)
3078       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3079       if is_degr:
3080         raise errors.OpExecError, ("MD device %s is degraded!" % name)
3081       cfg.SetDiskID(new_drbd, instance.primary_node)
3082       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3083       if is_degr:
3084         raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3085
3086     for name in iv_names:
3087       dev, child, new_drbd = iv_names[name]
3088       logger.Info("remove mirror %s component" % name)
3089       cfg.SetDiskID(dev, instance.primary_node)
3090       if not rpc.call_blockdev_removechild(instance.primary_node,
3091                                                 dev, child):
3092         logger.Error("Can't remove child from mirror, aborting"
3093                      " *this device cleanup*.\nYou need to cleanup manually!!")
3094         continue
3095
3096       for node in child.logical_id[:2]:
3097         logger.Info("remove child device on %s" % node)
3098         cfg.SetDiskID(child, node)
3099         if not rpc.call_blockdev_remove(node, child):
3100           logger.Error("Warning: failed to remove device from node %s,"
3101                        " continuing operation." % node)
3102
3103       dev.children.remove(child)
3104
3105       cfg.AddInstance(instance)
3106
3107
3108 class LUQueryInstanceData(NoHooksLU):
3109   """Query runtime instance data.
3110
3111   """
3112   _OP_REQP = ["instances"]
3113
3114   def CheckPrereq(self):
3115     """Check prerequisites.
3116
3117     This only checks the optional instance list against the existing names.
3118
3119     """
3120     if not isinstance(self.op.instances, list):
3121       raise errors.OpPrereqError, "Invalid argument type 'instances'"
3122     if self.op.instances:
3123       self.wanted_instances = []
3124       names = self.op.instances
3125       for name in names:
3126         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3127         if instance is None:
3128           raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3129       self.wanted_instances.append(instance)
3130     else:
3131       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3132                                in self.cfg.GetInstanceList()]
3133     return
3134
3135
3136   def _ComputeDiskStatus(self, instance, snode, dev):
3137     """Compute block device status.
3138
3139     """
3140     self.cfg.SetDiskID(dev, instance.primary_node)
3141     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3142     if dev.dev_type == "drbd":
3143       # we change the snode then (otherwise we use the one passed in)
3144       if dev.logical_id[0] == instance.primary_node:
3145         snode = dev.logical_id[1]
3146       else:
3147         snode = dev.logical_id[0]
3148
3149     if snode:
3150       self.cfg.SetDiskID(dev, snode)
3151       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3152     else:
3153       dev_sstatus = None
3154
3155     if dev.children:
3156       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3157                       for child in dev.children]
3158     else:
3159       dev_children = []
3160
3161     data = {
3162       "iv_name": dev.iv_name,
3163       "dev_type": dev.dev_type,
3164       "logical_id": dev.logical_id,
3165       "physical_id": dev.physical_id,
3166       "pstatus": dev_pstatus,
3167       "sstatus": dev_sstatus,
3168       "children": dev_children,
3169       }
3170
3171     return data
3172
3173   def Exec(self, feedback_fn):
3174     """Gather and return data"""
3175     result = {}
3176     for instance in self.wanted_instances:
3177       remote_info = rpc.call_instance_info(instance.primary_node,
3178                                                 instance.name)
3179       if remote_info and "state" in remote_info:
3180         remote_state = "up"
3181       else:
3182         remote_state = "down"
3183       if instance.status == "down":
3184         config_state = "down"
3185       else:
3186         config_state = "up"
3187
3188       disks = [self._ComputeDiskStatus(instance, None, device)
3189                for device in instance.disks]
3190
3191       idict = {
3192         "name": instance.name,
3193         "config_state": config_state,
3194         "run_state": remote_state,
3195         "pnode": instance.primary_node,
3196         "snodes": instance.secondary_nodes,
3197         "os": instance.os,
3198         "memory": instance.memory,
3199         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3200         "disks": disks,
3201         }
3202
3203       result[instance.name] = idict
3204
3205     return result
3206
3207
3208 class LUQueryNodeData(NoHooksLU):
3209   """Logical unit for querying node data.
3210
3211   """
3212   _OP_REQP = ["nodes"]
3213
3214   def CheckPrereq(self):
3215     """Check prerequisites.
3216
3217     This only checks the optional node list against the existing names.
3218
3219     """
3220     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3221
3222   def Exec(self, feedback_fn):
3223     """Compute and return the list of nodes.
3224
3225     """
3226     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3227              in self.cfg.GetInstanceList()]
3228     result = []
3229     for node in self.wanted_nodes:
3230       result.append((node.name, node.primary_ip, node.secondary_ip,
3231                      [inst.name for inst in ilist
3232                       if inst.primary_node == node.name],
3233                      [inst.name for inst in ilist
3234                       if node.name in inst.secondary_nodes],
3235                      ))
3236     return result
3237
3238
3239 class LUSetInstanceParms(LogicalUnit):
3240   """Modifies an instances's parameters.
3241
3242   """
3243   HPATH = "instance-modify"
3244   HTYPE = constants.HTYPE_INSTANCE
3245   _OP_REQP = ["instance_name"]
3246
3247   def BuildHooksEnv(self):
3248     """Build hooks env.
3249
3250     This runs on the master, primary and secondaries.
3251
3252     """
3253     args = dict()
3254     if self.mem:
3255       args['memory'] = self.mem
3256     if self.vcpus:
3257       args['vcpus'] = self.vcpus
3258     if self.do_ip or self.do_bridge:
3259       if self.do_ip:
3260         ip = self.ip
3261       else:
3262         ip = self.instance.nics[0].ip
3263       if self.bridge:
3264         bridge = self.bridge
3265       else:
3266         bridge = self.instance.nics[0].bridge
3267       args['nics'] = [(ip, bridge)]
3268     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3269     nl = [self.sstore.GetMasterNode(),
3270           self.instance.primary_node] + list(self.instance.secondary_nodes)
3271     return env, nl, nl
3272
3273   def CheckPrereq(self):
3274     """Check prerequisites.
3275
3276     This only checks the instance list against the existing names.
3277
3278     """
3279     self.mem = getattr(self.op, "mem", None)
3280     self.vcpus = getattr(self.op, "vcpus", None)
3281     self.ip = getattr(self.op, "ip", None)
3282     self.bridge = getattr(self.op, "bridge", None)
3283     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3284       raise errors.OpPrereqError, ("No changes submitted")
3285     if self.mem is not None:
3286       try:
3287         self.mem = int(self.mem)
3288       except ValueError, err:
3289         raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3290     if self.vcpus is not None:
3291       try:
3292         self.vcpus = int(self.vcpus)
3293       except ValueError, err:
3294         raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3295     if self.ip is not None:
3296       self.do_ip = True
3297       if self.ip.lower() == "none":
3298         self.ip = None
3299       else:
3300         if not utils.IsValidIP(self.ip):
3301           raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3302     else:
3303       self.do_ip = False
3304     self.do_bridge = (self.bridge is not None)
3305
3306     instance = self.cfg.GetInstanceInfo(
3307       self.cfg.ExpandInstanceName(self.op.instance_name))
3308     if instance is None:
3309       raise errors.OpPrereqError, ("No such instance name '%s'" %
3310                                    self.op.instance_name)
3311     self.op.instance_name = instance.name
3312     self.instance = instance
3313     return
3314
3315   def Exec(self, feedback_fn):
3316     """Modifies an instance.
3317
3318     All parameters take effect only at the next restart of the instance.
3319     """
3320     result = []
3321     instance = self.instance
3322     if self.mem:
3323       instance.memory = self.mem
3324       result.append(("mem", self.mem))
3325     if self.vcpus:
3326       instance.vcpus = self.vcpus
3327       result.append(("vcpus",  self.vcpus))
3328     if self.do_ip:
3329       instance.nics[0].ip = self.ip
3330       result.append(("ip", self.ip))
3331     if self.bridge:
3332       instance.nics[0].bridge = self.bridge
3333       result.append(("bridge", self.bridge))
3334
3335     self.cfg.AddInstance(instance)
3336
3337     return result
3338
3339
3340 class LUQueryExports(NoHooksLU):
3341   """Query the exports list
3342
3343   """
3344   _OP_REQP = []
3345
3346   def CheckPrereq(self):
3347     """Check that the nodelist contains only existing nodes.
3348
3349     """
3350     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3351
3352   def Exec(self, feedback_fn):
3353     """Compute the list of all the exported system images.
3354
3355     Returns:
3356       a dictionary with the structure node->(export-list)
3357       where export-list is a list of the instances exported on
3358       that node.
3359
3360     """
3361     return rpc.call_export_list([node.name for node in self.nodes])
3362
3363
3364 class LUExportInstance(LogicalUnit):
3365   """Export an instance to an image in the cluster.
3366
3367   """
3368   HPATH = "instance-export"
3369   HTYPE = constants.HTYPE_INSTANCE
3370   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3371
3372   def BuildHooksEnv(self):
3373     """Build hooks env.
3374
3375     This will run on the master, primary node and target node.
3376
3377     """
3378     env = {
3379       "EXPORT_NODE": self.op.target_node,
3380       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3381       }
3382     env.update(_BuildInstanceHookEnvByObject(self.instance))
3383     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3384           self.op.target_node]
3385     return env, nl, nl
3386
3387   def CheckPrereq(self):
3388     """Check prerequisites.
3389
3390     This checks that the instance name is a valid one.
3391
3392     """
3393     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3394     self.instance = self.cfg.GetInstanceInfo(instance_name)
3395     if self.instance is None:
3396       raise errors.OpPrereqError, ("Instance '%s' not found" %
3397                                    self.op.instance_name)
3398
3399     # node verification
3400     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3401     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3402
3403     if self.dst_node is None:
3404       raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3405                                    self.op.target_node)
3406     self.op.target_node = self.dst_node.name
3407
3408   def Exec(self, feedback_fn):
3409     """Export an instance to an image in the cluster.
3410
3411     """
3412     instance = self.instance
3413     dst_node = self.dst_node
3414     src_node = instance.primary_node
3415     # shutdown the instance, unless requested not to do so
3416     if self.op.shutdown:
3417       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3418       self.processor.ChainOpCode(op, feedback_fn)
3419
3420     vgname = self.cfg.GetVGName()
3421
3422     snap_disks = []
3423
3424     try:
3425       for disk in instance.disks:
3426         if disk.iv_name == "sda":
3427           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3428           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3429
3430           if not new_dev_name:
3431             logger.Error("could not snapshot block device %s on node %s" %
3432                          (disk.logical_id[1], src_node))
3433           else:
3434             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3435                                       logical_id=(vgname, new_dev_name),
3436                                       physical_id=(vgname, new_dev_name),
3437                                       iv_name=disk.iv_name)
3438             snap_disks.append(new_dev)
3439
3440     finally:
3441       if self.op.shutdown:
3442         op = opcodes.OpStartupInstance(instance_name=instance.name,
3443                                        force=False)
3444         self.processor.ChainOpCode(op, feedback_fn)
3445
3446     # TODO: check for size
3447
3448     for dev in snap_disks:
3449       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3450                                            instance):
3451         logger.Error("could not export block device %s from node"
3452                      " %s to node %s" %
3453                      (dev.logical_id[1], src_node, dst_node.name))
3454       if not rpc.call_blockdev_remove(src_node, dev):
3455         logger.Error("could not remove snapshot block device %s from"
3456                      " node %s" % (dev.logical_id[1], src_node))
3457
3458     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3459       logger.Error("could not finalize export for instance %s on node %s" %
3460                    (instance.name, dst_node.name))
3461
3462     nodelist = self.cfg.GetNodeList()
3463     nodelist.remove(dst_node.name)
3464
3465     # on one-node clusters nodelist will be empty after the removal
3466     # if we proceed the backup would be removed because OpQueryExports
3467     # substitutes an empty list with the full cluster node list.
3468     if nodelist:
3469       op = opcodes.OpQueryExports(nodes=nodelist)
3470       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3471       for node in exportlist:
3472         if instance.name in exportlist[node]:
3473           if not rpc.call_export_remove(node, instance.name):
3474             logger.Error("could not remove older export for instance %s"
3475                          " on node %s" % (instance.name, node))