Unify environment variables for instance related hooks.
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81                                      attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError, ("Cluster not initialized yet,"
85                                      " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError, ("Commands must be run on the master"
90                                        " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError, ("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206                                  % ",".join(frozenset(selected).
207                                             difference(all_fields)))
208
209
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os, status,
211                           memory, vcpus, nics):
212   """
213   """
214   env = {
215     "INSTANCE_NAME": name,
216     "INSTANCE_PRIMARY": primary_node,
217     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
218     "INSTANCE_OS": os,
219     "INSTANCE_STATUS": status,
220     "INSTANCE_MEMORY": memory,
221     "INSTANCE_VCPUS": vcpus,
222   }
223
224   if nics:
225     nic_count = len(nics)
226     for idx, (ip, bridge) in enumerate(nics):
227       if ip is None:
228         ip = ""
229       env["INSTANCE_NIC%d_IP" % idx] = ip
230       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
231   else:
232     nic_count = 0
233
234   env["INSTANCE_NIC_COUNT"] = nic_count
235
236   return env
237
238
239 def _BuildInstanceHookEnvByObject(instance, override=None):
240   args = {
241     'name': instance.name,
242     'primary_node': instance.primary_node,
243     'secondary_nodes': instance.secondary_nodes,
244     'os': instance.os,
245     'status': instance.os,
246     'memory': instance.memory,
247     'vcpus': instance.vcpus,
248     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
249   }
250   if override:
251     args.update(override)
252   return _BuildInstanceHookEnv(**args)
253
254
255 def _UpdateEtcHosts(fullnode, ip):
256   """Ensure a node has a correct entry in /etc/hosts.
257
258   Args:
259     fullnode - Fully qualified domain name of host. (str)
260     ip       - IPv4 address of host (str)
261
262   """
263   node = fullnode.split(".", 1)[0]
264
265   f = open('/etc/hosts', 'r+')
266
267   inthere = False
268
269   save_lines = []
270   add_lines = []
271   removed = False
272
273   while True:
274     rawline = f.readline()
275
276     if not rawline:
277       # End of file
278       break
279
280     line = rawline.split('\n')[0]
281
282     # Strip off comments
283     line = line.split('#')[0]
284
285     if not line:
286       # Entire line was comment, skip
287       save_lines.append(rawline)
288       continue
289
290     fields = line.split()
291
292     haveall = True
293     havesome = False
294     for spec in [ ip, fullnode, node ]:
295       if spec not in fields:
296         haveall = False
297       if spec in fields:
298         havesome = True
299
300     if haveall:
301       inthere = True
302       save_lines.append(rawline)
303       continue
304
305     if havesome and not haveall:
306       # Line (old, or manual?) which is missing some.  Remove.
307       removed = True
308       continue
309
310     save_lines.append(rawline)
311
312   if not inthere:
313     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
314
315   if removed:
316     if add_lines:
317       save_lines = save_lines + add_lines
318
319     # We removed a line, write a new file and replace old.
320     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
321     newfile = os.fdopen(fd, 'w')
322     newfile.write(''.join(save_lines))
323     newfile.close()
324     os.rename(tmpname, '/etc/hosts')
325
326   elif add_lines:
327     # Simply appending a new line will do the trick.
328     f.seek(0, 2)
329     for add in add_lines:
330       f.write(add)
331
332   f.close()
333
334
335 def _UpdateKnownHosts(fullnode, ip, pubkey):
336   """Ensure a node has a correct known_hosts entry.
337
338   Args:
339     fullnode - Fully qualified domain name of host. (str)
340     ip       - IPv4 address of host (str)
341     pubkey   - the public key of the cluster
342
343   """
344   if os.path.exists('/etc/ssh/ssh_known_hosts'):
345     f = open('/etc/ssh/ssh_known_hosts', 'r+')
346   else:
347     f = open('/etc/ssh/ssh_known_hosts', 'w+')
348
349   inthere = False
350
351   save_lines = []
352   add_lines = []
353   removed = False
354
355   while True:
356     rawline = f.readline()
357     logger.Debug('read %s' % (repr(rawline),))
358
359     if not rawline:
360       # End of file
361       break
362
363     line = rawline.split('\n')[0]
364
365     parts = line.split(' ')
366     fields = parts[0].split(',')
367     key = parts[2]
368
369     haveall = True
370     havesome = False
371     for spec in [ ip, fullnode ]:
372       if spec not in fields:
373         haveall = False
374       if spec in fields:
375         havesome = True
376
377     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
378     if haveall and key == pubkey:
379       inthere = True
380       save_lines.append(rawline)
381       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
382       continue
383
384     if havesome and (not haveall or key != pubkey):
385       removed = True
386       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
387       continue
388
389     save_lines.append(rawline)
390
391   if not inthere:
392     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
393     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
394
395   if removed:
396     save_lines = save_lines + add_lines
397
398     # Write a new file and replace old.
399     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
400     newfile = os.fdopen(fd, 'w')
401     newfile.write(''.join(save_lines))
402     newfile.close()
403     logger.Debug("Wrote new known_hosts.")
404     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
405
406   elif add_lines:
407     # Simply appending a new line will do the trick.
408     f.seek(0, 2)
409     for add in add_lines:
410       f.write(add)
411
412   f.close()
413
414
415 def _HasValidVG(vglist, vgname):
416   """Checks if the volume group list is valid.
417
418   A non-None return value means there's an error, and the return value
419   is the error message.
420
421   """
422   vgsize = vglist.get(vgname, None)
423   if vgsize is None:
424     return "volume group '%s' missing" % vgname
425   elif vgsize < 20480:
426     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
427             (vgname, vgsize))
428   return None
429
430
431 def _InitSSHSetup(node):
432   """Setup the SSH configuration for the cluster.
433
434
435   This generates a dsa keypair for root, adds the pub key to the
436   permitted hosts and adds the hostkey to its own known hosts.
437
438   Args:
439     node: the name of this host as a fqdn
440
441   """
442   utils.RemoveFile('/root/.ssh/known_hosts')
443
444   if os.path.exists('/root/.ssh/id_dsa'):
445     utils.CreateBackup('/root/.ssh/id_dsa')
446   if os.path.exists('/root/.ssh/id_dsa.pub'):
447     utils.CreateBackup('/root/.ssh/id_dsa.pub')
448
449   utils.RemoveFile('/root/.ssh/id_dsa')
450   utils.RemoveFile('/root/.ssh/id_dsa.pub')
451
452   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
453                          "-f", "/root/.ssh/id_dsa",
454                          "-q", "-N", ""])
455   if result.failed:
456     raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
457                                result.output)
458
459   f = open('/root/.ssh/id_dsa.pub', 'r')
460   try:
461     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
462   finally:
463     f.close()
464
465
466 def _InitGanetiServerSetup(ss):
467   """Setup the necessary configuration for the initial node daemon.
468
469   This creates the nodepass file containing the shared password for
470   the cluster and also generates the SSL certificate.
471
472   """
473   # Create pseudo random password
474   randpass = sha.new(os.urandom(64)).hexdigest()
475   # and write it into sstore
476   ss.SetKey(ss.SS_NODED_PASS, randpass)
477
478   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
479                          "-days", str(365*5), "-nodes", "-x509",
480                          "-keyout", constants.SSL_CERT_FILE,
481                          "-out", constants.SSL_CERT_FILE, "-batch"])
482   if result.failed:
483     raise errors.OpExecError, ("could not generate server ssl cert, command"
484                                " %s had exitcode %s and error message %s" %
485                                (result.cmd, result.exit_code, result.output))
486
487   os.chmod(constants.SSL_CERT_FILE, 0400)
488
489   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
490
491   if result.failed:
492     raise errors.OpExecError, ("could not start the node daemon, command %s"
493                                " had exitcode %s and error %s" %
494                                (result.cmd, result.exit_code, result.output))
495
496
497 class LUInitCluster(LogicalUnit):
498   """Initialise the cluster.
499
500   """
501   HPATH = "cluster-init"
502   HTYPE = constants.HTYPE_CLUSTER
503   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
504               "def_bridge", "master_netdev"]
505   REQ_CLUSTER = False
506
507   def BuildHooksEnv(self):
508     """Build hooks env.
509
510     Notes: Since we don't require a cluster, we must manually add
511     ourselves in the post-run node list.
512
513     """
514     env = {
515       "CLUSTER": self.op.cluster_name,
516       "MASTER": self.hostname['hostname_full'],
517       }
518     return env, [], [self.hostname['hostname_full']]
519
520   def CheckPrereq(self):
521     """Verify that the passed name is a valid one.
522
523     """
524     if config.ConfigWriter.IsCluster():
525       raise errors.OpPrereqError, ("Cluster is already initialised")
526
527     hostname_local = socket.gethostname()
528     self.hostname = hostname = utils.LookupHostname(hostname_local)
529     if not hostname:
530       raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
531                                    hostname_local)
532
533     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
534     if not clustername:
535       raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
536                                    % self.op.cluster_name)
537
538     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
539     if result.failed:
540       raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
541                                    " to %s,\nbut this ip address does not"
542                                    " belong to this host."
543                                    " Aborting." % hostname['ip'])
544
545     secondary_ip = getattr(self.op, "secondary_ip", None)
546     if secondary_ip and not utils.IsValidIP(secondary_ip):
547       raise errors.OpPrereqError, ("Invalid secondary ip given")
548     if secondary_ip and secondary_ip != hostname['ip']:
549       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
550       if result.failed:
551         raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
552                                      "but it does not belong to this host." %
553                                      secondary_ip)
554     self.secondary_ip = secondary_ip
555
556     # checks presence of the volume group given
557     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
558
559     if vgstatus:
560       raise errors.OpPrereqError, ("Error: %s" % vgstatus)
561
562     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
563                     self.op.mac_prefix):
564       raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
565                                    self.op.mac_prefix)
566
567     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
568       raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
569                                    self.op.hypervisor_type)
570
571     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
572     if result.failed:
573       raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
574                                    (self.op.master_netdev, result.output))
575
576   def Exec(self, feedback_fn):
577     """Initialize the cluster.
578
579     """
580     clustername = self.clustername
581     hostname = self.hostname
582
583     # set up the simple store
584     ss = ssconf.SimpleStore()
585     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
586     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
587     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
588     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
589     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
590
591     # set up the inter-node password and certificate
592     _InitGanetiServerSetup(ss)
593
594     # start the master ip
595     rpc.call_node_start_master(hostname['hostname_full'])
596
597     # set up ssh config and /etc/hosts
598     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
599     try:
600       sshline = f.read()
601     finally:
602       f.close()
603     sshkey = sshline.split(" ")[1]
604
605     _UpdateEtcHosts(hostname['hostname_full'],
606                     hostname['ip'],
607                     )
608
609     _UpdateKnownHosts(hostname['hostname_full'],
610                       hostname['ip'],
611                       sshkey,
612                       )
613
614     _InitSSHSetup(hostname['hostname'])
615
616     # init of cluster config file
617     cfgw = config.ConfigWriter()
618     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
619                     sshkey, self.op.mac_prefix,
620                     self.op.vg_name, self.op.def_bridge)
621
622
623 class LUDestroyCluster(NoHooksLU):
624   """Logical unit for destroying the cluster.
625
626   """
627   _OP_REQP = []
628
629   def CheckPrereq(self):
630     """Check prerequisites.
631
632     This checks whether the cluster is empty.
633
634     Any errors are signalled by raising errors.OpPrereqError.
635
636     """
637     master = self.sstore.GetMasterNode()
638
639     nodelist = self.cfg.GetNodeList()
640     if len(nodelist) != 1 or nodelist[0] != master:
641       raise errors.OpPrereqError, ("There are still %d node(s) in "
642                                    "this cluster." % (len(nodelist) - 1))
643     instancelist = self.cfg.GetInstanceList()
644     if instancelist:
645       raise errors.OpPrereqError, ("There are still %d instance(s) in "
646                                    "this cluster." % len(instancelist))
647
648   def Exec(self, feedback_fn):
649     """Destroys the cluster.
650
651     """
652     utils.CreateBackup('/root/.ssh/id_dsa')
653     utils.CreateBackup('/root/.ssh/id_dsa.pub')
654     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
655
656
657 class LUVerifyCluster(NoHooksLU):
658   """Verifies the cluster status.
659
660   """
661   _OP_REQP = []
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn(" - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     hyp_result = node_result.get('hypervisor', None)
729     if hyp_result is not None:
730       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731     return bad
732
733   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
734     """Verify an instance.
735
736     This function checks to see if the required block devices are
737     available on the instance's node.
738
739     """
740     bad = False
741
742     instancelist = self.cfg.GetInstanceList()
743     if not instance in instancelist:
744       feedback_fn("  - ERROR: instance %s not in instance list %s" %
745                       (instance, instancelist))
746       bad = True
747
748     instanceconfig = self.cfg.GetInstanceInfo(instance)
749     node_current = instanceconfig.primary_node
750
751     node_vol_should = {}
752     instanceconfig.MapLVsByNode(node_vol_should)
753
754     for node in node_vol_should:
755       for volume in node_vol_should[node]:
756         if node not in node_vol_is or volume not in node_vol_is[node]:
757           feedback_fn("  - ERROR: volume %s missing on node %s" %
758                           (volume, node))
759           bad = True
760
761     if not instanceconfig.status == 'down':
762       if not instance in node_instance[node_current]:
763         feedback_fn("  - ERROR: instance %s not running on node %s" %
764                         (instance, node_current))
765         bad = True
766
767     for node in node_instance:
768       if (not node == node_current):
769         if instance in node_instance[node]:
770           feedback_fn("  - ERROR: instance %s should not run on node %s" %
771                           (instance, node))
772           bad = True
773
774     return not bad
775
776   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
777     """Verify if there are any unknown volumes in the cluster.
778
779     The .os, .swap and backup volumes are ignored. All other volumes are
780     reported as unknown.
781
782     """
783     bad = False
784
785     for node in node_vol_is:
786       for volume in node_vol_is[node]:
787         if node not in node_vol_should or volume not in node_vol_should[node]:
788           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
789                       (volume, node))
790           bad = True
791     return bad
792
793   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
794     """Verify the list of running instances.
795
796     This checks what instances are running but unknown to the cluster.
797
798     """
799     bad = False
800     for node in node_instance:
801       for runninginstance in node_instance[node]:
802         if runninginstance not in instancelist:
803           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
804                           (runninginstance, node))
805           bad = True
806     return bad
807
808   def CheckPrereq(self):
809     """Check prerequisites.
810
811     This has no prerequisites.
812
813     """
814     pass
815
816   def Exec(self, feedback_fn):
817     """Verify integrity of cluster, performing various test on nodes.
818
819     """
820     bad = False
821     feedback_fn("* Verifying global settings")
822     self.cfg.VerifyConfig()
823
824     master = self.sstore.GetMasterNode()
825     vg_name = self.cfg.GetVGName()
826     nodelist = utils.NiceSort(self.cfg.GetNodeList())
827     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
828     node_volume = {}
829     node_instance = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     file_names = list(self.sstore.GetFileList())
834     file_names.append(constants.SSL_CERT_FILE)
835     file_names.append(constants.CLUSTER_CONF_FILE)
836     local_checksums = utils.FingerprintFiles(file_names)
837
838     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
839     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
840     all_instanceinfo = rpc.call_instance_list(nodelist)
841     all_vglist = rpc.call_vg_list(nodelist)
842     node_verify_param = {
843       'filelist': file_names,
844       'nodelist': nodelist,
845       'hypervisor': None,
846       }
847     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
848     all_rversion = rpc.call_version(nodelist)
849
850     for node in nodelist:
851       feedback_fn("* Verifying node %s" % node)
852       result = self._VerifyNode(node, file_names, local_checksums,
853                                 all_vglist[node], all_nvinfo[node],
854                                 all_rversion[node], feedback_fn)
855       bad = bad or result
856
857       # node_volume
858       volumeinfo = all_volumeinfo[node]
859
860       if type(volumeinfo) != dict:
861         feedback_fn("  - ERROR: connection to %s failed" % (node,))
862         bad = True
863         continue
864
865       node_volume[node] = volumeinfo
866
867       # node_instance
868       nodeinstance = all_instanceinfo[node]
869       if type(nodeinstance) != list:
870         feedback_fn("  - ERROR: connection to %s failed" % (node,))
871         bad = True
872         continue
873
874       node_instance[node] = nodeinstance
875
876     node_vol_should = {}
877
878     for instance in instancelist:
879       feedback_fn("* Verifying instance %s" % instance)
880       result =  self._VerifyInstance(instance, node_volume, node_instance,
881                                      feedback_fn)
882       bad = bad or result
883
884       inst_config = self.cfg.GetInstanceInfo(instance)
885
886       inst_config.MapLVsByNode(node_vol_should)
887
888     feedback_fn("* Verifying orphan volumes")
889     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
890                                        feedback_fn)
891     bad = bad or result
892
893     feedback_fn("* Verifying remaining instances")
894     result = self._VerifyOrphanInstances(instancelist, node_instance,
895                                          feedback_fn)
896     bad = bad or result
897
898     return int(bad)
899
900
901 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
902   """Sleep and poll for an instance's disk to sync.
903
904   """
905   if not instance.disks:
906     return True
907
908   if not oneshot:
909     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
910
911   node = instance.primary_node
912
913   for dev in instance.disks:
914     cfgw.SetDiskID(dev, node)
915
916   retries = 0
917   while True:
918     max_time = 0
919     done = True
920     cumul_degraded = False
921     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
922     if not rstats:
923       logger.ToStderr("Can't get any data from node %s" % node)
924       retries += 1
925       if retries >= 10:
926         raise errors.RemoteError, ("Can't contact node %s for mirror data,"
927                                    " aborting." % node)
928       time.sleep(6)
929       continue
930     retries = 0
931     for i in range(len(rstats)):
932       mstat = rstats[i]
933       if mstat is None:
934         logger.ToStderr("Can't compute data for node %s/%s" %
935                         (node, instance.disks[i].iv_name))
936         continue
937       perc_done, est_time, is_degraded = mstat
938       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
939       if perc_done is not None:
940         done = False
941         if est_time is not None:
942           rem_time = "%d estimated seconds remaining" % est_time
943           max_time = est_time
944         else:
945           rem_time = "no time estimate"
946         logger.ToStdout("- device %s: %5.2f%% done, %s" %
947                         (instance.disks[i].iv_name, perc_done, rem_time))
948     if done or oneshot:
949       break
950
951     if unlock:
952       utils.Unlock('cmd')
953     try:
954       time.sleep(min(60, max_time))
955     finally:
956       if unlock:
957         utils.Lock('cmd')
958
959   if done:
960     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
961   return not cumul_degraded
962
963
964 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
965   """Check that mirrors are not degraded.
966
967   """
968   cfgw.SetDiskID(dev, node)
969
970   result = True
971   if on_primary or dev.AssembleOnSecondary():
972     rstats = rpc.call_blockdev_find(node, dev)
973     if not rstats:
974       logger.ToStderr("Can't get any data from node %s" % node)
975       result = False
976     else:
977       result = result and (not rstats[5])
978   if dev.children:
979     for child in dev.children:
980       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
981
982   return result
983
984
985 class LUDiagnoseOS(NoHooksLU):
986   """Logical unit for OS diagnose/query.
987
988   """
989   _OP_REQP = []
990
991   def CheckPrereq(self):
992     """Check prerequisites.
993
994     This always succeeds, since this is a pure query LU.
995
996     """
997     return
998
999   def Exec(self, feedback_fn):
1000     """Compute the list of OSes.
1001
1002     """
1003     node_list = self.cfg.GetNodeList()
1004     node_data = rpc.call_os_diagnose(node_list)
1005     if node_data == False:
1006       raise errors.OpExecError, "Can't gather the list of OSes"
1007     return node_data
1008
1009
1010 class LURemoveNode(LogicalUnit):
1011   """Logical unit for removing a node.
1012
1013   """
1014   HPATH = "node-remove"
1015   HTYPE = constants.HTYPE_NODE
1016   _OP_REQP = ["node_name"]
1017
1018   def BuildHooksEnv(self):
1019     """Build hooks env.
1020
1021     This doesn't run on the target node in the pre phase as a failed
1022     node would not allows itself to run.
1023
1024     """
1025     env = {
1026       "NODE_NAME": self.op.node_name,
1027       }
1028     all_nodes = self.cfg.GetNodeList()
1029     all_nodes.remove(self.op.node_name)
1030     return env, all_nodes, all_nodes
1031
1032   def CheckPrereq(self):
1033     """Check prerequisites.
1034
1035     This checks:
1036      - the node exists in the configuration
1037      - it does not have primary or secondary instances
1038      - it's not the master
1039
1040     Any errors are signalled by raising errors.OpPrereqError.
1041
1042     """
1043     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1044     if node is None:
1045       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1046       return 1
1047
1048     instance_list = self.cfg.GetInstanceList()
1049
1050     masternode = self.sstore.GetMasterNode()
1051     if node.name == masternode:
1052       raise errors.OpPrereqError, ("Node is the master node,"
1053                                    " you need to failover first.")
1054
1055     for instance_name in instance_list:
1056       instance = self.cfg.GetInstanceInfo(instance_name)
1057       if node.name == instance.primary_node:
1058         raise errors.OpPrereqError, ("Instance %s still running on the node,"
1059                                      " please remove first." % instance_name)
1060       if node.name in instance.secondary_nodes:
1061         raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1062                                      " please remove first." % instance_name)
1063     self.op.node_name = node.name
1064     self.node = node
1065
1066   def Exec(self, feedback_fn):
1067     """Removes the node from the cluster.
1068
1069     """
1070     node = self.node
1071     logger.Info("stopping the node daemon and removing configs from node %s" %
1072                 node.name)
1073
1074     rpc.call_node_leave_cluster(node.name)
1075
1076     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1077
1078     logger.Info("Removing node %s from config" % node.name)
1079
1080     self.cfg.RemoveNode(node.name)
1081
1082
1083 class LUQueryNodes(NoHooksLU):
1084   """Logical unit for querying nodes.
1085
1086   """
1087   _OP_REQP = ["output_fields"]
1088
1089   def CheckPrereq(self):
1090     """Check prerequisites.
1091
1092     This checks that the fields required are valid output fields.
1093
1094     """
1095     self.dynamic_fields = frozenset(["dtotal", "dfree",
1096                                      "mtotal", "mnode", "mfree"])
1097
1098     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1099                        dynamic=self.dynamic_fields,
1100                        selected=self.op.output_fields)
1101
1102
1103   def Exec(self, feedback_fn):
1104     """Computes the list of nodes and their attributes.
1105
1106     """
1107     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1108     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1109
1110
1111     # begin data gathering
1112
1113     if self.dynamic_fields.intersection(self.op.output_fields):
1114       live_data = {}
1115       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1116       for name in nodenames:
1117         nodeinfo = node_data.get(name, None)
1118         if nodeinfo:
1119           live_data[name] = {
1120             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1121             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1122             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1123             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1124             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1125             }
1126         else:
1127           live_data[name] = {}
1128     else:
1129       live_data = dict.fromkeys(nodenames, {})
1130
1131     node_to_primary = dict.fromkeys(nodenames, 0)
1132     node_to_secondary = dict.fromkeys(nodenames, 0)
1133
1134     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1135       instancelist = self.cfg.GetInstanceList()
1136
1137       for instance in instancelist:
1138         instanceinfo = self.cfg.GetInstanceInfo(instance)
1139         node_to_primary[instanceinfo.primary_node] += 1
1140         for secnode in instanceinfo.secondary_nodes:
1141           node_to_secondary[secnode] += 1
1142
1143     # end data gathering
1144
1145     output = []
1146     for node in nodelist:
1147       node_output = []
1148       for field in self.op.output_fields:
1149         if field == "name":
1150           val = node.name
1151         elif field == "pinst":
1152           val = node_to_primary[node.name]
1153         elif field == "sinst":
1154           val = node_to_secondary[node.name]
1155         elif field == "pip":
1156           val = node.primary_ip
1157         elif field == "sip":
1158           val = node.secondary_ip
1159         elif field in self.dynamic_fields:
1160           val = live_data[node.name].get(field, "?")
1161         else:
1162           raise errors.ParameterError, field
1163         val = str(val)
1164         node_output.append(val)
1165       output.append(node_output)
1166
1167     return output
1168
1169
1170 class LUQueryNodeVolumes(NoHooksLU):
1171   """Logical unit for getting volumes on node(s).
1172
1173   """
1174   _OP_REQP = ["nodes", "output_fields"]
1175
1176   def CheckPrereq(self):
1177     """Check prerequisites.
1178
1179     This checks that the fields required are valid output fields.
1180
1181     """
1182     self.nodes = _GetWantedNodes(self, self.op.nodes)
1183
1184     _CheckOutputFields(static=["node"],
1185                        dynamic=["phys", "vg", "name", "size", "instance"],
1186                        selected=self.op.output_fields)
1187
1188
1189   def Exec(self, feedback_fn):
1190     """Computes the list of nodes and their attributes.
1191
1192     """
1193     nodenames = utils.NiceSort([node.name for node in self.nodes])
1194     volumes = rpc.call_node_volumes(nodenames)
1195
1196     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1197              in self.cfg.GetInstanceList()]
1198
1199     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1200
1201     output = []
1202     for node in nodenames:
1203       node_vols = volumes[node][:]
1204       node_vols.sort(key=lambda vol: vol['dev'])
1205
1206       for vol in node_vols:
1207         node_output = []
1208         for field in self.op.output_fields:
1209           if field == "node":
1210             val = node
1211           elif field == "phys":
1212             val = vol['dev']
1213           elif field == "vg":
1214             val = vol['vg']
1215           elif field == "name":
1216             val = vol['name']
1217           elif field == "size":
1218             val = int(float(vol['size']))
1219           elif field == "instance":
1220             for inst in ilist:
1221               if node not in lv_by_node[inst]:
1222                 continue
1223               if vol['name'] in lv_by_node[inst][node]:
1224                 val = inst.name
1225                 break
1226             else:
1227               val = '-'
1228           else:
1229             raise errors.ParameterError, field
1230           node_output.append(str(val))
1231
1232         output.append(node_output)
1233
1234     return output
1235
1236
1237 class LUAddNode(LogicalUnit):
1238   """Logical unit for adding node to the cluster.
1239
1240   """
1241   HPATH = "node-add"
1242   HTYPE = constants.HTYPE_NODE
1243   _OP_REQP = ["node_name"]
1244
1245   def BuildHooksEnv(self):
1246     """Build hooks env.
1247
1248     This will run on all nodes before, and on all nodes + the new node after.
1249
1250     """
1251     env = {
1252       "NODE_NAME": self.op.node_name,
1253       "NODE_PIP": self.op.primary_ip,
1254       "NODE_SIP": self.op.secondary_ip,
1255       }
1256     nodes_0 = self.cfg.GetNodeList()
1257     nodes_1 = nodes_0 + [self.op.node_name, ]
1258     return env, nodes_0, nodes_1
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This checks:
1264      - the new node is not already in the config
1265      - it is resolvable
1266      - its parameters (single/dual homed) matches the cluster
1267
1268     Any errors are signalled by raising errors.OpPrereqError.
1269
1270     """
1271     node_name = self.op.node_name
1272     cfg = self.cfg
1273
1274     dns_data = utils.LookupHostname(node_name)
1275     if not dns_data:
1276       raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1277
1278     node = dns_data['hostname']
1279     primary_ip = self.op.primary_ip = dns_data['ip']
1280     secondary_ip = getattr(self.op, "secondary_ip", None)
1281     if secondary_ip is None:
1282       secondary_ip = primary_ip
1283     if not utils.IsValidIP(secondary_ip):
1284       raise errors.OpPrereqError, ("Invalid secondary IP given")
1285     self.op.secondary_ip = secondary_ip
1286     node_list = cfg.GetNodeList()
1287     if node in node_list:
1288       raise errors.OpPrereqError, ("Node %s is already in the configuration"
1289                                    % node)
1290
1291     for existing_node_name in node_list:
1292       existing_node = cfg.GetNodeInfo(existing_node_name)
1293       if (existing_node.primary_ip == primary_ip or
1294           existing_node.secondary_ip == primary_ip or
1295           existing_node.primary_ip == secondary_ip or
1296           existing_node.secondary_ip == secondary_ip):
1297         raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1298                                      " existing node %s" % existing_node.name)
1299
1300     # check that the type of the node (single versus dual homed) is the
1301     # same as for the master
1302     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1303     master_singlehomed = myself.secondary_ip == myself.primary_ip
1304     newbie_singlehomed = secondary_ip == primary_ip
1305     if master_singlehomed != newbie_singlehomed:
1306       if master_singlehomed:
1307         raise errors.OpPrereqError, ("The master has no private ip but the"
1308                                      " new node has one")
1309       else:
1310         raise errors.OpPrereqError ("The master has a private ip but the"
1311                                     " new node doesn't have one")
1312
1313     # checks reachablity
1314     command = ["fping", "-q", primary_ip]
1315     result = utils.RunCmd(command)
1316     if result.failed:
1317       raise errors.OpPrereqError, ("Node not reachable by ping")
1318
1319     if not newbie_singlehomed:
1320       # check reachability from my secondary ip to newbie's secondary ip
1321       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1322       result = utils.RunCmd(command)
1323       if result.failed:
1324         raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1325
1326     self.new_node = objects.Node(name=node,
1327                                  primary_ip=primary_ip,
1328                                  secondary_ip=secondary_ip)
1329
1330   def Exec(self, feedback_fn):
1331     """Adds the new node to the cluster.
1332
1333     """
1334     new_node = self.new_node
1335     node = new_node.name
1336
1337     # set up inter-node password and certificate and restarts the node daemon
1338     gntpass = self.sstore.GetNodeDaemonPassword()
1339     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1340       raise errors.OpExecError, ("ganeti password corruption detected")
1341     f = open(constants.SSL_CERT_FILE)
1342     try:
1343       gntpem = f.read(8192)
1344     finally:
1345       f.close()
1346     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1347     # so we use this to detect an invalid certificate; as long as the
1348     # cert doesn't contain this, the here-document will be correctly
1349     # parsed by the shell sequence below
1350     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1351       raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1352     if not gntpem.endswith("\n"):
1353       raise errors.OpExecError, ("PEM must end with newline")
1354     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1355
1356     # remove first the root's known_hosts file
1357     utils.RemoveFile("/root/.ssh/known_hosts")
1358     # and then connect with ssh to set password and start ganeti-noded
1359     # note that all the below variables are sanitized at this point,
1360     # either by being constants or by the checks above
1361     ss = self.sstore
1362     mycommand = ("umask 077 && "
1363                  "echo '%s' > '%s' && "
1364                  "cat > '%s' << '!EOF.' && \n"
1365                  "%s!EOF.\n%s restart" %
1366                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1367                   constants.SSL_CERT_FILE, gntpem,
1368                   constants.NODE_INITD_SCRIPT))
1369
1370     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1371     if result.failed:
1372       raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1373                                  " output: %s" %
1374                                  (node, result.fail_reason, result.output))
1375
1376     # check connectivity
1377     time.sleep(4)
1378
1379     result = rpc.call_version([node])[node]
1380     if result:
1381       if constants.PROTOCOL_VERSION == result:
1382         logger.Info("communication to node %s fine, sw version %s match" %
1383                     (node, result))
1384       else:
1385         raise errors.OpExecError, ("Version mismatch master version %s,"
1386                                    " node version %s" %
1387                                    (constants.PROTOCOL_VERSION, result))
1388     else:
1389       raise errors.OpExecError, ("Cannot get version from the new node")
1390
1391     # setup ssh on node
1392     logger.Info("copy ssh key to node %s" % node)
1393     keyarray = []
1394     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1395                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1396                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1397
1398     for i in keyfiles:
1399       f = open(i, 'r')
1400       try:
1401         keyarray.append(f.read())
1402       finally:
1403         f.close()
1404
1405     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1406                                keyarray[3], keyarray[4], keyarray[5])
1407
1408     if not result:
1409       raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1410
1411     # Add node to our /etc/hosts, and add key to known_hosts
1412     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1413     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1414                       self.cfg.GetHostKey())
1415
1416     if new_node.secondary_ip != new_node.primary_ip:
1417       result = ssh.SSHCall(node, "root",
1418                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1419       if result.failed:
1420         raise errors.OpExecError, ("Node claims it doesn't have the"
1421                                    " secondary ip you gave (%s).\n"
1422                                    "Please fix and re-run this command." %
1423                                    new_node.secondary_ip)
1424
1425     # Distribute updated /etc/hosts and known_hosts to all nodes,
1426     # including the node just added
1427     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1428     dist_nodes = self.cfg.GetNodeList() + [node]
1429     if myself.name in dist_nodes:
1430       dist_nodes.remove(myself.name)
1431
1432     logger.Debug("Copying hosts and known_hosts to all nodes")
1433     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1434       result = rpc.call_upload_file(dist_nodes, fname)
1435       for to_node in dist_nodes:
1436         if not result[to_node]:
1437           logger.Error("copy of file %s to node %s failed" %
1438                        (fname, to_node))
1439
1440     to_copy = ss.GetFileList()
1441     for fname in to_copy:
1442       if not ssh.CopyFileToNode(node, fname):
1443         logger.Error("could not copy file %s to node %s" % (fname, node))
1444
1445     logger.Info("adding node %s to cluster.conf" % node)
1446     self.cfg.AddNode(new_node)
1447
1448
1449 class LUMasterFailover(LogicalUnit):
1450   """Failover the master node to the current node.
1451
1452   This is a special LU in that it must run on a non-master node.
1453
1454   """
1455   HPATH = "master-failover"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   REQ_MASTER = False
1458   _OP_REQP = []
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     This will run on the new master only in the pre phase, and on all
1464     the nodes in the post phase.
1465
1466     """
1467     env = {
1468       "NEW_MASTER": self.new_master,
1469       "OLD_MASTER": self.old_master,
1470       }
1471     return env, [self.new_master], self.cfg.GetNodeList()
1472
1473   def CheckPrereq(self):
1474     """Check prerequisites.
1475
1476     This checks that we are not already the master.
1477
1478     """
1479     self.new_master = socket.gethostname()
1480
1481     self.old_master = self.sstore.GetMasterNode()
1482
1483     if self.old_master == self.new_master:
1484       raise errors.OpPrereqError, ("This commands must be run on the node"
1485                                    " where you want the new master to be.\n"
1486                                    "%s is already the master" %
1487                                    self.old_master)
1488
1489   def Exec(self, feedback_fn):
1490     """Failover the master node.
1491
1492     This command, when run on a non-master node, will cause the current
1493     master to cease being master, and the non-master to become new
1494     master.
1495
1496     """
1497     #TODO: do not rely on gethostname returning the FQDN
1498     logger.Info("setting master to %s, old master: %s" %
1499                 (self.new_master, self.old_master))
1500
1501     if not rpc.call_node_stop_master(self.old_master):
1502       logger.Error("could disable the master role on the old master"
1503                    " %s, please disable manually" % self.old_master)
1504
1505     ss = self.sstore
1506     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1507     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1508                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1509       logger.Error("could not distribute the new simple store master file"
1510                    " to the other nodes, please check.")
1511
1512     if not rpc.call_node_start_master(self.new_master):
1513       logger.Error("could not start the master role on the new master"
1514                    " %s, please check" % self.new_master)
1515       feedback_fn("Error in activating the master IP on the new master,\n"
1516                   "please fix manually.")
1517
1518
1519
1520 class LUQueryClusterInfo(NoHooksLU):
1521   """Query cluster configuration.
1522
1523   """
1524   _OP_REQP = []
1525   REQ_MASTER = False
1526
1527   def CheckPrereq(self):
1528     """No prerequsites needed for this LU.
1529
1530     """
1531     pass
1532
1533   def Exec(self, feedback_fn):
1534     """Return cluster config.
1535
1536     """
1537     result = {
1538       "name": self.sstore.GetClusterName(),
1539       "software_version": constants.RELEASE_VERSION,
1540       "protocol_version": constants.PROTOCOL_VERSION,
1541       "config_version": constants.CONFIG_VERSION,
1542       "os_api_version": constants.OS_API_VERSION,
1543       "export_version": constants.EXPORT_VERSION,
1544       "master": self.sstore.GetMasterNode(),
1545       "architecture": (platform.architecture()[0], platform.machine()),
1546       }
1547
1548     return result
1549
1550
1551 class LUClusterCopyFile(NoHooksLU):
1552   """Copy file to cluster.
1553
1554   """
1555   _OP_REQP = ["nodes", "filename"]
1556
1557   def CheckPrereq(self):
1558     """Check prerequisites.
1559
1560     It should check that the named file exists and that the given list
1561     of nodes is valid.
1562
1563     """
1564     if not os.path.exists(self.op.filename):
1565       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1566
1567     self.nodes = _GetWantedNodes(self, self.op.nodes)
1568
1569   def Exec(self, feedback_fn):
1570     """Copy a file from master to some nodes.
1571
1572     Args:
1573       opts - class with options as members
1574       args - list containing a single element, the file name
1575     Opts used:
1576       nodes - list containing the name of target nodes; if empty, all nodes
1577
1578     """
1579     filename = self.op.filename
1580
1581     myname = socket.gethostname()
1582
1583     for node in self.nodes:
1584       if node == myname:
1585         continue
1586       if not ssh.CopyFileToNode(node, filename):
1587         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1588
1589
1590 class LUDumpClusterConfig(NoHooksLU):
1591   """Return a text-representation of the cluster-config.
1592
1593   """
1594   _OP_REQP = []
1595
1596   def CheckPrereq(self):
1597     """No prerequisites.
1598
1599     """
1600     pass
1601
1602   def Exec(self, feedback_fn):
1603     """Dump a representation of the cluster config to the standard output.
1604
1605     """
1606     return self.cfg.DumpConfig()
1607
1608
1609 class LURunClusterCommand(NoHooksLU):
1610   """Run a command on some nodes.
1611
1612   """
1613   _OP_REQP = ["command", "nodes"]
1614
1615   def CheckPrereq(self):
1616     """Check prerequisites.
1617
1618     It checks that the given list of nodes is valid.
1619
1620     """
1621     self.nodes = _GetWantedNodes(self, self.op.nodes)
1622
1623   def Exec(self, feedback_fn):
1624     """Run a command on some nodes.
1625
1626     """
1627     data = []
1628     for node in self.nodes:
1629       result = utils.RunCmd(["ssh", node.name, self.op.command])
1630       data.append((node.name, result.cmd, result.output, result.exit_code))
1631
1632     return data
1633
1634
1635 class LUActivateInstanceDisks(NoHooksLU):
1636   """Bring up an instance's disks.
1637
1638   """
1639   _OP_REQP = ["instance_name"]
1640
1641   def CheckPrereq(self):
1642     """Check prerequisites.
1643
1644     This checks that the instance is in the cluster.
1645
1646     """
1647     instance = self.cfg.GetInstanceInfo(
1648       self.cfg.ExpandInstanceName(self.op.instance_name))
1649     if instance is None:
1650       raise errors.OpPrereqError, ("Instance '%s' not known" %
1651                                    self.op.instance_name)
1652     self.instance = instance
1653
1654
1655   def Exec(self, feedback_fn):
1656     """Activate the disks.
1657
1658     """
1659     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1660     if not disks_ok:
1661       raise errors.OpExecError, ("Cannot activate block devices")
1662
1663     return disks_info
1664
1665
1666 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1667   """Prepare the block devices for an instance.
1668
1669   This sets up the block devices on all nodes.
1670
1671   Args:
1672     instance: a ganeti.objects.Instance object
1673     ignore_secondaries: if true, errors on secondary nodes won't result
1674                         in an error return from the function
1675
1676   Returns:
1677     false if the operation failed
1678     list of (host, instance_visible_name, node_visible_name) if the operation
1679          suceeded with the mapping from node devices to instance devices
1680   """
1681   device_info = []
1682   disks_ok = True
1683   for inst_disk in instance.disks:
1684     master_result = None
1685     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1686       cfg.SetDiskID(node_disk, node)
1687       is_primary = node == instance.primary_node
1688       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1689       if not result:
1690         logger.Error("could not prepare block device %s on node %s (is_pri"
1691                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1692         if is_primary or not ignore_secondaries:
1693           disks_ok = False
1694       if is_primary:
1695         master_result = result
1696     device_info.append((instance.primary_node, inst_disk.iv_name,
1697                         master_result))
1698
1699   return disks_ok, device_info
1700
1701
1702 def _StartInstanceDisks(cfg, instance, force):
1703   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1704                                            ignore_secondaries=force)
1705   if not disks_ok:
1706     _ShutdownInstanceDisks(instance, cfg)
1707     if force is not None and not force:
1708       logger.Error("If the message above refers to a secondary node,"
1709                    " you can retry the operation using '--force'.")
1710     raise errors.OpExecError, ("Disk consistency error")
1711
1712
1713 class LUDeactivateInstanceDisks(NoHooksLU):
1714   """Shutdown an instance's disks.
1715
1716   """
1717   _OP_REQP = ["instance_name"]
1718
1719   def CheckPrereq(self):
1720     """Check prerequisites.
1721
1722     This checks that the instance is in the cluster.
1723
1724     """
1725     instance = self.cfg.GetInstanceInfo(
1726       self.cfg.ExpandInstanceName(self.op.instance_name))
1727     if instance is None:
1728       raise errors.OpPrereqError, ("Instance '%s' not known" %
1729                                    self.op.instance_name)
1730     self.instance = instance
1731
1732   def Exec(self, feedback_fn):
1733     """Deactivate the disks
1734
1735     """
1736     instance = self.instance
1737     ins_l = rpc.call_instance_list([instance.primary_node])
1738     ins_l = ins_l[instance.primary_node]
1739     if not type(ins_l) is list:
1740       raise errors.OpExecError, ("Can't contact node '%s'" %
1741                                  instance.primary_node)
1742
1743     if self.instance.name in ins_l:
1744       raise errors.OpExecError, ("Instance is running, can't shutdown"
1745                                  " block devices.")
1746
1747     _ShutdownInstanceDisks(instance, self.cfg)
1748
1749
1750 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1751   """Shutdown block devices of an instance.
1752
1753   This does the shutdown on all nodes of the instance.
1754
1755   If the ignore_primary is false, errors on the primary node are
1756   ignored.
1757
1758   """
1759   result = True
1760   for disk in instance.disks:
1761     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1762       cfg.SetDiskID(top_disk, node)
1763       if not rpc.call_blockdev_shutdown(node, top_disk):
1764         logger.Error("could not shutdown block device %s on node %s" %
1765                      (disk.iv_name, node))
1766         if not ignore_primary or node != instance.primary_node:
1767           result = False
1768   return result
1769
1770
1771 class LUStartupInstance(LogicalUnit):
1772   """Starts an instance.
1773
1774   """
1775   HPATH = "instance-start"
1776   HTYPE = constants.HTYPE_INSTANCE
1777   _OP_REQP = ["instance_name", "force"]
1778
1779   def BuildHooksEnv(self):
1780     """Build hooks env.
1781
1782     This runs on master, primary and secondary nodes of the instance.
1783
1784     """
1785     env = {
1786       "FORCE": self.op.force,
1787       }
1788     env.update(_BuildInstanceHookEnvByObject(self.instance))
1789     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1790           list(self.instance.secondary_nodes))
1791     return env, nl, nl
1792
1793   def CheckPrereq(self):
1794     """Check prerequisites.
1795
1796     This checks that the instance is in the cluster.
1797
1798     """
1799     instance = self.cfg.GetInstanceInfo(
1800       self.cfg.ExpandInstanceName(self.op.instance_name))
1801     if instance is None:
1802       raise errors.OpPrereqError, ("Instance '%s' not known" %
1803                                    self.op.instance_name)
1804
1805     # check bridges existance
1806     brlist = [nic.bridge for nic in instance.nics]
1807     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1808       raise errors.OpPrereqError, ("one or more target bridges %s does not"
1809                                    " exist on destination node '%s'" %
1810                                    (brlist, instance.primary_node))
1811
1812     self.instance = instance
1813     self.op.instance_name = instance.name
1814
1815   def Exec(self, feedback_fn):
1816     """Start the instance.
1817
1818     """
1819     instance = self.instance
1820     force = self.op.force
1821     extra_args = getattr(self.op, "extra_args", "")
1822
1823     node_current = instance.primary_node
1824
1825     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1826     if not nodeinfo:
1827       raise errors.OpExecError, ("Could not contact node %s for infos" %
1828                                  (node_current))
1829
1830     freememory = nodeinfo[node_current]['memory_free']
1831     memory = instance.memory
1832     if memory > freememory:
1833       raise errors.OpExecError, ("Not enough memory to start instance"
1834                                  " %s on node %s"
1835                                  " needed %s MiB, available %s MiB" %
1836                                  (instance.name, node_current, memory,
1837                                   freememory))
1838
1839     _StartInstanceDisks(self.cfg, instance, force)
1840
1841     if not rpc.call_instance_start(node_current, instance, extra_args):
1842       _ShutdownInstanceDisks(instance, self.cfg)
1843       raise errors.OpExecError, ("Could not start instance")
1844
1845     self.cfg.MarkInstanceUp(instance.name)
1846
1847
1848 class LUShutdownInstance(LogicalUnit):
1849   """Shutdown an instance.
1850
1851   """
1852   HPATH = "instance-stop"
1853   HTYPE = constants.HTYPE_INSTANCE
1854   _OP_REQP = ["instance_name"]
1855
1856   def BuildHooksEnv(self):
1857     """Build hooks env.
1858
1859     This runs on master, primary and secondary nodes of the instance.
1860
1861     """
1862     env = _BuildInstanceHookEnvByObject(self.instance)
1863     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1864           list(self.instance.secondary_nodes))
1865     return env, nl, nl
1866
1867   def CheckPrereq(self):
1868     """Check prerequisites.
1869
1870     This checks that the instance is in the cluster.
1871
1872     """
1873     instance = self.cfg.GetInstanceInfo(
1874       self.cfg.ExpandInstanceName(self.op.instance_name))
1875     if instance is None:
1876       raise errors.OpPrereqError, ("Instance '%s' not known" %
1877                                    self.op.instance_name)
1878     self.instance = instance
1879
1880   def Exec(self, feedback_fn):
1881     """Shutdown the instance.
1882
1883     """
1884     instance = self.instance
1885     node_current = instance.primary_node
1886     if not rpc.call_instance_shutdown(node_current, instance):
1887       logger.Error("could not shutdown instance")
1888
1889     self.cfg.MarkInstanceDown(instance.name)
1890     _ShutdownInstanceDisks(instance, self.cfg)
1891
1892
1893 class LUReinstallInstance(LogicalUnit):
1894   """Reinstall an instance.
1895
1896   """
1897   HPATH = "instance-reinstall"
1898   HTYPE = constants.HTYPE_INSTANCE
1899   _OP_REQP = ["instance_name"]
1900
1901   def BuildHooksEnv(self):
1902     """Build hooks env.
1903
1904     This runs on master, primary and secondary nodes of the instance.
1905
1906     """
1907     env = _BuildInstanceHookEnvByObject(self.instance)
1908     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1909           list(self.instance.secondary_nodes))
1910     return env, nl, nl
1911
1912   def CheckPrereq(self):
1913     """Check prerequisites.
1914
1915     This checks that the instance is in the cluster and is not running.
1916
1917     """
1918     instance = self.cfg.GetInstanceInfo(
1919       self.cfg.ExpandInstanceName(self.op.instance_name))
1920     if instance is None:
1921       raise errors.OpPrereqError, ("Instance '%s' not known" %
1922                                    self.op.instance_name)
1923     if instance.disk_template == constants.DT_DISKLESS:
1924       raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1925                                    self.op.instance_name)
1926     if instance.status != "down":
1927       raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1928                                    self.op.instance_name)
1929     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1930     if remote_info:
1931       raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1932                                    (self.op.instance_name,
1933                                     instance.primary_node))
1934
1935     self.op.os_type = getattr(self.op, "os_type", None)
1936     if self.op.os_type is not None:
1937       # OS verification
1938       pnode = self.cfg.GetNodeInfo(
1939         self.cfg.ExpandNodeName(instance.primary_node))
1940       if pnode is None:
1941         raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1942                                      self.op.pnode)
1943       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1944       if not isinstance(os_obj, objects.OS):
1945         raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1946                                      " primary node"  % self.op.os_type)
1947
1948     self.instance = instance
1949
1950   def Exec(self, feedback_fn):
1951     """Reinstall the instance.
1952
1953     """
1954     inst = self.instance
1955
1956     if self.op.os_type is not None:
1957       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1958       inst.os = self.op.os_type
1959       self.cfg.AddInstance(inst)
1960
1961     _StartInstanceDisks(self.cfg, inst, None)
1962     try:
1963       feedback_fn("Running the instance OS create scripts...")
1964       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1965         raise errors.OpExecError, ("Could not install OS for instance %s "
1966                                    "on node %s" %
1967                                    (inst.name, inst.primary_node))
1968     finally:
1969       _ShutdownInstanceDisks(inst, self.cfg)
1970
1971
1972 class LURemoveInstance(LogicalUnit):
1973   """Remove an instance.
1974
1975   """
1976   HPATH = "instance-remove"
1977   HTYPE = constants.HTYPE_INSTANCE
1978   _OP_REQP = ["instance_name"]
1979
1980   def BuildHooksEnv(self):
1981     """Build hooks env.
1982
1983     This runs on master, primary and secondary nodes of the instance.
1984
1985     """
1986     env = _BuildInstanceHookEnvByObject(self.instance)
1987     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1988           list(self.instance.secondary_nodes))
1989     return env, nl, nl
1990
1991   def CheckPrereq(self):
1992     """Check prerequisites.
1993
1994     This checks that the instance is in the cluster.
1995
1996     """
1997     instance = self.cfg.GetInstanceInfo(
1998       self.cfg.ExpandInstanceName(self.op.instance_name))
1999     if instance is None:
2000       raise errors.OpPrereqError, ("Instance '%s' not known" %
2001                                    self.op.instance_name)
2002     self.instance = instance
2003
2004   def Exec(self, feedback_fn):
2005     """Remove the instance.
2006
2007     """
2008     instance = self.instance
2009     logger.Info("shutting down instance %s on node %s" %
2010                 (instance.name, instance.primary_node))
2011
2012     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2013       raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
2014                                  (instance.name, instance.primary_node))
2015
2016     logger.Info("removing block devices for instance %s" % instance.name)
2017
2018     _RemoveDisks(instance, self.cfg)
2019
2020     logger.Info("removing instance %s out of cluster config" % instance.name)
2021
2022     self.cfg.RemoveInstance(instance.name)
2023
2024
2025 class LUQueryInstances(NoHooksLU):
2026   """Logical unit for querying instances.
2027
2028   """
2029   _OP_REQP = ["output_fields"]
2030
2031   def CheckPrereq(self):
2032     """Check prerequisites.
2033
2034     This checks that the fields required are valid output fields.
2035
2036     """
2037     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2038     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2039                                "admin_state", "admin_ram",
2040                                "disk_template", "ip", "mac", "bridge"],
2041                        dynamic=self.dynamic_fields,
2042                        selected=self.op.output_fields)
2043
2044   def Exec(self, feedback_fn):
2045     """Computes the list of nodes and their attributes.
2046
2047     """
2048     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2049     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2050                      in instance_names]
2051
2052     # begin data gathering
2053
2054     nodes = frozenset([inst.primary_node for inst in instance_list])
2055
2056     bad_nodes = []
2057     if self.dynamic_fields.intersection(self.op.output_fields):
2058       live_data = {}
2059       node_data = rpc.call_all_instances_info(nodes)
2060       for name in nodes:
2061         result = node_data[name]
2062         if result:
2063           live_data.update(result)
2064         elif result == False:
2065           bad_nodes.append(name)
2066         # else no instance is alive
2067     else:
2068       live_data = dict([(name, {}) for name in instance_names])
2069
2070     # end data gathering
2071
2072     output = []
2073     for instance in instance_list:
2074       iout = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = instance.name
2078         elif field == "os":
2079           val = instance.os
2080         elif field == "pnode":
2081           val = instance.primary_node
2082         elif field == "snodes":
2083           val = ",".join(instance.secondary_nodes) or "-"
2084         elif field == "admin_state":
2085           if instance.status == "down":
2086             val = "no"
2087           else:
2088             val = "yes"
2089         elif field == "oper_state":
2090           if instance.primary_node in bad_nodes:
2091             val = "(node down)"
2092           else:
2093             if live_data.get(instance.name):
2094               val = "running"
2095             else:
2096               val = "stopped"
2097         elif field == "admin_ram":
2098           val = instance.memory
2099         elif field == "oper_ram":
2100           if instance.primary_node in bad_nodes:
2101             val = "(node down)"
2102           elif instance.name in live_data:
2103             val = live_data[instance.name].get("memory", "?")
2104           else:
2105             val = "-"
2106         elif field == "disk_template":
2107           val = instance.disk_template
2108         elif field == "ip":
2109           val = instance.nics[0].ip
2110         elif field == "bridge":
2111           val = instance.nics[0].bridge
2112         elif field == "mac":
2113           val = instance.nics[0].mac
2114         else:
2115           raise errors.ParameterError, field
2116         val = str(val)
2117         iout.append(val)
2118       output.append(iout)
2119
2120     return output
2121
2122
2123 class LUFailoverInstance(LogicalUnit):
2124   """Failover an instance.
2125
2126   """
2127   HPATH = "instance-failover"
2128   HTYPE = constants.HTYPE_INSTANCE
2129   _OP_REQP = ["instance_name", "ignore_consistency"]
2130
2131   def BuildHooksEnv(self):
2132     """Build hooks env.
2133
2134     This runs on master, primary and secondary nodes of the instance.
2135
2136     """
2137     env = {
2138       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2139       }
2140     env.update(_BuildInstanceHookEnvByObject(self.instance))
2141     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2142     return env, nl, nl
2143
2144   def CheckPrereq(self):
2145     """Check prerequisites.
2146
2147     This checks that the instance is in the cluster.
2148
2149     """
2150     instance = self.cfg.GetInstanceInfo(
2151       self.cfg.ExpandInstanceName(self.op.instance_name))
2152     if instance is None:
2153       raise errors.OpPrereqError, ("Instance '%s' not known" %
2154                                    self.op.instance_name)
2155
2156     # check memory requirements on the secondary node
2157     target_node = instance.secondary_nodes[0]
2158     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2159     info = nodeinfo.get(target_node, None)
2160     if not info:
2161       raise errors.OpPrereqError, ("Cannot get current information"
2162                                    " from node '%s'" % nodeinfo)
2163     if instance.memory > info['memory_free']:
2164       raise errors.OpPrereqError, ("Not enough memory on target node %s."
2165                                    " %d MB available, %d MB required" %
2166                                    (target_node, info['memory_free'],
2167                                     instance.memory))
2168
2169     # check bridge existance
2170     brlist = [nic.bridge for nic in instance.nics]
2171     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2172       raise errors.OpPrereqError, ("one or more target bridges %s does not"
2173                                    " exist on destination node '%s'" %
2174                                    (brlist, instance.primary_node))
2175
2176     self.instance = instance
2177
2178   def Exec(self, feedback_fn):
2179     """Failover an instance.
2180
2181     The failover is done by shutting it down on its present node and
2182     starting it on the secondary.
2183
2184     """
2185     instance = self.instance
2186
2187     source_node = instance.primary_node
2188     target_node = instance.secondary_nodes[0]
2189
2190     feedback_fn("* checking disk consistency between source and target")
2191     for dev in instance.disks:
2192       # for remote_raid1, these are md over drbd
2193       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2194         if not self.op.ignore_consistency:
2195           raise errors.OpExecError, ("Disk %s is degraded on target node,"
2196                                      " aborting failover." % dev.iv_name)
2197
2198     feedback_fn("* checking target node resource availability")
2199     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2200
2201     if not nodeinfo:
2202       raise errors.OpExecError, ("Could not contact target node %s." %
2203                                  target_node)
2204
2205     free_memory = int(nodeinfo[target_node]['memory_free'])
2206     memory = instance.memory
2207     if memory > free_memory:
2208       raise errors.OpExecError, ("Not enough memory to create instance %s on"
2209                                  " node %s. needed %s MiB, available %s MiB" %
2210                                  (instance.name, target_node, memory,
2211                                   free_memory))
2212
2213     feedback_fn("* shutting down instance on source node")
2214     logger.Info("Shutting down instance %s on node %s" %
2215                 (instance.name, source_node))
2216
2217     if not rpc.call_instance_shutdown(source_node, instance):
2218       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2219                    " anyway. Please make sure node %s is down"  %
2220                    (instance.name, source_node, source_node))
2221
2222     feedback_fn("* deactivating the instance's disks on source node")
2223     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2224       raise errors.OpExecError, ("Can't shut down the instance's disks.")
2225
2226     instance.primary_node = target_node
2227     # distribute new instance config to the other nodes
2228     self.cfg.AddInstance(instance)
2229
2230     feedback_fn("* activating the instance's disks on target node")
2231     logger.Info("Starting instance %s on node %s" %
2232                 (instance.name, target_node))
2233
2234     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2235                                              ignore_secondaries=True)
2236     if not disks_ok:
2237       _ShutdownInstanceDisks(instance, self.cfg)
2238       raise errors.OpExecError, ("Can't activate the instance's disks")
2239
2240     feedback_fn("* starting the instance on the target node")
2241     if not rpc.call_instance_start(target_node, instance, None):
2242       _ShutdownInstanceDisks(instance, self.cfg)
2243       raise errors.OpExecError("Could not start instance %s on node %s." %
2244                                (instance.name, target_node))
2245
2246
2247 def _CreateBlockDevOnPrimary(cfg, node, device):
2248   """Create a tree of block devices on the primary node.
2249
2250   This always creates all devices.
2251
2252   """
2253   if device.children:
2254     for child in device.children:
2255       if not _CreateBlockDevOnPrimary(cfg, node, child):
2256         return False
2257
2258   cfg.SetDiskID(device, node)
2259   new_id = rpc.call_blockdev_create(node, device, device.size, True)
2260   if not new_id:
2261     return False
2262   if device.physical_id is None:
2263     device.physical_id = new_id
2264   return True
2265
2266
2267 def _CreateBlockDevOnSecondary(cfg, node, device, force):
2268   """Create a tree of block devices on a secondary node.
2269
2270   If this device type has to be created on secondaries, create it and
2271   all its children.
2272
2273   If not, just recurse to children keeping the same 'force' value.
2274
2275   """
2276   if device.CreateOnSecondary():
2277     force = True
2278   if device.children:
2279     for child in device.children:
2280       if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2281         return False
2282
2283   if not force:
2284     return True
2285   cfg.SetDiskID(device, node)
2286   new_id = rpc.call_blockdev_create(node, device, device.size, False)
2287   if not new_id:
2288     return False
2289   if device.physical_id is None:
2290     device.physical_id = new_id
2291   return True
2292
2293
2294 def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2295   """Generate a drbd device complete with its children.
2296
2297   """
2298   port = cfg.AllocatePort()
2299   base = "%s_%s" % (base, port)
2300   dev_data = objects.Disk(dev_type="lvm", size=size,
2301                           logical_id=(vgname, "%s.data" % base))
2302   dev_meta = objects.Disk(dev_type="lvm", size=128,
2303                           logical_id=(vgname, "%s.meta" % base))
2304   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2305                           logical_id = (primary, secondary, port),
2306                           children = [dev_data, dev_meta])
2307   return drbd_dev
2308
2309
2310 def _GenerateDiskTemplate(cfg, vgname, template_name,
2311                           instance_name, primary_node,
2312                           secondary_nodes, disk_sz, swap_sz):
2313   """Generate the entire disk layout for a given template type.
2314
2315   """
2316   #TODO: compute space requirements
2317
2318   if template_name == "diskless":
2319     disks = []
2320   elif template_name == "plain":
2321     if len(secondary_nodes) != 0:
2322       raise errors.ProgrammerError("Wrong template configuration")
2323     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2324                            logical_id=(vgname, "%s.os" % instance_name),
2325                            iv_name = "sda")
2326     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2327                            logical_id=(vgname, "%s.swap" % instance_name),
2328                            iv_name = "sdb")
2329     disks = [sda_dev, sdb_dev]
2330   elif template_name == "local_raid1":
2331     if len(secondary_nodes) != 0:
2332       raise errors.ProgrammerError("Wrong template configuration")
2333     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2334                               logical_id=(vgname, "%s.os_m1" % instance_name))
2335     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2336                               logical_id=(vgname, "%s.os_m2" % instance_name))
2337     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2338                               size=disk_sz,
2339                               children = [sda_dev_m1, sda_dev_m2])
2340     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2341                               logical_id=(vgname, "%s.swap_m1" %
2342                                           instance_name))
2343     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2344                               logical_id=(vgname, "%s.swap_m2" %
2345                                           instance_name))
2346     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2347                               size=swap_sz,
2348                               children = [sdb_dev_m1, sdb_dev_m2])
2349     disks = [md_sda_dev, md_sdb_dev]
2350   elif template_name == "remote_raid1":
2351     if len(secondary_nodes) != 1:
2352       raise errors.ProgrammerError("Wrong template configuration")
2353     remote_node = secondary_nodes[0]
2354     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2355                                          primary_node, remote_node, disk_sz,
2356                                          "%s-sda" % instance_name)
2357     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2358                               children = [drbd_sda_dev], size=disk_sz)
2359     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2360                                          primary_node, remote_node, swap_sz,
2361                                          "%s-sdb" % instance_name)
2362     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2363                               children = [drbd_sdb_dev], size=swap_sz)
2364     disks = [md_sda_dev, md_sdb_dev]
2365   else:
2366     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2367   return disks
2368
2369
2370 def _CreateDisks(cfg, instance):
2371   """Create all disks for an instance.
2372
2373   This abstracts away some work from AddInstance.
2374
2375   Args:
2376     instance: the instance object
2377
2378   Returns:
2379     True or False showing the success of the creation process
2380
2381   """
2382   for device in instance.disks:
2383     logger.Info("creating volume %s for instance %s" %
2384               (device.iv_name, instance.name))
2385     #HARDCODE
2386     for secondary_node in instance.secondary_nodes:
2387       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2388         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2389                      (device.iv_name, device, secondary_node))
2390         return False
2391     #HARDCODE
2392     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2393       logger.Error("failed to create volume %s on primary!" %
2394                    device.iv_name)
2395       return False
2396   return True
2397
2398
2399 def _RemoveDisks(instance, cfg):
2400   """Remove all disks for an instance.
2401
2402   This abstracts away some work from `AddInstance()` and
2403   `RemoveInstance()`. Note that in case some of the devices couldn't
2404   be remove, the removal will continue with the other ones (compare
2405   with `_CreateDisks()`).
2406
2407   Args:
2408     instance: the instance object
2409
2410   Returns:
2411     True or False showing the success of the removal proces
2412
2413   """
2414   logger.Info("removing block devices for instance %s" % instance.name)
2415
2416   result = True
2417   for device in instance.disks:
2418     for node, disk in device.ComputeNodeTree(instance.primary_node):
2419       cfg.SetDiskID(disk, node)
2420       if not rpc.call_blockdev_remove(node, disk):
2421         logger.Error("could not remove block device %s on node %s,"
2422                      " continuing anyway" %
2423                      (device.iv_name, node))
2424         result = False
2425   return result
2426
2427
2428 class LUCreateInstance(LogicalUnit):
2429   """Create an instance.
2430
2431   """
2432   HPATH = "instance-add"
2433   HTYPE = constants.HTYPE_INSTANCE
2434   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2435               "disk_template", "swap_size", "mode", "start", "vcpus",
2436               "wait_for_sync"]
2437
2438   def BuildHooksEnv(self):
2439     """Build hooks env.
2440
2441     This runs on master, primary and secondary nodes of the instance.
2442
2443     """
2444     env = {
2445       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2446       "INSTANCE_DISK_SIZE": self.op.disk_size,
2447       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2448       "INSTANCE_ADD_MODE": self.op.mode,
2449       }
2450     if self.op.mode == constants.INSTANCE_IMPORT:
2451       env["INSTANCE_SRC_NODE"] = self.op.src_node
2452       env["INSTANCE_SRC_PATH"] = self.op.src_path
2453       env["INSTANCE_SRC_IMAGE"] = self.src_image
2454
2455     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2456       primary_node=self.op.pnode,
2457       secondary_nodes=self.secondaries,
2458       status=self.instance_status,
2459       os=self.op.os_type,
2460       memory=self.op.mem_size,
2461       vcpus=self.op.vcpus,
2462       nics=[(self.inst_ip, self.op.bridge)],
2463     ))
2464
2465     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2466           self.secondaries)
2467     return env, nl, nl
2468
2469
2470   def CheckPrereq(self):
2471     """Check prerequisites.
2472
2473     """
2474     if self.op.mode not in (constants.INSTANCE_CREATE,
2475                             constants.INSTANCE_IMPORT):
2476       raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2477                                    self.op.mode)
2478
2479     if self.op.mode == constants.INSTANCE_IMPORT:
2480       src_node = getattr(self.op, "src_node", None)
2481       src_path = getattr(self.op, "src_path", None)
2482       if src_node is None or src_path is None:
2483         raise errors.OpPrereqError, ("Importing an instance requires source"
2484                                      " node and path options")
2485       src_node_full = self.cfg.ExpandNodeName(src_node)
2486       if src_node_full is None:
2487         raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2488       self.op.src_node = src_node = src_node_full
2489
2490       if not os.path.isabs(src_path):
2491         raise errors.OpPrereqError, ("The source path must be absolute")
2492
2493       export_info = rpc.call_export_info(src_node, src_path)
2494
2495       if not export_info:
2496         raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2497
2498       if not export_info.has_section(constants.INISECT_EXP):
2499         raise errors.ProgrammerError, ("Corrupted export config")
2500
2501       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2502       if (int(ei_version) != constants.EXPORT_VERSION):
2503         raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2504                                      (ei_version, constants.EXPORT_VERSION))
2505
2506       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2507         raise errors.OpPrereqError, ("Can't import instance with more than"
2508                                      " one data disk")
2509
2510       # FIXME: are the old os-es, disk sizes, etc. useful?
2511       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2512       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2513                                                          'disk0_dump'))
2514       self.src_image = diskimage
2515     else: # INSTANCE_CREATE
2516       if getattr(self.op, "os_type", None) is None:
2517         raise errors.OpPrereqError, ("No guest OS specified")
2518
2519     # check primary node
2520     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2521     if pnode is None:
2522       raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2523                                    self.op.pnode)
2524     self.op.pnode = pnode.name
2525     self.pnode = pnode
2526     self.secondaries = []
2527     # disk template and mirror node verification
2528     if self.op.disk_template not in constants.DISK_TEMPLATES:
2529       raise errors.OpPrereqError, ("Invalid disk template name")
2530
2531     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2532       if getattr(self.op, "snode", None) is None:
2533         raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2534                                      " a mirror node")
2535
2536       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2537       if snode_name is None:
2538         raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2539                                      self.op.snode)
2540       elif snode_name == pnode.name:
2541         raise errors.OpPrereqError, ("The secondary node cannot be"
2542                                      " the primary node.")
2543       self.secondaries.append(snode_name)
2544
2545     # Check lv size requirements
2546     nodenames = [pnode.name] + self.secondaries
2547     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2548
2549     # Required free disk space as a function of disk and swap space
2550     req_size_dict = {
2551       constants.DT_DISKLESS: 0,
2552       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2553       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2554       # 256 MB are added for drbd metadata, 128MB for each drbd device
2555       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2556     }
2557
2558     if self.op.disk_template not in req_size_dict:
2559       raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2560                                      " is unknown" %  self.op.disk_template)
2561
2562     req_size = req_size_dict[self.op.disk_template]
2563
2564     for node in nodenames:
2565       info = nodeinfo.get(node, None)
2566       if not info:
2567         raise errors.OpPrereqError, ("Cannot get current information"
2568                                      " from node '%s'" % nodeinfo)
2569       if req_size > info['vg_free']:
2570         raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2571                                      " %d MB available, %d MB required" %
2572                                      (node, info['vg_free'], req_size))
2573
2574     # os verification
2575     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2576     if not isinstance(os_obj, objects.OS):
2577       raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2578                                    " primary node"  % self.op.os_type)
2579
2580     # instance verification
2581     hostname1 = utils.LookupHostname(self.op.instance_name)
2582     if not hostname1:
2583       raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2584                                    self.op.instance_name)
2585
2586     self.op.instance_name = instance_name = hostname1['hostname']
2587     instance_list = self.cfg.GetInstanceList()
2588     if instance_name in instance_list:
2589       raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2590                                    instance_name)
2591
2592     ip = getattr(self.op, "ip", None)
2593     if ip is None or ip.lower() == "none":
2594       inst_ip = None
2595     elif ip.lower() == "auto":
2596       inst_ip = hostname1['ip']
2597     else:
2598       if not utils.IsValidIP(ip):
2599         raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2600                                      " like a valid IP" % ip)
2601       inst_ip = ip
2602     self.inst_ip = inst_ip
2603
2604     command = ["fping", "-q", hostname1['ip']]
2605     result = utils.RunCmd(command)
2606     if not result.failed:
2607       raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2608                                    (hostname1['ip'], instance_name))
2609
2610     # bridge verification
2611     bridge = getattr(self.op, "bridge", None)
2612     if bridge is None:
2613       self.op.bridge = self.cfg.GetDefBridge()
2614     else:
2615       self.op.bridge = bridge
2616
2617     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2618       raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2619                                    " destination node '%s'" %
2620                                    (self.op.bridge, pnode.name))
2621
2622     if self.op.start:
2623       self.instance_status = 'up'
2624     else:
2625       self.instance_status = 'down'
2626
2627   def Exec(self, feedback_fn):
2628     """Create and add the instance to the cluster.
2629
2630     """
2631     instance = self.op.instance_name
2632     pnode_name = self.pnode.name
2633
2634     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2635     if self.inst_ip is not None:
2636       nic.ip = self.inst_ip
2637
2638     disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2639                                   self.op.disk_template,
2640                                   instance, pnode_name,
2641                                   self.secondaries, self.op.disk_size,
2642                                   self.op.swap_size)
2643
2644     iobj = objects.Instance(name=instance, os=self.op.os_type,
2645                             primary_node=pnode_name,
2646                             memory=self.op.mem_size,
2647                             vcpus=self.op.vcpus,
2648                             nics=[nic], disks=disks,
2649                             disk_template=self.op.disk_template,
2650                             status=self.instance_status,
2651                             )
2652
2653     feedback_fn("* creating instance disks...")
2654     if not _CreateDisks(self.cfg, iobj):
2655       _RemoveDisks(iobj, self.cfg)
2656       raise errors.OpExecError, ("Device creation failed, reverting...")
2657
2658     feedback_fn("adding instance %s to cluster config" % instance)
2659
2660     self.cfg.AddInstance(iobj)
2661
2662     if self.op.wait_for_sync:
2663       disk_abort = not _WaitForSync(self.cfg, iobj)
2664     elif iobj.disk_template == "remote_raid1":
2665       # make sure the disks are not degraded (still sync-ing is ok)
2666       time.sleep(15)
2667       feedback_fn("* checking mirrors status")
2668       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2669     else:
2670       disk_abort = False
2671
2672     if disk_abort:
2673       _RemoveDisks(iobj, self.cfg)
2674       self.cfg.RemoveInstance(iobj.name)
2675       raise errors.OpExecError, ("There are some degraded disks for"
2676                                       " this instance")
2677
2678     feedback_fn("creating os for instance %s on node %s" %
2679                 (instance, pnode_name))
2680
2681     if iobj.disk_template != constants.DT_DISKLESS:
2682       if self.op.mode == constants.INSTANCE_CREATE:
2683         feedback_fn("* running the instance OS create scripts...")
2684         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2685           raise errors.OpExecError, ("could not add os for instance %s"
2686                                           " on node %s" %
2687                                           (instance, pnode_name))
2688
2689       elif self.op.mode == constants.INSTANCE_IMPORT:
2690         feedback_fn("* running the instance OS import scripts...")
2691         src_node = self.op.src_node
2692         src_image = self.src_image
2693         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2694                                                 src_node, src_image):
2695           raise errors.OpExecError, ("Could not import os for instance"
2696                                           " %s on node %s" %
2697                                           (instance, pnode_name))
2698       else:
2699         # also checked in the prereq part
2700         raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2701                                        % self.op.mode)
2702
2703     if self.op.start:
2704       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2705       feedback_fn("* starting instance...")
2706       if not rpc.call_instance_start(pnode_name, iobj, None):
2707         raise errors.OpExecError, ("Could not start instance")
2708
2709
2710 class LUConnectConsole(NoHooksLU):
2711   """Connect to an instance's console.
2712
2713   This is somewhat special in that it returns the command line that
2714   you need to run on the master node in order to connect to the
2715   console.
2716
2717   """
2718   _OP_REQP = ["instance_name"]
2719
2720   def CheckPrereq(self):
2721     """Check prerequisites.
2722
2723     This checks that the instance is in the cluster.
2724
2725     """
2726     instance = self.cfg.GetInstanceInfo(
2727       self.cfg.ExpandInstanceName(self.op.instance_name))
2728     if instance is None:
2729       raise errors.OpPrereqError, ("Instance '%s' not known" %
2730                                    self.op.instance_name)
2731     self.instance = instance
2732
2733   def Exec(self, feedback_fn):
2734     """Connect to the console of an instance
2735
2736     """
2737     instance = self.instance
2738     node = instance.primary_node
2739
2740     node_insts = rpc.call_instance_list([node])[node]
2741     if node_insts is False:
2742       raise errors.OpExecError, ("Can't connect to node %s." % node)
2743
2744     if instance.name not in node_insts:
2745       raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2746
2747     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2748
2749     hyper = hypervisor.GetHypervisor()
2750     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2751     return node, console_cmd
2752
2753
2754 class LUAddMDDRBDComponent(LogicalUnit):
2755   """Adda new mirror member to an instance's disk.
2756
2757   """
2758   HPATH = "mirror-add"
2759   HTYPE = constants.HTYPE_INSTANCE
2760   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2761
2762   def BuildHooksEnv(self):
2763     """Build hooks env.
2764
2765     This runs on the master, the primary and all the secondaries.
2766
2767     """
2768     env = {
2769       "NEW_SECONDARY": self.op.remote_node,
2770       "DISK_NAME": self.op.disk_name,
2771       }
2772     env.update(_BuildInstanceHookEnvByObject(self.instance))
2773     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2774           self.op.remote_node,] + list(self.instance.secondary_nodes)
2775     return env, nl, nl
2776
2777   def CheckPrereq(self):
2778     """Check prerequisites.
2779
2780     This checks that the instance is in the cluster.
2781
2782     """
2783     instance = self.cfg.GetInstanceInfo(
2784       self.cfg.ExpandInstanceName(self.op.instance_name))
2785     if instance is None:
2786       raise errors.OpPrereqError, ("Instance '%s' not known" %
2787                                    self.op.instance_name)
2788     self.instance = instance
2789
2790     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2791     if remote_node is None:
2792       raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2793     self.remote_node = remote_node
2794
2795     if remote_node == instance.primary_node:
2796       raise errors.OpPrereqError, ("The specified node is the primary node of"
2797                                    " the instance.")
2798
2799     if instance.disk_template != constants.DT_REMOTE_RAID1:
2800       raise errors.OpPrereqError, ("Instance's disk layout is not"
2801                                    " remote_raid1.")
2802     for disk in instance.disks:
2803       if disk.iv_name == self.op.disk_name:
2804         break
2805     else:
2806       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2807                                    " instance." % self.op.disk_name)
2808     if len(disk.children) > 1:
2809       raise errors.OpPrereqError, ("The device already has two slave"
2810                                    " devices.\n"
2811                                    "This would create a 3-disk raid1"
2812                                    " which we don't allow.")
2813     self.disk = disk
2814
2815   def Exec(self, feedback_fn):
2816     """Add the mirror component
2817
2818     """
2819     disk = self.disk
2820     instance = self.instance
2821
2822     remote_node = self.remote_node
2823     new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2824                                      instance.primary_node, remote_node,
2825                                      disk.size, "%s-%s" %
2826                                      (instance.name, self.op.disk_name))
2827
2828     logger.Info("adding new mirror component on secondary")
2829     #HARDCODE
2830     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2831       raise errors.OpExecError, ("Failed to create new component on secondary"
2832                                  " node %s" % remote_node)
2833
2834     logger.Info("adding new mirror component on primary")
2835     #HARDCODE
2836     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2837       # remove secondary dev
2838       self.cfg.SetDiskID(new_drbd, remote_node)
2839       rpc.call_blockdev_remove(remote_node, new_drbd)
2840       raise errors.OpExecError, ("Failed to create volume on primary")
2841
2842     # the device exists now
2843     # call the primary node to add the mirror to md
2844     logger.Info("adding new mirror component to md")
2845     if not rpc.call_blockdev_addchild(instance.primary_node,
2846                                            disk, new_drbd):
2847       logger.Error("Can't add mirror compoment to md!")
2848       self.cfg.SetDiskID(new_drbd, remote_node)
2849       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2850         logger.Error("Can't rollback on secondary")
2851       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2852       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2853         logger.Error("Can't rollback on primary")
2854       raise errors.OpExecError, "Can't add mirror component to md array"
2855
2856     disk.children.append(new_drbd)
2857
2858     self.cfg.AddInstance(instance)
2859
2860     _WaitForSync(self.cfg, instance)
2861
2862     return 0
2863
2864
2865 class LURemoveMDDRBDComponent(LogicalUnit):
2866   """Remove a component from a remote_raid1 disk.
2867
2868   """
2869   HPATH = "mirror-remove"
2870   HTYPE = constants.HTYPE_INSTANCE
2871   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2872
2873   def BuildHooksEnv(self):
2874     """Build hooks env.
2875
2876     This runs on the master, the primary and all the secondaries.
2877
2878     """
2879     env = {
2880       "DISK_NAME": self.op.disk_name,
2881       "DISK_ID": self.op.disk_id,
2882       "OLD_SECONDARY": self.old_secondary,
2883       }
2884     env.update(_BuildInstanceHookEnvByObject(self.instance))
2885     nl = [self.sstore.GetMasterNode(),
2886           self.instance.primary_node] + list(self.instance.secondary_nodes)
2887     return env, nl, nl
2888
2889   def CheckPrereq(self):
2890     """Check prerequisites.
2891
2892     This checks that the instance is in the cluster.
2893
2894     """
2895     instance = self.cfg.GetInstanceInfo(
2896       self.cfg.ExpandInstanceName(self.op.instance_name))
2897     if instance is None:
2898       raise errors.OpPrereqError, ("Instance '%s' not known" %
2899                                    self.op.instance_name)
2900     self.instance = instance
2901
2902     if instance.disk_template != constants.DT_REMOTE_RAID1:
2903       raise errors.OpPrereqError, ("Instance's disk layout is not"
2904                                    " remote_raid1.")
2905     for disk in instance.disks:
2906       if disk.iv_name == self.op.disk_name:
2907         break
2908     else:
2909       raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2910                                    " instance." % self.op.disk_name)
2911     for child in disk.children:
2912       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2913         break
2914     else:
2915       raise errors.OpPrereqError, ("Can't find the device with this port.")
2916
2917     if len(disk.children) < 2:
2918       raise errors.OpPrereqError, ("Cannot remove the last component from"
2919                                    " a mirror.")
2920     self.disk = disk
2921     self.child = child
2922     if self.child.logical_id[0] == instance.primary_node:
2923       oid = 1
2924     else:
2925       oid = 0
2926     self.old_secondary = self.child.logical_id[oid]
2927
2928   def Exec(self, feedback_fn):
2929     """Remove the mirror component
2930
2931     """
2932     instance = self.instance
2933     disk = self.disk
2934     child = self.child
2935     logger.Info("remove mirror component")
2936     self.cfg.SetDiskID(disk, instance.primary_node)
2937     if not rpc.call_blockdev_removechild(instance.primary_node,
2938                                               disk, child):
2939       raise errors.OpExecError, ("Can't remove child from mirror.")
2940
2941     for node in child.logical_id[:2]:
2942       self.cfg.SetDiskID(child, node)
2943       if not rpc.call_blockdev_remove(node, child):
2944         logger.Error("Warning: failed to remove device from node %s,"
2945                      " continuing operation." % node)
2946
2947     disk.children.remove(child)
2948     self.cfg.AddInstance(instance)
2949
2950
2951 class LUReplaceDisks(LogicalUnit):
2952   """Replace the disks of an instance.
2953
2954   """
2955   HPATH = "mirrors-replace"
2956   HTYPE = constants.HTYPE_INSTANCE
2957   _OP_REQP = ["instance_name"]
2958
2959   def BuildHooksEnv(self):
2960     """Build hooks env.
2961
2962     This runs on the master, the primary and all the secondaries.
2963
2964     """
2965     env = {
2966       "NEW_SECONDARY": self.op.remote_node,
2967       "OLD_SECONDARY": self.instance.secondary_nodes[0],
2968       }
2969     env.update(_BuildInstanceHookEnvByObject(self.instance))
2970     nl = [self.sstore.GetMasterNode(),
2971           self.instance.primary_node] + list(self.instance.secondary_nodes)
2972     return env, nl, nl
2973
2974   def CheckPrereq(self):
2975     """Check prerequisites.
2976
2977     This checks that the instance is in the cluster.
2978
2979     """
2980     instance = self.cfg.GetInstanceInfo(
2981       self.cfg.ExpandInstanceName(self.op.instance_name))
2982     if instance is None:
2983       raise errors.OpPrereqError, ("Instance '%s' not known" %
2984                                    self.op.instance_name)
2985     self.instance = instance
2986
2987     if instance.disk_template != constants.DT_REMOTE_RAID1:
2988       raise errors.OpPrereqError, ("Instance's disk layout is not"
2989                                    " remote_raid1.")
2990
2991     if len(instance.secondary_nodes) != 1:
2992       raise errors.OpPrereqError, ("The instance has a strange layout,"
2993                                    " expected one secondary but found %d" %
2994                                    len(instance.secondary_nodes))
2995
2996     remote_node = getattr(self.op, "remote_node", None)
2997     if remote_node is None:
2998       remote_node = instance.secondary_nodes[0]
2999     else:
3000       remote_node = self.cfg.ExpandNodeName(remote_node)
3001       if remote_node is None:
3002         raise errors.OpPrereqError, ("Node '%s' not known" %
3003                                      self.op.remote_node)
3004     if remote_node == instance.primary_node:
3005       raise errors.OpPrereqError, ("The specified node is the primary node of"
3006                                    " the instance.")
3007     self.op.remote_node = remote_node
3008
3009   def Exec(self, feedback_fn):
3010     """Replace the disks of an instance.
3011
3012     """
3013     instance = self.instance
3014     iv_names = {}
3015     # start of work
3016     remote_node = self.op.remote_node
3017     cfg = self.cfg
3018     vgname = cfg.GetVGName()
3019     for dev in instance.disks:
3020       size = dev.size
3021       new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
3022                                        remote_node, size,
3023                                        "%s-%s" % (instance.name, dev.iv_name))
3024       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3025       logger.Info("adding new mirror component on secondary for %s" %
3026                   dev.iv_name)
3027       #HARDCODE
3028       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
3029         raise errors.OpExecError, ("Failed to create new component on"
3030                                    " secondary node %s\n"
3031                                    "Full abort, cleanup manually!" %
3032                                    remote_node)
3033
3034       logger.Info("adding new mirror component on primary")
3035       #HARDCODE
3036       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
3037         # remove secondary dev
3038         cfg.SetDiskID(new_drbd, remote_node)
3039         rpc.call_blockdev_remove(remote_node, new_drbd)
3040         raise errors.OpExecError("Failed to create volume on primary!\n"
3041                                  "Full abort, cleanup manually!!")
3042
3043       # the device exists now
3044       # call the primary node to add the mirror to md
3045       logger.Info("adding new mirror component to md")
3046       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3047                                         new_drbd):
3048         logger.Error("Can't add mirror compoment to md!")
3049         cfg.SetDiskID(new_drbd, remote_node)
3050         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3051           logger.Error("Can't rollback on secondary")
3052         cfg.SetDiskID(new_drbd, instance.primary_node)
3053         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3054           logger.Error("Can't rollback on primary")
3055         raise errors.OpExecError, ("Full abort, cleanup manually!!")
3056
3057       dev.children.append(new_drbd)
3058       cfg.AddInstance(instance)
3059
3060     # this can fail as the old devices are degraded and _WaitForSync
3061     # does a combined result over all disks, so we don't check its
3062     # return value
3063     _WaitForSync(cfg, instance, unlock=True)
3064
3065     # so check manually all the devices
3066     for name in iv_names:
3067       dev, child, new_drbd = iv_names[name]
3068       cfg.SetDiskID(dev, instance.primary_node)
3069       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3070       if is_degr:
3071         raise errors.OpExecError, ("MD device %s is degraded!" % name)
3072       cfg.SetDiskID(new_drbd, instance.primary_node)
3073       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3074       if is_degr:
3075         raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3076
3077     for name in iv_names:
3078       dev, child, new_drbd = iv_names[name]
3079       logger.Info("remove mirror %s component" % name)
3080       cfg.SetDiskID(dev, instance.primary_node)
3081       if not rpc.call_blockdev_removechild(instance.primary_node,
3082                                                 dev, child):
3083         logger.Error("Can't remove child from mirror, aborting"
3084                      " *this device cleanup*.\nYou need to cleanup manually!!")
3085         continue
3086
3087       for node in child.logical_id[:2]:
3088         logger.Info("remove child device on %s" % node)
3089         cfg.SetDiskID(child, node)
3090         if not rpc.call_blockdev_remove(node, child):
3091           logger.Error("Warning: failed to remove device from node %s,"
3092                        " continuing operation." % node)
3093
3094       dev.children.remove(child)
3095
3096       cfg.AddInstance(instance)
3097
3098
3099 class LUQueryInstanceData(NoHooksLU):
3100   """Query runtime instance data.
3101
3102   """
3103   _OP_REQP = ["instances"]
3104
3105   def CheckPrereq(self):
3106     """Check prerequisites.
3107
3108     This only checks the optional instance list against the existing names.
3109
3110     """
3111     if not isinstance(self.op.instances, list):
3112       raise errors.OpPrereqError, "Invalid argument type 'instances'"
3113     if self.op.instances:
3114       self.wanted_instances = []
3115       names = self.op.instances
3116       for name in names:
3117         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3118         if instance is None:
3119           raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3120       self.wanted_instances.append(instance)
3121     else:
3122       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3123                                in self.cfg.GetInstanceList()]
3124     return
3125
3126
3127   def _ComputeDiskStatus(self, instance, snode, dev):
3128     """Compute block device status.
3129
3130     """
3131     self.cfg.SetDiskID(dev, instance.primary_node)
3132     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3133     if dev.dev_type == "drbd":
3134       # we change the snode then (otherwise we use the one passed in)
3135       if dev.logical_id[0] == instance.primary_node:
3136         snode = dev.logical_id[1]
3137       else:
3138         snode = dev.logical_id[0]
3139
3140     if snode:
3141       self.cfg.SetDiskID(dev, snode)
3142       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3143     else:
3144       dev_sstatus = None
3145
3146     if dev.children:
3147       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3148                       for child in dev.children]
3149     else:
3150       dev_children = []
3151
3152     data = {
3153       "iv_name": dev.iv_name,
3154       "dev_type": dev.dev_type,
3155       "logical_id": dev.logical_id,
3156       "physical_id": dev.physical_id,
3157       "pstatus": dev_pstatus,
3158       "sstatus": dev_sstatus,
3159       "children": dev_children,
3160       }
3161
3162     return data
3163
3164   def Exec(self, feedback_fn):
3165     """Gather and return data"""
3166     result = {}
3167     for instance in self.wanted_instances:
3168       remote_info = rpc.call_instance_info(instance.primary_node,
3169                                                 instance.name)
3170       if remote_info and "state" in remote_info:
3171         remote_state = "up"
3172       else:
3173         remote_state = "down"
3174       if instance.status == "down":
3175         config_state = "down"
3176       else:
3177         config_state = "up"
3178
3179       disks = [self._ComputeDiskStatus(instance, None, device)
3180                for device in instance.disks]
3181
3182       idict = {
3183         "name": instance.name,
3184         "config_state": config_state,
3185         "run_state": remote_state,
3186         "pnode": instance.primary_node,
3187         "snodes": instance.secondary_nodes,
3188         "os": instance.os,
3189         "memory": instance.memory,
3190         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3191         "disks": disks,
3192         }
3193
3194       result[instance.name] = idict
3195
3196     return result
3197
3198
3199 class LUQueryNodeData(NoHooksLU):
3200   """Logical unit for querying node data.
3201
3202   """
3203   _OP_REQP = ["nodes"]
3204
3205   def CheckPrereq(self):
3206     """Check prerequisites.
3207
3208     This only checks the optional node list against the existing names.
3209
3210     """
3211     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3212
3213   def Exec(self, feedback_fn):
3214     """Compute and return the list of nodes.
3215
3216     """
3217     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3218              in self.cfg.GetInstanceList()]
3219     result = []
3220     for node in self.wanted_nodes:
3221       result.append((node.name, node.primary_ip, node.secondary_ip,
3222                      [inst.name for inst in ilist
3223                       if inst.primary_node == node.name],
3224                      [inst.name for inst in ilist
3225                       if node.name in inst.secondary_nodes],
3226                      ))
3227     return result
3228
3229
3230 class LUSetInstanceParms(LogicalUnit):
3231   """Modifies an instances's parameters.
3232
3233   """
3234   HPATH = "instance-modify"
3235   HTYPE = constants.HTYPE_INSTANCE
3236   _OP_REQP = ["instance_name"]
3237
3238   def BuildHooksEnv(self):
3239     """Build hooks env.
3240
3241     This runs on the master, primary and secondaries.
3242
3243     """
3244     args = dict()
3245     if self.mem:
3246       args['memory'] = self.mem
3247     if self.vcpus:
3248       args['vcpus'] = self.vcpus
3249     if self.do_ip or self.do_bridge:
3250       if self.do_ip:
3251         ip = self.ip
3252       else:
3253         ip = self.instance.nics[0].ip
3254       if self.bridge:
3255         bridge = self.bridge
3256       else:
3257         bridge = self.instance.nics[0].bridge
3258       args['nics'] = [(ip, bridge)]
3259     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3260     nl = [self.sstore.GetMasterNode(),
3261           self.instance.primary_node] + list(self.instance.secondary_nodes)
3262     return env, nl, nl
3263
3264   def CheckPrereq(self):
3265     """Check prerequisites.
3266
3267     This only checks the instance list against the existing names.
3268
3269     """
3270     self.mem = getattr(self.op, "mem", None)
3271     self.vcpus = getattr(self.op, "vcpus", None)
3272     self.ip = getattr(self.op, "ip", None)
3273     self.bridge = getattr(self.op, "bridge", None)
3274     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3275       raise errors.OpPrereqError, ("No changes submitted")
3276     if self.mem is not None:
3277       try:
3278         self.mem = int(self.mem)
3279       except ValueError, err:
3280         raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3281     if self.vcpus is not None:
3282       try:
3283         self.vcpus = int(self.vcpus)
3284       except ValueError, err:
3285         raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3286     if self.ip is not None:
3287       self.do_ip = True
3288       if self.ip.lower() == "none":
3289         self.ip = None
3290       else:
3291         if not utils.IsValidIP(self.ip):
3292           raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3293     else:
3294       self.do_ip = False
3295
3296     instance = self.cfg.GetInstanceInfo(
3297       self.cfg.ExpandInstanceName(self.op.instance_name))
3298     if instance is None:
3299       raise errors.OpPrereqError, ("No such instance name '%s'" %
3300                                    self.op.instance_name)
3301     self.op.instance_name = instance.name
3302     self.instance = instance
3303     return
3304
3305   def Exec(self, feedback_fn):
3306     """Modifies an instance.
3307
3308     All parameters take effect only at the next restart of the instance.
3309     """
3310     result = []
3311     instance = self.instance
3312     if self.mem:
3313       instance.memory = self.mem
3314       result.append(("mem", self.mem))
3315     if self.vcpus:
3316       instance.vcpus = self.vcpus
3317       result.append(("vcpus",  self.vcpus))
3318     if self.do_ip:
3319       instance.nics[0].ip = self.ip
3320       result.append(("ip", self.ip))
3321     if self.bridge:
3322       instance.nics[0].bridge = self.bridge
3323       result.append(("bridge", self.bridge))
3324
3325     self.cfg.AddInstance(instance)
3326
3327     return result
3328
3329
3330 class LUQueryExports(NoHooksLU):
3331   """Query the exports list
3332
3333   """
3334   _OP_REQP = []
3335
3336   def CheckPrereq(self):
3337     """Check that the nodelist contains only existing nodes.
3338
3339     """
3340     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3341
3342   def Exec(self, feedback_fn):
3343     """Compute the list of all the exported system images.
3344
3345     Returns:
3346       a dictionary with the structure node->(export-list)
3347       where export-list is a list of the instances exported on
3348       that node.
3349
3350     """
3351     return rpc.call_export_list([node.name for node in self.nodes])
3352
3353
3354 class LUExportInstance(LogicalUnit):
3355   """Export an instance to an image in the cluster.
3356
3357   """
3358   HPATH = "instance-export"
3359   HTYPE = constants.HTYPE_INSTANCE
3360   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3361
3362   def BuildHooksEnv(self):
3363     """Build hooks env.
3364
3365     This will run on the master, primary node and target node.
3366
3367     """
3368     env = {
3369       "EXPORT_NODE": self.op.target_node,
3370       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3371       }
3372     env.update(_BuildInstanceHookEnvByObject(self.instance))
3373     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3374           self.op.target_node]
3375     return env, nl, nl
3376
3377   def CheckPrereq(self):
3378     """Check prerequisites.
3379
3380     This checks that the instance name is a valid one.
3381
3382     """
3383     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3384     self.instance = self.cfg.GetInstanceInfo(instance_name)
3385     if self.instance is None:
3386       raise errors.OpPrereqError, ("Instance '%s' not found" %
3387                                    self.op.instance_name)
3388
3389     # node verification
3390     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3391     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3392
3393     if self.dst_node is None:
3394       raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3395                                    self.op.target_node)
3396     self.op.target_node = self.dst_node.name
3397
3398   def Exec(self, feedback_fn):
3399     """Export an instance to an image in the cluster.
3400
3401     """
3402     instance = self.instance
3403     dst_node = self.dst_node
3404     src_node = instance.primary_node
3405     # shutdown the instance, unless requested not to do so
3406     if self.op.shutdown:
3407       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3408       self.processor.ChainOpCode(op, feedback_fn)
3409
3410     vgname = self.cfg.GetVGName()
3411
3412     snap_disks = []
3413
3414     try:
3415       for disk in instance.disks:
3416         if disk.iv_name == "sda":
3417           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3418           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3419
3420           if not new_dev_name:
3421             logger.Error("could not snapshot block device %s on node %s" %
3422                          (disk.logical_id[1], src_node))
3423           else:
3424             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3425                                       logical_id=(vgname, new_dev_name),
3426                                       physical_id=(vgname, new_dev_name),
3427                                       iv_name=disk.iv_name)
3428             snap_disks.append(new_dev)
3429
3430     finally:
3431       if self.op.shutdown:
3432         op = opcodes.OpStartupInstance(instance_name=instance.name,
3433                                        force=False)
3434         self.processor.ChainOpCode(op, feedback_fn)
3435
3436     # TODO: check for size
3437
3438     for dev in snap_disks:
3439       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3440                                            instance):
3441         logger.Error("could not export block device %s from node"
3442                      " %s to node %s" %
3443                      (dev.logical_id[1], src_node, dst_node.name))
3444       if not rpc.call_blockdev_remove(src_node, dev):
3445         logger.Error("could not remove snapshot block device %s from"
3446                      " node %s" % (dev.logical_id[1], src_node))
3447
3448     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3449       logger.Error("could not finalize export for instance %s on node %s" %
3450                    (instance.name, dst_node.name))
3451
3452     nodelist = self.cfg.GetNodeList()
3453     nodelist.remove(dst_node.name)
3454
3455     # on one-node clusters nodelist will be empty after the removal
3456     # if we proceed the backup would be removed because OpQueryExports
3457     # substitutes an empty list with the full cluster node list.
3458     if nodelist:
3459       op = opcodes.OpQueryExports(nodes=nodelist)
3460       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3461       for node in exportlist:
3462         if instance.name in exportlist[node]:
3463           if not rpc.call_export_remove(node, instance.name):
3464             logger.Error("could not remove older export for instance %s"
3465                          " on node %s" % (instance.name, node))