Miscellaneous style fixes
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.processor = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _GetWantedNodes(lu, nodes):
167   """Returns list of checked and expanded node names.
168
169   Args:
170     nodes: List of nodes (strings) or None for all
171
172   """
173   if not isinstance(nodes, list):
174     raise errors.OpPrereqError("Invalid argument type 'nodes'")
175
176   if nodes:
177     wanted = []
178
179     for name in nodes:
180       node = lu.cfg.ExpandNodeName(name)
181       if node is None:
182         raise errors.OpPrereqError("No such node name '%s'" % name)
183       wanted.append(node)
184
185   else:
186     wanted = lu.cfg.GetNodeList()
187   return utils.NiceSort(wanted)
188
189
190 def _GetWantedInstances(lu, instances):
191   """Returns list of checked and expanded instance names.
192
193   Args:
194     instances: List of instances (strings) or None for all
195
196   """
197   if not isinstance(instances, list):
198     raise errors.OpPrereqError("Invalid argument type 'instances'")
199
200   if instances:
201     wanted = []
202
203     for name in instances:
204       instance = lu.cfg.ExpandInstanceName(name)
205       if instance is None:
206         raise errors.OpPrereqError("No such instance name '%s'" % name)
207       wanted.append(instance)
208
209   else:
210     wanted = lu.cfg.GetInstanceList()
211   return utils.NiceSort(wanted)
212
213
214 def _CheckOutputFields(static, dynamic, selected):
215   """Checks whether all selected fields are valid.
216
217   Args:
218     static: Static fields
219     dynamic: Dynamic fields
220
221   """
222   static_fields = frozenset(static)
223   dynamic_fields = frozenset(dynamic)
224
225   all_fields = static_fields | dynamic_fields
226
227   if not all_fields.issuperset(selected):
228     raise errors.OpPrereqError("Unknown output fields selected: %s"
229                                % ",".join(frozenset(selected).
230                                           difference(all_fields)))
231
232
233 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234                           memory, vcpus, nics):
235   """Builds instance related env variables for hooks from single variables.
236
237   Args:
238     secondary_nodes: List of secondary nodes as strings
239   """
240   env = {
241     "OP_TARGET": name,
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   for rawline in f:
389     logger.Debug('read %s' % (repr(rawline),))
390
391     parts = rawline.rstrip('\r\n').split()
392
393     # Ignore unwanted lines
394     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395       fields = parts[0].split(',')
396       key = parts[2]
397
398       haveall = True
399       havesome = False
400       for spec in [ ip, fullnode ]:
401         if spec not in fields:
402           haveall = False
403         if spec in fields:
404           havesome = True
405
406       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407       if haveall and key == pubkey:
408         inthere = True
409         save_lines.append(rawline)
410         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411         continue
412
413       if havesome and (not haveall or key != pubkey):
414         removed = True
415         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416         continue
417
418     save_lines.append(rawline)
419
420   if not inthere:
421     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423
424   if removed:
425     save_lines = save_lines + add_lines
426
427     # Write a new file and replace old.
428     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429                                    constants.DATA_DIR)
430     newfile = os.fdopen(fd, 'w')
431     try:
432       newfile.write(''.join(save_lines))
433     finally:
434       newfile.close()
435     logger.Debug("Wrote new known_hosts.")
436     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437
438   elif add_lines:
439     # Simply appending a new line will do the trick.
440     f.seek(0, 2)
441     for add in add_lines:
442       f.write(add)
443
444   f.close()
445
446
447 def _HasValidVG(vglist, vgname):
448   """Checks if the volume group list is valid.
449
450   A non-None return value means there's an error, and the return value
451   is the error message.
452
453   """
454   vgsize = vglist.get(vgname, None)
455   if vgsize is None:
456     return "volume group '%s' missing" % vgname
457   elif vgsize < 20480:
458     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459             (vgname, vgsize))
460   return None
461
462
463 def _InitSSHSetup(node):
464   """Setup the SSH configuration for the cluster.
465
466
467   This generates a dsa keypair for root, adds the pub key to the
468   permitted hosts and adds the hostkey to its own known hosts.
469
470   Args:
471     node: the name of this host as a fqdn
472
473   """
474   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475
476   for name in priv_key, pub_key:
477     if os.path.exists(name):
478       utils.CreateBackup(name)
479     utils.RemoveFile(name)
480
481   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482                          "-f", priv_key,
483                          "-q", "-N", ""])
484   if result.failed:
485     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486                              result.output)
487
488   f = open(pub_key, 'r')
489   try:
490     utils.AddAuthorizedKey(auth_keys, f.read(8192))
491   finally:
492     f.close()
493
494
495 def _InitGanetiServerSetup(ss):
496   """Setup the necessary configuration for the initial node daemon.
497
498   This creates the nodepass file containing the shared password for
499   the cluster and also generates the SSL certificate.
500
501   """
502   # Create pseudo random password
503   randpass = sha.new(os.urandom(64)).hexdigest()
504   # and write it into sstore
505   ss.SetKey(ss.SS_NODED_PASS, randpass)
506
507   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508                          "-days", str(365*5), "-nodes", "-x509",
509                          "-keyout", constants.SSL_CERT_FILE,
510                          "-out", constants.SSL_CERT_FILE, "-batch"])
511   if result.failed:
512     raise errors.OpExecError("could not generate server ssl cert, command"
513                              " %s had exitcode %s and error message %s" %
514                              (result.cmd, result.exit_code, result.output))
515
516   os.chmod(constants.SSL_CERT_FILE, 0400)
517
518   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519
520   if result.failed:
521     raise errors.OpExecError("Could not start the node daemon, command %s"
522                              " had exitcode %s and error %s" %
523                              (result.cmd, result.exit_code, result.output))
524
525
526 def _CheckInstanceBridgesExist(instance):
527   """Check that the brigdes needed by an instance exist.
528
529   """
530   # check bridges existance
531   brlist = [nic.bridge for nic in instance.nics]
532   if not rpc.call_bridges_exist(instance.primary_node, brlist):
533     raise errors.OpPrereqError("one or more target bridges %s does not"
534                                " exist on destination node '%s'" %
535                                (brlist, instance.primary_node))
536
537
538 class LUInitCluster(LogicalUnit):
539   """Initialise the cluster.
540
541   """
542   HPATH = "cluster-init"
543   HTYPE = constants.HTYPE_CLUSTER
544   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545               "def_bridge", "master_netdev"]
546   REQ_CLUSTER = False
547
548   def BuildHooksEnv(self):
549     """Build hooks env.
550
551     Notes: Since we don't require a cluster, we must manually add
552     ourselves in the post-run node list.
553
554     """
555     env = {"OP_TARGET": self.op.cluster_name}
556     return env, [], [self.hostname.name]
557
558   def CheckPrereq(self):
559     """Verify that the passed name is a valid one.
560
561     """
562     if config.ConfigWriter.IsCluster():
563       raise errors.OpPrereqError("Cluster is already initialised")
564
565     self.hostname = hostname = utils.HostInfo()
566
567     if hostname.ip.startswith("127."):
568       raise errors.OpPrereqError("This host's IP resolves to the private"
569                                  " range (%s). Please fix DNS or /etc/hosts." %
570                                  (hostname.ip,))
571
572     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573
574     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575                          constants.DEFAULT_NODED_PORT):
576       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577                                  " to %s,\nbut this ip address does not"
578                                  " belong to this host."
579                                  " Aborting." % hostname.ip)
580
581     secondary_ip = getattr(self.op, "secondary_ip", None)
582     if secondary_ip and not utils.IsValidIP(secondary_ip):
583       raise errors.OpPrereqError("Invalid secondary ip given")
584     if (secondary_ip and
585         secondary_ip != hostname.ip and
586         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587                            constants.DEFAULT_NODED_PORT))):
588       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589                                  "but it does not belong to this host." %
590                                  secondary_ip)
591     self.secondary_ip = secondary_ip
592
593     # checks presence of the volume group given
594     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595
596     if vgstatus:
597       raise errors.OpPrereqError("Error: %s" % vgstatus)
598
599     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600                     self.op.mac_prefix):
601       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602                                  self.op.mac_prefix)
603
604     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606                                  self.op.hypervisor_type)
607
608     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609     if result.failed:
610       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611                                  (self.op.master_netdev,
612                                   result.output.strip()))
613
614   def Exec(self, feedback_fn):
615     """Initialize the cluster.
616
617     """
618     clustername = self.clustername
619     hostname = self.hostname
620
621     # set up the simple store
622     self.sstore = ss = ssconf.SimpleStore()
623     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
624     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
625     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
626     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
627     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
628
629     # set up the inter-node password and certificate
630     _InitGanetiServerSetup(ss)
631
632     # start the master ip
633     rpc.call_node_start_master(hostname.name)
634
635     # set up ssh config and /etc/hosts
636     f = open(constants.SSH_HOST_RSA_PUB, 'r')
637     try:
638       sshline = f.read()
639     finally:
640       f.close()
641     sshkey = sshline.split(" ")[1]
642
643     _UpdateEtcHosts(hostname.name, hostname.ip)
644
645     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
646
647     _InitSSHSetup(hostname.name)
648
649     # init of cluster config file
650     self.cfg = cfgw = config.ConfigWriter()
651     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
652                     sshkey, self.op.mac_prefix,
653                     self.op.vg_name, self.op.def_bridge)
654
655
656 class LUDestroyCluster(NoHooksLU):
657   """Logical unit for destroying the cluster.
658
659   """
660   _OP_REQP = []
661
662   def CheckPrereq(self):
663     """Check prerequisites.
664
665     This checks whether the cluster is empty.
666
667     Any errors are signalled by raising errors.OpPrereqError.
668
669     """
670     master = self.sstore.GetMasterNode()
671
672     nodelist = self.cfg.GetNodeList()
673     if len(nodelist) != 1 or nodelist[0] != master:
674       raise errors.OpPrereqError("There are still %d node(s) in"
675                                  " this cluster." % (len(nodelist) - 1))
676     instancelist = self.cfg.GetInstanceList()
677     if instancelist:
678       raise errors.OpPrereqError("There are still %d instance(s) in"
679                                  " this cluster." % len(instancelist))
680
681   def Exec(self, feedback_fn):
682     """Destroys the cluster.
683
684     """
685     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
686     utils.CreateBackup(priv_key)
687     utils.CreateBackup(pub_key)
688     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
689
690
691 class LUVerifyCluster(NoHooksLU):
692   """Verifies the cluster status.
693
694   """
695   _OP_REQP = []
696
697   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
698                   remote_version, feedback_fn):
699     """Run multiple tests against a node.
700
701     Test list:
702       - compares ganeti version
703       - checks vg existance and size > 20G
704       - checks config file checksum
705       - checks ssh to other nodes
706
707     Args:
708       node: name of the node to check
709       file_list: required list of files
710       local_cksum: dictionary of local files and their checksums
711
712     """
713     # compares ganeti version
714     local_version = constants.PROTOCOL_VERSION
715     if not remote_version:
716       feedback_fn(" - ERROR: connection to %s failed" % (node))
717       return True
718
719     if local_version != remote_version:
720       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
721                       (local_version, node, remote_version))
722       return True
723
724     # checks vg existance and size > 20G
725
726     bad = False
727     if not vglist:
728       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
729                       (node,))
730       bad = True
731     else:
732       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
733       if vgstatus:
734         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
735         bad = True
736
737     # checks config file checksum
738     # checks ssh to any
739
740     if 'filelist' not in node_result:
741       bad = True
742       feedback_fn("  - ERROR: node hasn't returned file checksum data")
743     else:
744       remote_cksum = node_result['filelist']
745       for file_name in file_list:
746         if file_name not in remote_cksum:
747           bad = True
748           feedback_fn("  - ERROR: file '%s' missing" % file_name)
749         elif remote_cksum[file_name] != local_cksum[file_name]:
750           bad = True
751           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
752
753     if 'nodelist' not in node_result:
754       bad = True
755       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
756     else:
757       if node_result['nodelist']:
758         bad = True
759         for node in node_result['nodelist']:
760           feedback_fn("  - ERROR: communication with node '%s': %s" %
761                           (node, node_result['nodelist'][node]))
762     hyp_result = node_result.get('hypervisor', None)
763     if hyp_result is not None:
764       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
765     return bad
766
767   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
768     """Verify an instance.
769
770     This function checks to see if the required block devices are
771     available on the instance's node.
772
773     """
774     bad = False
775
776     instancelist = self.cfg.GetInstanceList()
777     if not instance in instancelist:
778       feedback_fn("  - ERROR: instance %s not in instance list %s" %
779                       (instance, instancelist))
780       bad = True
781
782     instanceconfig = self.cfg.GetInstanceInfo(instance)
783     node_current = instanceconfig.primary_node
784
785     node_vol_should = {}
786     instanceconfig.MapLVsByNode(node_vol_should)
787
788     for node in node_vol_should:
789       for volume in node_vol_should[node]:
790         if node not in node_vol_is or volume not in node_vol_is[node]:
791           feedback_fn("  - ERROR: volume %s missing on node %s" %
792                           (volume, node))
793           bad = True
794
795     if not instanceconfig.status == 'down':
796       if not instance in node_instance[node_current]:
797         feedback_fn("  - ERROR: instance %s not running on node %s" %
798                         (instance, node_current))
799         bad = True
800
801     for node in node_instance:
802       if (not node == node_current):
803         if instance in node_instance[node]:
804           feedback_fn("  - ERROR: instance %s should not run on node %s" %
805                           (instance, node))
806           bad = True
807
808     return bad
809
810   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
811     """Verify if there are any unknown volumes in the cluster.
812
813     The .os, .swap and backup volumes are ignored. All other volumes are
814     reported as unknown.
815
816     """
817     bad = False
818
819     for node in node_vol_is:
820       for volume in node_vol_is[node]:
821         if node not in node_vol_should or volume not in node_vol_should[node]:
822           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
823                       (volume, node))
824           bad = True
825     return bad
826
827   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
828     """Verify the list of running instances.
829
830     This checks what instances are running but unknown to the cluster.
831
832     """
833     bad = False
834     for node in node_instance:
835       for runninginstance in node_instance[node]:
836         if runninginstance not in instancelist:
837           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
838                           (runninginstance, node))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     This has no prerequisites.
846
847     """
848     pass
849
850   def Exec(self, feedback_fn):
851     """Verify integrity of cluster, performing various test on nodes.
852
853     """
854     bad = False
855     feedback_fn("* Verifying global settings")
856     self.cfg.VerifyConfig()
857
858     vg_name = self.cfg.GetVGName()
859     nodelist = utils.NiceSort(self.cfg.GetNodeList())
860     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
861     node_volume = {}
862     node_instance = {}
863
864     # FIXME: verify OS list
865     # do local checksums
866     file_names = list(self.sstore.GetFileList())
867     file_names.append(constants.SSL_CERT_FILE)
868     file_names.append(constants.CLUSTER_CONF_FILE)
869     local_checksums = utils.FingerprintFiles(file_names)
870
871     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
872     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
873     all_instanceinfo = rpc.call_instance_list(nodelist)
874     all_vglist = rpc.call_vg_list(nodelist)
875     node_verify_param = {
876       'filelist': file_names,
877       'nodelist': nodelist,
878       'hypervisor': None,
879       }
880     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
881     all_rversion = rpc.call_version(nodelist)
882
883     for node in nodelist:
884       feedback_fn("* Verifying node %s" % node)
885       result = self._VerifyNode(node, file_names, local_checksums,
886                                 all_vglist[node], all_nvinfo[node],
887                                 all_rversion[node], feedback_fn)
888       bad = bad or result
889
890       # node_volume
891       volumeinfo = all_volumeinfo[node]
892
893       if type(volumeinfo) != dict:
894         feedback_fn("  - ERROR: connection to %s failed" % (node,))
895         bad = True
896         continue
897
898       node_volume[node] = volumeinfo
899
900       # node_instance
901       nodeinstance = all_instanceinfo[node]
902       if type(nodeinstance) != list:
903         feedback_fn("  - ERROR: connection to %s failed" % (node,))
904         bad = True
905         continue
906
907       node_instance[node] = nodeinstance
908
909     node_vol_should = {}
910
911     for instance in instancelist:
912       feedback_fn("* Verifying instance %s" % instance)
913       result =  self._VerifyInstance(instance, node_volume, node_instance,
914                                      feedback_fn)
915       bad = bad or result
916
917       inst_config = self.cfg.GetInstanceInfo(instance)
918
919       inst_config.MapLVsByNode(node_vol_should)
920
921     feedback_fn("* Verifying orphan volumes")
922     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
923                                        feedback_fn)
924     bad = bad or result
925
926     feedback_fn("* Verifying remaining instances")
927     result = self._VerifyOrphanInstances(instancelist, node_instance,
928                                          feedback_fn)
929     bad = bad or result
930
931     return int(bad)
932
933
934 class LURenameCluster(LogicalUnit):
935   """Rename the cluster.
936
937   """
938   HPATH = "cluster-rename"
939   HTYPE = constants.HTYPE_CLUSTER
940   _OP_REQP = ["name"]
941
942   def BuildHooksEnv(self):
943     """Build hooks env.
944
945     """
946     env = {
947       "OP_TARGET": self.op.sstore.GetClusterName(),
948       "NEW_NAME": self.op.name,
949       }
950     mn = self.sstore.GetMasterNode()
951     return env, [mn], [mn]
952
953   def CheckPrereq(self):
954     """Verify that the passed name is a valid one.
955
956     """
957     hostname = utils.HostInfo(self.op.name)
958
959     new_name = hostname.name
960     self.ip = new_ip = hostname.ip
961     old_name = self.sstore.GetClusterName()
962     old_ip = self.sstore.GetMasterIP()
963     if new_name == old_name and new_ip == old_ip:
964       raise errors.OpPrereqError("Neither the name nor the IP address of the"
965                                  " cluster has changed")
966     if new_ip != old_ip:
967       result = utils.RunCmd(["fping", "-q", new_ip])
968       if not result.failed:
969         raise errors.OpPrereqError("The given cluster IP address (%s) is"
970                                    " reachable on the network. Aborting." %
971                                    new_ip)
972
973     self.op.name = new_name
974
975   def Exec(self, feedback_fn):
976     """Rename the cluster.
977
978     """
979     clustername = self.op.name
980     ip = self.ip
981     ss = self.sstore
982
983     # shutdown the master IP
984     master = ss.GetMasterNode()
985     if not rpc.call_node_stop_master(master):
986       raise errors.OpExecError("Could not disable the master role")
987
988     try:
989       # modify the sstore
990       ss.SetKey(ss.SS_MASTER_IP, ip)
991       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
992
993       # Distribute updated ss config to all nodes
994       myself = self.cfg.GetNodeInfo(master)
995       dist_nodes = self.cfg.GetNodeList()
996       if myself.name in dist_nodes:
997         dist_nodes.remove(myself.name)
998
999       logger.Debug("Copying updated ssconf data to all nodes")
1000       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1001         fname = ss.KeyToFilename(keyname)
1002         result = rpc.call_upload_file(dist_nodes, fname)
1003         for to_node in dist_nodes:
1004           if not result[to_node]:
1005             logger.Error("copy of file %s to node %s failed" %
1006                          (fname, to_node))
1007     finally:
1008       if not rpc.call_node_start_master(master):
1009         logger.Error("Could not re-enable the master role on the master,\n"
1010                      "please restart manually.")
1011
1012
1013 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1014   """Sleep and poll for an instance's disk to sync.
1015
1016   """
1017   if not instance.disks:
1018     return True
1019
1020   if not oneshot:
1021     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1022
1023   node = instance.primary_node
1024
1025   for dev in instance.disks:
1026     cfgw.SetDiskID(dev, node)
1027
1028   retries = 0
1029   while True:
1030     max_time = 0
1031     done = True
1032     cumul_degraded = False
1033     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1034     if not rstats:
1035       logger.ToStderr("Can't get any data from node %s" % node)
1036       retries += 1
1037       if retries >= 10:
1038         raise errors.RemoteError("Can't contact node %s for mirror data,"
1039                                  " aborting." % node)
1040       time.sleep(6)
1041       continue
1042     retries = 0
1043     for i in range(len(rstats)):
1044       mstat = rstats[i]
1045       if mstat is None:
1046         logger.ToStderr("Can't compute data for node %s/%s" %
1047                         (node, instance.disks[i].iv_name))
1048         continue
1049       perc_done, est_time, is_degraded = mstat
1050       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1051       if perc_done is not None:
1052         done = False
1053         if est_time is not None:
1054           rem_time = "%d estimated seconds remaining" % est_time
1055           max_time = est_time
1056         else:
1057           rem_time = "no time estimate"
1058         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1059                         (instance.disks[i].iv_name, perc_done, rem_time))
1060     if done or oneshot:
1061       break
1062
1063     if unlock:
1064       utils.Unlock('cmd')
1065     try:
1066       time.sleep(min(60, max_time))
1067     finally:
1068       if unlock:
1069         utils.Lock('cmd')
1070
1071   if done:
1072     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1073   return not cumul_degraded
1074
1075
1076 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1077   """Check that mirrors are not degraded.
1078
1079   """
1080   cfgw.SetDiskID(dev, node)
1081
1082   result = True
1083   if on_primary or dev.AssembleOnSecondary():
1084     rstats = rpc.call_blockdev_find(node, dev)
1085     if not rstats:
1086       logger.ToStderr("Can't get any data from node %s" % node)
1087       result = False
1088     else:
1089       result = result and (not rstats[5])
1090   if dev.children:
1091     for child in dev.children:
1092       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1093
1094   return result
1095
1096
1097 class LUDiagnoseOS(NoHooksLU):
1098   """Logical unit for OS diagnose/query.
1099
1100   """
1101   _OP_REQP = []
1102
1103   def CheckPrereq(self):
1104     """Check prerequisites.
1105
1106     This always succeeds, since this is a pure query LU.
1107
1108     """
1109     return
1110
1111   def Exec(self, feedback_fn):
1112     """Compute the list of OSes.
1113
1114     """
1115     node_list = self.cfg.GetNodeList()
1116     node_data = rpc.call_os_diagnose(node_list)
1117     if node_data == False:
1118       raise errors.OpExecError("Can't gather the list of OSes")
1119     return node_data
1120
1121
1122 class LURemoveNode(LogicalUnit):
1123   """Logical unit for removing a node.
1124
1125   """
1126   HPATH = "node-remove"
1127   HTYPE = constants.HTYPE_NODE
1128   _OP_REQP = ["node_name"]
1129
1130   def BuildHooksEnv(self):
1131     """Build hooks env.
1132
1133     This doesn't run on the target node in the pre phase as a failed
1134     node would not allows itself to run.
1135
1136     """
1137     env = {
1138       "OP_TARGET": self.op.node_name,
1139       "NODE_NAME": self.op.node_name,
1140       }
1141     all_nodes = self.cfg.GetNodeList()
1142     all_nodes.remove(self.op.node_name)
1143     return env, all_nodes, all_nodes
1144
1145   def CheckPrereq(self):
1146     """Check prerequisites.
1147
1148     This checks:
1149      - the node exists in the configuration
1150      - it does not have primary or secondary instances
1151      - it's not the master
1152
1153     Any errors are signalled by raising errors.OpPrereqError.
1154
1155     """
1156     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1157     if node is None:
1158       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1159
1160     instance_list = self.cfg.GetInstanceList()
1161
1162     masternode = self.sstore.GetMasterNode()
1163     if node.name == masternode:
1164       raise errors.OpPrereqError("Node is the master node,"
1165                                  " you need to failover first.")
1166
1167     for instance_name in instance_list:
1168       instance = self.cfg.GetInstanceInfo(instance_name)
1169       if node.name == instance.primary_node:
1170         raise errors.OpPrereqError("Instance %s still running on the node,"
1171                                    " please remove first." % instance_name)
1172       if node.name in instance.secondary_nodes:
1173         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1174                                    " please remove first." % instance_name)
1175     self.op.node_name = node.name
1176     self.node = node
1177
1178   def Exec(self, feedback_fn):
1179     """Removes the node from the cluster.
1180
1181     """
1182     node = self.node
1183     logger.Info("stopping the node daemon and removing configs from node %s" %
1184                 node.name)
1185
1186     rpc.call_node_leave_cluster(node.name)
1187
1188     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1189
1190     logger.Info("Removing node %s from config" % node.name)
1191
1192     self.cfg.RemoveNode(node.name)
1193
1194
1195 class LUQueryNodes(NoHooksLU):
1196   """Logical unit for querying nodes.
1197
1198   """
1199   _OP_REQP = ["output_fields", "names"]
1200
1201   def CheckPrereq(self):
1202     """Check prerequisites.
1203
1204     This checks that the fields required are valid output fields.
1205
1206     """
1207     self.dynamic_fields = frozenset(["dtotal", "dfree",
1208                                      "mtotal", "mnode", "mfree",
1209                                      "bootid"])
1210
1211     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1212                                "pinst_list", "sinst_list",
1213                                "pip", "sip"],
1214                        dynamic=self.dynamic_fields,
1215                        selected=self.op.output_fields)
1216
1217     self.wanted = _GetWantedNodes(self, self.op.names)
1218
1219   def Exec(self, feedback_fn):
1220     """Computes the list of nodes and their attributes.
1221
1222     """
1223     nodenames = self.wanted
1224     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1225
1226     # begin data gathering
1227
1228     if self.dynamic_fields.intersection(self.op.output_fields):
1229       live_data = {}
1230       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1231       for name in nodenames:
1232         nodeinfo = node_data.get(name, None)
1233         if nodeinfo:
1234           live_data[name] = {
1235             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1236             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1237             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1238             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1239             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1240             "bootid": nodeinfo['bootid'],
1241             }
1242         else:
1243           live_data[name] = {}
1244     else:
1245       live_data = dict.fromkeys(nodenames, {})
1246
1247     node_to_primary = dict([(name, set()) for name in nodenames])
1248     node_to_secondary = dict([(name, set()) for name in nodenames])
1249
1250     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1251                              "sinst_cnt", "sinst_list"))
1252     if inst_fields & frozenset(self.op.output_fields):
1253       instancelist = self.cfg.GetInstanceList()
1254
1255       for instance_name in instancelist:
1256         inst = self.cfg.GetInstanceInfo(instance_name)
1257         if inst.primary_node in node_to_primary:
1258           node_to_primary[inst.primary_node].add(inst.name)
1259         for secnode in inst.secondary_nodes:
1260           if secnode in node_to_secondary:
1261             node_to_secondary[secnode].add(inst.name)
1262
1263     # end data gathering
1264
1265     output = []
1266     for node in nodelist:
1267       node_output = []
1268       for field in self.op.output_fields:
1269         if field == "name":
1270           val = node.name
1271         elif field == "pinst_list":
1272           val = list(node_to_primary[node.name])
1273         elif field == "sinst_list":
1274           val = list(node_to_secondary[node.name])
1275         elif field == "pinst_cnt":
1276           val = len(node_to_primary[node.name])
1277         elif field == "sinst_cnt":
1278           val = len(node_to_secondary[node.name])
1279         elif field == "pip":
1280           val = node.primary_ip
1281         elif field == "sip":
1282           val = node.secondary_ip
1283         elif field in self.dynamic_fields:
1284           val = live_data[node.name].get(field, None)
1285         else:
1286           raise errors.ParameterError(field)
1287         node_output.append(val)
1288       output.append(node_output)
1289
1290     return output
1291
1292
1293 class LUQueryNodeVolumes(NoHooksLU):
1294   """Logical unit for getting volumes on node(s).
1295
1296   """
1297   _OP_REQP = ["nodes", "output_fields"]
1298
1299   def CheckPrereq(self):
1300     """Check prerequisites.
1301
1302     This checks that the fields required are valid output fields.
1303
1304     """
1305     self.nodes = _GetWantedNodes(self, self.op.nodes)
1306
1307     _CheckOutputFields(static=["node"],
1308                        dynamic=["phys", "vg", "name", "size", "instance"],
1309                        selected=self.op.output_fields)
1310
1311
1312   def Exec(self, feedback_fn):
1313     """Computes the list of nodes and their attributes.
1314
1315     """
1316     nodenames = self.nodes
1317     volumes = rpc.call_node_volumes(nodenames)
1318
1319     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1320              in self.cfg.GetInstanceList()]
1321
1322     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1323
1324     output = []
1325     for node in nodenames:
1326       if node not in volumes or not volumes[node]:
1327         continue
1328
1329       node_vols = volumes[node][:]
1330       node_vols.sort(key=lambda vol: vol['dev'])
1331
1332       for vol in node_vols:
1333         node_output = []
1334         for field in self.op.output_fields:
1335           if field == "node":
1336             val = node
1337           elif field == "phys":
1338             val = vol['dev']
1339           elif field == "vg":
1340             val = vol['vg']
1341           elif field == "name":
1342             val = vol['name']
1343           elif field == "size":
1344             val = int(float(vol['size']))
1345           elif field == "instance":
1346             for inst in ilist:
1347               if node not in lv_by_node[inst]:
1348                 continue
1349               if vol['name'] in lv_by_node[inst][node]:
1350                 val = inst.name
1351                 break
1352             else:
1353               val = '-'
1354           else:
1355             raise errors.ParameterError(field)
1356           node_output.append(str(val))
1357
1358         output.append(node_output)
1359
1360     return output
1361
1362
1363 class LUAddNode(LogicalUnit):
1364   """Logical unit for adding node to the cluster.
1365
1366   """
1367   HPATH = "node-add"
1368   HTYPE = constants.HTYPE_NODE
1369   _OP_REQP = ["node_name"]
1370
1371   def BuildHooksEnv(self):
1372     """Build hooks env.
1373
1374     This will run on all nodes before, and on all nodes + the new node after.
1375
1376     """
1377     env = {
1378       "OP_TARGET": self.op.node_name,
1379       "NODE_NAME": self.op.node_name,
1380       "NODE_PIP": self.op.primary_ip,
1381       "NODE_SIP": self.op.secondary_ip,
1382       }
1383     nodes_0 = self.cfg.GetNodeList()
1384     nodes_1 = nodes_0 + [self.op.node_name, ]
1385     return env, nodes_0, nodes_1
1386
1387   def CheckPrereq(self):
1388     """Check prerequisites.
1389
1390     This checks:
1391      - the new node is not already in the config
1392      - it is resolvable
1393      - its parameters (single/dual homed) matches the cluster
1394
1395     Any errors are signalled by raising errors.OpPrereqError.
1396
1397     """
1398     node_name = self.op.node_name
1399     cfg = self.cfg
1400
1401     dns_data = utils.HostInfo(node_name)
1402
1403     node = dns_data.name
1404     primary_ip = self.op.primary_ip = dns_data.ip
1405     secondary_ip = getattr(self.op, "secondary_ip", None)
1406     if secondary_ip is None:
1407       secondary_ip = primary_ip
1408     if not utils.IsValidIP(secondary_ip):
1409       raise errors.OpPrereqError("Invalid secondary IP given")
1410     self.op.secondary_ip = secondary_ip
1411     node_list = cfg.GetNodeList()
1412     if node in node_list:
1413       raise errors.OpPrereqError("Node %s is already in the configuration"
1414                                  % node)
1415
1416     for existing_node_name in node_list:
1417       existing_node = cfg.GetNodeInfo(existing_node_name)
1418       if (existing_node.primary_ip == primary_ip or
1419           existing_node.secondary_ip == primary_ip or
1420           existing_node.primary_ip == secondary_ip or
1421           existing_node.secondary_ip == secondary_ip):
1422         raise errors.OpPrereqError("New node ip address(es) conflict with"
1423                                    " existing node %s" % existing_node.name)
1424
1425     # check that the type of the node (single versus dual homed) is the
1426     # same as for the master
1427     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1428     master_singlehomed = myself.secondary_ip == myself.primary_ip
1429     newbie_singlehomed = secondary_ip == primary_ip
1430     if master_singlehomed != newbie_singlehomed:
1431       if master_singlehomed:
1432         raise errors.OpPrereqError("The master has no private ip but the"
1433                                    " new node has one")
1434       else:
1435         raise errors.OpPrereqError("The master has a private ip but the"
1436                                    " new node doesn't have one")
1437
1438     # checks reachablity
1439     if not utils.TcpPing(utils.HostInfo().name,
1440                          primary_ip,
1441                          constants.DEFAULT_NODED_PORT):
1442       raise errors.OpPrereqError("Node not reachable by ping")
1443
1444     if not newbie_singlehomed:
1445       # check reachability from my secondary ip to newbie's secondary ip
1446       if not utils.TcpPing(myself.secondary_ip,
1447                            secondary_ip,
1448                            constants.DEFAULT_NODED_PORT):
1449         raise errors.OpPrereqError(
1450           "Node secondary ip not reachable by TCP based ping to noded port")
1451
1452     self.new_node = objects.Node(name=node,
1453                                  primary_ip=primary_ip,
1454                                  secondary_ip=secondary_ip)
1455
1456   def Exec(self, feedback_fn):
1457     """Adds the new node to the cluster.
1458
1459     """
1460     new_node = self.new_node
1461     node = new_node.name
1462
1463     # set up inter-node password and certificate and restarts the node daemon
1464     gntpass = self.sstore.GetNodeDaemonPassword()
1465     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1466       raise errors.OpExecError("ganeti password corruption detected")
1467     f = open(constants.SSL_CERT_FILE)
1468     try:
1469       gntpem = f.read(8192)
1470     finally:
1471       f.close()
1472     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1473     # so we use this to detect an invalid certificate; as long as the
1474     # cert doesn't contain this, the here-document will be correctly
1475     # parsed by the shell sequence below
1476     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1477       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1478     if not gntpem.endswith("\n"):
1479       raise errors.OpExecError("PEM must end with newline")
1480     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1481
1482     # and then connect with ssh to set password and start ganeti-noded
1483     # note that all the below variables are sanitized at this point,
1484     # either by being constants or by the checks above
1485     ss = self.sstore
1486     mycommand = ("umask 077 && "
1487                  "echo '%s' > '%s' && "
1488                  "cat > '%s' << '!EOF.' && \n"
1489                  "%s!EOF.\n%s restart" %
1490                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1491                   constants.SSL_CERT_FILE, gntpem,
1492                   constants.NODE_INITD_SCRIPT))
1493
1494     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1495     if result.failed:
1496       raise errors.OpExecError("Remote command on node %s, error: %s,"
1497                                " output: %s" %
1498                                (node, result.fail_reason, result.output))
1499
1500     # check connectivity
1501     time.sleep(4)
1502
1503     result = rpc.call_version([node])[node]
1504     if result:
1505       if constants.PROTOCOL_VERSION == result:
1506         logger.Info("communication to node %s fine, sw version %s match" %
1507                     (node, result))
1508       else:
1509         raise errors.OpExecError("Version mismatch master version %s,"
1510                                  " node version %s" %
1511                                  (constants.PROTOCOL_VERSION, result))
1512     else:
1513       raise errors.OpExecError("Cannot get version from the new node")
1514
1515     # setup ssh on node
1516     logger.Info("copy ssh key to node %s" % node)
1517     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1518     keyarray = []
1519     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1520                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1521                 priv_key, pub_key]
1522
1523     for i in keyfiles:
1524       f = open(i, 'r')
1525       try:
1526         keyarray.append(f.read())
1527       finally:
1528         f.close()
1529
1530     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1531                                keyarray[3], keyarray[4], keyarray[5])
1532
1533     if not result:
1534       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1535
1536     # Add node to our /etc/hosts, and add key to known_hosts
1537     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1538     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1539                       self.cfg.GetHostKey())
1540
1541     if new_node.secondary_ip != new_node.primary_ip:
1542       if not rpc.call_node_tcp_ping(new_node.name,
1543                                     constants.LOCALHOST_IP_ADDRESS,
1544                                     new_node.secondary_ip,
1545                                     constants.DEFAULT_NODED_PORT,
1546                                     10, False):
1547         raise errors.OpExecError("Node claims it doesn't have the"
1548                                  " secondary ip you gave (%s).\n"
1549                                  "Please fix and re-run this command." %
1550                                  new_node.secondary_ip)
1551
1552     success, msg = ssh.VerifyNodeHostname(node)
1553     if not success:
1554       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1555                                " than the one the resolver gives: %s.\n"
1556                                "Please fix and re-run this command." %
1557                                (node, msg))
1558
1559     # Distribute updated /etc/hosts and known_hosts to all nodes,
1560     # including the node just added
1561     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1562     dist_nodes = self.cfg.GetNodeList() + [node]
1563     if myself.name in dist_nodes:
1564       dist_nodes.remove(myself.name)
1565
1566     logger.Debug("Copying hosts and known_hosts to all nodes")
1567     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1568       result = rpc.call_upload_file(dist_nodes, fname)
1569       for to_node in dist_nodes:
1570         if not result[to_node]:
1571           logger.Error("copy of file %s to node %s failed" %
1572                        (fname, to_node))
1573
1574     to_copy = ss.GetFileList()
1575     for fname in to_copy:
1576       if not ssh.CopyFileToNode(node, fname):
1577         logger.Error("could not copy file %s to node %s" % (fname, node))
1578
1579     logger.Info("adding node %s to cluster.conf" % node)
1580     self.cfg.AddNode(new_node)
1581
1582
1583 class LUMasterFailover(LogicalUnit):
1584   """Failover the master node to the current node.
1585
1586   This is a special LU in that it must run on a non-master node.
1587
1588   """
1589   HPATH = "master-failover"
1590   HTYPE = constants.HTYPE_CLUSTER
1591   REQ_MASTER = False
1592   _OP_REQP = []
1593
1594   def BuildHooksEnv(self):
1595     """Build hooks env.
1596
1597     This will run on the new master only in the pre phase, and on all
1598     the nodes in the post phase.
1599
1600     """
1601     env = {
1602       "OP_TARGET": self.new_master,
1603       "NEW_MASTER": self.new_master,
1604       "OLD_MASTER": self.old_master,
1605       }
1606     return env, [self.new_master], self.cfg.GetNodeList()
1607
1608   def CheckPrereq(self):
1609     """Check prerequisites.
1610
1611     This checks that we are not already the master.
1612
1613     """
1614     self.new_master = utils.HostInfo().name
1615     self.old_master = self.sstore.GetMasterNode()
1616
1617     if self.old_master == self.new_master:
1618       raise errors.OpPrereqError("This commands must be run on the node"
1619                                  " where you want the new master to be.\n"
1620                                  "%s is already the master" %
1621                                  self.old_master)
1622
1623   def Exec(self, feedback_fn):
1624     """Failover the master node.
1625
1626     This command, when run on a non-master node, will cause the current
1627     master to cease being master, and the non-master to become new
1628     master.
1629
1630     """
1631     #TODO: do not rely on gethostname returning the FQDN
1632     logger.Info("setting master to %s, old master: %s" %
1633                 (self.new_master, self.old_master))
1634
1635     if not rpc.call_node_stop_master(self.old_master):
1636       logger.Error("could disable the master role on the old master"
1637                    " %s, please disable manually" % self.old_master)
1638
1639     ss = self.sstore
1640     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1641     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1642                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1643       logger.Error("could not distribute the new simple store master file"
1644                    " to the other nodes, please check.")
1645
1646     if not rpc.call_node_start_master(self.new_master):
1647       logger.Error("could not start the master role on the new master"
1648                    " %s, please check" % self.new_master)
1649       feedback_fn("Error in activating the master IP on the new master,\n"
1650                   "please fix manually.")
1651
1652
1653
1654 class LUQueryClusterInfo(NoHooksLU):
1655   """Query cluster configuration.
1656
1657   """
1658   _OP_REQP = []
1659   REQ_MASTER = False
1660
1661   def CheckPrereq(self):
1662     """No prerequsites needed for this LU.
1663
1664     """
1665     pass
1666
1667   def Exec(self, feedback_fn):
1668     """Return cluster config.
1669
1670     """
1671     result = {
1672       "name": self.sstore.GetClusterName(),
1673       "software_version": constants.RELEASE_VERSION,
1674       "protocol_version": constants.PROTOCOL_VERSION,
1675       "config_version": constants.CONFIG_VERSION,
1676       "os_api_version": constants.OS_API_VERSION,
1677       "export_version": constants.EXPORT_VERSION,
1678       "master": self.sstore.GetMasterNode(),
1679       "architecture": (platform.architecture()[0], platform.machine()),
1680       }
1681
1682     return result
1683
1684
1685 class LUClusterCopyFile(NoHooksLU):
1686   """Copy file to cluster.
1687
1688   """
1689   _OP_REQP = ["nodes", "filename"]
1690
1691   def CheckPrereq(self):
1692     """Check prerequisites.
1693
1694     It should check that the named file exists and that the given list
1695     of nodes is valid.
1696
1697     """
1698     if not os.path.exists(self.op.filename):
1699       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1700
1701     self.nodes = _GetWantedNodes(self, self.op.nodes)
1702
1703   def Exec(self, feedback_fn):
1704     """Copy a file from master to some nodes.
1705
1706     Args:
1707       opts - class with options as members
1708       args - list containing a single element, the file name
1709     Opts used:
1710       nodes - list containing the name of target nodes; if empty, all nodes
1711
1712     """
1713     filename = self.op.filename
1714
1715     myname = utils.HostInfo().name
1716
1717     for node in self.nodes:
1718       if node == myname:
1719         continue
1720       if not ssh.CopyFileToNode(node, filename):
1721         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1722
1723
1724 class LUDumpClusterConfig(NoHooksLU):
1725   """Return a text-representation of the cluster-config.
1726
1727   """
1728   _OP_REQP = []
1729
1730   def CheckPrereq(self):
1731     """No prerequisites.
1732
1733     """
1734     pass
1735
1736   def Exec(self, feedback_fn):
1737     """Dump a representation of the cluster config to the standard output.
1738
1739     """
1740     return self.cfg.DumpConfig()
1741
1742
1743 class LURunClusterCommand(NoHooksLU):
1744   """Run a command on some nodes.
1745
1746   """
1747   _OP_REQP = ["command", "nodes"]
1748
1749   def CheckPrereq(self):
1750     """Check prerequisites.
1751
1752     It checks that the given list of nodes is valid.
1753
1754     """
1755     self.nodes = _GetWantedNodes(self, self.op.nodes)
1756
1757   def Exec(self, feedback_fn):
1758     """Run a command on some nodes.
1759
1760     """
1761     data = []
1762     for node in self.nodes:
1763       result = ssh.SSHCall(node, "root", self.op.command)
1764       data.append((node, result.output, result.exit_code))
1765
1766     return data
1767
1768
1769 class LUActivateInstanceDisks(NoHooksLU):
1770   """Bring up an instance's disks.
1771
1772   """
1773   _OP_REQP = ["instance_name"]
1774
1775   def CheckPrereq(self):
1776     """Check prerequisites.
1777
1778     This checks that the instance is in the cluster.
1779
1780     """
1781     instance = self.cfg.GetInstanceInfo(
1782       self.cfg.ExpandInstanceName(self.op.instance_name))
1783     if instance is None:
1784       raise errors.OpPrereqError("Instance '%s' not known" %
1785                                  self.op.instance_name)
1786     self.instance = instance
1787
1788
1789   def Exec(self, feedback_fn):
1790     """Activate the disks.
1791
1792     """
1793     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1794     if not disks_ok:
1795       raise errors.OpExecError("Cannot activate block devices")
1796
1797     return disks_info
1798
1799
1800 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1801   """Prepare the block devices for an instance.
1802
1803   This sets up the block devices on all nodes.
1804
1805   Args:
1806     instance: a ganeti.objects.Instance object
1807     ignore_secondaries: if true, errors on secondary nodes won't result
1808                         in an error return from the function
1809
1810   Returns:
1811     false if the operation failed
1812     list of (host, instance_visible_name, node_visible_name) if the operation
1813          suceeded with the mapping from node devices to instance devices
1814   """
1815   device_info = []
1816   disks_ok = True
1817   for inst_disk in instance.disks:
1818     master_result = None
1819     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1820       cfg.SetDiskID(node_disk, node)
1821       is_primary = node == instance.primary_node
1822       result = rpc.call_blockdev_assemble(node, node_disk,
1823                                           instance.name, is_primary)
1824       if not result:
1825         logger.Error("could not prepare block device %s on node %s (is_pri"
1826                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1827         if is_primary or not ignore_secondaries:
1828           disks_ok = False
1829       if is_primary:
1830         master_result = result
1831     device_info.append((instance.primary_node, inst_disk.iv_name,
1832                         master_result))
1833
1834   # leave the disks configured for the primary node
1835   # this is a workaround that would be fixed better by
1836   # improving the logical/physical id handling
1837   for disk in instance.disks:
1838     cfg.SetDiskID(disk, instance.primary_node)
1839
1840   return disks_ok, device_info
1841
1842
1843 def _StartInstanceDisks(cfg, instance, force):
1844   """Start the disks of an instance.
1845
1846   """
1847   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1848                                            ignore_secondaries=force)
1849   if not disks_ok:
1850     _ShutdownInstanceDisks(instance, cfg)
1851     if force is not None and not force:
1852       logger.Error("If the message above refers to a secondary node,"
1853                    " you can retry the operation using '--force'.")
1854     raise errors.OpExecError("Disk consistency error")
1855
1856
1857 class LUDeactivateInstanceDisks(NoHooksLU):
1858   """Shutdown an instance's disks.
1859
1860   """
1861   _OP_REQP = ["instance_name"]
1862
1863   def CheckPrereq(self):
1864     """Check prerequisites.
1865
1866     This checks that the instance is in the cluster.
1867
1868     """
1869     instance = self.cfg.GetInstanceInfo(
1870       self.cfg.ExpandInstanceName(self.op.instance_name))
1871     if instance is None:
1872       raise errors.OpPrereqError("Instance '%s' not known" %
1873                                  self.op.instance_name)
1874     self.instance = instance
1875
1876   def Exec(self, feedback_fn):
1877     """Deactivate the disks
1878
1879     """
1880     instance = self.instance
1881     ins_l = rpc.call_instance_list([instance.primary_node])
1882     ins_l = ins_l[instance.primary_node]
1883     if not type(ins_l) is list:
1884       raise errors.OpExecError("Can't contact node '%s'" %
1885                                instance.primary_node)
1886
1887     if self.instance.name in ins_l:
1888       raise errors.OpExecError("Instance is running, can't shutdown"
1889                                " block devices.")
1890
1891     _ShutdownInstanceDisks(instance, self.cfg)
1892
1893
1894 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1895   """Shutdown block devices of an instance.
1896
1897   This does the shutdown on all nodes of the instance.
1898
1899   If the ignore_primary is false, errors on the primary node are
1900   ignored.
1901
1902   """
1903   result = True
1904   for disk in instance.disks:
1905     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1906       cfg.SetDiskID(top_disk, node)
1907       if not rpc.call_blockdev_shutdown(node, top_disk):
1908         logger.Error("could not shutdown block device %s on node %s" %
1909                      (disk.iv_name, node))
1910         if not ignore_primary or node != instance.primary_node:
1911           result = False
1912   return result
1913
1914
1915 class LUStartupInstance(LogicalUnit):
1916   """Starts an instance.
1917
1918   """
1919   HPATH = "instance-start"
1920   HTYPE = constants.HTYPE_INSTANCE
1921   _OP_REQP = ["instance_name", "force"]
1922
1923   def BuildHooksEnv(self):
1924     """Build hooks env.
1925
1926     This runs on master, primary and secondary nodes of the instance.
1927
1928     """
1929     env = {
1930       "FORCE": self.op.force,
1931       }
1932     env.update(_BuildInstanceHookEnvByObject(self.instance))
1933     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1934           list(self.instance.secondary_nodes))
1935     return env, nl, nl
1936
1937   def CheckPrereq(self):
1938     """Check prerequisites.
1939
1940     This checks that the instance is in the cluster.
1941
1942     """
1943     instance = self.cfg.GetInstanceInfo(
1944       self.cfg.ExpandInstanceName(self.op.instance_name))
1945     if instance is None:
1946       raise errors.OpPrereqError("Instance '%s' not known" %
1947                                  self.op.instance_name)
1948
1949     # check bridges existance
1950     _CheckInstanceBridgesExist(instance)
1951
1952     self.instance = instance
1953     self.op.instance_name = instance.name
1954
1955   def Exec(self, feedback_fn):
1956     """Start the instance.
1957
1958     """
1959     instance = self.instance
1960     force = self.op.force
1961     extra_args = getattr(self.op, "extra_args", "")
1962
1963     node_current = instance.primary_node
1964
1965     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1966     if not nodeinfo:
1967       raise errors.OpExecError("Could not contact node %s for infos" %
1968                                (node_current))
1969
1970     freememory = nodeinfo[node_current]['memory_free']
1971     memory = instance.memory
1972     if memory > freememory:
1973       raise errors.OpExecError("Not enough memory to start instance"
1974                                " %s on node %s"
1975                                " needed %s MiB, available %s MiB" %
1976                                (instance.name, node_current, memory,
1977                                 freememory))
1978
1979     _StartInstanceDisks(self.cfg, instance, force)
1980
1981     if not rpc.call_instance_start(node_current, instance, extra_args):
1982       _ShutdownInstanceDisks(instance, self.cfg)
1983       raise errors.OpExecError("Could not start instance")
1984
1985     self.cfg.MarkInstanceUp(instance.name)
1986
1987
1988 class LURebootInstance(LogicalUnit):
1989   """Reboot an instance.
1990
1991   """
1992   HPATH = "instance-reboot"
1993   HTYPE = constants.HTYPE_INSTANCE
1994   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1995
1996   def BuildHooksEnv(self):
1997     """Build hooks env.
1998
1999     This runs on master, primary and secondary nodes of the instance.
2000
2001     """
2002     env = {
2003       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2004       }
2005     env.update(_BuildInstanceHookEnvByObject(self.instance))
2006     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2007           list(self.instance.secondary_nodes))
2008     return env, nl, nl
2009
2010   def CheckPrereq(self):
2011     """Check prerequisites.
2012
2013     This checks that the instance is in the cluster.
2014
2015     """
2016     instance = self.cfg.GetInstanceInfo(
2017       self.cfg.ExpandInstanceName(self.op.instance_name))
2018     if instance is None:
2019       raise errors.OpPrereqError("Instance '%s' not known" %
2020                                  self.op.instance_name)
2021
2022     # check bridges existance
2023     _CheckInstanceBridgesExist(instance)
2024
2025     self.instance = instance
2026     self.op.instance_name = instance.name
2027
2028   def Exec(self, feedback_fn):
2029     """Reboot the instance.
2030
2031     """
2032     instance = self.instance
2033     ignore_secondaries = self.op.ignore_secondaries
2034     reboot_type = self.op.reboot_type
2035     extra_args = getattr(self.op, "extra_args", "")
2036
2037     node_current = instance.primary_node
2038
2039     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2040                            constants.INSTANCE_REBOOT_HARD,
2041                            constants.INSTANCE_REBOOT_FULL]:
2042       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2043                                   (constants.INSTANCE_REBOOT_SOFT,
2044                                    constants.INSTANCE_REBOOT_HARD,
2045                                    constants.INSTANCE_REBOOT_FULL))
2046
2047     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2048                        constants.INSTANCE_REBOOT_HARD]:
2049       if not rpc.call_instance_reboot(node_current, instance,
2050                                       reboot_type, extra_args):
2051         raise errors.OpExecError("Could not reboot instance")
2052     else:
2053       if not rpc.call_instance_shutdown(node_current, instance):
2054         raise errors.OpExecError("could not shutdown instance for full reboot")
2055       _ShutdownInstanceDisks(instance, self.cfg)
2056       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2057       if not rpc.call_instance_start(node_current, instance, extra_args):
2058         _ShutdownInstanceDisks(instance, self.cfg)
2059         raise errors.OpExecError("Could not start instance for full reboot")
2060
2061     self.cfg.MarkInstanceUp(instance.name)
2062
2063
2064 class LUShutdownInstance(LogicalUnit):
2065   """Shutdown an instance.
2066
2067   """
2068   HPATH = "instance-stop"
2069   HTYPE = constants.HTYPE_INSTANCE
2070   _OP_REQP = ["instance_name"]
2071
2072   def BuildHooksEnv(self):
2073     """Build hooks env.
2074
2075     This runs on master, primary and secondary nodes of the instance.
2076
2077     """
2078     env = _BuildInstanceHookEnvByObject(self.instance)
2079     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2080           list(self.instance.secondary_nodes))
2081     return env, nl, nl
2082
2083   def CheckPrereq(self):
2084     """Check prerequisites.
2085
2086     This checks that the instance is in the cluster.
2087
2088     """
2089     instance = self.cfg.GetInstanceInfo(
2090       self.cfg.ExpandInstanceName(self.op.instance_name))
2091     if instance is None:
2092       raise errors.OpPrereqError("Instance '%s' not known" %
2093                                  self.op.instance_name)
2094     self.instance = instance
2095
2096   def Exec(self, feedback_fn):
2097     """Shutdown the instance.
2098
2099     """
2100     instance = self.instance
2101     node_current = instance.primary_node
2102     if not rpc.call_instance_shutdown(node_current, instance):
2103       logger.Error("could not shutdown instance")
2104
2105     self.cfg.MarkInstanceDown(instance.name)
2106     _ShutdownInstanceDisks(instance, self.cfg)
2107
2108
2109 class LUReinstallInstance(LogicalUnit):
2110   """Reinstall an instance.
2111
2112   """
2113   HPATH = "instance-reinstall"
2114   HTYPE = constants.HTYPE_INSTANCE
2115   _OP_REQP = ["instance_name"]
2116
2117   def BuildHooksEnv(self):
2118     """Build hooks env.
2119
2120     This runs on master, primary and secondary nodes of the instance.
2121
2122     """
2123     env = _BuildInstanceHookEnvByObject(self.instance)
2124     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2125           list(self.instance.secondary_nodes))
2126     return env, nl, nl
2127
2128   def CheckPrereq(self):
2129     """Check prerequisites.
2130
2131     This checks that the instance is in the cluster and is not running.
2132
2133     """
2134     instance = self.cfg.GetInstanceInfo(
2135       self.cfg.ExpandInstanceName(self.op.instance_name))
2136     if instance is None:
2137       raise errors.OpPrereqError("Instance '%s' not known" %
2138                                  self.op.instance_name)
2139     if instance.disk_template == constants.DT_DISKLESS:
2140       raise errors.OpPrereqError("Instance '%s' has no disks" %
2141                                  self.op.instance_name)
2142     if instance.status != "down":
2143       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2144                                  self.op.instance_name)
2145     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2146     if remote_info:
2147       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2148                                  (self.op.instance_name,
2149                                   instance.primary_node))
2150
2151     self.op.os_type = getattr(self.op, "os_type", None)
2152     if self.op.os_type is not None:
2153       # OS verification
2154       pnode = self.cfg.GetNodeInfo(
2155         self.cfg.ExpandNodeName(instance.primary_node))
2156       if pnode is None:
2157         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2158                                    self.op.pnode)
2159       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2160       if not os_obj:
2161         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2162                                    " primary node"  % self.op.os_type)
2163
2164     self.instance = instance
2165
2166   def Exec(self, feedback_fn):
2167     """Reinstall the instance.
2168
2169     """
2170     inst = self.instance
2171
2172     if self.op.os_type is not None:
2173       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2174       inst.os = self.op.os_type
2175       self.cfg.AddInstance(inst)
2176
2177     _StartInstanceDisks(self.cfg, inst, None)
2178     try:
2179       feedback_fn("Running the instance OS create scripts...")
2180       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2181         raise errors.OpExecError("Could not install OS for instance %s "
2182                                  "on node %s" %
2183                                  (inst.name, inst.primary_node))
2184     finally:
2185       _ShutdownInstanceDisks(inst, self.cfg)
2186
2187
2188 class LURenameInstance(LogicalUnit):
2189   """Rename an instance.
2190
2191   """
2192   HPATH = "instance-rename"
2193   HTYPE = constants.HTYPE_INSTANCE
2194   _OP_REQP = ["instance_name", "new_name"]
2195
2196   def BuildHooksEnv(self):
2197     """Build hooks env.
2198
2199     This runs on master, primary and secondary nodes of the instance.
2200
2201     """
2202     env = _BuildInstanceHookEnvByObject(self.instance)
2203     env["INSTANCE_NEW_NAME"] = self.op.new_name
2204     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2205           list(self.instance.secondary_nodes))
2206     return env, nl, nl
2207
2208   def CheckPrereq(self):
2209     """Check prerequisites.
2210
2211     This checks that the instance is in the cluster and is not running.
2212
2213     """
2214     instance = self.cfg.GetInstanceInfo(
2215       self.cfg.ExpandInstanceName(self.op.instance_name))
2216     if instance is None:
2217       raise errors.OpPrereqError("Instance '%s' not known" %
2218                                  self.op.instance_name)
2219     if instance.status != "down":
2220       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2221                                  self.op.instance_name)
2222     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2223     if remote_info:
2224       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2225                                  (self.op.instance_name,
2226                                   instance.primary_node))
2227     self.instance = instance
2228
2229     # new name verification
2230     name_info = utils.HostInfo(self.op.new_name)
2231
2232     self.op.new_name = new_name = name_info.name
2233     if not getattr(self.op, "ignore_ip", False):
2234       command = ["fping", "-q", name_info.ip]
2235       result = utils.RunCmd(command)
2236       if not result.failed:
2237         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2238                                    (name_info.ip, new_name))
2239
2240
2241   def Exec(self, feedback_fn):
2242     """Reinstall the instance.
2243
2244     """
2245     inst = self.instance
2246     old_name = inst.name
2247
2248     self.cfg.RenameInstance(inst.name, self.op.new_name)
2249
2250     # re-read the instance from the configuration after rename
2251     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2252
2253     _StartInstanceDisks(self.cfg, inst, None)
2254     try:
2255       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2256                                           "sda", "sdb"):
2257         msg = ("Could run OS rename script for instance %s\n"
2258                "on node %s\n"
2259                "(but the instance has been renamed in Ganeti)" %
2260                (inst.name, inst.primary_node))
2261         logger.Error(msg)
2262     finally:
2263       _ShutdownInstanceDisks(inst, self.cfg)
2264
2265
2266 class LURemoveInstance(LogicalUnit):
2267   """Remove an instance.
2268
2269   """
2270   HPATH = "instance-remove"
2271   HTYPE = constants.HTYPE_INSTANCE
2272   _OP_REQP = ["instance_name"]
2273
2274   def BuildHooksEnv(self):
2275     """Build hooks env.
2276
2277     This runs on master, primary and secondary nodes of the instance.
2278
2279     """
2280     env = _BuildInstanceHookEnvByObject(self.instance)
2281     nl = [self.sstore.GetMasterNode()]
2282     return env, nl, nl
2283
2284   def CheckPrereq(self):
2285     """Check prerequisites.
2286
2287     This checks that the instance is in the cluster.
2288
2289     """
2290     instance = self.cfg.GetInstanceInfo(
2291       self.cfg.ExpandInstanceName(self.op.instance_name))
2292     if instance is None:
2293       raise errors.OpPrereqError("Instance '%s' not known" %
2294                                  self.op.instance_name)
2295     self.instance = instance
2296
2297   def Exec(self, feedback_fn):
2298     """Remove the instance.
2299
2300     """
2301     instance = self.instance
2302     logger.Info("shutting down instance %s on node %s" %
2303                 (instance.name, instance.primary_node))
2304
2305     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2306       if self.op.ignore_failures:
2307         feedback_fn("Warning: can't shutdown instance")
2308       else:
2309         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2310                                  (instance.name, instance.primary_node))
2311
2312     logger.Info("removing block devices for instance %s" % instance.name)
2313
2314     if not _RemoveDisks(instance, self.cfg):
2315       if self.op.ignore_failures:
2316         feedback_fn("Warning: can't remove instance's disks")
2317       else:
2318         raise errors.OpExecError("Can't remove instance's disks")
2319
2320     logger.Info("removing instance %s out of cluster config" % instance.name)
2321
2322     self.cfg.RemoveInstance(instance.name)
2323
2324
2325 class LUQueryInstances(NoHooksLU):
2326   """Logical unit for querying instances.
2327
2328   """
2329   _OP_REQP = ["output_fields", "names"]
2330
2331   def CheckPrereq(self):
2332     """Check prerequisites.
2333
2334     This checks that the fields required are valid output fields.
2335
2336     """
2337     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2338     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2339                                "admin_state", "admin_ram",
2340                                "disk_template", "ip", "mac", "bridge",
2341                                "sda_size", "sdb_size"],
2342                        dynamic=self.dynamic_fields,
2343                        selected=self.op.output_fields)
2344
2345     self.wanted = _GetWantedInstances(self, self.op.names)
2346
2347   def Exec(self, feedback_fn):
2348     """Computes the list of nodes and their attributes.
2349
2350     """
2351     instance_names = self.wanted
2352     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2353                      in instance_names]
2354
2355     # begin data gathering
2356
2357     nodes = frozenset([inst.primary_node for inst in instance_list])
2358
2359     bad_nodes = []
2360     if self.dynamic_fields.intersection(self.op.output_fields):
2361       live_data = {}
2362       node_data = rpc.call_all_instances_info(nodes)
2363       for name in nodes:
2364         result = node_data[name]
2365         if result:
2366           live_data.update(result)
2367         elif result == False:
2368           bad_nodes.append(name)
2369         # else no instance is alive
2370     else:
2371       live_data = dict([(name, {}) for name in instance_names])
2372
2373     # end data gathering
2374
2375     output = []
2376     for instance in instance_list:
2377       iout = []
2378       for field in self.op.output_fields:
2379         if field == "name":
2380           val = instance.name
2381         elif field == "os":
2382           val = instance.os
2383         elif field == "pnode":
2384           val = instance.primary_node
2385         elif field == "snodes":
2386           val = list(instance.secondary_nodes)
2387         elif field == "admin_state":
2388           val = (instance.status != "down")
2389         elif field == "oper_state":
2390           if instance.primary_node in bad_nodes:
2391             val = None
2392           else:
2393             val = bool(live_data.get(instance.name))
2394         elif field == "admin_ram":
2395           val = instance.memory
2396         elif field == "oper_ram":
2397           if instance.primary_node in bad_nodes:
2398             val = None
2399           elif instance.name in live_data:
2400             val = live_data[instance.name].get("memory", "?")
2401           else:
2402             val = "-"
2403         elif field == "disk_template":
2404           val = instance.disk_template
2405         elif field == "ip":
2406           val = instance.nics[0].ip
2407         elif field == "bridge":
2408           val = instance.nics[0].bridge
2409         elif field == "mac":
2410           val = instance.nics[0].mac
2411         elif field == "sda_size" or field == "sdb_size":
2412           disk = instance.FindDisk(field[:3])
2413           if disk is None:
2414             val = None
2415           else:
2416             val = disk.size
2417         else:
2418           raise errors.ParameterError(field)
2419         iout.append(val)
2420       output.append(iout)
2421
2422     return output
2423
2424
2425 class LUFailoverInstance(LogicalUnit):
2426   """Failover an instance.
2427
2428   """
2429   HPATH = "instance-failover"
2430   HTYPE = constants.HTYPE_INSTANCE
2431   _OP_REQP = ["instance_name", "ignore_consistency"]
2432
2433   def BuildHooksEnv(self):
2434     """Build hooks env.
2435
2436     This runs on master, primary and secondary nodes of the instance.
2437
2438     """
2439     env = {
2440       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2441       }
2442     env.update(_BuildInstanceHookEnvByObject(self.instance))
2443     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2444     return env, nl, nl
2445
2446   def CheckPrereq(self):
2447     """Check prerequisites.
2448
2449     This checks that the instance is in the cluster.
2450
2451     """
2452     instance = self.cfg.GetInstanceInfo(
2453       self.cfg.ExpandInstanceName(self.op.instance_name))
2454     if instance is None:
2455       raise errors.OpPrereqError("Instance '%s' not known" %
2456                                  self.op.instance_name)
2457
2458     if instance.disk_template not in constants.DTS_NET_MIRROR:
2459       raise errors.OpPrereqError("Instance's disk layout is not"
2460                                  " network mirrored, cannot failover.")
2461
2462     secondary_nodes = instance.secondary_nodes
2463     if not secondary_nodes:
2464       raise errors.ProgrammerError("no secondary node but using "
2465                                    "DT_REMOTE_RAID1 template")
2466
2467     # check memory requirements on the secondary node
2468     target_node = secondary_nodes[0]
2469     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2470     info = nodeinfo.get(target_node, None)
2471     if not info:
2472       raise errors.OpPrereqError("Cannot get current information"
2473                                  " from node '%s'" % nodeinfo)
2474     if instance.memory > info['memory_free']:
2475       raise errors.OpPrereqError("Not enough memory on target node %s."
2476                                  " %d MB available, %d MB required" %
2477                                  (target_node, info['memory_free'],
2478                                   instance.memory))
2479
2480     # check bridge existance
2481     brlist = [nic.bridge for nic in instance.nics]
2482     if not rpc.call_bridges_exist(target_node, brlist):
2483       raise errors.OpPrereqError("One or more target bridges %s does not"
2484                                  " exist on destination node '%s'" %
2485                                  (brlist, target_node))
2486
2487     self.instance = instance
2488
2489   def Exec(self, feedback_fn):
2490     """Failover an instance.
2491
2492     The failover is done by shutting it down on its present node and
2493     starting it on the secondary.
2494
2495     """
2496     instance = self.instance
2497
2498     source_node = instance.primary_node
2499     target_node = instance.secondary_nodes[0]
2500
2501     feedback_fn("* checking disk consistency between source and target")
2502     for dev in instance.disks:
2503       # for remote_raid1, these are md over drbd
2504       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2505         if not self.op.ignore_consistency:
2506           raise errors.OpExecError("Disk %s is degraded on target node,"
2507                                    " aborting failover." % dev.iv_name)
2508
2509     feedback_fn("* checking target node resource availability")
2510     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2511
2512     if not nodeinfo:
2513       raise errors.OpExecError("Could not contact target node %s." %
2514                                target_node)
2515
2516     free_memory = int(nodeinfo[target_node]['memory_free'])
2517     memory = instance.memory
2518     if memory > free_memory:
2519       raise errors.OpExecError("Not enough memory to create instance %s on"
2520                                " node %s. needed %s MiB, available %s MiB" %
2521                                (instance.name, target_node, memory,
2522                                 free_memory))
2523
2524     feedback_fn("* shutting down instance on source node")
2525     logger.Info("Shutting down instance %s on node %s" %
2526                 (instance.name, source_node))
2527
2528     if not rpc.call_instance_shutdown(source_node, instance):
2529       if self.op.ignore_consistency:
2530         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2531                      " anyway. Please make sure node %s is down"  %
2532                      (instance.name, source_node, source_node))
2533       else:
2534         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2535                                  (instance.name, source_node))
2536
2537     feedback_fn("* deactivating the instance's disks on source node")
2538     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2539       raise errors.OpExecError("Can't shut down the instance's disks.")
2540
2541     instance.primary_node = target_node
2542     # distribute new instance config to the other nodes
2543     self.cfg.AddInstance(instance)
2544
2545     feedback_fn("* activating the instance's disks on target node")
2546     logger.Info("Starting instance %s on node %s" %
2547                 (instance.name, target_node))
2548
2549     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2550                                              ignore_secondaries=True)
2551     if not disks_ok:
2552       _ShutdownInstanceDisks(instance, self.cfg)
2553       raise errors.OpExecError("Can't activate the instance's disks")
2554
2555     feedback_fn("* starting the instance on the target node")
2556     if not rpc.call_instance_start(target_node, instance, None):
2557       _ShutdownInstanceDisks(instance, self.cfg)
2558       raise errors.OpExecError("Could not start instance %s on node %s." %
2559                                (instance.name, target_node))
2560
2561
2562 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2563   """Create a tree of block devices on the primary node.
2564
2565   This always creates all devices.
2566
2567   """
2568   if device.children:
2569     for child in device.children:
2570       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2571         return False
2572
2573   cfg.SetDiskID(device, node)
2574   new_id = rpc.call_blockdev_create(node, device, device.size,
2575                                     instance.name, True, info)
2576   if not new_id:
2577     return False
2578   if device.physical_id is None:
2579     device.physical_id = new_id
2580   return True
2581
2582
2583 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2584   """Create a tree of block devices on a secondary node.
2585
2586   If this device type has to be created on secondaries, create it and
2587   all its children.
2588
2589   If not, just recurse to children keeping the same 'force' value.
2590
2591   """
2592   if device.CreateOnSecondary():
2593     force = True
2594   if device.children:
2595     for child in device.children:
2596       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2597                                         child, force, info):
2598         return False
2599
2600   if not force:
2601     return True
2602   cfg.SetDiskID(device, node)
2603   new_id = rpc.call_blockdev_create(node, device, device.size,
2604                                     instance.name, False, info)
2605   if not new_id:
2606     return False
2607   if device.physical_id is None:
2608     device.physical_id = new_id
2609   return True
2610
2611
2612 def _GenerateUniqueNames(cfg, exts):
2613   """Generate a suitable LV name.
2614
2615   This will generate a logical volume name for the given instance.
2616
2617   """
2618   results = []
2619   for val in exts:
2620     new_id = cfg.GenerateUniqueID()
2621     results.append("%s%s" % (new_id, val))
2622   return results
2623
2624
2625 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2626   """Generate a drbd device complete with its children.
2627
2628   """
2629   port = cfg.AllocatePort()
2630   vgname = cfg.GetVGName()
2631   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2632                           logical_id=(vgname, names[0]))
2633   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2634                           logical_id=(vgname, names[1]))
2635   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2636                           logical_id = (primary, secondary, port),
2637                           children = [dev_data, dev_meta])
2638   return drbd_dev
2639
2640
2641 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2642   """Generate a drbd8 device complete with its children.
2643
2644   """
2645   port = cfg.AllocatePort()
2646   vgname = cfg.GetVGName()
2647   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2648                           logical_id=(vgname, names[0]))
2649   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2650                           logical_id=(vgname, names[1]))
2651   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2652                           logical_id = (primary, secondary, port),
2653                           children = [dev_data, dev_meta],
2654                           iv_name=iv_name)
2655   return drbd_dev
2656
2657 def _GenerateDiskTemplate(cfg, template_name,
2658                           instance_name, primary_node,
2659                           secondary_nodes, disk_sz, swap_sz):
2660   """Generate the entire disk layout for a given template type.
2661
2662   """
2663   #TODO: compute space requirements
2664
2665   vgname = cfg.GetVGName()
2666   if template_name == "diskless":
2667     disks = []
2668   elif template_name == "plain":
2669     if len(secondary_nodes) != 0:
2670       raise errors.ProgrammerError("Wrong template configuration")
2671
2672     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2673     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2674                            logical_id=(vgname, names[0]),
2675                            iv_name = "sda")
2676     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2677                            logical_id=(vgname, names[1]),
2678                            iv_name = "sdb")
2679     disks = [sda_dev, sdb_dev]
2680   elif template_name == "local_raid1":
2681     if len(secondary_nodes) != 0:
2682       raise errors.ProgrammerError("Wrong template configuration")
2683
2684
2685     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2686                                        ".sdb_m1", ".sdb_m2"])
2687     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688                               logical_id=(vgname, names[0]))
2689     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2690                               logical_id=(vgname, names[1]))
2691     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2692                               size=disk_sz,
2693                               children = [sda_dev_m1, sda_dev_m2])
2694     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2695                               logical_id=(vgname, names[2]))
2696     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2697                               logical_id=(vgname, names[3]))
2698     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2699                               size=swap_sz,
2700                               children = [sdb_dev_m1, sdb_dev_m2])
2701     disks = [md_sda_dev, md_sdb_dev]
2702   elif template_name == constants.DT_REMOTE_RAID1:
2703     if len(secondary_nodes) != 1:
2704       raise errors.ProgrammerError("Wrong template configuration")
2705     remote_node = secondary_nodes[0]
2706     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2707                                        ".sdb_data", ".sdb_meta"])
2708     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2709                                          disk_sz, names[0:2])
2710     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2711                               children = [drbd_sda_dev], size=disk_sz)
2712     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2713                                          swap_sz, names[2:4])
2714     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2715                               children = [drbd_sdb_dev], size=swap_sz)
2716     disks = [md_sda_dev, md_sdb_dev]
2717   elif template_name == constants.DT_DRBD8:
2718     if len(secondary_nodes) != 1:
2719       raise errors.ProgrammerError("Wrong template configuration")
2720     remote_node = secondary_nodes[0]
2721     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2722                                        ".sdb_data", ".sdb_meta"])
2723     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724                                          disk_sz, names[0:2], "sda")
2725     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2726                                          swap_sz, names[2:4], "sdb")
2727     disks = [drbd_sda_dev, drbd_sdb_dev]
2728   else:
2729     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2730   return disks
2731
2732
2733 def _GetInstanceInfoText(instance):
2734   """Compute that text that should be added to the disk's metadata.
2735
2736   """
2737   return "originstname+%s" % instance.name
2738
2739
2740 def _CreateDisks(cfg, instance):
2741   """Create all disks for an instance.
2742
2743   This abstracts away some work from AddInstance.
2744
2745   Args:
2746     instance: the instance object
2747
2748   Returns:
2749     True or False showing the success of the creation process
2750
2751   """
2752   info = _GetInstanceInfoText(instance)
2753
2754   for device in instance.disks:
2755     logger.Info("creating volume %s for instance %s" %
2756               (device.iv_name, instance.name))
2757     #HARDCODE
2758     for secondary_node in instance.secondary_nodes:
2759       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2760                                         device, False, info):
2761         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2762                      (device.iv_name, device, secondary_node))
2763         return False
2764     #HARDCODE
2765     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2766                                     instance, device, info):
2767       logger.Error("failed to create volume %s on primary!" %
2768                    device.iv_name)
2769       return False
2770   return True
2771
2772
2773 def _RemoveDisks(instance, cfg):
2774   """Remove all disks for an instance.
2775
2776   This abstracts away some work from `AddInstance()` and
2777   `RemoveInstance()`. Note that in case some of the devices couldn't
2778   be removed, the removal will continue with the other ones (compare
2779   with `_CreateDisks()`).
2780
2781   Args:
2782     instance: the instance object
2783
2784   Returns:
2785     True or False showing the success of the removal proces
2786
2787   """
2788   logger.Info("removing block devices for instance %s" % instance.name)
2789
2790   result = True
2791   for device in instance.disks:
2792     for node, disk in device.ComputeNodeTree(instance.primary_node):
2793       cfg.SetDiskID(disk, node)
2794       if not rpc.call_blockdev_remove(node, disk):
2795         logger.Error("could not remove block device %s on node %s,"
2796                      " continuing anyway" %
2797                      (device.iv_name, node))
2798         result = False
2799   return result
2800
2801
2802 class LUCreateInstance(LogicalUnit):
2803   """Create an instance.
2804
2805   """
2806   HPATH = "instance-add"
2807   HTYPE = constants.HTYPE_INSTANCE
2808   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2809               "disk_template", "swap_size", "mode", "start", "vcpus",
2810               "wait_for_sync", "ip_check"]
2811
2812   def BuildHooksEnv(self):
2813     """Build hooks env.
2814
2815     This runs on master, primary and secondary nodes of the instance.
2816
2817     """
2818     env = {
2819       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2820       "INSTANCE_DISK_SIZE": self.op.disk_size,
2821       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2822       "INSTANCE_ADD_MODE": self.op.mode,
2823       }
2824     if self.op.mode == constants.INSTANCE_IMPORT:
2825       env["INSTANCE_SRC_NODE"] = self.op.src_node
2826       env["INSTANCE_SRC_PATH"] = self.op.src_path
2827       env["INSTANCE_SRC_IMAGE"] = self.src_image
2828
2829     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2830       primary_node=self.op.pnode,
2831       secondary_nodes=self.secondaries,
2832       status=self.instance_status,
2833       os_type=self.op.os_type,
2834       memory=self.op.mem_size,
2835       vcpus=self.op.vcpus,
2836       nics=[(self.inst_ip, self.op.bridge)],
2837     ))
2838
2839     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2840           self.secondaries)
2841     return env, nl, nl
2842
2843
2844   def CheckPrereq(self):
2845     """Check prerequisites.
2846
2847     """
2848     if self.op.mode not in (constants.INSTANCE_CREATE,
2849                             constants.INSTANCE_IMPORT):
2850       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2851                                  self.op.mode)
2852
2853     if self.op.mode == constants.INSTANCE_IMPORT:
2854       src_node = getattr(self.op, "src_node", None)
2855       src_path = getattr(self.op, "src_path", None)
2856       if src_node is None or src_path is None:
2857         raise errors.OpPrereqError("Importing an instance requires source"
2858                                    " node and path options")
2859       src_node_full = self.cfg.ExpandNodeName(src_node)
2860       if src_node_full is None:
2861         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2862       self.op.src_node = src_node = src_node_full
2863
2864       if not os.path.isabs(src_path):
2865         raise errors.OpPrereqError("The source path must be absolute")
2866
2867       export_info = rpc.call_export_info(src_node, src_path)
2868
2869       if not export_info:
2870         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2871
2872       if not export_info.has_section(constants.INISECT_EXP):
2873         raise errors.ProgrammerError("Corrupted export config")
2874
2875       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2876       if (int(ei_version) != constants.EXPORT_VERSION):
2877         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2878                                    (ei_version, constants.EXPORT_VERSION))
2879
2880       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2881         raise errors.OpPrereqError("Can't import instance with more than"
2882                                    " one data disk")
2883
2884       # FIXME: are the old os-es, disk sizes, etc. useful?
2885       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2886       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2887                                                          'disk0_dump'))
2888       self.src_image = diskimage
2889     else: # INSTANCE_CREATE
2890       if getattr(self.op, "os_type", None) is None:
2891         raise errors.OpPrereqError("No guest OS specified")
2892
2893     # check primary node
2894     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2895     if pnode is None:
2896       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2897                                  self.op.pnode)
2898     self.op.pnode = pnode.name
2899     self.pnode = pnode
2900     self.secondaries = []
2901     # disk template and mirror node verification
2902     if self.op.disk_template not in constants.DISK_TEMPLATES:
2903       raise errors.OpPrereqError("Invalid disk template name")
2904
2905     if self.op.disk_template in constants.DTS_NET_MIRROR:
2906       if getattr(self.op, "snode", None) is None:
2907         raise errors.OpPrereqError("The networked disk templates need"
2908                                    " a mirror node")
2909
2910       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2911       if snode_name is None:
2912         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2913                                    self.op.snode)
2914       elif snode_name == pnode.name:
2915         raise errors.OpPrereqError("The secondary node cannot be"
2916                                    " the primary node.")
2917       self.secondaries.append(snode_name)
2918
2919     # Check lv size requirements
2920     nodenames = [pnode.name] + self.secondaries
2921     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2922
2923     # Required free disk space as a function of disk and swap space
2924     req_size_dict = {
2925       constants.DT_DISKLESS: 0,
2926       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2927       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2928       # 256 MB are added for drbd metadata, 128MB for each drbd device
2929       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2930       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2931     }
2932
2933     if self.op.disk_template not in req_size_dict:
2934       raise errors.ProgrammerError("Disk template '%s' size requirement"
2935                                    " is unknown" %  self.op.disk_template)
2936
2937     req_size = req_size_dict[self.op.disk_template]
2938
2939     for node in nodenames:
2940       info = nodeinfo.get(node, None)
2941       if not info:
2942         raise errors.OpPrereqError("Cannot get current information"
2943                                    " from node '%s'" % nodeinfo)
2944       if req_size > info['vg_free']:
2945         raise errors.OpPrereqError("Not enough disk space on target node %s."
2946                                    " %d MB available, %d MB required" %
2947                                    (node, info['vg_free'], req_size))
2948
2949     # os verification
2950     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2951     if not os_obj:
2952       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2953                                  " primary node"  % self.op.os_type)
2954
2955     # instance verification
2956     hostname1 = utils.HostInfo(self.op.instance_name)
2957
2958     self.op.instance_name = instance_name = hostname1.name
2959     instance_list = self.cfg.GetInstanceList()
2960     if instance_name in instance_list:
2961       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2962                                  instance_name)
2963
2964     ip = getattr(self.op, "ip", None)
2965     if ip is None or ip.lower() == "none":
2966       inst_ip = None
2967     elif ip.lower() == "auto":
2968       inst_ip = hostname1.ip
2969     else:
2970       if not utils.IsValidIP(ip):
2971         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2972                                    " like a valid IP" % ip)
2973       inst_ip = ip
2974     self.inst_ip = inst_ip
2975
2976     if self.op.start and not self.op.ip_check:
2977       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2978                                  " adding an instance in start mode")
2979
2980     if self.op.ip_check:
2981       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2982                        constants.DEFAULT_NODED_PORT):
2983         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2984                                    (hostname1.ip, instance_name))
2985
2986     # bridge verification
2987     bridge = getattr(self.op, "bridge", None)
2988     if bridge is None:
2989       self.op.bridge = self.cfg.GetDefBridge()
2990     else:
2991       self.op.bridge = bridge
2992
2993     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2994       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2995                                  " destination node '%s'" %
2996                                  (self.op.bridge, pnode.name))
2997
2998     if self.op.start:
2999       self.instance_status = 'up'
3000     else:
3001       self.instance_status = 'down'
3002
3003   def Exec(self, feedback_fn):
3004     """Create and add the instance to the cluster.
3005
3006     """
3007     instance = self.op.instance_name
3008     pnode_name = self.pnode.name
3009
3010     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3011     if self.inst_ip is not None:
3012       nic.ip = self.inst_ip
3013
3014     disks = _GenerateDiskTemplate(self.cfg,
3015                                   self.op.disk_template,
3016                                   instance, pnode_name,
3017                                   self.secondaries, self.op.disk_size,
3018                                   self.op.swap_size)
3019
3020     iobj = objects.Instance(name=instance, os=self.op.os_type,
3021                             primary_node=pnode_name,
3022                             memory=self.op.mem_size,
3023                             vcpus=self.op.vcpus,
3024                             nics=[nic], disks=disks,
3025                             disk_template=self.op.disk_template,
3026                             status=self.instance_status,
3027                             )
3028
3029     feedback_fn("* creating instance disks...")
3030     if not _CreateDisks(self.cfg, iobj):
3031       _RemoveDisks(iobj, self.cfg)
3032       raise errors.OpExecError("Device creation failed, reverting...")
3033
3034     feedback_fn("adding instance %s to cluster config" % instance)
3035
3036     self.cfg.AddInstance(iobj)
3037
3038     if self.op.wait_for_sync:
3039       disk_abort = not _WaitForSync(self.cfg, iobj)
3040     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3041       # make sure the disks are not degraded (still sync-ing is ok)
3042       time.sleep(15)
3043       feedback_fn("* checking mirrors status")
3044       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3045     else:
3046       disk_abort = False
3047
3048     if disk_abort:
3049       _RemoveDisks(iobj, self.cfg)
3050       self.cfg.RemoveInstance(iobj.name)
3051       raise errors.OpExecError("There are some degraded disks for"
3052                                " this instance")
3053
3054     feedback_fn("creating os for instance %s on node %s" %
3055                 (instance, pnode_name))
3056
3057     if iobj.disk_template != constants.DT_DISKLESS:
3058       if self.op.mode == constants.INSTANCE_CREATE:
3059         feedback_fn("* running the instance OS create scripts...")
3060         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3061           raise errors.OpExecError("could not add os for instance %s"
3062                                    " on node %s" %
3063                                    (instance, pnode_name))
3064
3065       elif self.op.mode == constants.INSTANCE_IMPORT:
3066         feedback_fn("* running the instance OS import scripts...")
3067         src_node = self.op.src_node
3068         src_image = self.src_image
3069         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3070                                                 src_node, src_image):
3071           raise errors.OpExecError("Could not import os for instance"
3072                                    " %s on node %s" %
3073                                    (instance, pnode_name))
3074       else:
3075         # also checked in the prereq part
3076         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3077                                      % self.op.mode)
3078
3079     if self.op.start:
3080       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3081       feedback_fn("* starting instance...")
3082       if not rpc.call_instance_start(pnode_name, iobj, None):
3083         raise errors.OpExecError("Could not start instance")
3084
3085
3086 class LUConnectConsole(NoHooksLU):
3087   """Connect to an instance's console.
3088
3089   This is somewhat special in that it returns the command line that
3090   you need to run on the master node in order to connect to the
3091   console.
3092
3093   """
3094   _OP_REQP = ["instance_name"]
3095
3096   def CheckPrereq(self):
3097     """Check prerequisites.
3098
3099     This checks that the instance is in the cluster.
3100
3101     """
3102     instance = self.cfg.GetInstanceInfo(
3103       self.cfg.ExpandInstanceName(self.op.instance_name))
3104     if instance is None:
3105       raise errors.OpPrereqError("Instance '%s' not known" %
3106                                  self.op.instance_name)
3107     self.instance = instance
3108
3109   def Exec(self, feedback_fn):
3110     """Connect to the console of an instance
3111
3112     """
3113     instance = self.instance
3114     node = instance.primary_node
3115
3116     node_insts = rpc.call_instance_list([node])[node]
3117     if node_insts is False:
3118       raise errors.OpExecError("Can't connect to node %s." % node)
3119
3120     if instance.name not in node_insts:
3121       raise errors.OpExecError("Instance %s is not running." % instance.name)
3122
3123     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3124
3125     hyper = hypervisor.GetHypervisor()
3126     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3127     # build ssh cmdline
3128     argv = ["ssh", "-q", "-t"]
3129     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3130     argv.extend(ssh.BATCH_MODE_OPTS)
3131     argv.append(node)
3132     argv.append(console_cmd)
3133     return "ssh", argv
3134
3135
3136 class LUAddMDDRBDComponent(LogicalUnit):
3137   """Adda new mirror member to an instance's disk.
3138
3139   """
3140   HPATH = "mirror-add"
3141   HTYPE = constants.HTYPE_INSTANCE
3142   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3143
3144   def BuildHooksEnv(self):
3145     """Build hooks env.
3146
3147     This runs on the master, the primary and all the secondaries.
3148
3149     """
3150     env = {
3151       "NEW_SECONDARY": self.op.remote_node,
3152       "DISK_NAME": self.op.disk_name,
3153       }
3154     env.update(_BuildInstanceHookEnvByObject(self.instance))
3155     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3156           self.op.remote_node,] + list(self.instance.secondary_nodes)
3157     return env, nl, nl
3158
3159   def CheckPrereq(self):
3160     """Check prerequisites.
3161
3162     This checks that the instance is in the cluster.
3163
3164     """
3165     instance = self.cfg.GetInstanceInfo(
3166       self.cfg.ExpandInstanceName(self.op.instance_name))
3167     if instance is None:
3168       raise errors.OpPrereqError("Instance '%s' not known" %
3169                                  self.op.instance_name)
3170     self.instance = instance
3171
3172     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3173     if remote_node is None:
3174       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3175     self.remote_node = remote_node
3176
3177     if remote_node == instance.primary_node:
3178       raise errors.OpPrereqError("The specified node is the primary node of"
3179                                  " the instance.")
3180
3181     if instance.disk_template != constants.DT_REMOTE_RAID1:
3182       raise errors.OpPrereqError("Instance's disk layout is not"
3183                                  " remote_raid1.")
3184     for disk in instance.disks:
3185       if disk.iv_name == self.op.disk_name:
3186         break
3187     else:
3188       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3189                                  " instance." % self.op.disk_name)
3190     if len(disk.children) > 1:
3191       raise errors.OpPrereqError("The device already has two slave"
3192                                  " devices.\n"
3193                                  "This would create a 3-disk raid1"
3194                                  " which we don't allow.")
3195     self.disk = disk
3196
3197   def Exec(self, feedback_fn):
3198     """Add the mirror component
3199
3200     """
3201     disk = self.disk
3202     instance = self.instance
3203
3204     remote_node = self.remote_node
3205     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3206     names = _GenerateUniqueNames(self.cfg, lv_names)
3207     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3208                                      remote_node, disk.size, names)
3209
3210     logger.Info("adding new mirror component on secondary")
3211     #HARDCODE
3212     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3213                                       new_drbd, False,
3214                                       _GetInstanceInfoText(instance)):
3215       raise errors.OpExecError("Failed to create new component on secondary"
3216                                " node %s" % remote_node)
3217
3218     logger.Info("adding new mirror component on primary")
3219     #HARDCODE
3220     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3221                                     instance, new_drbd,
3222                                     _GetInstanceInfoText(instance)):
3223       # remove secondary dev
3224       self.cfg.SetDiskID(new_drbd, remote_node)
3225       rpc.call_blockdev_remove(remote_node, new_drbd)
3226       raise errors.OpExecError("Failed to create volume on primary")
3227
3228     # the device exists now
3229     # call the primary node to add the mirror to md
3230     logger.Info("adding new mirror component to md")
3231     if not rpc.call_blockdev_addchildren(instance.primary_node,
3232                                          disk, [new_drbd]):
3233       logger.Error("Can't add mirror compoment to md!")
3234       self.cfg.SetDiskID(new_drbd, remote_node)
3235       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3236         logger.Error("Can't rollback on secondary")
3237       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3238       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3239         logger.Error("Can't rollback on primary")
3240       raise errors.OpExecError("Can't add mirror component to md array")
3241
3242     disk.children.append(new_drbd)
3243
3244     self.cfg.AddInstance(instance)
3245
3246     _WaitForSync(self.cfg, instance)
3247
3248     return 0
3249
3250
3251 class LURemoveMDDRBDComponent(LogicalUnit):
3252   """Remove a component from a remote_raid1 disk.
3253
3254   """
3255   HPATH = "mirror-remove"
3256   HTYPE = constants.HTYPE_INSTANCE
3257   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3258
3259   def BuildHooksEnv(self):
3260     """Build hooks env.
3261
3262     This runs on the master, the primary and all the secondaries.
3263
3264     """
3265     env = {
3266       "DISK_NAME": self.op.disk_name,
3267       "DISK_ID": self.op.disk_id,
3268       "OLD_SECONDARY": self.old_secondary,
3269       }
3270     env.update(_BuildInstanceHookEnvByObject(self.instance))
3271     nl = [self.sstore.GetMasterNode(),
3272           self.instance.primary_node] + list(self.instance.secondary_nodes)
3273     return env, nl, nl
3274
3275   def CheckPrereq(self):
3276     """Check prerequisites.
3277
3278     This checks that the instance is in the cluster.
3279
3280     """
3281     instance = self.cfg.GetInstanceInfo(
3282       self.cfg.ExpandInstanceName(self.op.instance_name))
3283     if instance is None:
3284       raise errors.OpPrereqError("Instance '%s' not known" %
3285                                  self.op.instance_name)
3286     self.instance = instance
3287
3288     if instance.disk_template != constants.DT_REMOTE_RAID1:
3289       raise errors.OpPrereqError("Instance's disk layout is not"
3290                                  " remote_raid1.")
3291     for disk in instance.disks:
3292       if disk.iv_name == self.op.disk_name:
3293         break
3294     else:
3295       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3296                                  " instance." % self.op.disk_name)
3297     for child in disk.children:
3298       if (child.dev_type == constants.LD_DRBD7 and
3299           child.logical_id[2] == self.op.disk_id):
3300         break
3301     else:
3302       raise errors.OpPrereqError("Can't find the device with this port.")
3303
3304     if len(disk.children) < 2:
3305       raise errors.OpPrereqError("Cannot remove the last component from"
3306                                  " a mirror.")
3307     self.disk = disk
3308     self.child = child
3309     if self.child.logical_id[0] == instance.primary_node:
3310       oid = 1
3311     else:
3312       oid = 0
3313     self.old_secondary = self.child.logical_id[oid]
3314
3315   def Exec(self, feedback_fn):
3316     """Remove the mirror component
3317
3318     """
3319     instance = self.instance
3320     disk = self.disk
3321     child = self.child
3322     logger.Info("remove mirror component")
3323     self.cfg.SetDiskID(disk, instance.primary_node)
3324     if not rpc.call_blockdev_removechildren(instance.primary_node,
3325                                             disk, [child]):
3326       raise errors.OpExecError("Can't remove child from mirror.")
3327
3328     for node in child.logical_id[:2]:
3329       self.cfg.SetDiskID(child, node)
3330       if not rpc.call_blockdev_remove(node, child):
3331         logger.Error("Warning: failed to remove device from node %s,"
3332                      " continuing operation." % node)
3333
3334     disk.children.remove(child)
3335     self.cfg.AddInstance(instance)
3336
3337
3338 class LUReplaceDisks(LogicalUnit):
3339   """Replace the disks of an instance.
3340
3341   """
3342   HPATH = "mirrors-replace"
3343   HTYPE = constants.HTYPE_INSTANCE
3344   _OP_REQP = ["instance_name", "mode", "disks"]
3345
3346   def BuildHooksEnv(self):
3347     """Build hooks env.
3348
3349     This runs on the master, the primary and all the secondaries.
3350
3351     """
3352     env = {
3353       "MODE": self.op.mode,
3354       "NEW_SECONDARY": self.op.remote_node,
3355       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3356       }
3357     env.update(_BuildInstanceHookEnvByObject(self.instance))
3358     nl = [self.sstore.GetMasterNode(),
3359           self.instance.primary_node] + list(self.instance.secondary_nodes)
3360     return env, nl, nl
3361
3362   def CheckPrereq(self):
3363     """Check prerequisites.
3364
3365     This checks that the instance is in the cluster.
3366
3367     """
3368     instance = self.cfg.GetInstanceInfo(
3369       self.cfg.ExpandInstanceName(self.op.instance_name))
3370     if instance is None:
3371       raise errors.OpPrereqError("Instance '%s' not known" %
3372                                  self.op.instance_name)
3373     self.instance = instance
3374
3375     if instance.disk_template not in constants.DTS_NET_MIRROR:
3376       raise errors.OpPrereqError("Instance's disk layout is not"
3377                                  " network mirrored.")
3378
3379     if len(instance.secondary_nodes) != 1:
3380       raise errors.OpPrereqError("The instance has a strange layout,"
3381                                  " expected one secondary but found %d" %
3382                                  len(instance.secondary_nodes))
3383
3384     self.sec_node = instance.secondary_nodes[0]
3385
3386     remote_node = getattr(self.op, "remote_node", None)
3387     if remote_node is not None:
3388       remote_node = self.cfg.ExpandNodeName(remote_node)
3389       if remote_node is None:
3390         raise errors.OpPrereqError("Node '%s' not known" %
3391                                    self.op.remote_node)
3392       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3393     else:
3394       self.remote_node_info = None
3395     if remote_node == instance.primary_node:
3396       raise errors.OpPrereqError("The specified node is the primary node of"
3397                                  " the instance.")
3398     elif remote_node == self.sec_node:
3399       # the user gave the current secondary, switch to
3400       # 'no-replace-secondary' mode
3401       remote_node = None
3402     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3403         self.op.mode != constants.REPLACE_DISK_ALL):
3404       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3405                                  " disks replacement, not individual ones")
3406     if instance.disk_template == constants.DT_DRBD8:
3407       if self.op.mode == constants.REPLACE_DISK_ALL:
3408         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3409                                    " secondary disk replacement, not"
3410                                    " both at once")
3411       elif self.op.mode == constants.REPLACE_DISK_PRI:
3412         if remote_node is not None:
3413           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3414                                      " the secondary while doing a primary"
3415                                      " node disk replacement")
3416         self.tgt_node = instance.primary_node
3417       elif self.op.mode == constants.REPLACE_DISK_SEC:
3418         self.new_node = remote_node # this can be None, in which case
3419                                     # we don't change the secondary
3420         self.tgt_node = instance.secondary_nodes[0]
3421       else:
3422         raise errors.ProgrammerError("Unhandled disk replace mode")
3423
3424     for name in self.op.disks:
3425       if instance.FindDisk(name) is None:
3426         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3427                                    (name, instance.name))
3428     self.op.remote_node = remote_node
3429
3430   def _ExecRR1(self, feedback_fn):
3431     """Replace the disks of an instance.
3432
3433     """
3434     instance = self.instance
3435     iv_names = {}
3436     # start of work
3437     if self.op.remote_node is None:
3438       remote_node = self.sec_node
3439     else:
3440       remote_node = self.op.remote_node
3441     cfg = self.cfg
3442     for dev in instance.disks:
3443       size = dev.size
3444       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3445       names = _GenerateUniqueNames(cfg, lv_names)
3446       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3447                                        remote_node, size, names)
3448       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3449       logger.Info("adding new mirror component on secondary for %s" %
3450                   dev.iv_name)
3451       #HARDCODE
3452       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3453                                         new_drbd, False,
3454                                         _GetInstanceInfoText(instance)):
3455         raise errors.OpExecError("Failed to create new component on"
3456                                  " secondary node %s\n"
3457                                  "Full abort, cleanup manually!" %
3458                                  remote_node)
3459
3460       logger.Info("adding new mirror component on primary")
3461       #HARDCODE
3462       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3463                                       instance, new_drbd,
3464                                       _GetInstanceInfoText(instance)):
3465         # remove secondary dev
3466         cfg.SetDiskID(new_drbd, remote_node)
3467         rpc.call_blockdev_remove(remote_node, new_drbd)
3468         raise errors.OpExecError("Failed to create volume on primary!\n"
3469                                  "Full abort, cleanup manually!!")
3470
3471       # the device exists now
3472       # call the primary node to add the mirror to md
3473       logger.Info("adding new mirror component to md")
3474       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3475                                            [new_drbd]):
3476         logger.Error("Can't add mirror compoment to md!")
3477         cfg.SetDiskID(new_drbd, remote_node)
3478         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3479           logger.Error("Can't rollback on secondary")
3480         cfg.SetDiskID(new_drbd, instance.primary_node)
3481         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3482           logger.Error("Can't rollback on primary")
3483         raise errors.OpExecError("Full abort, cleanup manually!!")
3484
3485       dev.children.append(new_drbd)
3486       cfg.AddInstance(instance)
3487
3488     # this can fail as the old devices are degraded and _WaitForSync
3489     # does a combined result over all disks, so we don't check its
3490     # return value
3491     _WaitForSync(cfg, instance, unlock=True)
3492
3493     # so check manually all the devices
3494     for name in iv_names:
3495       dev, child, new_drbd = iv_names[name]
3496       cfg.SetDiskID(dev, instance.primary_node)
3497       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3498       if is_degr:
3499         raise errors.OpExecError("MD device %s is degraded!" % name)
3500       cfg.SetDiskID(new_drbd, instance.primary_node)
3501       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3502       if is_degr:
3503         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3504
3505     for name in iv_names:
3506       dev, child, new_drbd = iv_names[name]
3507       logger.Info("remove mirror %s component" % name)
3508       cfg.SetDiskID(dev, instance.primary_node)
3509       if not rpc.call_blockdev_removechildren(instance.primary_node,
3510                                               dev, [child]):
3511         logger.Error("Can't remove child from mirror, aborting"
3512                      " *this device cleanup*.\nYou need to cleanup manually!!")
3513         continue
3514
3515       for node in child.logical_id[:2]:
3516         logger.Info("remove child device on %s" % node)
3517         cfg.SetDiskID(child, node)
3518         if not rpc.call_blockdev_remove(node, child):
3519           logger.Error("Warning: failed to remove device from node %s,"
3520                        " continuing operation." % node)
3521
3522       dev.children.remove(child)
3523
3524       cfg.AddInstance(instance)
3525
3526   def _ExecD8DiskOnly(self, feedback_fn):
3527     """Replace a disk on the primary or secondary for dbrd8.
3528
3529     The algorithm for replace is quite complicated:
3530       - for each disk to be replaced:
3531         - create new LVs on the target node with unique names
3532         - detach old LVs from the drbd device
3533         - rename old LVs to name_replaced.<time_t>
3534         - rename new LVs to old LVs
3535         - attach the new LVs (with the old names now) to the drbd device
3536       - wait for sync across all devices
3537       - for each modified disk:
3538         - remove old LVs (which have the name name_replaces.<time_t>)
3539
3540     Failures are not very well handled.
3541     """
3542     instance = self.instance
3543     iv_names = {}
3544     vgname = self.cfg.GetVGName()
3545     # start of work
3546     cfg = self.cfg
3547     tgt_node = self.tgt_node
3548     for dev in instance.disks:
3549       if not dev.iv_name in self.op.disks:
3550         continue
3551       size = dev.size
3552       cfg.SetDiskID(dev, tgt_node)
3553       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3554       names = _GenerateUniqueNames(cfg, lv_names)
3555       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3556                              logical_id=(vgname, names[0]))
3557       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3558                              logical_id=(vgname, names[1]))
3559       new_lvs = [lv_data, lv_meta]
3560       old_lvs = dev.children
3561       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3562       logger.Info("adding new local storage on %s for %s" %
3563                   (tgt_node, dev.iv_name))
3564       # since we *always* want to create this LV, we use the
3565       # _Create...OnPrimary (which forces the creation), even if we
3566       # are talking about the secondary node
3567       for new_lv in new_lvs:
3568         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3569                                         _GetInstanceInfoText(instance)):
3570           raise errors.OpExecError("Failed to create new LV named '%s' on"
3571                                    " node '%s'" %
3572                                    (new_lv.logical_id[1], tgt_node))
3573
3574       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3575         raise errors.OpExecError("Can't detach drbd from local storage on node"
3576                                  " %s for device %s" % (tgt_node, dev.iv_name))
3577       dev.children = []
3578       cfg.Update(instance)
3579
3580       # ok, we created the new LVs, so now we know we have the needed
3581       # storage; as such, we proceed on the target node to rename
3582       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3583       # using the assumption than logical_id == physical_id (which in
3584       # turn is the unique_id on that node)
3585       temp_suffix = int(time.time())
3586       logger.Info("renaming the old LVs on the target node")
3587       ren_fn = lambda d, suff: (d.physical_id[0],
3588                                 d.physical_id[1] + "_replaced-%s" % suff)
3589       rlist = [(disk, ren_fn(disk, temp_suffix)) for disk in old_lvs]
3590       if not rpc.call_blockdev_rename(tgt_node, rlist):
3591         logger.Error("Can't rename old LVs on node %s" % tgt_node)
3592         do_change_old = False
3593       else:
3594         do_change_old = True
3595       # now we rename the new LVs to the old LVs
3596       logger.Info("renaming the new LVs on the target node")
3597       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3598       if not rpc.call_blockdev_rename(tgt_node, rlist):
3599         logger.Error("Can't rename new LVs on node %s" % tgt_node)
3600       else:
3601         for old, new in zip(old_lvs, new_lvs):
3602           new.logical_id = old.logical_id
3603           cfg.SetDiskID(new, tgt_node)
3604
3605       if do_change_old:
3606         for disk in old_lvs:
3607           disk.logical_id = ren_fn(disk, temp_suffix)
3608           cfg.SetDiskID(disk, tgt_node)
3609
3610       # now that the new lvs have the old name, we can add them to the device
3611       logger.Info("adding new mirror component on %s" % tgt_node)
3612       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3613         logger.Error("Can't add local storage to drbd!")
3614         for new_lv in new_lvs:
3615           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3616             logger.Error("Can't rollback device %s")
3617         return
3618
3619       dev.children = new_lvs
3620       cfg.Update(instance)
3621
3622
3623     # this can fail as the old devices are degraded and _WaitForSync
3624     # does a combined result over all disks, so we don't check its
3625     # return value
3626     logger.Info("Done changing drbd configs, waiting for sync")
3627     _WaitForSync(cfg, instance, unlock=True)
3628
3629     # so check manually all the devices
3630     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3631       cfg.SetDiskID(dev, instance.primary_node)
3632       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3633       if is_degr:
3634         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3635
3636     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3637       logger.Info("remove logical volumes for %s" % name)
3638       for lv in old_lvs:
3639         cfg.SetDiskID(lv, tgt_node)
3640         if not rpc.call_blockdev_remove(tgt_node, lv):
3641           logger.Error("Can't cleanup child device, skipping. You need to"
3642                        " fix manually!")
3643           continue
3644
3645   def _ExecD8Secondary(self, feedback_fn):
3646     """Replace the secondary node for drbd8.
3647
3648     The algorithm for replace is quite complicated:
3649       - for all disks of the instance:
3650         - create new LVs on the new node with same names
3651         - shutdown the drbd device on the old secondary
3652         - disconnect the drbd network on the primary
3653         - create the drbd device on the new secondary
3654         - network attach the drbd on the primary, using an artifice:
3655           the drbd code for Attach() will connect to the network if it
3656           finds a device which is connected to the good local disks but
3657           not network enabled
3658       - wait for sync across all devices
3659       - remove all disks from the old secondary
3660
3661     Failures are not very well handled.
3662     """
3663     instance = self.instance
3664     iv_names = {}
3665     vgname = self.cfg.GetVGName()
3666     # start of work
3667     cfg = self.cfg
3668     old_node = self.tgt_node
3669     new_node = self.new_node
3670     pri_node = instance.primary_node
3671     for dev in instance.disks:
3672       size = dev.size
3673       logger.Info("adding new local storage on %s for %s" %
3674                   (new_node, dev.iv_name))
3675       # since we *always* want to create this LV, we use the
3676       # _Create...OnPrimary (which forces the creation), even if we
3677       # are talking about the secondary node
3678       for new_lv in dev.children:
3679         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3680                                         _GetInstanceInfoText(instance)):
3681           raise errors.OpExecError("Failed to create new LV named '%s' on"
3682                                    " node '%s'" %
3683                                    (new_lv.logical_id[1], new_node))
3684
3685       # create new devices on new_node
3686       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3687                               logical_id=(pri_node, new_node,
3688                                           dev.logical_id[2]),
3689                               children=dev.children)
3690       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3691                                         new_drbd, False,
3692                                       _GetInstanceInfoText(instance)):
3693         raise errors.OpExecError("Failed to create new DRBD on"
3694                                  " node '%s'" % new_node)
3695
3696       # we have new devices, shutdown the drbd on the old secondary
3697       cfg.SetDiskID(dev, old_node)
3698       if not rpc.call_blockdev_shutdown(old_node, dev):
3699         raise errors.OpExecError("Failed to shutdown DRBD on old node")
3700
3701       # we have new storage, we 'rename' the network on the primary
3702       cfg.SetDiskID(dev, pri_node)
3703       # rename to the ip of the new node
3704       new_uid = list(dev.physical_id)
3705       new_uid[2] = self.remote_node_info.secondary_ip
3706       rlist = [(dev, tuple(new_uid))]
3707       if not rpc.call_blockdev_rename(pri_node, rlist):
3708         raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3709                                  " %s from %s to %s" %
3710                                  (dev.iv_name, pri_node, old_node, new_node))
3711       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3712       cfg.SetDiskID(dev, pri_node)
3713       cfg.Update(instance)
3714
3715       iv_names[dev.iv_name] = (dev, dev.children)
3716
3717     # this can fail as the old devices are degraded and _WaitForSync
3718     # does a combined result over all disks, so we don't check its
3719     # return value
3720     logger.Info("Done changing drbd configs, waiting for sync")
3721     _WaitForSync(cfg, instance, unlock=True)
3722
3723     # so check manually all the devices
3724     for name, (dev, old_lvs) in iv_names.iteritems():
3725       cfg.SetDiskID(dev, pri_node)
3726       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3727       if is_degr:
3728         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3729
3730     for name, (dev, old_lvs) in iv_names.iteritems():
3731       logger.Info("remove logical volumes for %s" % name)
3732       for lv in old_lvs:
3733         cfg.SetDiskID(lv, old_node)
3734         if not rpc.call_blockdev_remove(old_node, lv):
3735           logger.Error("Can't cleanup child device, skipping. You need to"
3736                        " fix manually!")
3737           continue
3738
3739   def Exec(self, feedback_fn):
3740     """Execute disk replacement.
3741
3742     This dispatches the disk replacement to the appropriate handler.
3743
3744     """
3745     instance = self.instance
3746     if instance.disk_template == constants.DT_REMOTE_RAID1:
3747       fn = self._ExecRR1
3748     elif instance.disk_template == constants.DT_DRBD8:
3749       if self.op.remote_node is None:
3750         fn = self._ExecD8DiskOnly
3751       else:
3752         fn = self._ExecD8Secondary
3753     else:
3754       raise errors.ProgrammerError("Unhandled disk replacement case")
3755     return fn(feedback_fn)
3756
3757
3758 class LUQueryInstanceData(NoHooksLU):
3759   """Query runtime instance data.
3760
3761   """
3762   _OP_REQP = ["instances"]
3763
3764   def CheckPrereq(self):
3765     """Check prerequisites.
3766
3767     This only checks the optional instance list against the existing names.
3768
3769     """
3770     if not isinstance(self.op.instances, list):
3771       raise errors.OpPrereqError("Invalid argument type 'instances'")
3772     if self.op.instances:
3773       self.wanted_instances = []
3774       names = self.op.instances
3775       for name in names:
3776         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3777         if instance is None:
3778           raise errors.OpPrereqError("No such instance name '%s'" % name)
3779       self.wanted_instances.append(instance)
3780     else:
3781       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3782                                in self.cfg.GetInstanceList()]
3783     return
3784
3785
3786   def _ComputeDiskStatus(self, instance, snode, dev):
3787     """Compute block device status.
3788
3789     """
3790     self.cfg.SetDiskID(dev, instance.primary_node)
3791     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3792     if dev.dev_type in constants.LDS_DRBD:
3793       # we change the snode then (otherwise we use the one passed in)
3794       if dev.logical_id[0] == instance.primary_node:
3795         snode = dev.logical_id[1]
3796       else:
3797         snode = dev.logical_id[0]
3798
3799     if snode:
3800       self.cfg.SetDiskID(dev, snode)
3801       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3802     else:
3803       dev_sstatus = None
3804
3805     if dev.children:
3806       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3807                       for child in dev.children]
3808     else:
3809       dev_children = []
3810
3811     data = {
3812       "iv_name": dev.iv_name,
3813       "dev_type": dev.dev_type,
3814       "logical_id": dev.logical_id,
3815       "physical_id": dev.physical_id,
3816       "pstatus": dev_pstatus,
3817       "sstatus": dev_sstatus,
3818       "children": dev_children,
3819       }
3820
3821     return data
3822
3823   def Exec(self, feedback_fn):
3824     """Gather and return data"""
3825     result = {}
3826     for instance in self.wanted_instances:
3827       remote_info = rpc.call_instance_info(instance.primary_node,
3828                                                 instance.name)
3829       if remote_info and "state" in remote_info:
3830         remote_state = "up"
3831       else:
3832         remote_state = "down"
3833       if instance.status == "down":
3834         config_state = "down"
3835       else:
3836         config_state = "up"
3837
3838       disks = [self._ComputeDiskStatus(instance, None, device)
3839                for device in instance.disks]
3840
3841       idict = {
3842         "name": instance.name,
3843         "config_state": config_state,
3844         "run_state": remote_state,
3845         "pnode": instance.primary_node,
3846         "snodes": instance.secondary_nodes,
3847         "os": instance.os,
3848         "memory": instance.memory,
3849         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3850         "disks": disks,
3851         "vcpus": instance.vcpus,
3852         }
3853
3854       result[instance.name] = idict
3855
3856     return result
3857
3858
3859 class LUSetInstanceParms(LogicalUnit):
3860   """Modifies an instances's parameters.
3861
3862   """
3863   HPATH = "instance-modify"
3864   HTYPE = constants.HTYPE_INSTANCE
3865   _OP_REQP = ["instance_name"]
3866
3867   def BuildHooksEnv(self):
3868     """Build hooks env.
3869
3870     This runs on the master, primary and secondaries.
3871
3872     """
3873     args = dict()
3874     if self.mem:
3875       args['memory'] = self.mem
3876     if self.vcpus:
3877       args['vcpus'] = self.vcpus
3878     if self.do_ip or self.do_bridge:
3879       if self.do_ip:
3880         ip = self.ip
3881       else:
3882         ip = self.instance.nics[0].ip
3883       if self.bridge:
3884         bridge = self.bridge
3885       else:
3886         bridge = self.instance.nics[0].bridge
3887       args['nics'] = [(ip, bridge)]
3888     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3889     nl = [self.sstore.GetMasterNode(),
3890           self.instance.primary_node] + list(self.instance.secondary_nodes)
3891     return env, nl, nl
3892
3893   def CheckPrereq(self):
3894     """Check prerequisites.
3895
3896     This only checks the instance list against the existing names.
3897
3898     """
3899     self.mem = getattr(self.op, "mem", None)
3900     self.vcpus = getattr(self.op, "vcpus", None)
3901     self.ip = getattr(self.op, "ip", None)
3902     self.bridge = getattr(self.op, "bridge", None)
3903     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3904       raise errors.OpPrereqError("No changes submitted")
3905     if self.mem is not None:
3906       try:
3907         self.mem = int(self.mem)
3908       except ValueError, err:
3909         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3910     if self.vcpus is not None:
3911       try:
3912         self.vcpus = int(self.vcpus)
3913       except ValueError, err:
3914         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3915     if self.ip is not None:
3916       self.do_ip = True
3917       if self.ip.lower() == "none":
3918         self.ip = None
3919       else:
3920         if not utils.IsValidIP(self.ip):
3921           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3922     else:
3923       self.do_ip = False
3924     self.do_bridge = (self.bridge is not None)
3925
3926     instance = self.cfg.GetInstanceInfo(
3927       self.cfg.ExpandInstanceName(self.op.instance_name))
3928     if instance is None:
3929       raise errors.OpPrereqError("No such instance name '%s'" %
3930                                  self.op.instance_name)
3931     self.op.instance_name = instance.name
3932     self.instance = instance
3933     return
3934
3935   def Exec(self, feedback_fn):
3936     """Modifies an instance.
3937
3938     All parameters take effect only at the next restart of the instance.
3939     """
3940     result = []
3941     instance = self.instance
3942     if self.mem:
3943       instance.memory = self.mem
3944       result.append(("mem", self.mem))
3945     if self.vcpus:
3946       instance.vcpus = self.vcpus
3947       result.append(("vcpus",  self.vcpus))
3948     if self.do_ip:
3949       instance.nics[0].ip = self.ip
3950       result.append(("ip", self.ip))
3951     if self.bridge:
3952       instance.nics[0].bridge = self.bridge
3953       result.append(("bridge", self.bridge))
3954
3955     self.cfg.AddInstance(instance)
3956
3957     return result
3958
3959
3960 class LUQueryExports(NoHooksLU):
3961   """Query the exports list
3962
3963   """
3964   _OP_REQP = []
3965
3966   def CheckPrereq(self):
3967     """Check that the nodelist contains only existing nodes.
3968
3969     """
3970     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3971
3972   def Exec(self, feedback_fn):
3973     """Compute the list of all the exported system images.
3974
3975     Returns:
3976       a dictionary with the structure node->(export-list)
3977       where export-list is a list of the instances exported on
3978       that node.
3979
3980     """
3981     return rpc.call_export_list(self.nodes)
3982
3983
3984 class LUExportInstance(LogicalUnit):
3985   """Export an instance to an image in the cluster.
3986
3987   """
3988   HPATH = "instance-export"
3989   HTYPE = constants.HTYPE_INSTANCE
3990   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3991
3992   def BuildHooksEnv(self):
3993     """Build hooks env.
3994
3995     This will run on the master, primary node and target node.
3996
3997     """
3998     env = {
3999       "EXPORT_NODE": self.op.target_node,
4000       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4001       }
4002     env.update(_BuildInstanceHookEnvByObject(self.instance))
4003     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4004           self.op.target_node]
4005     return env, nl, nl
4006
4007   def CheckPrereq(self):
4008     """Check prerequisites.
4009
4010     This checks that the instance name is a valid one.
4011
4012     """
4013     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4014     self.instance = self.cfg.GetInstanceInfo(instance_name)
4015     if self.instance is None:
4016       raise errors.OpPrereqError("Instance '%s' not found" %
4017                                  self.op.instance_name)
4018
4019     # node verification
4020     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4021     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4022
4023     if self.dst_node is None:
4024       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4025                                  self.op.target_node)
4026     self.op.target_node = self.dst_node.name
4027
4028   def Exec(self, feedback_fn):
4029     """Export an instance to an image in the cluster.
4030
4031     """
4032     instance = self.instance
4033     dst_node = self.dst_node
4034     src_node = instance.primary_node
4035     # shutdown the instance, unless requested not to do so
4036     if self.op.shutdown:
4037       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4038       self.processor.ChainOpCode(op)
4039
4040     vgname = self.cfg.GetVGName()
4041
4042     snap_disks = []
4043
4044     try:
4045       for disk in instance.disks:
4046         if disk.iv_name == "sda":
4047           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4048           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4049
4050           if not new_dev_name:
4051             logger.Error("could not snapshot block device %s on node %s" %
4052                          (disk.logical_id[1], src_node))
4053           else:
4054             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4055                                       logical_id=(vgname, new_dev_name),
4056                                       physical_id=(vgname, new_dev_name),
4057                                       iv_name=disk.iv_name)
4058             snap_disks.append(new_dev)
4059
4060     finally:
4061       if self.op.shutdown:
4062         op = opcodes.OpStartupInstance(instance_name=instance.name,
4063                                        force=False)
4064         self.processor.ChainOpCode(op)
4065
4066     # TODO: check for size
4067
4068     for dev in snap_disks:
4069       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4070                                            instance):
4071         logger.Error("could not export block device %s from node"
4072                      " %s to node %s" %
4073                      (dev.logical_id[1], src_node, dst_node.name))
4074       if not rpc.call_blockdev_remove(src_node, dev):
4075         logger.Error("could not remove snapshot block device %s from"
4076                      " node %s" % (dev.logical_id[1], src_node))
4077
4078     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4079       logger.Error("could not finalize export for instance %s on node %s" %
4080                    (instance.name, dst_node.name))
4081
4082     nodelist = self.cfg.GetNodeList()
4083     nodelist.remove(dst_node.name)
4084
4085     # on one-node clusters nodelist will be empty after the removal
4086     # if we proceed the backup would be removed because OpQueryExports
4087     # substitutes an empty list with the full cluster node list.
4088     if nodelist:
4089       op = opcodes.OpQueryExports(nodes=nodelist)
4090       exportlist = self.processor.ChainOpCode(op)
4091       for node in exportlist:
4092         if instance.name in exportlist[node]:
4093           if not rpc.call_export_remove(node, instance.name):
4094             logger.Error("could not remove older export for instance %s"
4095                          " on node %s" % (instance.name, node))
4096
4097
4098 class TagsLU(NoHooksLU):
4099   """Generic tags LU.
4100
4101   This is an abstract class which is the parent of all the other tags LUs.
4102
4103   """
4104   def CheckPrereq(self):
4105     """Check prerequisites.
4106
4107     """
4108     if self.op.kind == constants.TAG_CLUSTER:
4109       self.target = self.cfg.GetClusterInfo()
4110     elif self.op.kind == constants.TAG_NODE:
4111       name = self.cfg.ExpandNodeName(self.op.name)
4112       if name is None:
4113         raise errors.OpPrereqError("Invalid node name (%s)" %
4114                                    (self.op.name,))
4115       self.op.name = name
4116       self.target = self.cfg.GetNodeInfo(name)
4117     elif self.op.kind == constants.TAG_INSTANCE:
4118       name = self.cfg.ExpandInstanceName(self.op.name)
4119       if name is None:
4120         raise errors.OpPrereqError("Invalid instance name (%s)" %
4121                                    (self.op.name,))
4122       self.op.name = name
4123       self.target = self.cfg.GetInstanceInfo(name)
4124     else:
4125       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4126                                  str(self.op.kind))
4127
4128
4129 class LUGetTags(TagsLU):
4130   """Returns the tags of a given object.
4131
4132   """
4133   _OP_REQP = ["kind", "name"]
4134
4135   def Exec(self, feedback_fn):
4136     """Returns the tag list.
4137
4138     """
4139     return self.target.GetTags()
4140
4141
4142 class LUSearchTags(NoHooksLU):
4143   """Searches the tags for a given pattern.
4144
4145   """
4146   _OP_REQP = ["pattern"]
4147
4148   def CheckPrereq(self):
4149     """Check prerequisites.
4150
4151     This checks the pattern passed for validity by compiling it.
4152
4153     """
4154     try:
4155       self.re = re.compile(self.op.pattern)
4156     except re.error, err:
4157       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4158                                  (self.op.pattern, err))
4159
4160   def Exec(self, feedback_fn):
4161     """Returns the tag list.
4162
4163     """
4164     cfg = self.cfg
4165     tgts = [("/cluster", cfg.GetClusterInfo())]
4166     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4167     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4168     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4169     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4170     results = []
4171     for path, target in tgts:
4172       for tag in target.GetTags():
4173         if self.re.search(tag):
4174           results.append((path, tag))
4175     return results
4176
4177
4178 class LUAddTags(TagsLU):
4179   """Sets a tag on a given object.
4180
4181   """
4182   _OP_REQP = ["kind", "name", "tags"]
4183
4184   def CheckPrereq(self):
4185     """Check prerequisites.
4186
4187     This checks the type and length of the tag name and value.
4188
4189     """
4190     TagsLU.CheckPrereq(self)
4191     for tag in self.op.tags:
4192       objects.TaggableObject.ValidateTag(tag)
4193
4194   def Exec(self, feedback_fn):
4195     """Sets the tag.
4196
4197     """
4198     try:
4199       for tag in self.op.tags:
4200         self.target.AddTag(tag)
4201     except errors.TagError, err:
4202       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4203     try:
4204       self.cfg.Update(self.target)
4205     except errors.ConfigurationError:
4206       raise errors.OpRetryError("There has been a modification to the"
4207                                 " config file and the operation has been"
4208                                 " aborted. Please retry.")
4209
4210
4211 class LUDelTags(TagsLU):
4212   """Delete a list of tags from a given object.
4213
4214   """
4215   _OP_REQP = ["kind", "name", "tags"]
4216
4217   def CheckPrereq(self):
4218     """Check prerequisites.
4219
4220     This checks that we have the given tag.
4221
4222     """
4223     TagsLU.CheckPrereq(self)
4224     for tag in self.op.tags:
4225       objects.TaggableObject.ValidateTag(tag)
4226     del_tags = frozenset(self.op.tags)
4227     cur_tags = self.target.GetTags()
4228     if not del_tags <= cur_tags:
4229       diff_tags = del_tags - cur_tags
4230       diff_names = ["'%s'" % tag for tag in diff_tags]
4231       diff_names.sort()
4232       raise errors.OpPrereqError("Tag(s) %s not found" %
4233                                  (",".join(diff_names)))
4234
4235   def Exec(self, feedback_fn):
4236     """Remove the tag from the object.
4237
4238     """
4239     for tag in self.op.tags:
4240       self.target.RemoveTag(tag)
4241     try:
4242       self.cfg.Update(self.target)
4243     except errors.ConfigurationError:
4244       raise errors.OpRetryError("There has been a modification to the"
4245                                 " config file and the operation has been"
4246                                 " aborted. Please retry.")