Some small fixes.
[ganeti-local] / lib / cmdlib.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != socket.gethostname():
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded nodes.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if nodes is not None and not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted_nodes = []
179
180     for name in nodes:
181       node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184     wanted_nodes.append(node)
185
186     return wanted_nodes
187   else:
188     return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189
190
191 def _CheckOutputFields(static, dynamic, selected):
192   """Checks whether all selected fields are valid.
193
194   Args:
195     static: Static fields
196     dynamic: Dynamic fields
197
198   """
199   static_fields = frozenset(static)
200   dynamic_fields = frozenset(dynamic)
201
202   all_fields = static_fields | dynamic_fields
203
204   if not all_fields.issuperset(selected):
205     raise errors.OpPrereqError("Unknown output fields selected: %s"
206                                % ",".join(frozenset(selected).
207                                           difference(all_fields)))
208
209
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211                           memory, vcpus, nics):
212   """Builds instance related env variables for hooks from single variables.
213
214   Args:
215     secondary_nodes: List of secondary nodes as strings
216   """
217   env = {
218     "INSTANCE_NAME": name,
219     "INSTANCE_PRIMARY": primary_node,
220     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221     "INSTANCE_OS_TYPE": os_type,
222     "INSTANCE_STATUS": status,
223     "INSTANCE_MEMORY": memory,
224     "INSTANCE_VCPUS": vcpus,
225   }
226
227   if nics:
228     nic_count = len(nics)
229     for idx, (ip, bridge) in enumerate(nics):
230       if ip is None:
231         ip = ""
232       env["INSTANCE_NIC%d_IP" % idx] = ip
233       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234   else:
235     nic_count = 0
236
237   env["INSTANCE_NIC_COUNT"] = nic_count
238
239   return env
240
241
242 def _BuildInstanceHookEnvByObject(instance, override=None):
243   """Builds instance related env variables for hooks from an object.
244
245   Args:
246     instance: objects.Instance object of instance
247     override: dict of values to override
248   """
249   args = {
250     'name': instance.name,
251     'primary_node': instance.primary_node,
252     'secondary_nodes': instance.secondary_nodes,
253     'os_type': instance.os,
254     'status': instance.os,
255     'memory': instance.memory,
256     'vcpus': instance.vcpus,
257     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258   }
259   if override:
260     args.update(override)
261   return _BuildInstanceHookEnv(**args)
262
263
264 def _UpdateEtcHosts(fullnode, ip):
265   """Ensure a node has a correct entry in /etc/hosts.
266
267   Args:
268     fullnode - Fully qualified domain name of host. (str)
269     ip       - IPv4 address of host (str)
270
271   """
272   node = fullnode.split(".", 1)[0]
273
274   f = open('/etc/hosts', 'r+')
275
276   inthere = False
277
278   save_lines = []
279   add_lines = []
280   removed = False
281
282   while True:
283     rawline = f.readline()
284
285     if not rawline:
286       # End of file
287       break
288
289     line = rawline.split('\n')[0]
290
291     # Strip off comments
292     line = line.split('#')[0]
293
294     if not line:
295       # Entire line was comment, skip
296       save_lines.append(rawline)
297       continue
298
299     fields = line.split()
300
301     haveall = True
302     havesome = False
303     for spec in [ ip, fullnode, node ]:
304       if spec not in fields:
305         haveall = False
306       if spec in fields:
307         havesome = True
308
309     if haveall:
310       inthere = True
311       save_lines.append(rawline)
312       continue
313
314     if havesome and not haveall:
315       # Line (old, or manual?) which is missing some.  Remove.
316       removed = True
317       continue
318
319     save_lines.append(rawline)
320
321   if not inthere:
322     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323
324   if removed:
325     if add_lines:
326       save_lines = save_lines + add_lines
327
328     # We removed a line, write a new file and replace old.
329     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330     newfile = os.fdopen(fd, 'w')
331     newfile.write(''.join(save_lines))
332     newfile.close()
333     os.rename(tmpname, '/etc/hosts')
334
335   elif add_lines:
336     # Simply appending a new line will do the trick.
337     f.seek(0, 2)
338     for add in add_lines:
339       f.write(add)
340
341   f.close()
342
343
344 def _UpdateKnownHosts(fullnode, ip, pubkey):
345   """Ensure a node has a correct known_hosts entry.
346
347   Args:
348     fullnode - Fully qualified domain name of host. (str)
349     ip       - IPv4 address of host (str)
350     pubkey   - the public key of the cluster
351
352   """
353   if os.path.exists('/etc/ssh/ssh_known_hosts'):
354     f = open('/etc/ssh/ssh_known_hosts', 'r+')
355   else:
356     f = open('/etc/ssh/ssh_known_hosts', 'w+')
357
358   inthere = False
359
360   save_lines = []
361   add_lines = []
362   removed = False
363
364   while True:
365     rawline = f.readline()
366     logger.Debug('read %s' % (repr(rawline),))
367
368     if not rawline:
369       # End of file
370       break
371
372     line = rawline.split('\n')[0]
373
374     parts = line.split(' ')
375     fields = parts[0].split(',')
376     key = parts[2]
377
378     haveall = True
379     havesome = False
380     for spec in [ ip, fullnode ]:
381       if spec not in fields:
382         haveall = False
383       if spec in fields:
384         havesome = True
385
386     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387     if haveall and key == pubkey:
388       inthere = True
389       save_lines.append(rawline)
390       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391       continue
392
393     if havesome and (not haveall or key != pubkey):
394       removed = True
395       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396       continue
397
398     save_lines.append(rawline)
399
400   if not inthere:
401     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403
404   if removed:
405     save_lines = save_lines + add_lines
406
407     # Write a new file and replace old.
408     fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
409     newfile = os.fdopen(fd, 'w')
410     newfile.write(''.join(save_lines))
411     newfile.close()
412     logger.Debug("Wrote new known_hosts.")
413     os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
414
415   elif add_lines:
416     # Simply appending a new line will do the trick.
417     f.seek(0, 2)
418     for add in add_lines:
419       f.write(add)
420
421   f.close()
422
423
424 def _HasValidVG(vglist, vgname):
425   """Checks if the volume group list is valid.
426
427   A non-None return value means there's an error, and the return value
428   is the error message.
429
430   """
431   vgsize = vglist.get(vgname, None)
432   if vgsize is None:
433     return "volume group '%s' missing" % vgname
434   elif vgsize < 20480:
435     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
436             (vgname, vgsize))
437   return None
438
439
440 def _InitSSHSetup(node):
441   """Setup the SSH configuration for the cluster.
442
443
444   This generates a dsa keypair for root, adds the pub key to the
445   permitted hosts and adds the hostkey to its own known hosts.
446
447   Args:
448     node: the name of this host as a fqdn
449
450   """
451   utils.RemoveFile('/root/.ssh/known_hosts')
452
453   if os.path.exists('/root/.ssh/id_dsa'):
454     utils.CreateBackup('/root/.ssh/id_dsa')
455   if os.path.exists('/root/.ssh/id_dsa.pub'):
456     utils.CreateBackup('/root/.ssh/id_dsa.pub')
457
458   utils.RemoveFile('/root/.ssh/id_dsa')
459   utils.RemoveFile('/root/.ssh/id_dsa.pub')
460
461   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
462                          "-f", "/root/.ssh/id_dsa",
463                          "-q", "-N", ""])
464   if result.failed:
465     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
466                              result.output)
467
468   f = open('/root/.ssh/id_dsa.pub', 'r')
469   try:
470     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
471   finally:
472     f.close()
473
474
475 def _InitGanetiServerSetup(ss):
476   """Setup the necessary configuration for the initial node daemon.
477
478   This creates the nodepass file containing the shared password for
479   the cluster and also generates the SSL certificate.
480
481   """
482   # Create pseudo random password
483   randpass = sha.new(os.urandom(64)).hexdigest()
484   # and write it into sstore
485   ss.SetKey(ss.SS_NODED_PASS, randpass)
486
487   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
488                          "-days", str(365*5), "-nodes", "-x509",
489                          "-keyout", constants.SSL_CERT_FILE,
490                          "-out", constants.SSL_CERT_FILE, "-batch"])
491   if result.failed:
492     raise errors.OpExecError("could not generate server ssl cert, command"
493                              " %s had exitcode %s and error message %s" %
494                              (result.cmd, result.exit_code, result.output))
495
496   os.chmod(constants.SSL_CERT_FILE, 0400)
497
498   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
499
500   if result.failed:
501     raise errors.OpExecError("Could not start the node daemon, command %s"
502                              " had exitcode %s and error %s" %
503                              (result.cmd, result.exit_code, result.output))
504
505
506 class LUInitCluster(LogicalUnit):
507   """Initialise the cluster.
508
509   """
510   HPATH = "cluster-init"
511   HTYPE = constants.HTYPE_CLUSTER
512   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
513               "def_bridge", "master_netdev"]
514   REQ_CLUSTER = False
515
516   def BuildHooksEnv(self):
517     """Build hooks env.
518
519     Notes: Since we don't require a cluster, we must manually add
520     ourselves in the post-run node list.
521
522     """
523     env = {
524       "CLUSTER": self.op.cluster_name,
525       "MASTER": self.hostname['hostname_full'],
526       }
527     return env, [], [self.hostname['hostname_full']]
528
529   def CheckPrereq(self):
530     """Verify that the passed name is a valid one.
531
532     """
533     if config.ConfigWriter.IsCluster():
534       raise errors.OpPrereqError("Cluster is already initialised")
535
536     hostname_local = socket.gethostname()
537     self.hostname = hostname = utils.LookupHostname(hostname_local)
538     if not hostname:
539       raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
540                                  hostname_local)
541
542     self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
543     if not clustername:
544       raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
545                                  % self.op.cluster_name)
546
547     result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
548     if result.failed:
549       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
550                                  " to %s,\nbut this ip address does not"
551                                  " belong to this host."
552                                  " Aborting." % hostname['ip'])
553
554     secondary_ip = getattr(self.op, "secondary_ip", None)
555     if secondary_ip and not utils.IsValidIP(secondary_ip):
556       raise errors.OpPrereqError("Invalid secondary ip given")
557     if secondary_ip and secondary_ip != hostname['ip']:
558       result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
559       if result.failed:
560         raise errors.OpPrereqError("You gave %s as secondary IP,\n"
561                                    "but it does not belong to this host." %
562                                    secondary_ip)
563     self.secondary_ip = secondary_ip
564
565     # checks presence of the volume group given
566     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
567
568     if vgstatus:
569       raise errors.OpPrereqError("Error: %s" % vgstatus)
570
571     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
572                     self.op.mac_prefix):
573       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
574                                  self.op.mac_prefix)
575
576     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
577       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
578                                  self.op.hypervisor_type)
579
580     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
581     if result.failed:
582       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
583                                  (self.op.master_netdev,
584                                   result.output.strip()))
585
586   def Exec(self, feedback_fn):
587     """Initialize the cluster.
588
589     """
590     clustername = self.clustername
591     hostname = self.hostname
592
593     # set up the simple store
594     ss = ssconf.SimpleStore()
595     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
596     ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
597     ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
598     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
599     ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
600
601     # set up the inter-node password and certificate
602     _InitGanetiServerSetup(ss)
603
604     # start the master ip
605     rpc.call_node_start_master(hostname['hostname_full'])
606
607     # set up ssh config and /etc/hosts
608     f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
609     try:
610       sshline = f.read()
611     finally:
612       f.close()
613     sshkey = sshline.split(" ")[1]
614
615     _UpdateEtcHosts(hostname['hostname_full'],
616                     hostname['ip'],
617                     )
618
619     _UpdateKnownHosts(hostname['hostname_full'],
620                       hostname['ip'],
621                       sshkey,
622                       )
623
624     _InitSSHSetup(hostname['hostname'])
625
626     # init of cluster config file
627     cfgw = config.ConfigWriter()
628     cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
629                     sshkey, self.op.mac_prefix,
630                     self.op.vg_name, self.op.def_bridge)
631
632
633 class LUDestroyCluster(NoHooksLU):
634   """Logical unit for destroying the cluster.
635
636   """
637   _OP_REQP = []
638
639   def CheckPrereq(self):
640     """Check prerequisites.
641
642     This checks whether the cluster is empty.
643
644     Any errors are signalled by raising errors.OpPrereqError.
645
646     """
647     master = self.sstore.GetMasterNode()
648
649     nodelist = self.cfg.GetNodeList()
650     if len(nodelist) != 1 or nodelist[0] != master:
651       raise errors.OpPrereqError("There are still %d node(s) in"
652                                  " this cluster." % (len(nodelist) - 1))
653     instancelist = self.cfg.GetInstanceList()
654     if instancelist:
655       raise errors.OpPrereqError("There are still %d instance(s) in"
656                                  " this cluster." % len(instancelist))
657
658   def Exec(self, feedback_fn):
659     """Destroys the cluster.
660
661     """
662     utils.CreateBackup('/root/.ssh/id_dsa')
663     utils.CreateBackup('/root/.ssh/id_dsa.pub')
664     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
665
666
667 class LUVerifyCluster(NoHooksLU):
668   """Verifies the cluster status.
669
670   """
671   _OP_REQP = []
672
673   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
674                   remote_version, feedback_fn):
675     """Run multiple tests against a node.
676
677     Test list:
678       - compares ganeti version
679       - checks vg existance and size > 20G
680       - checks config file checksum
681       - checks ssh to other nodes
682
683     Args:
684       node: name of the node to check
685       file_list: required list of files
686       local_cksum: dictionary of local files and their checksums
687
688     """
689     # compares ganeti version
690     local_version = constants.PROTOCOL_VERSION
691     if not remote_version:
692       feedback_fn(" - ERROR: connection to %s failed" % (node))
693       return True
694
695     if local_version != remote_version:
696       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
697                       (local_version, node, remote_version))
698       return True
699
700     # checks vg existance and size > 20G
701
702     bad = False
703     if not vglist:
704       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
705                       (node,))
706       bad = True
707     else:
708       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
709       if vgstatus:
710         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
711         bad = True
712
713     # checks config file checksum
714     # checks ssh to any
715
716     if 'filelist' not in node_result:
717       bad = True
718       feedback_fn("  - ERROR: node hasn't returned file checksum data")
719     else:
720       remote_cksum = node_result['filelist']
721       for file_name in file_list:
722         if file_name not in remote_cksum:
723           bad = True
724           feedback_fn("  - ERROR: file '%s' missing" % file_name)
725         elif remote_cksum[file_name] != local_cksum[file_name]:
726           bad = True
727           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
728
729     if 'nodelist' not in node_result:
730       bad = True
731       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
732     else:
733       if node_result['nodelist']:
734         bad = True
735         for node in node_result['nodelist']:
736           feedback_fn("  - ERROR: communication with node '%s': %s" %
737                           (node, node_result['nodelist'][node]))
738     hyp_result = node_result.get('hypervisor', None)
739     if hyp_result is not None:
740       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
741     return bad
742
743   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
744     """Verify an instance.
745
746     This function checks to see if the required block devices are
747     available on the instance's node.
748
749     """
750     bad = False
751
752     instancelist = self.cfg.GetInstanceList()
753     if not instance in instancelist:
754       feedback_fn("  - ERROR: instance %s not in instance list %s" %
755                       (instance, instancelist))
756       bad = True
757
758     instanceconfig = self.cfg.GetInstanceInfo(instance)
759     node_current = instanceconfig.primary_node
760
761     node_vol_should = {}
762     instanceconfig.MapLVsByNode(node_vol_should)
763
764     for node in node_vol_should:
765       for volume in node_vol_should[node]:
766         if node not in node_vol_is or volume not in node_vol_is[node]:
767           feedback_fn("  - ERROR: volume %s missing on node %s" %
768                           (volume, node))
769           bad = True
770
771     if not instanceconfig.status == 'down':
772       if not instance in node_instance[node_current]:
773         feedback_fn("  - ERROR: instance %s not running on node %s" %
774                         (instance, node_current))
775         bad = True
776
777     for node in node_instance:
778       if (not node == node_current):
779         if instance in node_instance[node]:
780           feedback_fn("  - ERROR: instance %s should not run on node %s" %
781                           (instance, node))
782           bad = True
783
784     return not bad
785
786   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
787     """Verify if there are any unknown volumes in the cluster.
788
789     The .os, .swap and backup volumes are ignored. All other volumes are
790     reported as unknown.
791
792     """
793     bad = False
794
795     for node in node_vol_is:
796       for volume in node_vol_is[node]:
797         if node not in node_vol_should or volume not in node_vol_should[node]:
798           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
799                       (volume, node))
800           bad = True
801     return bad
802
803   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
804     """Verify the list of running instances.
805
806     This checks what instances are running but unknown to the cluster.
807
808     """
809     bad = False
810     for node in node_instance:
811       for runninginstance in node_instance[node]:
812         if runninginstance not in instancelist:
813           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
814                           (runninginstance, node))
815           bad = True
816     return bad
817
818   def CheckPrereq(self):
819     """Check prerequisites.
820
821     This has no prerequisites.
822
823     """
824     pass
825
826   def Exec(self, feedback_fn):
827     """Verify integrity of cluster, performing various test on nodes.
828
829     """
830     bad = False
831     feedback_fn("* Verifying global settings")
832     self.cfg.VerifyConfig()
833
834     master = self.sstore.GetMasterNode()
835     vg_name = self.cfg.GetVGName()
836     nodelist = utils.NiceSort(self.cfg.GetNodeList())
837     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
838     node_volume = {}
839     node_instance = {}
840
841     # FIXME: verify OS list
842     # do local checksums
843     file_names = list(self.sstore.GetFileList())
844     file_names.append(constants.SSL_CERT_FILE)
845     file_names.append(constants.CLUSTER_CONF_FILE)
846     local_checksums = utils.FingerprintFiles(file_names)
847
848     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
849     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
850     all_instanceinfo = rpc.call_instance_list(nodelist)
851     all_vglist = rpc.call_vg_list(nodelist)
852     node_verify_param = {
853       'filelist': file_names,
854       'nodelist': nodelist,
855       'hypervisor': None,
856       }
857     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858     all_rversion = rpc.call_version(nodelist)
859
860     for node in nodelist:
861       feedback_fn("* Verifying node %s" % node)
862       result = self._VerifyNode(node, file_names, local_checksums,
863                                 all_vglist[node], all_nvinfo[node],
864                                 all_rversion[node], feedback_fn)
865       bad = bad or result
866
867       # node_volume
868       volumeinfo = all_volumeinfo[node]
869
870       if type(volumeinfo) != dict:
871         feedback_fn("  - ERROR: connection to %s failed" % (node,))
872         bad = True
873         continue
874
875       node_volume[node] = volumeinfo
876
877       # node_instance
878       nodeinstance = all_instanceinfo[node]
879       if type(nodeinstance) != list:
880         feedback_fn("  - ERROR: connection to %s failed" % (node,))
881         bad = True
882         continue
883
884       node_instance[node] = nodeinstance
885
886     node_vol_should = {}
887
888     for instance in instancelist:
889       feedback_fn("* Verifying instance %s" % instance)
890       result =  self._VerifyInstance(instance, node_volume, node_instance,
891                                      feedback_fn)
892       bad = bad or result
893
894       inst_config = self.cfg.GetInstanceInfo(instance)
895
896       inst_config.MapLVsByNode(node_vol_should)
897
898     feedback_fn("* Verifying orphan volumes")
899     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
900                                        feedback_fn)
901     bad = bad or result
902
903     feedback_fn("* Verifying remaining instances")
904     result = self._VerifyOrphanInstances(instancelist, node_instance,
905                                          feedback_fn)
906     bad = bad or result
907
908     return int(bad)
909
910
911 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
912   """Sleep and poll for an instance's disk to sync.
913
914   """
915   if not instance.disks:
916     return True
917
918   if not oneshot:
919     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
920
921   node = instance.primary_node
922
923   for dev in instance.disks:
924     cfgw.SetDiskID(dev, node)
925
926   retries = 0
927   while True:
928     max_time = 0
929     done = True
930     cumul_degraded = False
931     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
932     if not rstats:
933       logger.ToStderr("Can't get any data from node %s" % node)
934       retries += 1
935       if retries >= 10:
936         raise errors.RemoteError("Can't contact node %s for mirror data,"
937                                  " aborting." % node)
938       time.sleep(6)
939       continue
940     retries = 0
941     for i in range(len(rstats)):
942       mstat = rstats[i]
943       if mstat is None:
944         logger.ToStderr("Can't compute data for node %s/%s" %
945                         (node, instance.disks[i].iv_name))
946         continue
947       perc_done, est_time, is_degraded = mstat
948       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
949       if perc_done is not None:
950         done = False
951         if est_time is not None:
952           rem_time = "%d estimated seconds remaining" % est_time
953           max_time = est_time
954         else:
955           rem_time = "no time estimate"
956         logger.ToStdout("- device %s: %5.2f%% done, %s" %
957                         (instance.disks[i].iv_name, perc_done, rem_time))
958     if done or oneshot:
959       break
960
961     if unlock:
962       utils.Unlock('cmd')
963     try:
964       time.sleep(min(60, max_time))
965     finally:
966       if unlock:
967         utils.Lock('cmd')
968
969   if done:
970     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
971   return not cumul_degraded
972
973
974 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
975   """Check that mirrors are not degraded.
976
977   """
978   cfgw.SetDiskID(dev, node)
979
980   result = True
981   if on_primary or dev.AssembleOnSecondary():
982     rstats = rpc.call_blockdev_find(node, dev)
983     if not rstats:
984       logger.ToStderr("Can't get any data from node %s" % node)
985       result = False
986     else:
987       result = result and (not rstats[5])
988   if dev.children:
989     for child in dev.children:
990       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
991
992   return result
993
994
995 class LUDiagnoseOS(NoHooksLU):
996   """Logical unit for OS diagnose/query.
997
998   """
999   _OP_REQP = []
1000
1001   def CheckPrereq(self):
1002     """Check prerequisites.
1003
1004     This always succeeds, since this is a pure query LU.
1005
1006     """
1007     return
1008
1009   def Exec(self, feedback_fn):
1010     """Compute the list of OSes.
1011
1012     """
1013     node_list = self.cfg.GetNodeList()
1014     node_data = rpc.call_os_diagnose(node_list)
1015     if node_data == False:
1016       raise errors.OpExecError("Can't gather the list of OSes")
1017     return node_data
1018
1019
1020 class LURemoveNode(LogicalUnit):
1021   """Logical unit for removing a node.
1022
1023   """
1024   HPATH = "node-remove"
1025   HTYPE = constants.HTYPE_NODE
1026   _OP_REQP = ["node_name"]
1027
1028   def BuildHooksEnv(self):
1029     """Build hooks env.
1030
1031     This doesn't run on the target node in the pre phase as a failed
1032     node would not allows itself to run.
1033
1034     """
1035     env = {
1036       "NODE_NAME": self.op.node_name,
1037       }
1038     all_nodes = self.cfg.GetNodeList()
1039     all_nodes.remove(self.op.node_name)
1040     return env, all_nodes, all_nodes
1041
1042   def CheckPrereq(self):
1043     """Check prerequisites.
1044
1045     This checks:
1046      - the node exists in the configuration
1047      - it does not have primary or secondary instances
1048      - it's not the master
1049
1050     Any errors are signalled by raising errors.OpPrereqError.
1051
1052     """
1053     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1054     if node is None:
1055       logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1056       return 1
1057
1058     instance_list = self.cfg.GetInstanceList()
1059
1060     masternode = self.sstore.GetMasterNode()
1061     if node.name == masternode:
1062       raise errors.OpPrereqError("Node is the master node,"
1063                                  " you need to failover first.")
1064
1065     for instance_name in instance_list:
1066       instance = self.cfg.GetInstanceInfo(instance_name)
1067       if node.name == instance.primary_node:
1068         raise errors.OpPrereqError("Instance %s still running on the node,"
1069                                    " please remove first." % instance_name)
1070       if node.name in instance.secondary_nodes:
1071         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1072                                    " please remove first." % instance_name)
1073     self.op.node_name = node.name
1074     self.node = node
1075
1076   def Exec(self, feedback_fn):
1077     """Removes the node from the cluster.
1078
1079     """
1080     node = self.node
1081     logger.Info("stopping the node daemon and removing configs from node %s" %
1082                 node.name)
1083
1084     rpc.call_node_leave_cluster(node.name)
1085
1086     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1087
1088     logger.Info("Removing node %s from config" % node.name)
1089
1090     self.cfg.RemoveNode(node.name)
1091
1092
1093 class LUQueryNodes(NoHooksLU):
1094   """Logical unit for querying nodes.
1095
1096   """
1097   _OP_REQP = ["output_fields"]
1098
1099   def CheckPrereq(self):
1100     """Check prerequisites.
1101
1102     This checks that the fields required are valid output fields.
1103
1104     """
1105     self.dynamic_fields = frozenset(["dtotal", "dfree",
1106                                      "mtotal", "mnode", "mfree"])
1107
1108     _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1109                        dynamic=self.dynamic_fields,
1110                        selected=self.op.output_fields)
1111
1112
1113   def Exec(self, feedback_fn):
1114     """Computes the list of nodes and their attributes.
1115
1116     """
1117     nodenames = utils.NiceSort(self.cfg.GetNodeList())
1118     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1119
1120
1121     # begin data gathering
1122
1123     if self.dynamic_fields.intersection(self.op.output_fields):
1124       live_data = {}
1125       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1126       for name in nodenames:
1127         nodeinfo = node_data.get(name, None)
1128         if nodeinfo:
1129           live_data[name] = {
1130             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1131             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1132             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1133             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1134             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1135             }
1136         else:
1137           live_data[name] = {}
1138     else:
1139       live_data = dict.fromkeys(nodenames, {})
1140
1141     node_to_primary = dict.fromkeys(nodenames, 0)
1142     node_to_secondary = dict.fromkeys(nodenames, 0)
1143
1144     if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1145       instancelist = self.cfg.GetInstanceList()
1146
1147       for instance in instancelist:
1148         instanceinfo = self.cfg.GetInstanceInfo(instance)
1149         node_to_primary[instanceinfo.primary_node] += 1
1150         for secnode in instanceinfo.secondary_nodes:
1151           node_to_secondary[secnode] += 1
1152
1153     # end data gathering
1154
1155     output = []
1156     for node in nodelist:
1157       node_output = []
1158       for field in self.op.output_fields:
1159         if field == "name":
1160           val = node.name
1161         elif field == "pinst":
1162           val = node_to_primary[node.name]
1163         elif field == "sinst":
1164           val = node_to_secondary[node.name]
1165         elif field == "pip":
1166           val = node.primary_ip
1167         elif field == "sip":
1168           val = node.secondary_ip
1169         elif field in self.dynamic_fields:
1170           val = live_data[node.name].get(field, "?")
1171         else:
1172           raise errors.ParameterError(field)
1173         val = str(val)
1174         node_output.append(val)
1175       output.append(node_output)
1176
1177     return output
1178
1179
1180 class LUQueryNodeVolumes(NoHooksLU):
1181   """Logical unit for getting volumes on node(s).
1182
1183   """
1184   _OP_REQP = ["nodes", "output_fields"]
1185
1186   def CheckPrereq(self):
1187     """Check prerequisites.
1188
1189     This checks that the fields required are valid output fields.
1190
1191     """
1192     self.nodes = _GetWantedNodes(self, self.op.nodes)
1193
1194     _CheckOutputFields(static=["node"],
1195                        dynamic=["phys", "vg", "name", "size", "instance"],
1196                        selected=self.op.output_fields)
1197
1198
1199   def Exec(self, feedback_fn):
1200     """Computes the list of nodes and their attributes.
1201
1202     """
1203     nodenames = utils.NiceSort([node.name for node in self.nodes])
1204     volumes = rpc.call_node_volumes(nodenames)
1205
1206     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1207              in self.cfg.GetInstanceList()]
1208
1209     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1210
1211     output = []
1212     for node in nodenames:
1213       if node not in volumes or not volumes[node]:
1214         continue
1215
1216       node_vols = volumes[node][:]
1217       node_vols.sort(key=lambda vol: vol['dev'])
1218
1219       for vol in node_vols:
1220         node_output = []
1221         for field in self.op.output_fields:
1222           if field == "node":
1223             val = node
1224           elif field == "phys":
1225             val = vol['dev']
1226           elif field == "vg":
1227             val = vol['vg']
1228           elif field == "name":
1229             val = vol['name']
1230           elif field == "size":
1231             val = int(float(vol['size']))
1232           elif field == "instance":
1233             for inst in ilist:
1234               if node not in lv_by_node[inst]:
1235                 continue
1236               if vol['name'] in lv_by_node[inst][node]:
1237                 val = inst.name
1238                 break
1239             else:
1240               val = '-'
1241           else:
1242             raise errors.ParameterError(field)
1243           node_output.append(str(val))
1244
1245         output.append(node_output)
1246
1247     return output
1248
1249
1250 class LUAddNode(LogicalUnit):
1251   """Logical unit for adding node to the cluster.
1252
1253   """
1254   HPATH = "node-add"
1255   HTYPE = constants.HTYPE_NODE
1256   _OP_REQP = ["node_name"]
1257
1258   def BuildHooksEnv(self):
1259     """Build hooks env.
1260
1261     This will run on all nodes before, and on all nodes + the new node after.
1262
1263     """
1264     env = {
1265       "NODE_NAME": self.op.node_name,
1266       "NODE_PIP": self.op.primary_ip,
1267       "NODE_SIP": self.op.secondary_ip,
1268       }
1269     nodes_0 = self.cfg.GetNodeList()
1270     nodes_1 = nodes_0 + [self.op.node_name, ]
1271     return env, nodes_0, nodes_1
1272
1273   def CheckPrereq(self):
1274     """Check prerequisites.
1275
1276     This checks:
1277      - the new node is not already in the config
1278      - it is resolvable
1279      - its parameters (single/dual homed) matches the cluster
1280
1281     Any errors are signalled by raising errors.OpPrereqError.
1282
1283     """
1284     node_name = self.op.node_name
1285     cfg = self.cfg
1286
1287     dns_data = utils.LookupHostname(node_name)
1288     if not dns_data:
1289       raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1290
1291     node = dns_data['hostname']
1292     primary_ip = self.op.primary_ip = dns_data['ip']
1293     secondary_ip = getattr(self.op, "secondary_ip", None)
1294     if secondary_ip is None:
1295       secondary_ip = primary_ip
1296     if not utils.IsValidIP(secondary_ip):
1297       raise errors.OpPrereqError("Invalid secondary IP given")
1298     self.op.secondary_ip = secondary_ip
1299     node_list = cfg.GetNodeList()
1300     if node in node_list:
1301       raise errors.OpPrereqError("Node %s is already in the configuration"
1302                                  % node)
1303
1304     for existing_node_name in node_list:
1305       existing_node = cfg.GetNodeInfo(existing_node_name)
1306       if (existing_node.primary_ip == primary_ip or
1307           existing_node.secondary_ip == primary_ip or
1308           existing_node.primary_ip == secondary_ip or
1309           existing_node.secondary_ip == secondary_ip):
1310         raise errors.OpPrereqError("New node ip address(es) conflict with"
1311                                    " existing node %s" % existing_node.name)
1312
1313     # check that the type of the node (single versus dual homed) is the
1314     # same as for the master
1315     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1316     master_singlehomed = myself.secondary_ip == myself.primary_ip
1317     newbie_singlehomed = secondary_ip == primary_ip
1318     if master_singlehomed != newbie_singlehomed:
1319       if master_singlehomed:
1320         raise errors.OpPrereqError("The master has no private ip but the"
1321                                    " new node has one")
1322       else:
1323         raise errors.OpPrereqError("The master has a private ip but the"
1324                                    " new node doesn't have one")
1325
1326     # checks reachablity
1327     command = ["fping", "-q", primary_ip]
1328     result = utils.RunCmd(command)
1329     if result.failed:
1330       raise errors.OpPrereqError("Node not reachable by ping")
1331
1332     if not newbie_singlehomed:
1333       # check reachability from my secondary ip to newbie's secondary ip
1334       command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1335       result = utils.RunCmd(command)
1336       if result.failed:
1337         raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1338
1339     self.new_node = objects.Node(name=node,
1340                                  primary_ip=primary_ip,
1341                                  secondary_ip=secondary_ip)
1342
1343   def Exec(self, feedback_fn):
1344     """Adds the new node to the cluster.
1345
1346     """
1347     new_node = self.new_node
1348     node = new_node.name
1349
1350     # set up inter-node password and certificate and restarts the node daemon
1351     gntpass = self.sstore.GetNodeDaemonPassword()
1352     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1353       raise errors.OpExecError("ganeti password corruption detected")
1354     f = open(constants.SSL_CERT_FILE)
1355     try:
1356       gntpem = f.read(8192)
1357     finally:
1358       f.close()
1359     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1360     # so we use this to detect an invalid certificate; as long as the
1361     # cert doesn't contain this, the here-document will be correctly
1362     # parsed by the shell sequence below
1363     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1364       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1365     if not gntpem.endswith("\n"):
1366       raise errors.OpExecError("PEM must end with newline")
1367     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1368
1369     # remove first the root's known_hosts file
1370     utils.RemoveFile("/root/.ssh/known_hosts")
1371     # and then connect with ssh to set password and start ganeti-noded
1372     # note that all the below variables are sanitized at this point,
1373     # either by being constants or by the checks above
1374     ss = self.sstore
1375     mycommand = ("umask 077 && "
1376                  "echo '%s' > '%s' && "
1377                  "cat > '%s' << '!EOF.' && \n"
1378                  "%s!EOF.\n%s restart" %
1379                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1380                   constants.SSL_CERT_FILE, gntpem,
1381                   constants.NODE_INITD_SCRIPT))
1382
1383     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1384     if result.failed:
1385       raise errors.OpExecError("Remote command on node %s, error: %s,"
1386                                " output: %s" %
1387                                (node, result.fail_reason, result.output))
1388
1389     # check connectivity
1390     time.sleep(4)
1391
1392     result = rpc.call_version([node])[node]
1393     if result:
1394       if constants.PROTOCOL_VERSION == result:
1395         logger.Info("communication to node %s fine, sw version %s match" %
1396                     (node, result))
1397       else:
1398         raise errors.OpExecError("Version mismatch master version %s,"
1399                                  " node version %s" %
1400                                  (constants.PROTOCOL_VERSION, result))
1401     else:
1402       raise errors.OpExecError("Cannot get version from the new node")
1403
1404     # setup ssh on node
1405     logger.Info("copy ssh key to node %s" % node)
1406     keyarray = []
1407     keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1408                 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1409                 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1410
1411     for i in keyfiles:
1412       f = open(i, 'r')
1413       try:
1414         keyarray.append(f.read())
1415       finally:
1416         f.close()
1417
1418     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1419                                keyarray[3], keyarray[4], keyarray[5])
1420
1421     if not result:
1422       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1423
1424     # Add node to our /etc/hosts, and add key to known_hosts
1425     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1426     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1427                       self.cfg.GetHostKey())
1428
1429     if new_node.secondary_ip != new_node.primary_ip:
1430       result = ssh.SSHCall(node, "root",
1431                            "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1432       if result.failed:
1433         raise errors.OpExecError("Node claims it doesn't have the"
1434                                  " secondary ip you gave (%s).\n"
1435                                  "Please fix and re-run this command." %
1436                                  new_node.secondary_ip)
1437
1438     # Distribute updated /etc/hosts and known_hosts to all nodes,
1439     # including the node just added
1440     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1441     dist_nodes = self.cfg.GetNodeList() + [node]
1442     if myself.name in dist_nodes:
1443       dist_nodes.remove(myself.name)
1444
1445     logger.Debug("Copying hosts and known_hosts to all nodes")
1446     for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1447       result = rpc.call_upload_file(dist_nodes, fname)
1448       for to_node in dist_nodes:
1449         if not result[to_node]:
1450           logger.Error("copy of file %s to node %s failed" %
1451                        (fname, to_node))
1452
1453     to_copy = ss.GetFileList()
1454     for fname in to_copy:
1455       if not ssh.CopyFileToNode(node, fname):
1456         logger.Error("could not copy file %s to node %s" % (fname, node))
1457
1458     logger.Info("adding node %s to cluster.conf" % node)
1459     self.cfg.AddNode(new_node)
1460
1461
1462 class LUMasterFailover(LogicalUnit):
1463   """Failover the master node to the current node.
1464
1465   This is a special LU in that it must run on a non-master node.
1466
1467   """
1468   HPATH = "master-failover"
1469   HTYPE = constants.HTYPE_CLUSTER
1470   REQ_MASTER = False
1471   _OP_REQP = []
1472
1473   def BuildHooksEnv(self):
1474     """Build hooks env.
1475
1476     This will run on the new master only in the pre phase, and on all
1477     the nodes in the post phase.
1478
1479     """
1480     env = {
1481       "NEW_MASTER": self.new_master,
1482       "OLD_MASTER": self.old_master,
1483       }
1484     return env, [self.new_master], self.cfg.GetNodeList()
1485
1486   def CheckPrereq(self):
1487     """Check prerequisites.
1488
1489     This checks that we are not already the master.
1490
1491     """
1492     self.new_master = socket.gethostname()
1493
1494     self.old_master = self.sstore.GetMasterNode()
1495
1496     if self.old_master == self.new_master:
1497       raise errors.OpPrereqError("This commands must be run on the node"
1498                                  " where you want the new master to be.\n"
1499                                  "%s is already the master" %
1500                                  self.old_master)
1501
1502   def Exec(self, feedback_fn):
1503     """Failover the master node.
1504
1505     This command, when run on a non-master node, will cause the current
1506     master to cease being master, and the non-master to become new
1507     master.
1508
1509     """
1510     #TODO: do not rely on gethostname returning the FQDN
1511     logger.Info("setting master to %s, old master: %s" %
1512                 (self.new_master, self.old_master))
1513
1514     if not rpc.call_node_stop_master(self.old_master):
1515       logger.Error("could disable the master role on the old master"
1516                    " %s, please disable manually" % self.old_master)
1517
1518     ss = self.sstore
1519     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1520     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1521                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1522       logger.Error("could not distribute the new simple store master file"
1523                    " to the other nodes, please check.")
1524
1525     if not rpc.call_node_start_master(self.new_master):
1526       logger.Error("could not start the master role on the new master"
1527                    " %s, please check" % self.new_master)
1528       feedback_fn("Error in activating the master IP on the new master,\n"
1529                   "please fix manually.")
1530
1531
1532
1533 class LUQueryClusterInfo(NoHooksLU):
1534   """Query cluster configuration.
1535
1536   """
1537   _OP_REQP = []
1538   REQ_MASTER = False
1539
1540   def CheckPrereq(self):
1541     """No prerequsites needed for this LU.
1542
1543     """
1544     pass
1545
1546   def Exec(self, feedback_fn):
1547     """Return cluster config.
1548
1549     """
1550     result = {
1551       "name": self.sstore.GetClusterName(),
1552       "software_version": constants.RELEASE_VERSION,
1553       "protocol_version": constants.PROTOCOL_VERSION,
1554       "config_version": constants.CONFIG_VERSION,
1555       "os_api_version": constants.OS_API_VERSION,
1556       "export_version": constants.EXPORT_VERSION,
1557       "master": self.sstore.GetMasterNode(),
1558       "architecture": (platform.architecture()[0], platform.machine()),
1559       }
1560
1561     return result
1562
1563
1564 class LUClusterCopyFile(NoHooksLU):
1565   """Copy file to cluster.
1566
1567   """
1568   _OP_REQP = ["nodes", "filename"]
1569
1570   def CheckPrereq(self):
1571     """Check prerequisites.
1572
1573     It should check that the named file exists and that the given list
1574     of nodes is valid.
1575
1576     """
1577     if not os.path.exists(self.op.filename):
1578       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1579
1580     self.nodes = _GetWantedNodes(self, self.op.nodes)
1581
1582   def Exec(self, feedback_fn):
1583     """Copy a file from master to some nodes.
1584
1585     Args:
1586       opts - class with options as members
1587       args - list containing a single element, the file name
1588     Opts used:
1589       nodes - list containing the name of target nodes; if empty, all nodes
1590
1591     """
1592     filename = self.op.filename
1593
1594     myname = socket.gethostname()
1595
1596     for node in self.nodes:
1597       if node == myname:
1598         continue
1599       if not ssh.CopyFileToNode(node, filename):
1600         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1601
1602
1603 class LUDumpClusterConfig(NoHooksLU):
1604   """Return a text-representation of the cluster-config.
1605
1606   """
1607   _OP_REQP = []
1608
1609   def CheckPrereq(self):
1610     """No prerequisites.
1611
1612     """
1613     pass
1614
1615   def Exec(self, feedback_fn):
1616     """Dump a representation of the cluster config to the standard output.
1617
1618     """
1619     return self.cfg.DumpConfig()
1620
1621
1622 class LURunClusterCommand(NoHooksLU):
1623   """Run a command on some nodes.
1624
1625   """
1626   _OP_REQP = ["command", "nodes"]
1627
1628   def CheckPrereq(self):
1629     """Check prerequisites.
1630
1631     It checks that the given list of nodes is valid.
1632
1633     """
1634     self.nodes = _GetWantedNodes(self, self.op.nodes)
1635
1636   def Exec(self, feedback_fn):
1637     """Run a command on some nodes.
1638
1639     """
1640     data = []
1641     for node in self.nodes:
1642       result = utils.RunCmd(["ssh", node.name, self.op.command])
1643       data.append((node.name, result.cmd, result.output, result.exit_code))
1644
1645     return data
1646
1647
1648 class LUActivateInstanceDisks(NoHooksLU):
1649   """Bring up an instance's disks.
1650
1651   """
1652   _OP_REQP = ["instance_name"]
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that the instance is in the cluster.
1658
1659     """
1660     instance = self.cfg.GetInstanceInfo(
1661       self.cfg.ExpandInstanceName(self.op.instance_name))
1662     if instance is None:
1663       raise errors.OpPrereqError("Instance '%s' not known" %
1664                                  self.op.instance_name)
1665     self.instance = instance
1666
1667
1668   def Exec(self, feedback_fn):
1669     """Activate the disks.
1670
1671     """
1672     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1673     if not disks_ok:
1674       raise errors.OpExecError("Cannot activate block devices")
1675
1676     return disks_info
1677
1678
1679 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1680   """Prepare the block devices for an instance.
1681
1682   This sets up the block devices on all nodes.
1683
1684   Args:
1685     instance: a ganeti.objects.Instance object
1686     ignore_secondaries: if true, errors on secondary nodes won't result
1687                         in an error return from the function
1688
1689   Returns:
1690     false if the operation failed
1691     list of (host, instance_visible_name, node_visible_name) if the operation
1692          suceeded with the mapping from node devices to instance devices
1693   """
1694   device_info = []
1695   disks_ok = True
1696   for inst_disk in instance.disks:
1697     master_result = None
1698     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1699       cfg.SetDiskID(node_disk, node)
1700       is_primary = node == instance.primary_node
1701       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1702       if not result:
1703         logger.Error("could not prepare block device %s on node %s (is_pri"
1704                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1705         if is_primary or not ignore_secondaries:
1706           disks_ok = False
1707       if is_primary:
1708         master_result = result
1709     device_info.append((instance.primary_node, inst_disk.iv_name,
1710                         master_result))
1711
1712   return disks_ok, device_info
1713
1714
1715 def _StartInstanceDisks(cfg, instance, force):
1716   """Start the disks of an instance.
1717
1718   """
1719   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1720                                            ignore_secondaries=force)
1721   if not disks_ok:
1722     _ShutdownInstanceDisks(instance, cfg)
1723     if force is not None and not force:
1724       logger.Error("If the message above refers to a secondary node,"
1725                    " you can retry the operation using '--force'.")
1726     raise errors.OpExecError("Disk consistency error")
1727
1728
1729 class LUDeactivateInstanceDisks(NoHooksLU):
1730   """Shutdown an instance's disks.
1731
1732   """
1733   _OP_REQP = ["instance_name"]
1734
1735   def CheckPrereq(self):
1736     """Check prerequisites.
1737
1738     This checks that the instance is in the cluster.
1739
1740     """
1741     instance = self.cfg.GetInstanceInfo(
1742       self.cfg.ExpandInstanceName(self.op.instance_name))
1743     if instance is None:
1744       raise errors.OpPrereqError("Instance '%s' not known" %
1745                                  self.op.instance_name)
1746     self.instance = instance
1747
1748   def Exec(self, feedback_fn):
1749     """Deactivate the disks
1750
1751     """
1752     instance = self.instance
1753     ins_l = rpc.call_instance_list([instance.primary_node])
1754     ins_l = ins_l[instance.primary_node]
1755     if not type(ins_l) is list:
1756       raise errors.OpExecError("Can't contact node '%s'" %
1757                                instance.primary_node)
1758
1759     if self.instance.name in ins_l:
1760       raise errors.OpExecError("Instance is running, can't shutdown"
1761                                " block devices.")
1762
1763     _ShutdownInstanceDisks(instance, self.cfg)
1764
1765
1766 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1767   """Shutdown block devices of an instance.
1768
1769   This does the shutdown on all nodes of the instance.
1770
1771   If the ignore_primary is false, errors on the primary node are
1772   ignored.
1773
1774   """
1775   result = True
1776   for disk in instance.disks:
1777     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1778       cfg.SetDiskID(top_disk, node)
1779       if not rpc.call_blockdev_shutdown(node, top_disk):
1780         logger.Error("could not shutdown block device %s on node %s" %
1781                      (disk.iv_name, node))
1782         if not ignore_primary or node != instance.primary_node:
1783           result = False
1784   return result
1785
1786
1787 class LUStartupInstance(LogicalUnit):
1788   """Starts an instance.
1789
1790   """
1791   HPATH = "instance-start"
1792   HTYPE = constants.HTYPE_INSTANCE
1793   _OP_REQP = ["instance_name", "force"]
1794
1795   def BuildHooksEnv(self):
1796     """Build hooks env.
1797
1798     This runs on master, primary and secondary nodes of the instance.
1799
1800     """
1801     env = {
1802       "FORCE": self.op.force,
1803       }
1804     env.update(_BuildInstanceHookEnvByObject(self.instance))
1805     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1806           list(self.instance.secondary_nodes))
1807     return env, nl, nl
1808
1809   def CheckPrereq(self):
1810     """Check prerequisites.
1811
1812     This checks that the instance is in the cluster.
1813
1814     """
1815     instance = self.cfg.GetInstanceInfo(
1816       self.cfg.ExpandInstanceName(self.op.instance_name))
1817     if instance is None:
1818       raise errors.OpPrereqError("Instance '%s' not known" %
1819                                  self.op.instance_name)
1820
1821     # check bridges existance
1822     brlist = [nic.bridge for nic in instance.nics]
1823     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1824       raise errors.OpPrereqError("one or more target bridges %s does not"
1825                                  " exist on destination node '%s'" %
1826                                  (brlist, instance.primary_node))
1827
1828     self.instance = instance
1829     self.op.instance_name = instance.name
1830
1831   def Exec(self, feedback_fn):
1832     """Start the instance.
1833
1834     """
1835     instance = self.instance
1836     force = self.op.force
1837     extra_args = getattr(self.op, "extra_args", "")
1838
1839     node_current = instance.primary_node
1840
1841     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1842     if not nodeinfo:
1843       raise errors.OpExecError("Could not contact node %s for infos" %
1844                                (node_current))
1845
1846     freememory = nodeinfo[node_current]['memory_free']
1847     memory = instance.memory
1848     if memory > freememory:
1849       raise errors.OpExecError("Not enough memory to start instance"
1850                                " %s on node %s"
1851                                " needed %s MiB, available %s MiB" %
1852                                (instance.name, node_current, memory,
1853                                 freememory))
1854
1855     _StartInstanceDisks(self.cfg, instance, force)
1856
1857     if not rpc.call_instance_start(node_current, instance, extra_args):
1858       _ShutdownInstanceDisks(instance, self.cfg)
1859       raise errors.OpExecError("Could not start instance")
1860
1861     self.cfg.MarkInstanceUp(instance.name)
1862
1863
1864 class LUShutdownInstance(LogicalUnit):
1865   """Shutdown an instance.
1866
1867   """
1868   HPATH = "instance-stop"
1869   HTYPE = constants.HTYPE_INSTANCE
1870   _OP_REQP = ["instance_name"]
1871
1872   def BuildHooksEnv(self):
1873     """Build hooks env.
1874
1875     This runs on master, primary and secondary nodes of the instance.
1876
1877     """
1878     env = _BuildInstanceHookEnvByObject(self.instance)
1879     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1880           list(self.instance.secondary_nodes))
1881     return env, nl, nl
1882
1883   def CheckPrereq(self):
1884     """Check prerequisites.
1885
1886     This checks that the instance is in the cluster.
1887
1888     """
1889     instance = self.cfg.GetInstanceInfo(
1890       self.cfg.ExpandInstanceName(self.op.instance_name))
1891     if instance is None:
1892       raise errors.OpPrereqError("Instance '%s' not known" %
1893                                  self.op.instance_name)
1894     self.instance = instance
1895
1896   def Exec(self, feedback_fn):
1897     """Shutdown the instance.
1898
1899     """
1900     instance = self.instance
1901     node_current = instance.primary_node
1902     if not rpc.call_instance_shutdown(node_current, instance):
1903       logger.Error("could not shutdown instance")
1904
1905     self.cfg.MarkInstanceDown(instance.name)
1906     _ShutdownInstanceDisks(instance, self.cfg)
1907
1908
1909 class LUReinstallInstance(LogicalUnit):
1910   """Reinstall an instance.
1911
1912   """
1913   HPATH = "instance-reinstall"
1914   HTYPE = constants.HTYPE_INSTANCE
1915   _OP_REQP = ["instance_name"]
1916
1917   def BuildHooksEnv(self):
1918     """Build hooks env.
1919
1920     This runs on master, primary and secondary nodes of the instance.
1921
1922     """
1923     env = _BuildInstanceHookEnvByObject(self.instance)
1924     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1925           list(self.instance.secondary_nodes))
1926     return env, nl, nl
1927
1928   def CheckPrereq(self):
1929     """Check prerequisites.
1930
1931     This checks that the instance is in the cluster and is not running.
1932
1933     """
1934     instance = self.cfg.GetInstanceInfo(
1935       self.cfg.ExpandInstanceName(self.op.instance_name))
1936     if instance is None:
1937       raise errors.OpPrereqError("Instance '%s' not known" %
1938                                  self.op.instance_name)
1939     if instance.disk_template == constants.DT_DISKLESS:
1940       raise errors.OpPrereqError("Instance '%s' has no disks" %
1941                                  self.op.instance_name)
1942     if instance.status != "down":
1943       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1944                                  self.op.instance_name)
1945     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1946     if remote_info:
1947       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1948                                  (self.op.instance_name,
1949                                   instance.primary_node))
1950
1951     self.op.os_type = getattr(self.op, "os_type", None)
1952     if self.op.os_type is not None:
1953       # OS verification
1954       pnode = self.cfg.GetNodeInfo(
1955         self.cfg.ExpandNodeName(instance.primary_node))
1956       if pnode is None:
1957         raise errors.OpPrereqError("Primary node '%s' is unknown" %
1958                                    self.op.pnode)
1959       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1960       if not isinstance(os_obj, objects.OS):
1961         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1962                                    " primary node"  % self.op.os_type)
1963
1964     self.instance = instance
1965
1966   def Exec(self, feedback_fn):
1967     """Reinstall the instance.
1968
1969     """
1970     inst = self.instance
1971
1972     if self.op.os_type is not None:
1973       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1974       inst.os = self.op.os_type
1975       self.cfg.AddInstance(inst)
1976
1977     _StartInstanceDisks(self.cfg, inst, None)
1978     try:
1979       feedback_fn("Running the instance OS create scripts...")
1980       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1981         raise errors.OpExecError("Could not install OS for instance %s "
1982                                  "on node %s" %
1983                                  (inst.name, inst.primary_node))
1984     finally:
1985       _ShutdownInstanceDisks(inst, self.cfg)
1986
1987
1988 class LURemoveInstance(LogicalUnit):
1989   """Remove an instance.
1990
1991   """
1992   HPATH = "instance-remove"
1993   HTYPE = constants.HTYPE_INSTANCE
1994   _OP_REQP = ["instance_name"]
1995
1996   def BuildHooksEnv(self):
1997     """Build hooks env.
1998
1999     This runs on master, primary and secondary nodes of the instance.
2000
2001     """
2002     env = _BuildInstanceHookEnvByObject(self.instance)
2003     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2004           list(self.instance.secondary_nodes))
2005     return env, nl, nl
2006
2007   def CheckPrereq(self):
2008     """Check prerequisites.
2009
2010     This checks that the instance is in the cluster.
2011
2012     """
2013     instance = self.cfg.GetInstanceInfo(
2014       self.cfg.ExpandInstanceName(self.op.instance_name))
2015     if instance is None:
2016       raise errors.OpPrereqError("Instance '%s' not known" %
2017                                  self.op.instance_name)
2018     self.instance = instance
2019
2020   def Exec(self, feedback_fn):
2021     """Remove the instance.
2022
2023     """
2024     instance = self.instance
2025     logger.Info("shutting down instance %s on node %s" %
2026                 (instance.name, instance.primary_node))
2027
2028     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2029       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2030                                (instance.name, instance.primary_node))
2031
2032     logger.Info("removing block devices for instance %s" % instance.name)
2033
2034     _RemoveDisks(instance, self.cfg)
2035
2036     logger.Info("removing instance %s out of cluster config" % instance.name)
2037
2038     self.cfg.RemoveInstance(instance.name)
2039
2040
2041 class LUQueryInstances(NoHooksLU):
2042   """Logical unit for querying instances.
2043
2044   """
2045   _OP_REQP = ["output_fields"]
2046
2047   def CheckPrereq(self):
2048     """Check prerequisites.
2049
2050     This checks that the fields required are valid output fields.
2051
2052     """
2053     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2054     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2055                                "admin_state", "admin_ram",
2056                                "disk_template", "ip", "mac", "bridge"],
2057                        dynamic=self.dynamic_fields,
2058                        selected=self.op.output_fields)
2059
2060   def Exec(self, feedback_fn):
2061     """Computes the list of nodes and their attributes.
2062
2063     """
2064     instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2065     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2066                      in instance_names]
2067
2068     # begin data gathering
2069
2070     nodes = frozenset([inst.primary_node for inst in instance_list])
2071
2072     bad_nodes = []
2073     if self.dynamic_fields.intersection(self.op.output_fields):
2074       live_data = {}
2075       node_data = rpc.call_all_instances_info(nodes)
2076       for name in nodes:
2077         result = node_data[name]
2078         if result:
2079           live_data.update(result)
2080         elif result == False:
2081           bad_nodes.append(name)
2082         # else no instance is alive
2083     else:
2084       live_data = dict([(name, {}) for name in instance_names])
2085
2086     # end data gathering
2087
2088     output = []
2089     for instance in instance_list:
2090       iout = []
2091       for field in self.op.output_fields:
2092         if field == "name":
2093           val = instance.name
2094         elif field == "os":
2095           val = instance.os
2096         elif field == "pnode":
2097           val = instance.primary_node
2098         elif field == "snodes":
2099           val = ",".join(instance.secondary_nodes) or "-"
2100         elif field == "admin_state":
2101           if instance.status == "down":
2102             val = "no"
2103           else:
2104             val = "yes"
2105         elif field == "oper_state":
2106           if instance.primary_node in bad_nodes:
2107             val = "(node down)"
2108           else:
2109             if live_data.get(instance.name):
2110               val = "running"
2111             else:
2112               val = "stopped"
2113         elif field == "admin_ram":
2114           val = instance.memory
2115         elif field == "oper_ram":
2116           if instance.primary_node in bad_nodes:
2117             val = "(node down)"
2118           elif instance.name in live_data:
2119             val = live_data[instance.name].get("memory", "?")
2120           else:
2121             val = "-"
2122         elif field == "disk_template":
2123           val = instance.disk_template
2124         elif field == "ip":
2125           val = instance.nics[0].ip
2126         elif field == "bridge":
2127           val = instance.nics[0].bridge
2128         elif field == "mac":
2129           val = instance.nics[0].mac
2130         else:
2131           raise errors.ParameterError(field)
2132         val = str(val)
2133         iout.append(val)
2134       output.append(iout)
2135
2136     return output
2137
2138
2139 class LUFailoverInstance(LogicalUnit):
2140   """Failover an instance.
2141
2142   """
2143   HPATH = "instance-failover"
2144   HTYPE = constants.HTYPE_INSTANCE
2145   _OP_REQP = ["instance_name", "ignore_consistency"]
2146
2147   def BuildHooksEnv(self):
2148     """Build hooks env.
2149
2150     This runs on master, primary and secondary nodes of the instance.
2151
2152     """
2153     env = {
2154       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2155       }
2156     env.update(_BuildInstanceHookEnvByObject(self.instance))
2157     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2158     return env, nl, nl
2159
2160   def CheckPrereq(self):
2161     """Check prerequisites.
2162
2163     This checks that the instance is in the cluster.
2164
2165     """
2166     instance = self.cfg.GetInstanceInfo(
2167       self.cfg.ExpandInstanceName(self.op.instance_name))
2168     if instance is None:
2169       raise errors.OpPrereqError("Instance '%s' not known" %
2170                                  self.op.instance_name)
2171
2172     # check memory requirements on the secondary node
2173     target_node = instance.secondary_nodes[0]
2174     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2175     info = nodeinfo.get(target_node, None)
2176     if not info:
2177       raise errors.OpPrereqError("Cannot get current information"
2178                                  " from node '%s'" % nodeinfo)
2179     if instance.memory > info['memory_free']:
2180       raise errors.OpPrereqError("Not enough memory on target node %s."
2181                                  " %d MB available, %d MB required" %
2182                                  (target_node, info['memory_free'],
2183                                   instance.memory))
2184
2185     # check bridge existance
2186     brlist = [nic.bridge for nic in instance.nics]
2187     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2188       raise errors.OpPrereqError("One or more target bridges %s does not"
2189                                  " exist on destination node '%s'" %
2190                                  (brlist, instance.primary_node))
2191
2192     self.instance = instance
2193
2194   def Exec(self, feedback_fn):
2195     """Failover an instance.
2196
2197     The failover is done by shutting it down on its present node and
2198     starting it on the secondary.
2199
2200     """
2201     instance = self.instance
2202
2203     source_node = instance.primary_node
2204     target_node = instance.secondary_nodes[0]
2205
2206     feedback_fn("* checking disk consistency between source and target")
2207     for dev in instance.disks:
2208       # for remote_raid1, these are md over drbd
2209       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2210         if not self.op.ignore_consistency:
2211           raise errors.OpExecError("Disk %s is degraded on target node,"
2212                                    " aborting failover." % dev.iv_name)
2213
2214     feedback_fn("* checking target node resource availability")
2215     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2216
2217     if not nodeinfo:
2218       raise errors.OpExecError("Could not contact target node %s." %
2219                                target_node)
2220
2221     free_memory = int(nodeinfo[target_node]['memory_free'])
2222     memory = instance.memory
2223     if memory > free_memory:
2224       raise errors.OpExecError("Not enough memory to create instance %s on"
2225                                " node %s. needed %s MiB, available %s MiB" %
2226                                (instance.name, target_node, memory,
2227                                 free_memory))
2228
2229     feedback_fn("* shutting down instance on source node")
2230     logger.Info("Shutting down instance %s on node %s" %
2231                 (instance.name, source_node))
2232
2233     if not rpc.call_instance_shutdown(source_node, instance):
2234       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2235                    " anyway. Please make sure node %s is down"  %
2236                    (instance.name, source_node, source_node))
2237
2238     feedback_fn("* deactivating the instance's disks on source node")
2239     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2240       raise errors.OpExecError("Can't shut down the instance's disks.")
2241
2242     instance.primary_node = target_node
2243     # distribute new instance config to the other nodes
2244     self.cfg.AddInstance(instance)
2245
2246     feedback_fn("* activating the instance's disks on target node")
2247     logger.Info("Starting instance %s on node %s" %
2248                 (instance.name, target_node))
2249
2250     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2251                                              ignore_secondaries=True)
2252     if not disks_ok:
2253       _ShutdownInstanceDisks(instance, self.cfg)
2254       raise errors.OpExecError("Can't activate the instance's disks")
2255
2256     feedback_fn("* starting the instance on the target node")
2257     if not rpc.call_instance_start(target_node, instance, None):
2258       _ShutdownInstanceDisks(instance, self.cfg)
2259       raise errors.OpExecError("Could not start instance %s on node %s." %
2260                                (instance.name, target_node))
2261
2262
2263 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2264   """Create a tree of block devices on the primary node.
2265
2266   This always creates all devices.
2267
2268   """
2269   if device.children:
2270     for child in device.children:
2271       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2272         return False
2273
2274   cfg.SetDiskID(device, node)
2275   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2276   if not new_id:
2277     return False
2278   if device.physical_id is None:
2279     device.physical_id = new_id
2280   return True
2281
2282
2283 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2284   """Create a tree of block devices on a secondary node.
2285
2286   If this device type has to be created on secondaries, create it and
2287   all its children.
2288
2289   If not, just recurse to children keeping the same 'force' value.
2290
2291   """
2292   if device.CreateOnSecondary():
2293     force = True
2294   if device.children:
2295     for child in device.children:
2296       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2297         return False
2298
2299   if not force:
2300     return True
2301   cfg.SetDiskID(device, node)
2302   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2303   if not new_id:
2304     return False
2305   if device.physical_id is None:
2306     device.physical_id = new_id
2307   return True
2308
2309
2310 def _GenerateUniqueNames(cfg, exts):
2311   """Generate a suitable LV name.
2312
2313   This will generate a logical volume name for the given instance.
2314
2315   """
2316   results = []
2317   for val in exts:
2318     new_id = cfg.GenerateUniqueID()
2319     results.append("%s%s" % (new_id, val))
2320   return results
2321
2322
2323 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2324   """Generate a drbd device complete with its children.
2325
2326   """
2327   port = cfg.AllocatePort()
2328   vgname = cfg.GetVGName()
2329   dev_data = objects.Disk(dev_type="lvm", size=size,
2330                           logical_id=(vgname, names[0]))
2331   dev_meta = objects.Disk(dev_type="lvm", size=128,
2332                           logical_id=(vgname, names[1]))
2333   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2334                           logical_id = (primary, secondary, port),
2335                           children = [dev_data, dev_meta])
2336   return drbd_dev
2337
2338
2339 def _GenerateDiskTemplate(cfg, template_name,
2340                           instance_name, primary_node,
2341                           secondary_nodes, disk_sz, swap_sz):
2342   """Generate the entire disk layout for a given template type.
2343
2344   """
2345   #TODO: compute space requirements
2346
2347   vgname = cfg.GetVGName()
2348   if template_name == "diskless":
2349     disks = []
2350   elif template_name == "plain":
2351     if len(secondary_nodes) != 0:
2352       raise errors.ProgrammerError("Wrong template configuration")
2353
2354     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2355     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2356                            logical_id=(vgname, names[0]),
2357                            iv_name = "sda")
2358     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2359                            logical_id=(vgname, names[1]),
2360                            iv_name = "sdb")
2361     disks = [sda_dev, sdb_dev]
2362   elif template_name == "local_raid1":
2363     if len(secondary_nodes) != 0:
2364       raise errors.ProgrammerError("Wrong template configuration")
2365
2366
2367     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2368                                        ".sdb_m1", ".sdb_m2"])
2369     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2370                               logical_id=(vgname, names[0]))
2371     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2372                               logical_id=(vgname, names[1]))
2373     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2374                               size=disk_sz,
2375                               children = [sda_dev_m1, sda_dev_m2])
2376     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2377                               logical_id=(vgname, names[2]))
2378     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2379                               logical_id=(vgname, names[3]))
2380     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2381                               size=swap_sz,
2382                               children = [sdb_dev_m1, sdb_dev_m2])
2383     disks = [md_sda_dev, md_sdb_dev]
2384   elif template_name == "remote_raid1":
2385     if len(secondary_nodes) != 1:
2386       raise errors.ProgrammerError("Wrong template configuration")
2387     remote_node = secondary_nodes[0]
2388     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2389                                        ".sdb_data", ".sdb_meta"])
2390     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2391                                          disk_sz, names[0:2])
2392     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2393                               children = [drbd_sda_dev], size=disk_sz)
2394     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2395                                          swap_sz, names[2:4])
2396     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2397                               children = [drbd_sdb_dev], size=swap_sz)
2398     disks = [md_sda_dev, md_sdb_dev]
2399   else:
2400     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2401   return disks
2402
2403
2404 def _GetInstanceInfoText(instance):
2405   """Compute that text that should be added to the disk's metadata.
2406
2407   """
2408   return "originstname+%s" % instance.name
2409
2410
2411 def _CreateDisks(cfg, instance):
2412   """Create all disks for an instance.
2413
2414   This abstracts away some work from AddInstance.
2415
2416   Args:
2417     instance: the instance object
2418
2419   Returns:
2420     True or False showing the success of the creation process
2421
2422   """
2423   info = _GetInstanceInfoText(instance)
2424
2425   for device in instance.disks:
2426     logger.Info("creating volume %s for instance %s" %
2427               (device.iv_name, instance.name))
2428     #HARDCODE
2429     for secondary_node in instance.secondary_nodes:
2430       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2431                                         info):
2432         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2433                      (device.iv_name, device, secondary_node))
2434         return False
2435     #HARDCODE
2436     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2437       logger.Error("failed to create volume %s on primary!" %
2438                    device.iv_name)
2439       return False
2440   return True
2441
2442
2443 def _RemoveDisks(instance, cfg):
2444   """Remove all disks for an instance.
2445
2446   This abstracts away some work from `AddInstance()` and
2447   `RemoveInstance()`. Note that in case some of the devices couldn't
2448   be remove, the removal will continue with the other ones (compare
2449   with `_CreateDisks()`).
2450
2451   Args:
2452     instance: the instance object
2453
2454   Returns:
2455     True or False showing the success of the removal proces
2456
2457   """
2458   logger.Info("removing block devices for instance %s" % instance.name)
2459
2460   result = True
2461   for device in instance.disks:
2462     for node, disk in device.ComputeNodeTree(instance.primary_node):
2463       cfg.SetDiskID(disk, node)
2464       if not rpc.call_blockdev_remove(node, disk):
2465         logger.Error("could not remove block device %s on node %s,"
2466                      " continuing anyway" %
2467                      (device.iv_name, node))
2468         result = False
2469   return result
2470
2471
2472 class LUCreateInstance(LogicalUnit):
2473   """Create an instance.
2474
2475   """
2476   HPATH = "instance-add"
2477   HTYPE = constants.HTYPE_INSTANCE
2478   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2479               "disk_template", "swap_size", "mode", "start", "vcpus",
2480               "wait_for_sync"]
2481
2482   def BuildHooksEnv(self):
2483     """Build hooks env.
2484
2485     This runs on master, primary and secondary nodes of the instance.
2486
2487     """
2488     env = {
2489       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2490       "INSTANCE_DISK_SIZE": self.op.disk_size,
2491       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2492       "INSTANCE_ADD_MODE": self.op.mode,
2493       }
2494     if self.op.mode == constants.INSTANCE_IMPORT:
2495       env["INSTANCE_SRC_NODE"] = self.op.src_node
2496       env["INSTANCE_SRC_PATH"] = self.op.src_path
2497       env["INSTANCE_SRC_IMAGE"] = self.src_image
2498
2499     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2500       primary_node=self.op.pnode,
2501       secondary_nodes=self.secondaries,
2502       status=self.instance_status,
2503       os_type=self.op.os_type,
2504       memory=self.op.mem_size,
2505       vcpus=self.op.vcpus,
2506       nics=[(self.inst_ip, self.op.bridge)],
2507     ))
2508
2509     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2510           self.secondaries)
2511     return env, nl, nl
2512
2513
2514   def CheckPrereq(self):
2515     """Check prerequisites.
2516
2517     """
2518     if self.op.mode not in (constants.INSTANCE_CREATE,
2519                             constants.INSTANCE_IMPORT):
2520       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2521                                  self.op.mode)
2522
2523     if self.op.mode == constants.INSTANCE_IMPORT:
2524       src_node = getattr(self.op, "src_node", None)
2525       src_path = getattr(self.op, "src_path", None)
2526       if src_node is None or src_path is None:
2527         raise errors.OpPrereqError("Importing an instance requires source"
2528                                    " node and path options")
2529       src_node_full = self.cfg.ExpandNodeName(src_node)
2530       if src_node_full is None:
2531         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2532       self.op.src_node = src_node = src_node_full
2533
2534       if not os.path.isabs(src_path):
2535         raise errors.OpPrereqError("The source path must be absolute")
2536
2537       export_info = rpc.call_export_info(src_node, src_path)
2538
2539       if not export_info:
2540         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2541
2542       if not export_info.has_section(constants.INISECT_EXP):
2543         raise errors.ProgrammerError("Corrupted export config")
2544
2545       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2546       if (int(ei_version) != constants.EXPORT_VERSION):
2547         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2548                                    (ei_version, constants.EXPORT_VERSION))
2549
2550       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2551         raise errors.OpPrereqError("Can't import instance with more than"
2552                                    " one data disk")
2553
2554       # FIXME: are the old os-es, disk sizes, etc. useful?
2555       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2556       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2557                                                          'disk0_dump'))
2558       self.src_image = diskimage
2559     else: # INSTANCE_CREATE
2560       if getattr(self.op, "os_type", None) is None:
2561         raise errors.OpPrereqError("No guest OS specified")
2562
2563     # check primary node
2564     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2565     if pnode is None:
2566       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2567                                  self.op.pnode)
2568     self.op.pnode = pnode.name
2569     self.pnode = pnode
2570     self.secondaries = []
2571     # disk template and mirror node verification
2572     if self.op.disk_template not in constants.DISK_TEMPLATES:
2573       raise errors.OpPrereqError("Invalid disk template name")
2574
2575     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2576       if getattr(self.op, "snode", None) is None:
2577         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2578                                    " a mirror node")
2579
2580       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2581       if snode_name is None:
2582         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2583                                    self.op.snode)
2584       elif snode_name == pnode.name:
2585         raise errors.OpPrereqError("The secondary node cannot be"
2586                                    " the primary node.")
2587       self.secondaries.append(snode_name)
2588
2589     # Check lv size requirements
2590     nodenames = [pnode.name] + self.secondaries
2591     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2592
2593     # Required free disk space as a function of disk and swap space
2594     req_size_dict = {
2595       constants.DT_DISKLESS: 0,
2596       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2597       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2598       # 256 MB are added for drbd metadata, 128MB for each drbd device
2599       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2600     }
2601
2602     if self.op.disk_template not in req_size_dict:
2603       raise errors.ProgrammerError("Disk template '%s' size requirement"
2604                                    " is unknown" %  self.op.disk_template)
2605
2606     req_size = req_size_dict[self.op.disk_template]
2607
2608     for node in nodenames:
2609       info = nodeinfo.get(node, None)
2610       if not info:
2611         raise errors.OpPrereqError("Cannot get current information"
2612                                    " from node '%s'" % nodeinfo)
2613       if req_size > info['vg_free']:
2614         raise errors.OpPrereqError("Not enough disk space on target node %s."
2615                                    " %d MB available, %d MB required" %
2616                                    (node, info['vg_free'], req_size))
2617
2618     # os verification
2619     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2620     if not isinstance(os_obj, objects.OS):
2621       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2622                                  " primary node"  % self.op.os_type)
2623
2624     # instance verification
2625     hostname1 = utils.LookupHostname(self.op.instance_name)
2626     if not hostname1:
2627       raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2628                                  self.op.instance_name)
2629
2630     self.op.instance_name = instance_name = hostname1['hostname']
2631     instance_list = self.cfg.GetInstanceList()
2632     if instance_name in instance_list:
2633       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2634                                  instance_name)
2635
2636     ip = getattr(self.op, "ip", None)
2637     if ip is None or ip.lower() == "none":
2638       inst_ip = None
2639     elif ip.lower() == "auto":
2640       inst_ip = hostname1['ip']
2641     else:
2642       if not utils.IsValidIP(ip):
2643         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2644                                    " like a valid IP" % ip)
2645       inst_ip = ip
2646     self.inst_ip = inst_ip
2647
2648     command = ["fping", "-q", hostname1['ip']]
2649     result = utils.RunCmd(command)
2650     if not result.failed:
2651       raise errors.OpPrereqError("IP %s of instance %s already in use" %
2652                                  (hostname1['ip'], instance_name))
2653
2654     # bridge verification
2655     bridge = getattr(self.op, "bridge", None)
2656     if bridge is None:
2657       self.op.bridge = self.cfg.GetDefBridge()
2658     else:
2659       self.op.bridge = bridge
2660
2661     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2662       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2663                                  " destination node '%s'" %
2664                                  (self.op.bridge, pnode.name))
2665
2666     if self.op.start:
2667       self.instance_status = 'up'
2668     else:
2669       self.instance_status = 'down'
2670
2671   def Exec(self, feedback_fn):
2672     """Create and add the instance to the cluster.
2673
2674     """
2675     instance = self.op.instance_name
2676     pnode_name = self.pnode.name
2677
2678     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2679     if self.inst_ip is not None:
2680       nic.ip = self.inst_ip
2681
2682     disks = _GenerateDiskTemplate(self.cfg,
2683                                   self.op.disk_template,
2684                                   instance, pnode_name,
2685                                   self.secondaries, self.op.disk_size,
2686                                   self.op.swap_size)
2687
2688     iobj = objects.Instance(name=instance, os=self.op.os_type,
2689                             primary_node=pnode_name,
2690                             memory=self.op.mem_size,
2691                             vcpus=self.op.vcpus,
2692                             nics=[nic], disks=disks,
2693                             disk_template=self.op.disk_template,
2694                             status=self.instance_status,
2695                             )
2696
2697     feedback_fn("* creating instance disks...")
2698     if not _CreateDisks(self.cfg, iobj):
2699       _RemoveDisks(iobj, self.cfg)
2700       raise errors.OpExecError("Device creation failed, reverting...")
2701
2702     feedback_fn("adding instance %s to cluster config" % instance)
2703
2704     self.cfg.AddInstance(iobj)
2705
2706     if self.op.wait_for_sync:
2707       disk_abort = not _WaitForSync(self.cfg, iobj)
2708     elif iobj.disk_template == "remote_raid1":
2709       # make sure the disks are not degraded (still sync-ing is ok)
2710       time.sleep(15)
2711       feedback_fn("* checking mirrors status")
2712       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2713     else:
2714       disk_abort = False
2715
2716     if disk_abort:
2717       _RemoveDisks(iobj, self.cfg)
2718       self.cfg.RemoveInstance(iobj.name)
2719       raise errors.OpExecError("There are some degraded disks for"
2720                                " this instance")
2721
2722     feedback_fn("creating os for instance %s on node %s" %
2723                 (instance, pnode_name))
2724
2725     if iobj.disk_template != constants.DT_DISKLESS:
2726       if self.op.mode == constants.INSTANCE_CREATE:
2727         feedback_fn("* running the instance OS create scripts...")
2728         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2729           raise errors.OpExecError("could not add os for instance %s"
2730                                    " on node %s" %
2731                                    (instance, pnode_name))
2732
2733       elif self.op.mode == constants.INSTANCE_IMPORT:
2734         feedback_fn("* running the instance OS import scripts...")
2735         src_node = self.op.src_node
2736         src_image = self.src_image
2737         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2738                                                 src_node, src_image):
2739           raise errors.OpExecError("Could not import os for instance"
2740                                    " %s on node %s" %
2741                                    (instance, pnode_name))
2742       else:
2743         # also checked in the prereq part
2744         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2745                                      % self.op.mode)
2746
2747     if self.op.start:
2748       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2749       feedback_fn("* starting instance...")
2750       if not rpc.call_instance_start(pnode_name, iobj, None):
2751         raise errors.OpExecError("Could not start instance")
2752
2753
2754 class LUConnectConsole(NoHooksLU):
2755   """Connect to an instance's console.
2756
2757   This is somewhat special in that it returns the command line that
2758   you need to run on the master node in order to connect to the
2759   console.
2760
2761   """
2762   _OP_REQP = ["instance_name"]
2763
2764   def CheckPrereq(self):
2765     """Check prerequisites.
2766
2767     This checks that the instance is in the cluster.
2768
2769     """
2770     instance = self.cfg.GetInstanceInfo(
2771       self.cfg.ExpandInstanceName(self.op.instance_name))
2772     if instance is None:
2773       raise errors.OpPrereqError("Instance '%s' not known" %
2774                                  self.op.instance_name)
2775     self.instance = instance
2776
2777   def Exec(self, feedback_fn):
2778     """Connect to the console of an instance
2779
2780     """
2781     instance = self.instance
2782     node = instance.primary_node
2783
2784     node_insts = rpc.call_instance_list([node])[node]
2785     if node_insts is False:
2786       raise errors.OpExecError("Can't connect to node %s." % node)
2787
2788     if instance.name not in node_insts:
2789       raise errors.OpExecError("Instance %s is not running." % instance.name)
2790
2791     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2792
2793     hyper = hypervisor.GetHypervisor()
2794     console_cmd = hyper.GetShellCommandForConsole(instance.name)
2795     return node, console_cmd
2796
2797
2798 class LUAddMDDRBDComponent(LogicalUnit):
2799   """Adda new mirror member to an instance's disk.
2800
2801   """
2802   HPATH = "mirror-add"
2803   HTYPE = constants.HTYPE_INSTANCE
2804   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2805
2806   def BuildHooksEnv(self):
2807     """Build hooks env.
2808
2809     This runs on the master, the primary and all the secondaries.
2810
2811     """
2812     env = {
2813       "NEW_SECONDARY": self.op.remote_node,
2814       "DISK_NAME": self.op.disk_name,
2815       }
2816     env.update(_BuildInstanceHookEnvByObject(self.instance))
2817     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2818           self.op.remote_node,] + list(self.instance.secondary_nodes)
2819     return env, nl, nl
2820
2821   def CheckPrereq(self):
2822     """Check prerequisites.
2823
2824     This checks that the instance is in the cluster.
2825
2826     """
2827     instance = self.cfg.GetInstanceInfo(
2828       self.cfg.ExpandInstanceName(self.op.instance_name))
2829     if instance is None:
2830       raise errors.OpPrereqError("Instance '%s' not known" %
2831                                  self.op.instance_name)
2832     self.instance = instance
2833
2834     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2835     if remote_node is None:
2836       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2837     self.remote_node = remote_node
2838
2839     if remote_node == instance.primary_node:
2840       raise errors.OpPrereqError("The specified node is the primary node of"
2841                                  " the instance.")
2842
2843     if instance.disk_template != constants.DT_REMOTE_RAID1:
2844       raise errors.OpPrereqError("Instance's disk layout is not"
2845                                  " remote_raid1.")
2846     for disk in instance.disks:
2847       if disk.iv_name == self.op.disk_name:
2848         break
2849     else:
2850       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2851                                  " instance." % self.op.disk_name)
2852     if len(disk.children) > 1:
2853       raise errors.OpPrereqError("The device already has two slave"
2854                                  " devices.\n"
2855                                  "This would create a 3-disk raid1"
2856                                  " which we don't allow.")
2857     self.disk = disk
2858
2859   def Exec(self, feedback_fn):
2860     """Add the mirror component
2861
2862     """
2863     disk = self.disk
2864     instance = self.instance
2865
2866     remote_node = self.remote_node
2867     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2868     names = _GenerateUniqueNames(self.cfg, lv_names)
2869     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2870                                      remote_node, disk.size, names)
2871
2872     logger.Info("adding new mirror component on secondary")
2873     #HARDCODE
2874     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2875                                       _GetInstanceInfoText(instance)):
2876       raise errors.OpExecError("Failed to create new component on secondary"
2877                                " node %s" % remote_node)
2878
2879     logger.Info("adding new mirror component on primary")
2880     #HARDCODE
2881     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2882                                     _GetInstanceInfoText(instance)):
2883       # remove secondary dev
2884       self.cfg.SetDiskID(new_drbd, remote_node)
2885       rpc.call_blockdev_remove(remote_node, new_drbd)
2886       raise errors.OpExecError("Failed to create volume on primary")
2887
2888     # the device exists now
2889     # call the primary node to add the mirror to md
2890     logger.Info("adding new mirror component to md")
2891     if not rpc.call_blockdev_addchild(instance.primary_node,
2892                                            disk, new_drbd):
2893       logger.Error("Can't add mirror compoment to md!")
2894       self.cfg.SetDiskID(new_drbd, remote_node)
2895       if not rpc.call_blockdev_remove(remote_node, new_drbd):
2896         logger.Error("Can't rollback on secondary")
2897       self.cfg.SetDiskID(new_drbd, instance.primary_node)
2898       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2899         logger.Error("Can't rollback on primary")
2900       raise errors.OpExecError("Can't add mirror component to md array")
2901
2902     disk.children.append(new_drbd)
2903
2904     self.cfg.AddInstance(instance)
2905
2906     _WaitForSync(self.cfg, instance)
2907
2908     return 0
2909
2910
2911 class LURemoveMDDRBDComponent(LogicalUnit):
2912   """Remove a component from a remote_raid1 disk.
2913
2914   """
2915   HPATH = "mirror-remove"
2916   HTYPE = constants.HTYPE_INSTANCE
2917   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2918
2919   def BuildHooksEnv(self):
2920     """Build hooks env.
2921
2922     This runs on the master, the primary and all the secondaries.
2923
2924     """
2925     env = {
2926       "DISK_NAME": self.op.disk_name,
2927       "DISK_ID": self.op.disk_id,
2928       "OLD_SECONDARY": self.old_secondary,
2929       }
2930     env.update(_BuildInstanceHookEnvByObject(self.instance))
2931     nl = [self.sstore.GetMasterNode(),
2932           self.instance.primary_node] + list(self.instance.secondary_nodes)
2933     return env, nl, nl
2934
2935   def CheckPrereq(self):
2936     """Check prerequisites.
2937
2938     This checks that the instance is in the cluster.
2939
2940     """
2941     instance = self.cfg.GetInstanceInfo(
2942       self.cfg.ExpandInstanceName(self.op.instance_name))
2943     if instance is None:
2944       raise errors.OpPrereqError("Instance '%s' not known" %
2945                                  self.op.instance_name)
2946     self.instance = instance
2947
2948     if instance.disk_template != constants.DT_REMOTE_RAID1:
2949       raise errors.OpPrereqError("Instance's disk layout is not"
2950                                  " remote_raid1.")
2951     for disk in instance.disks:
2952       if disk.iv_name == self.op.disk_name:
2953         break
2954     else:
2955       raise errors.OpPrereqError("Can't find this device ('%s') in the"
2956                                  " instance." % self.op.disk_name)
2957     for child in disk.children:
2958       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2959         break
2960     else:
2961       raise errors.OpPrereqError("Can't find the device with this port.")
2962
2963     if len(disk.children) < 2:
2964       raise errors.OpPrereqError("Cannot remove the last component from"
2965                                  " a mirror.")
2966     self.disk = disk
2967     self.child = child
2968     if self.child.logical_id[0] == instance.primary_node:
2969       oid = 1
2970     else:
2971       oid = 0
2972     self.old_secondary = self.child.logical_id[oid]
2973
2974   def Exec(self, feedback_fn):
2975     """Remove the mirror component
2976
2977     """
2978     instance = self.instance
2979     disk = self.disk
2980     child = self.child
2981     logger.Info("remove mirror component")
2982     self.cfg.SetDiskID(disk, instance.primary_node)
2983     if not rpc.call_blockdev_removechild(instance.primary_node,
2984                                               disk, child):
2985       raise errors.OpExecError("Can't remove child from mirror.")
2986
2987     for node in child.logical_id[:2]:
2988       self.cfg.SetDiskID(child, node)
2989       if not rpc.call_blockdev_remove(node, child):
2990         logger.Error("Warning: failed to remove device from node %s,"
2991                      " continuing operation." % node)
2992
2993     disk.children.remove(child)
2994     self.cfg.AddInstance(instance)
2995
2996
2997 class LUReplaceDisks(LogicalUnit):
2998   """Replace the disks of an instance.
2999
3000   """
3001   HPATH = "mirrors-replace"
3002   HTYPE = constants.HTYPE_INSTANCE
3003   _OP_REQP = ["instance_name"]
3004
3005   def BuildHooksEnv(self):
3006     """Build hooks env.
3007
3008     This runs on the master, the primary and all the secondaries.
3009
3010     """
3011     env = {
3012       "NEW_SECONDARY": self.op.remote_node,
3013       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3014       }
3015     env.update(_BuildInstanceHookEnvByObject(self.instance))
3016     nl = [self.sstore.GetMasterNode(),
3017           self.instance.primary_node] + list(self.instance.secondary_nodes)
3018     return env, nl, nl
3019
3020   def CheckPrereq(self):
3021     """Check prerequisites.
3022
3023     This checks that the instance is in the cluster.
3024
3025     """
3026     instance = self.cfg.GetInstanceInfo(
3027       self.cfg.ExpandInstanceName(self.op.instance_name))
3028     if instance is None:
3029       raise errors.OpPrereqError("Instance '%s' not known" %
3030                                  self.op.instance_name)
3031     self.instance = instance
3032
3033     if instance.disk_template != constants.DT_REMOTE_RAID1:
3034       raise errors.OpPrereqError("Instance's disk layout is not"
3035                                  " remote_raid1.")
3036
3037     if len(instance.secondary_nodes) != 1:
3038       raise errors.OpPrereqError("The instance has a strange layout,"
3039                                  " expected one secondary but found %d" %
3040                                  len(instance.secondary_nodes))
3041
3042     remote_node = getattr(self.op, "remote_node", None)
3043     if remote_node is None:
3044       remote_node = instance.secondary_nodes[0]
3045     else:
3046       remote_node = self.cfg.ExpandNodeName(remote_node)
3047       if remote_node is None:
3048         raise errors.OpPrereqError("Node '%s' not known" %
3049                                    self.op.remote_node)
3050     if remote_node == instance.primary_node:
3051       raise errors.OpPrereqError("The specified node is the primary node of"
3052                                  " the instance.")
3053     self.op.remote_node = remote_node
3054
3055   def Exec(self, feedback_fn):
3056     """Replace the disks of an instance.
3057
3058     """
3059     instance = self.instance
3060     iv_names = {}
3061     # start of work
3062     remote_node = self.op.remote_node
3063     cfg = self.cfg
3064     vgname = cfg.GetVGName()
3065     for dev in instance.disks:
3066       size = dev.size
3067       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3068       names = _GenerateUniqueNames(cfg, lv_names)
3069       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3070                                        remote_node, size, names)
3071       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3072       logger.Info("adding new mirror component on secondary for %s" %
3073                   dev.iv_name)
3074       #HARDCODE
3075       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3076                                         _GetInstanceInfoText(instance)):
3077         raise errors.OpExecError("Failed to create new component on"
3078                                  " secondary node %s\n"
3079                                  "Full abort, cleanup manually!" %
3080                                  remote_node)
3081
3082       logger.Info("adding new mirror component on primary")
3083       #HARDCODE
3084       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3085                                       _GetInstanceInfoText(instance)):
3086         # remove secondary dev
3087         cfg.SetDiskID(new_drbd, remote_node)
3088         rpc.call_blockdev_remove(remote_node, new_drbd)
3089         raise errors.OpExecError("Failed to create volume on primary!\n"
3090                                  "Full abort, cleanup manually!!")
3091
3092       # the device exists now
3093       # call the primary node to add the mirror to md
3094       logger.Info("adding new mirror component to md")
3095       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3096                                         new_drbd):
3097         logger.Error("Can't add mirror compoment to md!")
3098         cfg.SetDiskID(new_drbd, remote_node)
3099         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3100           logger.Error("Can't rollback on secondary")
3101         cfg.SetDiskID(new_drbd, instance.primary_node)
3102         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3103           logger.Error("Can't rollback on primary")
3104         raise errors.OpExecError("Full abort, cleanup manually!!")
3105
3106       dev.children.append(new_drbd)
3107       cfg.AddInstance(instance)
3108
3109     # this can fail as the old devices are degraded and _WaitForSync
3110     # does a combined result over all disks, so we don't check its
3111     # return value
3112     _WaitForSync(cfg, instance, unlock=True)
3113
3114     # so check manually all the devices
3115     for name in iv_names:
3116       dev, child, new_drbd = iv_names[name]
3117       cfg.SetDiskID(dev, instance.primary_node)
3118       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3119       if is_degr:
3120         raise errors.OpExecError("MD device %s is degraded!" % name)
3121       cfg.SetDiskID(new_drbd, instance.primary_node)
3122       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3123       if is_degr:
3124         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3125
3126     for name in iv_names:
3127       dev, child, new_drbd = iv_names[name]
3128       logger.Info("remove mirror %s component" % name)
3129       cfg.SetDiskID(dev, instance.primary_node)
3130       if not rpc.call_blockdev_removechild(instance.primary_node,
3131                                                 dev, child):
3132         logger.Error("Can't remove child from mirror, aborting"
3133                      " *this device cleanup*.\nYou need to cleanup manually!!")
3134         continue
3135
3136       for node in child.logical_id[:2]:
3137         logger.Info("remove child device on %s" % node)
3138         cfg.SetDiskID(child, node)
3139         if not rpc.call_blockdev_remove(node, child):
3140           logger.Error("Warning: failed to remove device from node %s,"
3141                        " continuing operation." % node)
3142
3143       dev.children.remove(child)
3144
3145       cfg.AddInstance(instance)
3146
3147
3148 class LUQueryInstanceData(NoHooksLU):
3149   """Query runtime instance data.
3150
3151   """
3152   _OP_REQP = ["instances"]
3153
3154   def CheckPrereq(self):
3155     """Check prerequisites.
3156
3157     This only checks the optional instance list against the existing names.
3158
3159     """
3160     if not isinstance(self.op.instances, list):
3161       raise errors.OpPrereqError("Invalid argument type 'instances'")
3162     if self.op.instances:
3163       self.wanted_instances = []
3164       names = self.op.instances
3165       for name in names:
3166         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3167         if instance is None:
3168           raise errors.OpPrereqError("No such instance name '%s'" % name)
3169       self.wanted_instances.append(instance)
3170     else:
3171       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3172                                in self.cfg.GetInstanceList()]
3173     return
3174
3175
3176   def _ComputeDiskStatus(self, instance, snode, dev):
3177     """Compute block device status.
3178
3179     """
3180     self.cfg.SetDiskID(dev, instance.primary_node)
3181     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3182     if dev.dev_type == "drbd":
3183       # we change the snode then (otherwise we use the one passed in)
3184       if dev.logical_id[0] == instance.primary_node:
3185         snode = dev.logical_id[1]
3186       else:
3187         snode = dev.logical_id[0]
3188
3189     if snode:
3190       self.cfg.SetDiskID(dev, snode)
3191       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3192     else:
3193       dev_sstatus = None
3194
3195     if dev.children:
3196       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3197                       for child in dev.children]
3198     else:
3199       dev_children = []
3200
3201     data = {
3202       "iv_name": dev.iv_name,
3203       "dev_type": dev.dev_type,
3204       "logical_id": dev.logical_id,
3205       "physical_id": dev.physical_id,
3206       "pstatus": dev_pstatus,
3207       "sstatus": dev_sstatus,
3208       "children": dev_children,
3209       }
3210
3211     return data
3212
3213   def Exec(self, feedback_fn):
3214     """Gather and return data"""
3215     result = {}
3216     for instance in self.wanted_instances:
3217       remote_info = rpc.call_instance_info(instance.primary_node,
3218                                                 instance.name)
3219       if remote_info and "state" in remote_info:
3220         remote_state = "up"
3221       else:
3222         remote_state = "down"
3223       if instance.status == "down":
3224         config_state = "down"
3225       else:
3226         config_state = "up"
3227
3228       disks = [self._ComputeDiskStatus(instance, None, device)
3229                for device in instance.disks]
3230
3231       idict = {
3232         "name": instance.name,
3233         "config_state": config_state,
3234         "run_state": remote_state,
3235         "pnode": instance.primary_node,
3236         "snodes": instance.secondary_nodes,
3237         "os": instance.os,
3238         "memory": instance.memory,
3239         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3240         "disks": disks,
3241         }
3242
3243       result[instance.name] = idict
3244
3245     return result
3246
3247
3248 class LUQueryNodeData(NoHooksLU):
3249   """Logical unit for querying node data.
3250
3251   """
3252   _OP_REQP = ["nodes"]
3253
3254   def CheckPrereq(self):
3255     """Check prerequisites.
3256
3257     This only checks the optional node list against the existing names.
3258
3259     """
3260     self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3261
3262   def Exec(self, feedback_fn):
3263     """Compute and return the list of nodes.
3264
3265     """
3266     ilist = [self.cfg.GetInstanceInfo(iname) for iname
3267              in self.cfg.GetInstanceList()]
3268     result = []
3269     for node in self.wanted_nodes:
3270       result.append((node.name, node.primary_ip, node.secondary_ip,
3271                      [inst.name for inst in ilist
3272                       if inst.primary_node == node.name],
3273                      [inst.name for inst in ilist
3274                       if node.name in inst.secondary_nodes],
3275                      ))
3276     return result
3277
3278
3279 class LUSetInstanceParms(LogicalUnit):
3280   """Modifies an instances's parameters.
3281
3282   """
3283   HPATH = "instance-modify"
3284   HTYPE = constants.HTYPE_INSTANCE
3285   _OP_REQP = ["instance_name"]
3286
3287   def BuildHooksEnv(self):
3288     """Build hooks env.
3289
3290     This runs on the master, primary and secondaries.
3291
3292     """
3293     args = dict()
3294     if self.mem:
3295       args['memory'] = self.mem
3296     if self.vcpus:
3297       args['vcpus'] = self.vcpus
3298     if self.do_ip or self.do_bridge:
3299       if self.do_ip:
3300         ip = self.ip
3301       else:
3302         ip = self.instance.nics[0].ip
3303       if self.bridge:
3304         bridge = self.bridge
3305       else:
3306         bridge = self.instance.nics[0].bridge
3307       args['nics'] = [(ip, bridge)]
3308     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3309     nl = [self.sstore.GetMasterNode(),
3310           self.instance.primary_node] + list(self.instance.secondary_nodes)
3311     return env, nl, nl
3312
3313   def CheckPrereq(self):
3314     """Check prerequisites.
3315
3316     This only checks the instance list against the existing names.
3317
3318     """
3319     self.mem = getattr(self.op, "mem", None)
3320     self.vcpus = getattr(self.op, "vcpus", None)
3321     self.ip = getattr(self.op, "ip", None)
3322     self.bridge = getattr(self.op, "bridge", None)
3323     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3324       raise errors.OpPrereqError("No changes submitted")
3325     if self.mem is not None:
3326       try:
3327         self.mem = int(self.mem)
3328       except ValueError, err:
3329         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3330     if self.vcpus is not None:
3331       try:
3332         self.vcpus = int(self.vcpus)
3333       except ValueError, err:
3334         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3335     if self.ip is not None:
3336       self.do_ip = True
3337       if self.ip.lower() == "none":
3338         self.ip = None
3339       else:
3340         if not utils.IsValidIP(self.ip):
3341           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3342     else:
3343       self.do_ip = False
3344     self.do_bridge = (self.bridge is not None)
3345
3346     instance = self.cfg.GetInstanceInfo(
3347       self.cfg.ExpandInstanceName(self.op.instance_name))
3348     if instance is None:
3349       raise errors.OpPrereqError("No such instance name '%s'" %
3350                                  self.op.instance_name)
3351     self.op.instance_name = instance.name
3352     self.instance = instance
3353     return
3354
3355   def Exec(self, feedback_fn):
3356     """Modifies an instance.
3357
3358     All parameters take effect only at the next restart of the instance.
3359     """
3360     result = []
3361     instance = self.instance
3362     if self.mem:
3363       instance.memory = self.mem
3364       result.append(("mem", self.mem))
3365     if self.vcpus:
3366       instance.vcpus = self.vcpus
3367       result.append(("vcpus",  self.vcpus))
3368     if self.do_ip:
3369       instance.nics[0].ip = self.ip
3370       result.append(("ip", self.ip))
3371     if self.bridge:
3372       instance.nics[0].bridge = self.bridge
3373       result.append(("bridge", self.bridge))
3374
3375     self.cfg.AddInstance(instance)
3376
3377     return result
3378
3379
3380 class LUQueryExports(NoHooksLU):
3381   """Query the exports list
3382
3383   """
3384   _OP_REQP = []
3385
3386   def CheckPrereq(self):
3387     """Check that the nodelist contains only existing nodes.
3388
3389     """
3390     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3391
3392   def Exec(self, feedback_fn):
3393     """Compute the list of all the exported system images.
3394
3395     Returns:
3396       a dictionary with the structure node->(export-list)
3397       where export-list is a list of the instances exported on
3398       that node.
3399
3400     """
3401     return rpc.call_export_list([node.name for node in self.nodes])
3402
3403
3404 class LUExportInstance(LogicalUnit):
3405   """Export an instance to an image in the cluster.
3406
3407   """
3408   HPATH = "instance-export"
3409   HTYPE = constants.HTYPE_INSTANCE
3410   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3411
3412   def BuildHooksEnv(self):
3413     """Build hooks env.
3414
3415     This will run on the master, primary node and target node.
3416
3417     """
3418     env = {
3419       "EXPORT_NODE": self.op.target_node,
3420       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3421       }
3422     env.update(_BuildInstanceHookEnvByObject(self.instance))
3423     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3424           self.op.target_node]
3425     return env, nl, nl
3426
3427   def CheckPrereq(self):
3428     """Check prerequisites.
3429
3430     This checks that the instance name is a valid one.
3431
3432     """
3433     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3434     self.instance = self.cfg.GetInstanceInfo(instance_name)
3435     if self.instance is None:
3436       raise errors.OpPrereqError("Instance '%s' not found" %
3437                                  self.op.instance_name)
3438
3439     # node verification
3440     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3441     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3442
3443     if self.dst_node is None:
3444       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3445                                  self.op.target_node)
3446     self.op.target_node = self.dst_node.name
3447
3448   def Exec(self, feedback_fn):
3449     """Export an instance to an image in the cluster.
3450
3451     """
3452     instance = self.instance
3453     dst_node = self.dst_node
3454     src_node = instance.primary_node
3455     # shutdown the instance, unless requested not to do so
3456     if self.op.shutdown:
3457       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3458       self.processor.ChainOpCode(op, feedback_fn)
3459
3460     vgname = self.cfg.GetVGName()
3461
3462     snap_disks = []
3463
3464     try:
3465       for disk in instance.disks:
3466         if disk.iv_name == "sda":
3467           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3468           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3469
3470           if not new_dev_name:
3471             logger.Error("could not snapshot block device %s on node %s" %
3472                          (disk.logical_id[1], src_node))
3473           else:
3474             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3475                                       logical_id=(vgname, new_dev_name),
3476                                       physical_id=(vgname, new_dev_name),
3477                                       iv_name=disk.iv_name)
3478             snap_disks.append(new_dev)
3479
3480     finally:
3481       if self.op.shutdown:
3482         op = opcodes.OpStartupInstance(instance_name=instance.name,
3483                                        force=False)
3484         self.processor.ChainOpCode(op, feedback_fn)
3485
3486     # TODO: check for size
3487
3488     for dev in snap_disks:
3489       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3490                                            instance):
3491         logger.Error("could not export block device %s from node"
3492                      " %s to node %s" %
3493                      (dev.logical_id[1], src_node, dst_node.name))
3494       if not rpc.call_blockdev_remove(src_node, dev):
3495         logger.Error("could not remove snapshot block device %s from"
3496                      " node %s" % (dev.logical_id[1], src_node))
3497
3498     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3499       logger.Error("could not finalize export for instance %s on node %s" %
3500                    (instance.name, dst_node.name))
3501
3502     nodelist = self.cfg.GetNodeList()
3503     nodelist.remove(dst_node.name)
3504
3505     # on one-node clusters nodelist will be empty after the removal
3506     # if we proceed the backup would be removed because OpQueryExports
3507     # substitutes an empty list with the full cluster node list.
3508     if nodelist:
3509       op = opcodes.OpQueryExports(nodes=nodelist)
3510       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3511       for node in exportlist:
3512         if instance.name in exportlist[node]:
3513           if not rpc.call_export_remove(node, instance.name):
3514             logger.Error("could not remove older export for instance %s"
3515                          " on node %s" % (instance.name, node))
3516
3517
3518 class TagsLU(NoHooksLU):
3519   """Generic tags LU.
3520
3521   This is an abstract class which is the parent of all the other tags LUs.
3522
3523   """
3524   def CheckPrereq(self):
3525     """Check prerequisites.
3526
3527     """
3528     if self.op.kind == constants.TAG_CLUSTER:
3529       self.target = self.cfg.GetClusterInfo()
3530     elif self.op.kind == constants.TAG_NODE:
3531       name = self.cfg.ExpandNodeName(self.op.name)
3532       if name is None:
3533         raise errors.OpPrereqError("Invalid node name (%s)" %
3534                                    (self.op.name,))
3535       self.op.name = name
3536       self.target = self.cfg.GetNodeInfo(name)
3537     elif self.op.kind == constants.TAG_INSTANCE:
3538       name = self.cfg.ExpandInstanceName(name)
3539       if name is None:
3540         raise errors.OpPrereqError("Invalid instance name (%s)" %
3541                                    (self.op.name,))
3542       self.op.name = name
3543       self.target = self.cfg.GetInstanceInfo(name)
3544     else:
3545       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3546                                  str(self.op.kind))
3547
3548
3549 class LUGetTags(TagsLU):
3550   """Returns the tags of a given object.
3551
3552   """
3553   _OP_REQP = ["kind", "name"]
3554
3555   def Exec(self, feedback_fn):
3556     """Returns the tag list.
3557
3558     """
3559     return self.target.GetTags()
3560
3561
3562 class LUAddTag(TagsLU):
3563   """Sets a tag on a given object.
3564
3565   """
3566   _OP_REQP = ["kind", "name", "tag"]
3567
3568   def CheckPrereq(self):
3569     """Check prerequisites.
3570
3571     This checks the type and length of the tag name and value.
3572
3573     """
3574     TagsLU.CheckPrereq(self)
3575     objects.TaggableObject.ValidateTag(self.op.tag)
3576
3577   def Exec(self, feedback_fn):
3578     """Sets the tag.
3579
3580     """
3581     try:
3582       self.target.AddTag(self.op.tag)
3583     except errors.TagError, err:
3584       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3585     try:
3586       self.cfg.Update(self.target)
3587     except errors.ConfigurationError:
3588       raise errors.OpRetryError("There has been a modification to the"
3589                                 " config file and the operation has been"
3590                                 " aborted. Please retry.")
3591
3592
3593 class LUDelTag(TagsLU):
3594   """Delete a tag from a given object.
3595
3596   """
3597   _OP_REQP = ["kind", "name", "tag"]
3598
3599   def CheckPrereq(self):
3600     """Check prerequisites.
3601
3602     This checks that we have the given tag.
3603
3604     """
3605     TagsLU.CheckPrereq(self)
3606     objects.TaggableObject.ValidateTag(self.op.tag)
3607     if self.op.tag not in self.target.GetTags():
3608       raise errors.OpPrereqError("Tag not found")
3609
3610   def Exec(self, feedback_fn):
3611     """Remove the tag from the object.
3612
3613     """
3614     self.target.RemoveTag(self.op.tag)
3615     try:
3616       self.cfg.Update(self.target)
3617     except errors.ConfigurationError:
3618       raise errors.OpRetryError("There has been a modification to the"
3619                                 " config file and the operation has been"
3620                                 " aborted. Please retry.")