Refactor burnin to improve disk replacement
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.processor = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _GetWantedNodes(lu, nodes):
167   """Returns list of checked and expanded node names.
168
169   Args:
170     nodes: List of nodes (strings) or None for all
171
172   """
173   if not isinstance(nodes, list):
174     raise errors.OpPrereqError("Invalid argument type 'nodes'")
175
176   if nodes:
177     wanted = []
178
179     for name in nodes:
180       node = lu.cfg.ExpandNodeName(name)
181       if node is None:
182         raise errors.OpPrereqError("No such node name '%s'" % name)
183       wanted.append(node)
184
185   else:
186     wanted = lu.cfg.GetNodeList()
187   return utils.NiceSort(wanted)
188
189
190 def _GetWantedInstances(lu, instances):
191   """Returns list of checked and expanded instance names.
192
193   Args:
194     instances: List of instances (strings) or None for all
195
196   """
197   if not isinstance(instances, list):
198     raise errors.OpPrereqError("Invalid argument type 'instances'")
199
200   if instances:
201     wanted = []
202
203     for name in instances:
204       instance = lu.cfg.ExpandInstanceName(name)
205       if instance is None:
206         raise errors.OpPrereqError("No such instance name '%s'" % name)
207       wanted.append(instance)
208
209   else:
210     wanted = lu.cfg.GetInstanceList()
211   return utils.NiceSort(wanted)
212
213
214 def _CheckOutputFields(static, dynamic, selected):
215   """Checks whether all selected fields are valid.
216
217   Args:
218     static: Static fields
219     dynamic: Dynamic fields
220
221   """
222   static_fields = frozenset(static)
223   dynamic_fields = frozenset(dynamic)
224
225   all_fields = static_fields | dynamic_fields
226
227   if not all_fields.issuperset(selected):
228     raise errors.OpPrereqError("Unknown output fields selected: %s"
229                                % ",".join(frozenset(selected).
230                                           difference(all_fields)))
231
232
233 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234                           memory, vcpus, nics):
235   """Builds instance related env variables for hooks from single variables.
236
237   Args:
238     secondary_nodes: List of secondary nodes as strings
239   """
240   env = {
241     "OP_TARGET": name,
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   for rawline in f:
389     logger.Debug('read %s' % (repr(rawline),))
390
391     parts = rawline.rstrip('\r\n').split()
392
393     # Ignore unwanted lines
394     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395       fields = parts[0].split(',')
396       key = parts[2]
397
398       haveall = True
399       havesome = False
400       for spec in [ ip, fullnode ]:
401         if spec not in fields:
402           haveall = False
403         if spec in fields:
404           havesome = True
405
406       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407       if haveall and key == pubkey:
408         inthere = True
409         save_lines.append(rawline)
410         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411         continue
412
413       if havesome and (not haveall or key != pubkey):
414         removed = True
415         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416         continue
417
418     save_lines.append(rawline)
419
420   if not inthere:
421     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423
424   if removed:
425     save_lines = save_lines + add_lines
426
427     # Write a new file and replace old.
428     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429                                    constants.DATA_DIR)
430     newfile = os.fdopen(fd, 'w')
431     try:
432       newfile.write(''.join(save_lines))
433     finally:
434       newfile.close()
435     logger.Debug("Wrote new known_hosts.")
436     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437
438   elif add_lines:
439     # Simply appending a new line will do the trick.
440     f.seek(0, 2)
441     for add in add_lines:
442       f.write(add)
443
444   f.close()
445
446
447 def _HasValidVG(vglist, vgname):
448   """Checks if the volume group list is valid.
449
450   A non-None return value means there's an error, and the return value
451   is the error message.
452
453   """
454   vgsize = vglist.get(vgname, None)
455   if vgsize is None:
456     return "volume group '%s' missing" % vgname
457   elif vgsize < 20480:
458     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459             (vgname, vgsize))
460   return None
461
462
463 def _InitSSHSetup(node):
464   """Setup the SSH configuration for the cluster.
465
466
467   This generates a dsa keypair for root, adds the pub key to the
468   permitted hosts and adds the hostkey to its own known hosts.
469
470   Args:
471     node: the name of this host as a fqdn
472
473   """
474   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475
476   for name in priv_key, pub_key:
477     if os.path.exists(name):
478       utils.CreateBackup(name)
479     utils.RemoveFile(name)
480
481   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482                          "-f", priv_key,
483                          "-q", "-N", ""])
484   if result.failed:
485     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486                              result.output)
487
488   f = open(pub_key, 'r')
489   try:
490     utils.AddAuthorizedKey(auth_keys, f.read(8192))
491   finally:
492     f.close()
493
494
495 def _InitGanetiServerSetup(ss):
496   """Setup the necessary configuration for the initial node daemon.
497
498   This creates the nodepass file containing the shared password for
499   the cluster and also generates the SSL certificate.
500
501   """
502   # Create pseudo random password
503   randpass = sha.new(os.urandom(64)).hexdigest()
504   # and write it into sstore
505   ss.SetKey(ss.SS_NODED_PASS, randpass)
506
507   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508                          "-days", str(365*5), "-nodes", "-x509",
509                          "-keyout", constants.SSL_CERT_FILE,
510                          "-out", constants.SSL_CERT_FILE, "-batch"])
511   if result.failed:
512     raise errors.OpExecError("could not generate server ssl cert, command"
513                              " %s had exitcode %s and error message %s" %
514                              (result.cmd, result.exit_code, result.output))
515
516   os.chmod(constants.SSL_CERT_FILE, 0400)
517
518   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519
520   if result.failed:
521     raise errors.OpExecError("Could not start the node daemon, command %s"
522                              " had exitcode %s and error %s" %
523                              (result.cmd, result.exit_code, result.output))
524
525
526 def _CheckInstanceBridgesExist(instance):
527   """Check that the brigdes needed by an instance exist.
528
529   """
530   # check bridges existance
531   brlist = [nic.bridge for nic in instance.nics]
532   if not rpc.call_bridges_exist(instance.primary_node, brlist):
533     raise errors.OpPrereqError("one or more target bridges %s does not"
534                                " exist on destination node '%s'" %
535                                (brlist, instance.primary_node))
536
537
538 class LUInitCluster(LogicalUnit):
539   """Initialise the cluster.
540
541   """
542   HPATH = "cluster-init"
543   HTYPE = constants.HTYPE_CLUSTER
544   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545               "def_bridge", "master_netdev"]
546   REQ_CLUSTER = False
547
548   def BuildHooksEnv(self):
549     """Build hooks env.
550
551     Notes: Since we don't require a cluster, we must manually add
552     ourselves in the post-run node list.
553
554     """
555     env = {"OP_TARGET": self.op.cluster_name}
556     return env, [], [self.hostname.name]
557
558   def CheckPrereq(self):
559     """Verify that the passed name is a valid one.
560
561     """
562     if config.ConfigWriter.IsCluster():
563       raise errors.OpPrereqError("Cluster is already initialised")
564
565     self.hostname = hostname = utils.HostInfo()
566
567     if hostname.ip.startswith("127."):
568       raise errors.OpPrereqError("This host's IP resolves to the private"
569                                  " range (%s). Please fix DNS or /etc/hosts." %
570                                  (hostname.ip,))
571
572     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573
574     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575                          constants.DEFAULT_NODED_PORT):
576       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577                                  " to %s,\nbut this ip address does not"
578                                  " belong to this host."
579                                  " Aborting." % hostname.ip)
580
581     secondary_ip = getattr(self.op, "secondary_ip", None)
582     if secondary_ip and not utils.IsValidIP(secondary_ip):
583       raise errors.OpPrereqError("Invalid secondary ip given")
584     if (secondary_ip and
585         secondary_ip != hostname.ip and
586         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587                            constants.DEFAULT_NODED_PORT))):
588       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589                                  "but it does not belong to this host." %
590                                  secondary_ip)
591     self.secondary_ip = secondary_ip
592
593     # checks presence of the volume group given
594     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595
596     if vgstatus:
597       raise errors.OpPrereqError("Error: %s" % vgstatus)
598
599     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600                     self.op.mac_prefix):
601       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602                                  self.op.mac_prefix)
603
604     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606                                  self.op.hypervisor_type)
607
608     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609     if result.failed:
610       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611                                  (self.op.master_netdev,
612                                   result.output.strip()))
613
614     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
615             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
616       raise errors.OpPrereqError("Init.d script '%s' missing or not "
617                                  "executable." % constants.NODE_INITD_SCRIPT)
618
619   def Exec(self, feedback_fn):
620     """Initialize the cluster.
621
622     """
623     clustername = self.clustername
624     hostname = self.hostname
625
626     # set up the simple store
627     self.sstore = ss = ssconf.SimpleStore()
628     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633
634     # set up the inter-node password and certificate
635     _InitGanetiServerSetup(ss)
636
637     # start the master ip
638     rpc.call_node_start_master(hostname.name)
639
640     # set up ssh config and /etc/hosts
641     f = open(constants.SSH_HOST_RSA_PUB, 'r')
642     try:
643       sshline = f.read()
644     finally:
645       f.close()
646     sshkey = sshline.split(" ")[1]
647
648     _UpdateEtcHosts(hostname.name, hostname.ip)
649
650     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651
652     _InitSSHSetup(hostname.name)
653
654     # init of cluster config file
655     self.cfg = cfgw = config.ConfigWriter()
656     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657                     sshkey, self.op.mac_prefix,
658                     self.op.vg_name, self.op.def_bridge)
659
660
661 class LUDestroyCluster(NoHooksLU):
662   """Logical unit for destroying the cluster.
663
664   """
665   _OP_REQP = []
666
667   def CheckPrereq(self):
668     """Check prerequisites.
669
670     This checks whether the cluster is empty.
671
672     Any errors are signalled by raising errors.OpPrereqError.
673
674     """
675     master = self.sstore.GetMasterNode()
676
677     nodelist = self.cfg.GetNodeList()
678     if len(nodelist) != 1 or nodelist[0] != master:
679       raise errors.OpPrereqError("There are still %d node(s) in"
680                                  " this cluster." % (len(nodelist) - 1))
681     instancelist = self.cfg.GetInstanceList()
682     if instancelist:
683       raise errors.OpPrereqError("There are still %d instance(s) in"
684                                  " this cluster." % len(instancelist))
685
686   def Exec(self, feedback_fn):
687     """Destroys the cluster.
688
689     """
690     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691     utils.CreateBackup(priv_key)
692     utils.CreateBackup(pub_key)
693     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694
695
696 class LUVerifyCluster(NoHooksLU):
697   """Verifies the cluster status.
698
699   """
700   _OP_REQP = []
701
702   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703                   remote_version, feedback_fn):
704     """Run multiple tests against a node.
705
706     Test list:
707       - compares ganeti version
708       - checks vg existance and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     Args:
713       node: name of the node to check
714       file_list: required list of files
715       local_cksum: dictionary of local files and their checksums
716
717     """
718     # compares ganeti version
719     local_version = constants.PROTOCOL_VERSION
720     if not remote_version:
721       feedback_fn(" - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version:
725       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726                       (local_version, node, remote_version))
727       return True
728
729     # checks vg existance and size > 20G
730
731     bad = False
732     if not vglist:
733       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734                       (node,))
735       bad = True
736     else:
737       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738       if vgstatus:
739         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740         bad = True
741
742     # checks config file checksum
743     # checks ssh to any
744
745     if 'filelist' not in node_result:
746       bad = True
747       feedback_fn("  - ERROR: node hasn't returned file checksum data")
748     else:
749       remote_cksum = node_result['filelist']
750       for file_name in file_list:
751         if file_name not in remote_cksum:
752           bad = True
753           feedback_fn("  - ERROR: file '%s' missing" % file_name)
754         elif remote_cksum[file_name] != local_cksum[file_name]:
755           bad = True
756           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757
758     if 'nodelist' not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761     else:
762       if node_result['nodelist']:
763         bad = True
764         for node in node_result['nodelist']:
765           feedback_fn("  - ERROR: communication with node '%s': %s" %
766                           (node, node_result['nodelist'][node]))
767     hyp_result = node_result.get('hypervisor', None)
768     if hyp_result is not None:
769       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770     return bad
771
772   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773     """Verify an instance.
774
775     This function checks to see if the required block devices are
776     available on the instance's node.
777
778     """
779     bad = False
780
781     instancelist = self.cfg.GetInstanceList()
782     if not instance in instancelist:
783       feedback_fn("  - ERROR: instance %s not in instance list %s" %
784                       (instance, instancelist))
785       bad = True
786
787     instanceconfig = self.cfg.GetInstanceInfo(instance)
788     node_current = instanceconfig.primary_node
789
790     node_vol_should = {}
791     instanceconfig.MapLVsByNode(node_vol_should)
792
793     for node in node_vol_should:
794       for volume in node_vol_should[node]:
795         if node not in node_vol_is or volume not in node_vol_is[node]:
796           feedback_fn("  - ERROR: volume %s missing on node %s" %
797                           (volume, node))
798           bad = True
799
800     if not instanceconfig.status == 'down':
801       if not instance in node_instance[node_current]:
802         feedback_fn("  - ERROR: instance %s not running on node %s" %
803                         (instance, node_current))
804         bad = True
805
806     for node in node_instance:
807       if (not node == node_current):
808         if instance in node_instance[node]:
809           feedback_fn("  - ERROR: instance %s should not run on node %s" %
810                           (instance, node))
811           bad = True
812
813     return bad
814
815   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816     """Verify if there are any unknown volumes in the cluster.
817
818     The .os, .swap and backup volumes are ignored. All other volumes are
819     reported as unknown.
820
821     """
822     bad = False
823
824     for node in node_vol_is:
825       for volume in node_vol_is[node]:
826         if node not in node_vol_should or volume not in node_vol_should[node]:
827           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828                       (volume, node))
829           bad = True
830     return bad
831
832   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833     """Verify the list of running instances.
834
835     This checks what instances are running but unknown to the cluster.
836
837     """
838     bad = False
839     for node in node_instance:
840       for runninginstance in node_instance[node]:
841         if runninginstance not in instancelist:
842           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843                           (runninginstance, node))
844           bad = True
845     return bad
846
847   def CheckPrereq(self):
848     """Check prerequisites.
849
850     This has no prerequisites.
851
852     """
853     pass
854
855   def Exec(self, feedback_fn):
856     """Verify integrity of cluster, performing various test on nodes.
857
858     """
859     bad = False
860     feedback_fn("* Verifying global settings")
861     self.cfg.VerifyConfig()
862
863     vg_name = self.cfg.GetVGName()
864     nodelist = utils.NiceSort(self.cfg.GetNodeList())
865     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866     node_volume = {}
867     node_instance = {}
868
869     # FIXME: verify OS list
870     # do local checksums
871     file_names = list(self.sstore.GetFileList())
872     file_names.append(constants.SSL_CERT_FILE)
873     file_names.append(constants.CLUSTER_CONF_FILE)
874     local_checksums = utils.FingerprintFiles(file_names)
875
876     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
877     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
878     all_instanceinfo = rpc.call_instance_list(nodelist)
879     all_vglist = rpc.call_vg_list(nodelist)
880     node_verify_param = {
881       'filelist': file_names,
882       'nodelist': nodelist,
883       'hypervisor': None,
884       }
885     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
886     all_rversion = rpc.call_version(nodelist)
887
888     for node in nodelist:
889       feedback_fn("* Verifying node %s" % node)
890       result = self._VerifyNode(node, file_names, local_checksums,
891                                 all_vglist[node], all_nvinfo[node],
892                                 all_rversion[node], feedback_fn)
893       bad = bad or result
894
895       # node_volume
896       volumeinfo = all_volumeinfo[node]
897
898       if type(volumeinfo) != dict:
899         feedback_fn("  - ERROR: connection to %s failed" % (node,))
900         bad = True
901         continue
902
903       node_volume[node] = volumeinfo
904
905       # node_instance
906       nodeinstance = all_instanceinfo[node]
907       if type(nodeinstance) != list:
908         feedback_fn("  - ERROR: connection to %s failed" % (node,))
909         bad = True
910         continue
911
912       node_instance[node] = nodeinstance
913
914     node_vol_should = {}
915
916     for instance in instancelist:
917       feedback_fn("* Verifying instance %s" % instance)
918       result =  self._VerifyInstance(instance, node_volume, node_instance,
919                                      feedback_fn)
920       bad = bad or result
921
922       inst_config = self.cfg.GetInstanceInfo(instance)
923
924       inst_config.MapLVsByNode(node_vol_should)
925
926     feedback_fn("* Verifying orphan volumes")
927     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
928                                        feedback_fn)
929     bad = bad or result
930
931     feedback_fn("* Verifying remaining instances")
932     result = self._VerifyOrphanInstances(instancelist, node_instance,
933                                          feedback_fn)
934     bad = bad or result
935
936     return int(bad)
937
938
939 class LURenameCluster(LogicalUnit):
940   """Rename the cluster.
941
942   """
943   HPATH = "cluster-rename"
944   HTYPE = constants.HTYPE_CLUSTER
945   _OP_REQP = ["name"]
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     """
951     env = {
952       "OP_TARGET": self.op.sstore.GetClusterName(),
953       "NEW_NAME": self.op.name,
954       }
955     mn = self.sstore.GetMasterNode()
956     return env, [mn], [mn]
957
958   def CheckPrereq(self):
959     """Verify that the passed name is a valid one.
960
961     """
962     hostname = utils.HostInfo(self.op.name)
963
964     new_name = hostname.name
965     self.ip = new_ip = hostname.ip
966     old_name = self.sstore.GetClusterName()
967     old_ip = self.sstore.GetMasterIP()
968     if new_name == old_name and new_ip == old_ip:
969       raise errors.OpPrereqError("Neither the name nor the IP address of the"
970                                  " cluster has changed")
971     if new_ip != old_ip:
972       result = utils.RunCmd(["fping", "-q", new_ip])
973       if not result.failed:
974         raise errors.OpPrereqError("The given cluster IP address (%s) is"
975                                    " reachable on the network. Aborting." %
976                                    new_ip)
977
978     self.op.name = new_name
979
980   def Exec(self, feedback_fn):
981     """Rename the cluster.
982
983     """
984     clustername = self.op.name
985     ip = self.ip
986     ss = self.sstore
987
988     # shutdown the master IP
989     master = ss.GetMasterNode()
990     if not rpc.call_node_stop_master(master):
991       raise errors.OpExecError("Could not disable the master role")
992
993     try:
994       # modify the sstore
995       ss.SetKey(ss.SS_MASTER_IP, ip)
996       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
997
998       # Distribute updated ss config to all nodes
999       myself = self.cfg.GetNodeInfo(master)
1000       dist_nodes = self.cfg.GetNodeList()
1001       if myself.name in dist_nodes:
1002         dist_nodes.remove(myself.name)
1003
1004       logger.Debug("Copying updated ssconf data to all nodes")
1005       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1006         fname = ss.KeyToFilename(keyname)
1007         result = rpc.call_upload_file(dist_nodes, fname)
1008         for to_node in dist_nodes:
1009           if not result[to_node]:
1010             logger.Error("copy of file %s to node %s failed" %
1011                          (fname, to_node))
1012     finally:
1013       if not rpc.call_node_start_master(master):
1014         logger.Error("Could not re-enable the master role on the master,\n"
1015                      "please restart manually.")
1016
1017
1018 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1019   """Sleep and poll for an instance's disk to sync.
1020
1021   """
1022   if not instance.disks:
1023     return True
1024
1025   if not oneshot:
1026     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1027
1028   node = instance.primary_node
1029
1030   for dev in instance.disks:
1031     cfgw.SetDiskID(dev, node)
1032
1033   retries = 0
1034   while True:
1035     max_time = 0
1036     done = True
1037     cumul_degraded = False
1038     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1039     if not rstats:
1040       logger.ToStderr("Can't get any data from node %s" % node)
1041       retries += 1
1042       if retries >= 10:
1043         raise errors.RemoteError("Can't contact node %s for mirror data,"
1044                                  " aborting." % node)
1045       time.sleep(6)
1046       continue
1047     retries = 0
1048     for i in range(len(rstats)):
1049       mstat = rstats[i]
1050       if mstat is None:
1051         logger.ToStderr("Can't compute data for node %s/%s" %
1052                         (node, instance.disks[i].iv_name))
1053         continue
1054       # we ignore the ldisk parameter
1055       perc_done, est_time, is_degraded, _ = mstat
1056       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057       if perc_done is not None:
1058         done = False
1059         if est_time is not None:
1060           rem_time = "%d estimated seconds remaining" % est_time
1061           max_time = est_time
1062         else:
1063           rem_time = "no time estimate"
1064         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065                         (instance.disks[i].iv_name, perc_done, rem_time))
1066     if done or oneshot:
1067       break
1068
1069     if unlock:
1070       utils.Unlock('cmd')
1071     try:
1072       time.sleep(min(60, max_time))
1073     finally:
1074       if unlock:
1075         utils.Lock('cmd')
1076
1077   if done:
1078     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079   return not cumul_degraded
1080
1081
1082 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1083   """Check that mirrors are not degraded.
1084
1085   The ldisk parameter, if True, will change the test from the
1086   is_degraded attribute (which represents overall non-ok status for
1087   the device(s)) to the ldisk (representing the local storage status).
1088
1089   """
1090   cfgw.SetDiskID(dev, node)
1091   if ldisk:
1092     idx = 6
1093   else:
1094     idx = 5
1095
1096   result = True
1097   if on_primary or dev.AssembleOnSecondary():
1098     rstats = rpc.call_blockdev_find(node, dev)
1099     if not rstats:
1100       logger.ToStderr("Can't get any data from node %s" % node)
1101       result = False
1102     else:
1103       result = result and (not rstats[idx])
1104   if dev.children:
1105     for child in dev.children:
1106       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1107
1108   return result
1109
1110
1111 class LUDiagnoseOS(NoHooksLU):
1112   """Logical unit for OS diagnose/query.
1113
1114   """
1115   _OP_REQP = []
1116
1117   def CheckPrereq(self):
1118     """Check prerequisites.
1119
1120     This always succeeds, since this is a pure query LU.
1121
1122     """
1123     return
1124
1125   def Exec(self, feedback_fn):
1126     """Compute the list of OSes.
1127
1128     """
1129     node_list = self.cfg.GetNodeList()
1130     node_data = rpc.call_os_diagnose(node_list)
1131     if node_data == False:
1132       raise errors.OpExecError("Can't gather the list of OSes")
1133     return node_data
1134
1135
1136 class LURemoveNode(LogicalUnit):
1137   """Logical unit for removing a node.
1138
1139   """
1140   HPATH = "node-remove"
1141   HTYPE = constants.HTYPE_NODE
1142   _OP_REQP = ["node_name"]
1143
1144   def BuildHooksEnv(self):
1145     """Build hooks env.
1146
1147     This doesn't run on the target node in the pre phase as a failed
1148     node would not allows itself to run.
1149
1150     """
1151     env = {
1152       "OP_TARGET": self.op.node_name,
1153       "NODE_NAME": self.op.node_name,
1154       }
1155     all_nodes = self.cfg.GetNodeList()
1156     all_nodes.remove(self.op.node_name)
1157     return env, all_nodes, all_nodes
1158
1159   def CheckPrereq(self):
1160     """Check prerequisites.
1161
1162     This checks:
1163      - the node exists in the configuration
1164      - it does not have primary or secondary instances
1165      - it's not the master
1166
1167     Any errors are signalled by raising errors.OpPrereqError.
1168
1169     """
1170     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1171     if node is None:
1172       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1173
1174     instance_list = self.cfg.GetInstanceList()
1175
1176     masternode = self.sstore.GetMasterNode()
1177     if node.name == masternode:
1178       raise errors.OpPrereqError("Node is the master node,"
1179                                  " you need to failover first.")
1180
1181     for instance_name in instance_list:
1182       instance = self.cfg.GetInstanceInfo(instance_name)
1183       if node.name == instance.primary_node:
1184         raise errors.OpPrereqError("Instance %s still running on the node,"
1185                                    " please remove first." % instance_name)
1186       if node.name in instance.secondary_nodes:
1187         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1188                                    " please remove first." % instance_name)
1189     self.op.node_name = node.name
1190     self.node = node
1191
1192   def Exec(self, feedback_fn):
1193     """Removes the node from the cluster.
1194
1195     """
1196     node = self.node
1197     logger.Info("stopping the node daemon and removing configs from node %s" %
1198                 node.name)
1199
1200     rpc.call_node_leave_cluster(node.name)
1201
1202     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1203
1204     logger.Info("Removing node %s from config" % node.name)
1205
1206     self.cfg.RemoveNode(node.name)
1207
1208
1209 class LUQueryNodes(NoHooksLU):
1210   """Logical unit for querying nodes.
1211
1212   """
1213   _OP_REQP = ["output_fields", "names"]
1214
1215   def CheckPrereq(self):
1216     """Check prerequisites.
1217
1218     This checks that the fields required are valid output fields.
1219
1220     """
1221     self.dynamic_fields = frozenset(["dtotal", "dfree",
1222                                      "mtotal", "mnode", "mfree",
1223                                      "bootid"])
1224
1225     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1226                                "pinst_list", "sinst_list",
1227                                "pip", "sip"],
1228                        dynamic=self.dynamic_fields,
1229                        selected=self.op.output_fields)
1230
1231     self.wanted = _GetWantedNodes(self, self.op.names)
1232
1233   def Exec(self, feedback_fn):
1234     """Computes the list of nodes and their attributes.
1235
1236     """
1237     nodenames = self.wanted
1238     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1239
1240     # begin data gathering
1241
1242     if self.dynamic_fields.intersection(self.op.output_fields):
1243       live_data = {}
1244       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1245       for name in nodenames:
1246         nodeinfo = node_data.get(name, None)
1247         if nodeinfo:
1248           live_data[name] = {
1249             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1250             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1251             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1252             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1253             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1254             "bootid": nodeinfo['bootid'],
1255             }
1256         else:
1257           live_data[name] = {}
1258     else:
1259       live_data = dict.fromkeys(nodenames, {})
1260
1261     node_to_primary = dict([(name, set()) for name in nodenames])
1262     node_to_secondary = dict([(name, set()) for name in nodenames])
1263
1264     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1265                              "sinst_cnt", "sinst_list"))
1266     if inst_fields & frozenset(self.op.output_fields):
1267       instancelist = self.cfg.GetInstanceList()
1268
1269       for instance_name in instancelist:
1270         inst = self.cfg.GetInstanceInfo(instance_name)
1271         if inst.primary_node in node_to_primary:
1272           node_to_primary[inst.primary_node].add(inst.name)
1273         for secnode in inst.secondary_nodes:
1274           if secnode in node_to_secondary:
1275             node_to_secondary[secnode].add(inst.name)
1276
1277     # end data gathering
1278
1279     output = []
1280     for node in nodelist:
1281       node_output = []
1282       for field in self.op.output_fields:
1283         if field == "name":
1284           val = node.name
1285         elif field == "pinst_list":
1286           val = list(node_to_primary[node.name])
1287         elif field == "sinst_list":
1288           val = list(node_to_secondary[node.name])
1289         elif field == "pinst_cnt":
1290           val = len(node_to_primary[node.name])
1291         elif field == "sinst_cnt":
1292           val = len(node_to_secondary[node.name])
1293         elif field == "pip":
1294           val = node.primary_ip
1295         elif field == "sip":
1296           val = node.secondary_ip
1297         elif field in self.dynamic_fields:
1298           val = live_data[node.name].get(field, None)
1299         else:
1300           raise errors.ParameterError(field)
1301         node_output.append(val)
1302       output.append(node_output)
1303
1304     return output
1305
1306
1307 class LUQueryNodeVolumes(NoHooksLU):
1308   """Logical unit for getting volumes on node(s).
1309
1310   """
1311   _OP_REQP = ["nodes", "output_fields"]
1312
1313   def CheckPrereq(self):
1314     """Check prerequisites.
1315
1316     This checks that the fields required are valid output fields.
1317
1318     """
1319     self.nodes = _GetWantedNodes(self, self.op.nodes)
1320
1321     _CheckOutputFields(static=["node"],
1322                        dynamic=["phys", "vg", "name", "size", "instance"],
1323                        selected=self.op.output_fields)
1324
1325
1326   def Exec(self, feedback_fn):
1327     """Computes the list of nodes and their attributes.
1328
1329     """
1330     nodenames = self.nodes
1331     volumes = rpc.call_node_volumes(nodenames)
1332
1333     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1334              in self.cfg.GetInstanceList()]
1335
1336     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1337
1338     output = []
1339     for node in nodenames:
1340       if node not in volumes or not volumes[node]:
1341         continue
1342
1343       node_vols = volumes[node][:]
1344       node_vols.sort(key=lambda vol: vol['dev'])
1345
1346       for vol in node_vols:
1347         node_output = []
1348         for field in self.op.output_fields:
1349           if field == "node":
1350             val = node
1351           elif field == "phys":
1352             val = vol['dev']
1353           elif field == "vg":
1354             val = vol['vg']
1355           elif field == "name":
1356             val = vol['name']
1357           elif field == "size":
1358             val = int(float(vol['size']))
1359           elif field == "instance":
1360             for inst in ilist:
1361               if node not in lv_by_node[inst]:
1362                 continue
1363               if vol['name'] in lv_by_node[inst][node]:
1364                 val = inst.name
1365                 break
1366             else:
1367               val = '-'
1368           else:
1369             raise errors.ParameterError(field)
1370           node_output.append(str(val))
1371
1372         output.append(node_output)
1373
1374     return output
1375
1376
1377 class LUAddNode(LogicalUnit):
1378   """Logical unit for adding node to the cluster.
1379
1380   """
1381   HPATH = "node-add"
1382   HTYPE = constants.HTYPE_NODE
1383   _OP_REQP = ["node_name"]
1384
1385   def BuildHooksEnv(self):
1386     """Build hooks env.
1387
1388     This will run on all nodes before, and on all nodes + the new node after.
1389
1390     """
1391     env = {
1392       "OP_TARGET": self.op.node_name,
1393       "NODE_NAME": self.op.node_name,
1394       "NODE_PIP": self.op.primary_ip,
1395       "NODE_SIP": self.op.secondary_ip,
1396       }
1397     nodes_0 = self.cfg.GetNodeList()
1398     nodes_1 = nodes_0 + [self.op.node_name, ]
1399     return env, nodes_0, nodes_1
1400
1401   def CheckPrereq(self):
1402     """Check prerequisites.
1403
1404     This checks:
1405      - the new node is not already in the config
1406      - it is resolvable
1407      - its parameters (single/dual homed) matches the cluster
1408
1409     Any errors are signalled by raising errors.OpPrereqError.
1410
1411     """
1412     node_name = self.op.node_name
1413     cfg = self.cfg
1414
1415     dns_data = utils.HostInfo(node_name)
1416
1417     node = dns_data.name
1418     primary_ip = self.op.primary_ip = dns_data.ip
1419     secondary_ip = getattr(self.op, "secondary_ip", None)
1420     if secondary_ip is None:
1421       secondary_ip = primary_ip
1422     if not utils.IsValidIP(secondary_ip):
1423       raise errors.OpPrereqError("Invalid secondary IP given")
1424     self.op.secondary_ip = secondary_ip
1425     node_list = cfg.GetNodeList()
1426     if node in node_list:
1427       raise errors.OpPrereqError("Node %s is already in the configuration"
1428                                  % node)
1429
1430     for existing_node_name in node_list:
1431       existing_node = cfg.GetNodeInfo(existing_node_name)
1432       if (existing_node.primary_ip == primary_ip or
1433           existing_node.secondary_ip == primary_ip or
1434           existing_node.primary_ip == secondary_ip or
1435           existing_node.secondary_ip == secondary_ip):
1436         raise errors.OpPrereqError("New node ip address(es) conflict with"
1437                                    " existing node %s" % existing_node.name)
1438
1439     # check that the type of the node (single versus dual homed) is the
1440     # same as for the master
1441     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1442     master_singlehomed = myself.secondary_ip == myself.primary_ip
1443     newbie_singlehomed = secondary_ip == primary_ip
1444     if master_singlehomed != newbie_singlehomed:
1445       if master_singlehomed:
1446         raise errors.OpPrereqError("The master has no private ip but the"
1447                                    " new node has one")
1448       else:
1449         raise errors.OpPrereqError("The master has a private ip but the"
1450                                    " new node doesn't have one")
1451
1452     # checks reachablity
1453     if not utils.TcpPing(utils.HostInfo().name,
1454                          primary_ip,
1455                          constants.DEFAULT_NODED_PORT):
1456       raise errors.OpPrereqError("Node not reachable by ping")
1457
1458     if not newbie_singlehomed:
1459       # check reachability from my secondary ip to newbie's secondary ip
1460       if not utils.TcpPing(myself.secondary_ip,
1461                            secondary_ip,
1462                            constants.DEFAULT_NODED_PORT):
1463         raise errors.OpPrereqError(
1464           "Node secondary ip not reachable by TCP based ping to noded port")
1465
1466     self.new_node = objects.Node(name=node,
1467                                  primary_ip=primary_ip,
1468                                  secondary_ip=secondary_ip)
1469
1470   def Exec(self, feedback_fn):
1471     """Adds the new node to the cluster.
1472
1473     """
1474     new_node = self.new_node
1475     node = new_node.name
1476
1477     # set up inter-node password and certificate and restarts the node daemon
1478     gntpass = self.sstore.GetNodeDaemonPassword()
1479     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1480       raise errors.OpExecError("ganeti password corruption detected")
1481     f = open(constants.SSL_CERT_FILE)
1482     try:
1483       gntpem = f.read(8192)
1484     finally:
1485       f.close()
1486     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1487     # so we use this to detect an invalid certificate; as long as the
1488     # cert doesn't contain this, the here-document will be correctly
1489     # parsed by the shell sequence below
1490     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1491       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1492     if not gntpem.endswith("\n"):
1493       raise errors.OpExecError("PEM must end with newline")
1494     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1495
1496     # and then connect with ssh to set password and start ganeti-noded
1497     # note that all the below variables are sanitized at this point,
1498     # either by being constants or by the checks above
1499     ss = self.sstore
1500     mycommand = ("umask 077 && "
1501                  "echo '%s' > '%s' && "
1502                  "cat > '%s' << '!EOF.' && \n"
1503                  "%s!EOF.\n%s restart" %
1504                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1505                   constants.SSL_CERT_FILE, gntpem,
1506                   constants.NODE_INITD_SCRIPT))
1507
1508     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1509     if result.failed:
1510       raise errors.OpExecError("Remote command on node %s, error: %s,"
1511                                " output: %s" %
1512                                (node, result.fail_reason, result.output))
1513
1514     # check connectivity
1515     time.sleep(4)
1516
1517     result = rpc.call_version([node])[node]
1518     if result:
1519       if constants.PROTOCOL_VERSION == result:
1520         logger.Info("communication to node %s fine, sw version %s match" %
1521                     (node, result))
1522       else:
1523         raise errors.OpExecError("Version mismatch master version %s,"
1524                                  " node version %s" %
1525                                  (constants.PROTOCOL_VERSION, result))
1526     else:
1527       raise errors.OpExecError("Cannot get version from the new node")
1528
1529     # setup ssh on node
1530     logger.Info("copy ssh key to node %s" % node)
1531     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1532     keyarray = []
1533     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1534                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1535                 priv_key, pub_key]
1536
1537     for i in keyfiles:
1538       f = open(i, 'r')
1539       try:
1540         keyarray.append(f.read())
1541       finally:
1542         f.close()
1543
1544     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1545                                keyarray[3], keyarray[4], keyarray[5])
1546
1547     if not result:
1548       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1549
1550     # Add node to our /etc/hosts, and add key to known_hosts
1551     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1552     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1553                       self.cfg.GetHostKey())
1554
1555     if new_node.secondary_ip != new_node.primary_ip:
1556       if not rpc.call_node_tcp_ping(new_node.name,
1557                                     constants.LOCALHOST_IP_ADDRESS,
1558                                     new_node.secondary_ip,
1559                                     constants.DEFAULT_NODED_PORT,
1560                                     10, False):
1561         raise errors.OpExecError("Node claims it doesn't have the"
1562                                  " secondary ip you gave (%s).\n"
1563                                  "Please fix and re-run this command." %
1564                                  new_node.secondary_ip)
1565
1566     success, msg = ssh.VerifyNodeHostname(node)
1567     if not success:
1568       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1569                                " than the one the resolver gives: %s.\n"
1570                                "Please fix and re-run this command." %
1571                                (node, msg))
1572
1573     # Distribute updated /etc/hosts and known_hosts to all nodes,
1574     # including the node just added
1575     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1576     dist_nodes = self.cfg.GetNodeList() + [node]
1577     if myself.name in dist_nodes:
1578       dist_nodes.remove(myself.name)
1579
1580     logger.Debug("Copying hosts and known_hosts to all nodes")
1581     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1582       result = rpc.call_upload_file(dist_nodes, fname)
1583       for to_node in dist_nodes:
1584         if not result[to_node]:
1585           logger.Error("copy of file %s to node %s failed" %
1586                        (fname, to_node))
1587
1588     to_copy = ss.GetFileList()
1589     for fname in to_copy:
1590       if not ssh.CopyFileToNode(node, fname):
1591         logger.Error("could not copy file %s to node %s" % (fname, node))
1592
1593     logger.Info("adding node %s to cluster.conf" % node)
1594     self.cfg.AddNode(new_node)
1595
1596
1597 class LUMasterFailover(LogicalUnit):
1598   """Failover the master node to the current node.
1599
1600   This is a special LU in that it must run on a non-master node.
1601
1602   """
1603   HPATH = "master-failover"
1604   HTYPE = constants.HTYPE_CLUSTER
1605   REQ_MASTER = False
1606   _OP_REQP = []
1607
1608   def BuildHooksEnv(self):
1609     """Build hooks env.
1610
1611     This will run on the new master only in the pre phase, and on all
1612     the nodes in the post phase.
1613
1614     """
1615     env = {
1616       "OP_TARGET": self.new_master,
1617       "NEW_MASTER": self.new_master,
1618       "OLD_MASTER": self.old_master,
1619       }
1620     return env, [self.new_master], self.cfg.GetNodeList()
1621
1622   def CheckPrereq(self):
1623     """Check prerequisites.
1624
1625     This checks that we are not already the master.
1626
1627     """
1628     self.new_master = utils.HostInfo().name
1629     self.old_master = self.sstore.GetMasterNode()
1630
1631     if self.old_master == self.new_master:
1632       raise errors.OpPrereqError("This commands must be run on the node"
1633                                  " where you want the new master to be.\n"
1634                                  "%s is already the master" %
1635                                  self.old_master)
1636
1637   def Exec(self, feedback_fn):
1638     """Failover the master node.
1639
1640     This command, when run on a non-master node, will cause the current
1641     master to cease being master, and the non-master to become new
1642     master.
1643
1644     """
1645     #TODO: do not rely on gethostname returning the FQDN
1646     logger.Info("setting master to %s, old master: %s" %
1647                 (self.new_master, self.old_master))
1648
1649     if not rpc.call_node_stop_master(self.old_master):
1650       logger.Error("could disable the master role on the old master"
1651                    " %s, please disable manually" % self.old_master)
1652
1653     ss = self.sstore
1654     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1655     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1656                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1657       logger.Error("could not distribute the new simple store master file"
1658                    " to the other nodes, please check.")
1659
1660     if not rpc.call_node_start_master(self.new_master):
1661       logger.Error("could not start the master role on the new master"
1662                    " %s, please check" % self.new_master)
1663       feedback_fn("Error in activating the master IP on the new master,\n"
1664                   "please fix manually.")
1665
1666
1667
1668 class LUQueryClusterInfo(NoHooksLU):
1669   """Query cluster configuration.
1670
1671   """
1672   _OP_REQP = []
1673   REQ_MASTER = False
1674
1675   def CheckPrereq(self):
1676     """No prerequsites needed for this LU.
1677
1678     """
1679     pass
1680
1681   def Exec(self, feedback_fn):
1682     """Return cluster config.
1683
1684     """
1685     result = {
1686       "name": self.sstore.GetClusterName(),
1687       "software_version": constants.RELEASE_VERSION,
1688       "protocol_version": constants.PROTOCOL_VERSION,
1689       "config_version": constants.CONFIG_VERSION,
1690       "os_api_version": constants.OS_API_VERSION,
1691       "export_version": constants.EXPORT_VERSION,
1692       "master": self.sstore.GetMasterNode(),
1693       "architecture": (platform.architecture()[0], platform.machine()),
1694       }
1695
1696     return result
1697
1698
1699 class LUClusterCopyFile(NoHooksLU):
1700   """Copy file to cluster.
1701
1702   """
1703   _OP_REQP = ["nodes", "filename"]
1704
1705   def CheckPrereq(self):
1706     """Check prerequisites.
1707
1708     It should check that the named file exists and that the given list
1709     of nodes is valid.
1710
1711     """
1712     if not os.path.exists(self.op.filename):
1713       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1714
1715     self.nodes = _GetWantedNodes(self, self.op.nodes)
1716
1717   def Exec(self, feedback_fn):
1718     """Copy a file from master to some nodes.
1719
1720     Args:
1721       opts - class with options as members
1722       args - list containing a single element, the file name
1723     Opts used:
1724       nodes - list containing the name of target nodes; if empty, all nodes
1725
1726     """
1727     filename = self.op.filename
1728
1729     myname = utils.HostInfo().name
1730
1731     for node in self.nodes:
1732       if node == myname:
1733         continue
1734       if not ssh.CopyFileToNode(node, filename):
1735         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1736
1737
1738 class LUDumpClusterConfig(NoHooksLU):
1739   """Return a text-representation of the cluster-config.
1740
1741   """
1742   _OP_REQP = []
1743
1744   def CheckPrereq(self):
1745     """No prerequisites.
1746
1747     """
1748     pass
1749
1750   def Exec(self, feedback_fn):
1751     """Dump a representation of the cluster config to the standard output.
1752
1753     """
1754     return self.cfg.DumpConfig()
1755
1756
1757 class LURunClusterCommand(NoHooksLU):
1758   """Run a command on some nodes.
1759
1760   """
1761   _OP_REQP = ["command", "nodes"]
1762
1763   def CheckPrereq(self):
1764     """Check prerequisites.
1765
1766     It checks that the given list of nodes is valid.
1767
1768     """
1769     self.nodes = _GetWantedNodes(self, self.op.nodes)
1770
1771   def Exec(self, feedback_fn):
1772     """Run a command on some nodes.
1773
1774     """
1775     data = []
1776     for node in self.nodes:
1777       result = ssh.SSHCall(node, "root", self.op.command)
1778       data.append((node, result.output, result.exit_code))
1779
1780     return data
1781
1782
1783 class LUActivateInstanceDisks(NoHooksLU):
1784   """Bring up an instance's disks.
1785
1786   """
1787   _OP_REQP = ["instance_name"]
1788
1789   def CheckPrereq(self):
1790     """Check prerequisites.
1791
1792     This checks that the instance is in the cluster.
1793
1794     """
1795     instance = self.cfg.GetInstanceInfo(
1796       self.cfg.ExpandInstanceName(self.op.instance_name))
1797     if instance is None:
1798       raise errors.OpPrereqError("Instance '%s' not known" %
1799                                  self.op.instance_name)
1800     self.instance = instance
1801
1802
1803   def Exec(self, feedback_fn):
1804     """Activate the disks.
1805
1806     """
1807     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1808     if not disks_ok:
1809       raise errors.OpExecError("Cannot activate block devices")
1810
1811     return disks_info
1812
1813
1814 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1815   """Prepare the block devices for an instance.
1816
1817   This sets up the block devices on all nodes.
1818
1819   Args:
1820     instance: a ganeti.objects.Instance object
1821     ignore_secondaries: if true, errors on secondary nodes won't result
1822                         in an error return from the function
1823
1824   Returns:
1825     false if the operation failed
1826     list of (host, instance_visible_name, node_visible_name) if the operation
1827          suceeded with the mapping from node devices to instance devices
1828   """
1829   device_info = []
1830   disks_ok = True
1831   for inst_disk in instance.disks:
1832     master_result = None
1833     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1834       cfg.SetDiskID(node_disk, node)
1835       is_primary = node == instance.primary_node
1836       result = rpc.call_blockdev_assemble(node, node_disk,
1837                                           instance.name, is_primary)
1838       if not result:
1839         logger.Error("could not prepare block device %s on node %s (is_pri"
1840                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1841         if is_primary or not ignore_secondaries:
1842           disks_ok = False
1843       if is_primary:
1844         master_result = result
1845     device_info.append((instance.primary_node, inst_disk.iv_name,
1846                         master_result))
1847
1848   # leave the disks configured for the primary node
1849   # this is a workaround that would be fixed better by
1850   # improving the logical/physical id handling
1851   for disk in instance.disks:
1852     cfg.SetDiskID(disk, instance.primary_node)
1853
1854   return disks_ok, device_info
1855
1856
1857 def _StartInstanceDisks(cfg, instance, force):
1858   """Start the disks of an instance.
1859
1860   """
1861   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1862                                            ignore_secondaries=force)
1863   if not disks_ok:
1864     _ShutdownInstanceDisks(instance, cfg)
1865     if force is not None and not force:
1866       logger.Error("If the message above refers to a secondary node,"
1867                    " you can retry the operation using '--force'.")
1868     raise errors.OpExecError("Disk consistency error")
1869
1870
1871 class LUDeactivateInstanceDisks(NoHooksLU):
1872   """Shutdown an instance's disks.
1873
1874   """
1875   _OP_REQP = ["instance_name"]
1876
1877   def CheckPrereq(self):
1878     """Check prerequisites.
1879
1880     This checks that the instance is in the cluster.
1881
1882     """
1883     instance = self.cfg.GetInstanceInfo(
1884       self.cfg.ExpandInstanceName(self.op.instance_name))
1885     if instance is None:
1886       raise errors.OpPrereqError("Instance '%s' not known" %
1887                                  self.op.instance_name)
1888     self.instance = instance
1889
1890   def Exec(self, feedback_fn):
1891     """Deactivate the disks
1892
1893     """
1894     instance = self.instance
1895     ins_l = rpc.call_instance_list([instance.primary_node])
1896     ins_l = ins_l[instance.primary_node]
1897     if not type(ins_l) is list:
1898       raise errors.OpExecError("Can't contact node '%s'" %
1899                                instance.primary_node)
1900
1901     if self.instance.name in ins_l:
1902       raise errors.OpExecError("Instance is running, can't shutdown"
1903                                " block devices.")
1904
1905     _ShutdownInstanceDisks(instance, self.cfg)
1906
1907
1908 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1909   """Shutdown block devices of an instance.
1910
1911   This does the shutdown on all nodes of the instance.
1912
1913   If the ignore_primary is false, errors on the primary node are
1914   ignored.
1915
1916   """
1917   result = True
1918   for disk in instance.disks:
1919     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1920       cfg.SetDiskID(top_disk, node)
1921       if not rpc.call_blockdev_shutdown(node, top_disk):
1922         logger.Error("could not shutdown block device %s on node %s" %
1923                      (disk.iv_name, node))
1924         if not ignore_primary or node != instance.primary_node:
1925           result = False
1926   return result
1927
1928
1929 class LUStartupInstance(LogicalUnit):
1930   """Starts an instance.
1931
1932   """
1933   HPATH = "instance-start"
1934   HTYPE = constants.HTYPE_INSTANCE
1935   _OP_REQP = ["instance_name", "force"]
1936
1937   def BuildHooksEnv(self):
1938     """Build hooks env.
1939
1940     This runs on master, primary and secondary nodes of the instance.
1941
1942     """
1943     env = {
1944       "FORCE": self.op.force,
1945       }
1946     env.update(_BuildInstanceHookEnvByObject(self.instance))
1947     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1948           list(self.instance.secondary_nodes))
1949     return env, nl, nl
1950
1951   def CheckPrereq(self):
1952     """Check prerequisites.
1953
1954     This checks that the instance is in the cluster.
1955
1956     """
1957     instance = self.cfg.GetInstanceInfo(
1958       self.cfg.ExpandInstanceName(self.op.instance_name))
1959     if instance is None:
1960       raise errors.OpPrereqError("Instance '%s' not known" %
1961                                  self.op.instance_name)
1962
1963     # check bridges existance
1964     _CheckInstanceBridgesExist(instance)
1965
1966     self.instance = instance
1967     self.op.instance_name = instance.name
1968
1969   def Exec(self, feedback_fn):
1970     """Start the instance.
1971
1972     """
1973     instance = self.instance
1974     force = self.op.force
1975     extra_args = getattr(self.op, "extra_args", "")
1976
1977     node_current = instance.primary_node
1978
1979     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1980     if not nodeinfo:
1981       raise errors.OpExecError("Could not contact node %s for infos" %
1982                                (node_current))
1983
1984     freememory = nodeinfo[node_current]['memory_free']
1985     memory = instance.memory
1986     if memory > freememory:
1987       raise errors.OpExecError("Not enough memory to start instance"
1988                                " %s on node %s"
1989                                " needed %s MiB, available %s MiB" %
1990                                (instance.name, node_current, memory,
1991                                 freememory))
1992
1993     _StartInstanceDisks(self.cfg, instance, force)
1994
1995     if not rpc.call_instance_start(node_current, instance, extra_args):
1996       _ShutdownInstanceDisks(instance, self.cfg)
1997       raise errors.OpExecError("Could not start instance")
1998
1999     self.cfg.MarkInstanceUp(instance.name)
2000
2001
2002 class LURebootInstance(LogicalUnit):
2003   """Reboot an instance.
2004
2005   """
2006   HPATH = "instance-reboot"
2007   HTYPE = constants.HTYPE_INSTANCE
2008   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2009
2010   def BuildHooksEnv(self):
2011     """Build hooks env.
2012
2013     This runs on master, primary and secondary nodes of the instance.
2014
2015     """
2016     env = {
2017       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2018       }
2019     env.update(_BuildInstanceHookEnvByObject(self.instance))
2020     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2021           list(self.instance.secondary_nodes))
2022     return env, nl, nl
2023
2024   def CheckPrereq(self):
2025     """Check prerequisites.
2026
2027     This checks that the instance is in the cluster.
2028
2029     """
2030     instance = self.cfg.GetInstanceInfo(
2031       self.cfg.ExpandInstanceName(self.op.instance_name))
2032     if instance is None:
2033       raise errors.OpPrereqError("Instance '%s' not known" %
2034                                  self.op.instance_name)
2035
2036     # check bridges existance
2037     _CheckInstanceBridgesExist(instance)
2038
2039     self.instance = instance
2040     self.op.instance_name = instance.name
2041
2042   def Exec(self, feedback_fn):
2043     """Reboot the instance.
2044
2045     """
2046     instance = self.instance
2047     ignore_secondaries = self.op.ignore_secondaries
2048     reboot_type = self.op.reboot_type
2049     extra_args = getattr(self.op, "extra_args", "")
2050
2051     node_current = instance.primary_node
2052
2053     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2054                            constants.INSTANCE_REBOOT_HARD,
2055                            constants.INSTANCE_REBOOT_FULL]:
2056       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2057                                   (constants.INSTANCE_REBOOT_SOFT,
2058                                    constants.INSTANCE_REBOOT_HARD,
2059                                    constants.INSTANCE_REBOOT_FULL))
2060
2061     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2062                        constants.INSTANCE_REBOOT_HARD]:
2063       if not rpc.call_instance_reboot(node_current, instance,
2064                                       reboot_type, extra_args):
2065         raise errors.OpExecError("Could not reboot instance")
2066     else:
2067       if not rpc.call_instance_shutdown(node_current, instance):
2068         raise errors.OpExecError("could not shutdown instance for full reboot")
2069       _ShutdownInstanceDisks(instance, self.cfg)
2070       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2071       if not rpc.call_instance_start(node_current, instance, extra_args):
2072         _ShutdownInstanceDisks(instance, self.cfg)
2073         raise errors.OpExecError("Could not start instance for full reboot")
2074
2075     self.cfg.MarkInstanceUp(instance.name)
2076
2077
2078 class LUShutdownInstance(LogicalUnit):
2079   """Shutdown an instance.
2080
2081   """
2082   HPATH = "instance-stop"
2083   HTYPE = constants.HTYPE_INSTANCE
2084   _OP_REQP = ["instance_name"]
2085
2086   def BuildHooksEnv(self):
2087     """Build hooks env.
2088
2089     This runs on master, primary and secondary nodes of the instance.
2090
2091     """
2092     env = _BuildInstanceHookEnvByObject(self.instance)
2093     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2094           list(self.instance.secondary_nodes))
2095     return env, nl, nl
2096
2097   def CheckPrereq(self):
2098     """Check prerequisites.
2099
2100     This checks that the instance is in the cluster.
2101
2102     """
2103     instance = self.cfg.GetInstanceInfo(
2104       self.cfg.ExpandInstanceName(self.op.instance_name))
2105     if instance is None:
2106       raise errors.OpPrereqError("Instance '%s' not known" %
2107                                  self.op.instance_name)
2108     self.instance = instance
2109
2110   def Exec(self, feedback_fn):
2111     """Shutdown the instance.
2112
2113     """
2114     instance = self.instance
2115     node_current = instance.primary_node
2116     if not rpc.call_instance_shutdown(node_current, instance):
2117       logger.Error("could not shutdown instance")
2118
2119     self.cfg.MarkInstanceDown(instance.name)
2120     _ShutdownInstanceDisks(instance, self.cfg)
2121
2122
2123 class LUReinstallInstance(LogicalUnit):
2124   """Reinstall an instance.
2125
2126   """
2127   HPATH = "instance-reinstall"
2128   HTYPE = constants.HTYPE_INSTANCE
2129   _OP_REQP = ["instance_name"]
2130
2131   def BuildHooksEnv(self):
2132     """Build hooks env.
2133
2134     This runs on master, primary and secondary nodes of the instance.
2135
2136     """
2137     env = _BuildInstanceHookEnvByObject(self.instance)
2138     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2139           list(self.instance.secondary_nodes))
2140     return env, nl, nl
2141
2142   def CheckPrereq(self):
2143     """Check prerequisites.
2144
2145     This checks that the instance is in the cluster and is not running.
2146
2147     """
2148     instance = self.cfg.GetInstanceInfo(
2149       self.cfg.ExpandInstanceName(self.op.instance_name))
2150     if instance is None:
2151       raise errors.OpPrereqError("Instance '%s' not known" %
2152                                  self.op.instance_name)
2153     if instance.disk_template == constants.DT_DISKLESS:
2154       raise errors.OpPrereqError("Instance '%s' has no disks" %
2155                                  self.op.instance_name)
2156     if instance.status != "down":
2157       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2158                                  self.op.instance_name)
2159     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2160     if remote_info:
2161       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2162                                  (self.op.instance_name,
2163                                   instance.primary_node))
2164
2165     self.op.os_type = getattr(self.op, "os_type", None)
2166     if self.op.os_type is not None:
2167       # OS verification
2168       pnode = self.cfg.GetNodeInfo(
2169         self.cfg.ExpandNodeName(instance.primary_node))
2170       if pnode is None:
2171         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2172                                    self.op.pnode)
2173       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2174       if not os_obj:
2175         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2176                                    " primary node"  % self.op.os_type)
2177
2178     self.instance = instance
2179
2180   def Exec(self, feedback_fn):
2181     """Reinstall the instance.
2182
2183     """
2184     inst = self.instance
2185
2186     if self.op.os_type is not None:
2187       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2188       inst.os = self.op.os_type
2189       self.cfg.AddInstance(inst)
2190
2191     _StartInstanceDisks(self.cfg, inst, None)
2192     try:
2193       feedback_fn("Running the instance OS create scripts...")
2194       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2195         raise errors.OpExecError("Could not install OS for instance %s "
2196                                  "on node %s" %
2197                                  (inst.name, inst.primary_node))
2198     finally:
2199       _ShutdownInstanceDisks(inst, self.cfg)
2200
2201
2202 class LURenameInstance(LogicalUnit):
2203   """Rename an instance.
2204
2205   """
2206   HPATH = "instance-rename"
2207   HTYPE = constants.HTYPE_INSTANCE
2208   _OP_REQP = ["instance_name", "new_name"]
2209
2210   def BuildHooksEnv(self):
2211     """Build hooks env.
2212
2213     This runs on master, primary and secondary nodes of the instance.
2214
2215     """
2216     env = _BuildInstanceHookEnvByObject(self.instance)
2217     env["INSTANCE_NEW_NAME"] = self.op.new_name
2218     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2219           list(self.instance.secondary_nodes))
2220     return env, nl, nl
2221
2222   def CheckPrereq(self):
2223     """Check prerequisites.
2224
2225     This checks that the instance is in the cluster and is not running.
2226
2227     """
2228     instance = self.cfg.GetInstanceInfo(
2229       self.cfg.ExpandInstanceName(self.op.instance_name))
2230     if instance is None:
2231       raise errors.OpPrereqError("Instance '%s' not known" %
2232                                  self.op.instance_name)
2233     if instance.status != "down":
2234       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2235                                  self.op.instance_name)
2236     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2237     if remote_info:
2238       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2239                                  (self.op.instance_name,
2240                                   instance.primary_node))
2241     self.instance = instance
2242
2243     # new name verification
2244     name_info = utils.HostInfo(self.op.new_name)
2245
2246     self.op.new_name = new_name = name_info.name
2247     if not getattr(self.op, "ignore_ip", False):
2248       command = ["fping", "-q", name_info.ip]
2249       result = utils.RunCmd(command)
2250       if not result.failed:
2251         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2252                                    (name_info.ip, new_name))
2253
2254
2255   def Exec(self, feedback_fn):
2256     """Reinstall the instance.
2257
2258     """
2259     inst = self.instance
2260     old_name = inst.name
2261
2262     self.cfg.RenameInstance(inst.name, self.op.new_name)
2263
2264     # re-read the instance from the configuration after rename
2265     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2266
2267     _StartInstanceDisks(self.cfg, inst, None)
2268     try:
2269       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2270                                           "sda", "sdb"):
2271         msg = ("Could run OS rename script for instance %s\n"
2272                "on node %s\n"
2273                "(but the instance has been renamed in Ganeti)" %
2274                (inst.name, inst.primary_node))
2275         logger.Error(msg)
2276     finally:
2277       _ShutdownInstanceDisks(inst, self.cfg)
2278
2279
2280 class LURemoveInstance(LogicalUnit):
2281   """Remove an instance.
2282
2283   """
2284   HPATH = "instance-remove"
2285   HTYPE = constants.HTYPE_INSTANCE
2286   _OP_REQP = ["instance_name"]
2287
2288   def BuildHooksEnv(self):
2289     """Build hooks env.
2290
2291     This runs on master, primary and secondary nodes of the instance.
2292
2293     """
2294     env = _BuildInstanceHookEnvByObject(self.instance)
2295     nl = [self.sstore.GetMasterNode()]
2296     return env, nl, nl
2297
2298   def CheckPrereq(self):
2299     """Check prerequisites.
2300
2301     This checks that the instance is in the cluster.
2302
2303     """
2304     instance = self.cfg.GetInstanceInfo(
2305       self.cfg.ExpandInstanceName(self.op.instance_name))
2306     if instance is None:
2307       raise errors.OpPrereqError("Instance '%s' not known" %
2308                                  self.op.instance_name)
2309     self.instance = instance
2310
2311   def Exec(self, feedback_fn):
2312     """Remove the instance.
2313
2314     """
2315     instance = self.instance
2316     logger.Info("shutting down instance %s on node %s" %
2317                 (instance.name, instance.primary_node))
2318
2319     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2320       if self.op.ignore_failures:
2321         feedback_fn("Warning: can't shutdown instance")
2322       else:
2323         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2324                                  (instance.name, instance.primary_node))
2325
2326     logger.Info("removing block devices for instance %s" % instance.name)
2327
2328     if not _RemoveDisks(instance, self.cfg):
2329       if self.op.ignore_failures:
2330         feedback_fn("Warning: can't remove instance's disks")
2331       else:
2332         raise errors.OpExecError("Can't remove instance's disks")
2333
2334     logger.Info("removing instance %s out of cluster config" % instance.name)
2335
2336     self.cfg.RemoveInstance(instance.name)
2337
2338
2339 class LUQueryInstances(NoHooksLU):
2340   """Logical unit for querying instances.
2341
2342   """
2343   _OP_REQP = ["output_fields", "names"]
2344
2345   def CheckPrereq(self):
2346     """Check prerequisites.
2347
2348     This checks that the fields required are valid output fields.
2349
2350     """
2351     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2352     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2353                                "admin_state", "admin_ram",
2354                                "disk_template", "ip", "mac", "bridge",
2355                                "sda_size", "sdb_size"],
2356                        dynamic=self.dynamic_fields,
2357                        selected=self.op.output_fields)
2358
2359     self.wanted = _GetWantedInstances(self, self.op.names)
2360
2361   def Exec(self, feedback_fn):
2362     """Computes the list of nodes and their attributes.
2363
2364     """
2365     instance_names = self.wanted
2366     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2367                      in instance_names]
2368
2369     # begin data gathering
2370
2371     nodes = frozenset([inst.primary_node for inst in instance_list])
2372
2373     bad_nodes = []
2374     if self.dynamic_fields.intersection(self.op.output_fields):
2375       live_data = {}
2376       node_data = rpc.call_all_instances_info(nodes)
2377       for name in nodes:
2378         result = node_data[name]
2379         if result:
2380           live_data.update(result)
2381         elif result == False:
2382           bad_nodes.append(name)
2383         # else no instance is alive
2384     else:
2385       live_data = dict([(name, {}) for name in instance_names])
2386
2387     # end data gathering
2388
2389     output = []
2390     for instance in instance_list:
2391       iout = []
2392       for field in self.op.output_fields:
2393         if field == "name":
2394           val = instance.name
2395         elif field == "os":
2396           val = instance.os
2397         elif field == "pnode":
2398           val = instance.primary_node
2399         elif field == "snodes":
2400           val = list(instance.secondary_nodes)
2401         elif field == "admin_state":
2402           val = (instance.status != "down")
2403         elif field == "oper_state":
2404           if instance.primary_node in bad_nodes:
2405             val = None
2406           else:
2407             val = bool(live_data.get(instance.name))
2408         elif field == "admin_ram":
2409           val = instance.memory
2410         elif field == "oper_ram":
2411           if instance.primary_node in bad_nodes:
2412             val = None
2413           elif instance.name in live_data:
2414             val = live_data[instance.name].get("memory", "?")
2415           else:
2416             val = "-"
2417         elif field == "disk_template":
2418           val = instance.disk_template
2419         elif field == "ip":
2420           val = instance.nics[0].ip
2421         elif field == "bridge":
2422           val = instance.nics[0].bridge
2423         elif field == "mac":
2424           val = instance.nics[0].mac
2425         elif field == "sda_size" or field == "sdb_size":
2426           disk = instance.FindDisk(field[:3])
2427           if disk is None:
2428             val = None
2429           else:
2430             val = disk.size
2431         else:
2432           raise errors.ParameterError(field)
2433         iout.append(val)
2434       output.append(iout)
2435
2436     return output
2437
2438
2439 class LUFailoverInstance(LogicalUnit):
2440   """Failover an instance.
2441
2442   """
2443   HPATH = "instance-failover"
2444   HTYPE = constants.HTYPE_INSTANCE
2445   _OP_REQP = ["instance_name", "ignore_consistency"]
2446
2447   def BuildHooksEnv(self):
2448     """Build hooks env.
2449
2450     This runs on master, primary and secondary nodes of the instance.
2451
2452     """
2453     env = {
2454       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2455       }
2456     env.update(_BuildInstanceHookEnvByObject(self.instance))
2457     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2458     return env, nl, nl
2459
2460   def CheckPrereq(self):
2461     """Check prerequisites.
2462
2463     This checks that the instance is in the cluster.
2464
2465     """
2466     instance = self.cfg.GetInstanceInfo(
2467       self.cfg.ExpandInstanceName(self.op.instance_name))
2468     if instance is None:
2469       raise errors.OpPrereqError("Instance '%s' not known" %
2470                                  self.op.instance_name)
2471
2472     if instance.disk_template not in constants.DTS_NET_MIRROR:
2473       raise errors.OpPrereqError("Instance's disk layout is not"
2474                                  " network mirrored, cannot failover.")
2475
2476     secondary_nodes = instance.secondary_nodes
2477     if not secondary_nodes:
2478       raise errors.ProgrammerError("no secondary node but using "
2479                                    "DT_REMOTE_RAID1 template")
2480
2481     # check memory requirements on the secondary node
2482     target_node = secondary_nodes[0]
2483     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2484     info = nodeinfo.get(target_node, None)
2485     if not info:
2486       raise errors.OpPrereqError("Cannot get current information"
2487                                  " from node '%s'" % nodeinfo)
2488     if instance.memory > info['memory_free']:
2489       raise errors.OpPrereqError("Not enough memory on target node %s."
2490                                  " %d MB available, %d MB required" %
2491                                  (target_node, info['memory_free'],
2492                                   instance.memory))
2493
2494     # check bridge existance
2495     brlist = [nic.bridge for nic in instance.nics]
2496     if not rpc.call_bridges_exist(target_node, brlist):
2497       raise errors.OpPrereqError("One or more target bridges %s does not"
2498                                  " exist on destination node '%s'" %
2499                                  (brlist, target_node))
2500
2501     self.instance = instance
2502
2503   def Exec(self, feedback_fn):
2504     """Failover an instance.
2505
2506     The failover is done by shutting it down on its present node and
2507     starting it on the secondary.
2508
2509     """
2510     instance = self.instance
2511
2512     source_node = instance.primary_node
2513     target_node = instance.secondary_nodes[0]
2514
2515     feedback_fn("* checking disk consistency between source and target")
2516     for dev in instance.disks:
2517       # for remote_raid1, these are md over drbd
2518       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2519         if not self.op.ignore_consistency:
2520           raise errors.OpExecError("Disk %s is degraded on target node,"
2521                                    " aborting failover." % dev.iv_name)
2522
2523     feedback_fn("* checking target node resource availability")
2524     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2525
2526     if not nodeinfo:
2527       raise errors.OpExecError("Could not contact target node %s." %
2528                                target_node)
2529
2530     free_memory = int(nodeinfo[target_node]['memory_free'])
2531     memory = instance.memory
2532     if memory > free_memory:
2533       raise errors.OpExecError("Not enough memory to create instance %s on"
2534                                " node %s. needed %s MiB, available %s MiB" %
2535                                (instance.name, target_node, memory,
2536                                 free_memory))
2537
2538     feedback_fn("* shutting down instance on source node")
2539     logger.Info("Shutting down instance %s on node %s" %
2540                 (instance.name, source_node))
2541
2542     if not rpc.call_instance_shutdown(source_node, instance):
2543       if self.op.ignore_consistency:
2544         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2545                      " anyway. Please make sure node %s is down"  %
2546                      (instance.name, source_node, source_node))
2547       else:
2548         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2549                                  (instance.name, source_node))
2550
2551     feedback_fn("* deactivating the instance's disks on source node")
2552     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2553       raise errors.OpExecError("Can't shut down the instance's disks.")
2554
2555     instance.primary_node = target_node
2556     # distribute new instance config to the other nodes
2557     self.cfg.AddInstance(instance)
2558
2559     feedback_fn("* activating the instance's disks on target node")
2560     logger.Info("Starting instance %s on node %s" %
2561                 (instance.name, target_node))
2562
2563     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2564                                              ignore_secondaries=True)
2565     if not disks_ok:
2566       _ShutdownInstanceDisks(instance, self.cfg)
2567       raise errors.OpExecError("Can't activate the instance's disks")
2568
2569     feedback_fn("* starting the instance on the target node")
2570     if not rpc.call_instance_start(target_node, instance, None):
2571       _ShutdownInstanceDisks(instance, self.cfg)
2572       raise errors.OpExecError("Could not start instance %s on node %s." %
2573                                (instance.name, target_node))
2574
2575
2576 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2577   """Create a tree of block devices on the primary node.
2578
2579   This always creates all devices.
2580
2581   """
2582   if device.children:
2583     for child in device.children:
2584       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2585         return False
2586
2587   cfg.SetDiskID(device, node)
2588   new_id = rpc.call_blockdev_create(node, device, device.size,
2589                                     instance.name, True, info)
2590   if not new_id:
2591     return False
2592   if device.physical_id is None:
2593     device.physical_id = new_id
2594   return True
2595
2596
2597 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2598   """Create a tree of block devices on a secondary node.
2599
2600   If this device type has to be created on secondaries, create it and
2601   all its children.
2602
2603   If not, just recurse to children keeping the same 'force' value.
2604
2605   """
2606   if device.CreateOnSecondary():
2607     force = True
2608   if device.children:
2609     for child in device.children:
2610       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2611                                         child, force, info):
2612         return False
2613
2614   if not force:
2615     return True
2616   cfg.SetDiskID(device, node)
2617   new_id = rpc.call_blockdev_create(node, device, device.size,
2618                                     instance.name, False, info)
2619   if not new_id:
2620     return False
2621   if device.physical_id is None:
2622     device.physical_id = new_id
2623   return True
2624
2625
2626 def _GenerateUniqueNames(cfg, exts):
2627   """Generate a suitable LV name.
2628
2629   This will generate a logical volume name for the given instance.
2630
2631   """
2632   results = []
2633   for val in exts:
2634     new_id = cfg.GenerateUniqueID()
2635     results.append("%s%s" % (new_id, val))
2636   return results
2637
2638
2639 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2640   """Generate a drbd device complete with its children.
2641
2642   """
2643   port = cfg.AllocatePort()
2644   vgname = cfg.GetVGName()
2645   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2646                           logical_id=(vgname, names[0]))
2647   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2648                           logical_id=(vgname, names[1]))
2649   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2650                           logical_id = (primary, secondary, port),
2651                           children = [dev_data, dev_meta])
2652   return drbd_dev
2653
2654
2655 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2656   """Generate a drbd8 device complete with its children.
2657
2658   """
2659   port = cfg.AllocatePort()
2660   vgname = cfg.GetVGName()
2661   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662                           logical_id=(vgname, names[0]))
2663   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664                           logical_id=(vgname, names[1]))
2665   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2666                           logical_id = (primary, secondary, port),
2667                           children = [dev_data, dev_meta],
2668                           iv_name=iv_name)
2669   return drbd_dev
2670
2671 def _GenerateDiskTemplate(cfg, template_name,
2672                           instance_name, primary_node,
2673                           secondary_nodes, disk_sz, swap_sz):
2674   """Generate the entire disk layout for a given template type.
2675
2676   """
2677   #TODO: compute space requirements
2678
2679   vgname = cfg.GetVGName()
2680   if template_name == "diskless":
2681     disks = []
2682   elif template_name == "plain":
2683     if len(secondary_nodes) != 0:
2684       raise errors.ProgrammerError("Wrong template configuration")
2685
2686     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2687     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688                            logical_id=(vgname, names[0]),
2689                            iv_name = "sda")
2690     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2691                            logical_id=(vgname, names[1]),
2692                            iv_name = "sdb")
2693     disks = [sda_dev, sdb_dev]
2694   elif template_name == "local_raid1":
2695     if len(secondary_nodes) != 0:
2696       raise errors.ProgrammerError("Wrong template configuration")
2697
2698
2699     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2700                                        ".sdb_m1", ".sdb_m2"])
2701     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2702                               logical_id=(vgname, names[0]))
2703     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704                               logical_id=(vgname, names[1]))
2705     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2706                               size=disk_sz,
2707                               children = [sda_dev_m1, sda_dev_m2])
2708     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2709                               logical_id=(vgname, names[2]))
2710     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2711                               logical_id=(vgname, names[3]))
2712     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2713                               size=swap_sz,
2714                               children = [sdb_dev_m1, sdb_dev_m2])
2715     disks = [md_sda_dev, md_sdb_dev]
2716   elif template_name == constants.DT_REMOTE_RAID1:
2717     if len(secondary_nodes) != 1:
2718       raise errors.ProgrammerError("Wrong template configuration")
2719     remote_node = secondary_nodes[0]
2720     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721                                        ".sdb_data", ".sdb_meta"])
2722     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2723                                          disk_sz, names[0:2])
2724     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2725                               children = [drbd_sda_dev], size=disk_sz)
2726     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2727                                          swap_sz, names[2:4])
2728     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2729                               children = [drbd_sdb_dev], size=swap_sz)
2730     disks = [md_sda_dev, md_sdb_dev]
2731   elif template_name == constants.DT_DRBD8:
2732     if len(secondary_nodes) != 1:
2733       raise errors.ProgrammerError("Wrong template configuration")
2734     remote_node = secondary_nodes[0]
2735     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2736                                        ".sdb_data", ".sdb_meta"])
2737     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2738                                          disk_sz, names[0:2], "sda")
2739     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2740                                          swap_sz, names[2:4], "sdb")
2741     disks = [drbd_sda_dev, drbd_sdb_dev]
2742   else:
2743     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2744   return disks
2745
2746
2747 def _GetInstanceInfoText(instance):
2748   """Compute that text that should be added to the disk's metadata.
2749
2750   """
2751   return "originstname+%s" % instance.name
2752
2753
2754 def _CreateDisks(cfg, instance):
2755   """Create all disks for an instance.
2756
2757   This abstracts away some work from AddInstance.
2758
2759   Args:
2760     instance: the instance object
2761
2762   Returns:
2763     True or False showing the success of the creation process
2764
2765   """
2766   info = _GetInstanceInfoText(instance)
2767
2768   for device in instance.disks:
2769     logger.Info("creating volume %s for instance %s" %
2770               (device.iv_name, instance.name))
2771     #HARDCODE
2772     for secondary_node in instance.secondary_nodes:
2773       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2774                                         device, False, info):
2775         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2776                      (device.iv_name, device, secondary_node))
2777         return False
2778     #HARDCODE
2779     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2780                                     instance, device, info):
2781       logger.Error("failed to create volume %s on primary!" %
2782                    device.iv_name)
2783       return False
2784   return True
2785
2786
2787 def _RemoveDisks(instance, cfg):
2788   """Remove all disks for an instance.
2789
2790   This abstracts away some work from `AddInstance()` and
2791   `RemoveInstance()`. Note that in case some of the devices couldn't
2792   be removed, the removal will continue with the other ones (compare
2793   with `_CreateDisks()`).
2794
2795   Args:
2796     instance: the instance object
2797
2798   Returns:
2799     True or False showing the success of the removal proces
2800
2801   """
2802   logger.Info("removing block devices for instance %s" % instance.name)
2803
2804   result = True
2805   for device in instance.disks:
2806     for node, disk in device.ComputeNodeTree(instance.primary_node):
2807       cfg.SetDiskID(disk, node)
2808       if not rpc.call_blockdev_remove(node, disk):
2809         logger.Error("could not remove block device %s on node %s,"
2810                      " continuing anyway" %
2811                      (device.iv_name, node))
2812         result = False
2813   return result
2814
2815
2816 class LUCreateInstance(LogicalUnit):
2817   """Create an instance.
2818
2819   """
2820   HPATH = "instance-add"
2821   HTYPE = constants.HTYPE_INSTANCE
2822   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2823               "disk_template", "swap_size", "mode", "start", "vcpus",
2824               "wait_for_sync", "ip_check"]
2825
2826   def BuildHooksEnv(self):
2827     """Build hooks env.
2828
2829     This runs on master, primary and secondary nodes of the instance.
2830
2831     """
2832     env = {
2833       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2834       "INSTANCE_DISK_SIZE": self.op.disk_size,
2835       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2836       "INSTANCE_ADD_MODE": self.op.mode,
2837       }
2838     if self.op.mode == constants.INSTANCE_IMPORT:
2839       env["INSTANCE_SRC_NODE"] = self.op.src_node
2840       env["INSTANCE_SRC_PATH"] = self.op.src_path
2841       env["INSTANCE_SRC_IMAGE"] = self.src_image
2842
2843     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2844       primary_node=self.op.pnode,
2845       secondary_nodes=self.secondaries,
2846       status=self.instance_status,
2847       os_type=self.op.os_type,
2848       memory=self.op.mem_size,
2849       vcpus=self.op.vcpus,
2850       nics=[(self.inst_ip, self.op.bridge)],
2851     ))
2852
2853     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2854           self.secondaries)
2855     return env, nl, nl
2856
2857
2858   def CheckPrereq(self):
2859     """Check prerequisites.
2860
2861     """
2862     if self.op.mode not in (constants.INSTANCE_CREATE,
2863                             constants.INSTANCE_IMPORT):
2864       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2865                                  self.op.mode)
2866
2867     if self.op.mode == constants.INSTANCE_IMPORT:
2868       src_node = getattr(self.op, "src_node", None)
2869       src_path = getattr(self.op, "src_path", None)
2870       if src_node is None or src_path is None:
2871         raise errors.OpPrereqError("Importing an instance requires source"
2872                                    " node and path options")
2873       src_node_full = self.cfg.ExpandNodeName(src_node)
2874       if src_node_full is None:
2875         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2876       self.op.src_node = src_node = src_node_full
2877
2878       if not os.path.isabs(src_path):
2879         raise errors.OpPrereqError("The source path must be absolute")
2880
2881       export_info = rpc.call_export_info(src_node, src_path)
2882
2883       if not export_info:
2884         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2885
2886       if not export_info.has_section(constants.INISECT_EXP):
2887         raise errors.ProgrammerError("Corrupted export config")
2888
2889       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2890       if (int(ei_version) != constants.EXPORT_VERSION):
2891         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2892                                    (ei_version, constants.EXPORT_VERSION))
2893
2894       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2895         raise errors.OpPrereqError("Can't import instance with more than"
2896                                    " one data disk")
2897
2898       # FIXME: are the old os-es, disk sizes, etc. useful?
2899       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2900       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2901                                                          'disk0_dump'))
2902       self.src_image = diskimage
2903     else: # INSTANCE_CREATE
2904       if getattr(self.op, "os_type", None) is None:
2905         raise errors.OpPrereqError("No guest OS specified")
2906
2907     # check primary node
2908     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2909     if pnode is None:
2910       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2911                                  self.op.pnode)
2912     self.op.pnode = pnode.name
2913     self.pnode = pnode
2914     self.secondaries = []
2915     # disk template and mirror node verification
2916     if self.op.disk_template not in constants.DISK_TEMPLATES:
2917       raise errors.OpPrereqError("Invalid disk template name")
2918
2919     if self.op.disk_template in constants.DTS_NET_MIRROR:
2920       if getattr(self.op, "snode", None) is None:
2921         raise errors.OpPrereqError("The networked disk templates need"
2922                                    " a mirror node")
2923
2924       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2925       if snode_name is None:
2926         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2927                                    self.op.snode)
2928       elif snode_name == pnode.name:
2929         raise errors.OpPrereqError("The secondary node cannot be"
2930                                    " the primary node.")
2931       self.secondaries.append(snode_name)
2932
2933     # Check lv size requirements
2934     nodenames = [pnode.name] + self.secondaries
2935     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2936
2937     # Required free disk space as a function of disk and swap space
2938     req_size_dict = {
2939       constants.DT_DISKLESS: 0,
2940       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2941       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2942       # 256 MB are added for drbd metadata, 128MB for each drbd device
2943       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2944       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2945     }
2946
2947     if self.op.disk_template not in req_size_dict:
2948       raise errors.ProgrammerError("Disk template '%s' size requirement"
2949                                    " is unknown" %  self.op.disk_template)
2950
2951     req_size = req_size_dict[self.op.disk_template]
2952
2953     for node in nodenames:
2954       info = nodeinfo.get(node, None)
2955       if not info:
2956         raise errors.OpPrereqError("Cannot get current information"
2957                                    " from node '%s'" % nodeinfo)
2958       if req_size > info['vg_free']:
2959         raise errors.OpPrereqError("Not enough disk space on target node %s."
2960                                    " %d MB available, %d MB required" %
2961                                    (node, info['vg_free'], req_size))
2962
2963     # os verification
2964     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2965     if not os_obj:
2966       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2967                                  " primary node"  % self.op.os_type)
2968
2969     # instance verification
2970     hostname1 = utils.HostInfo(self.op.instance_name)
2971
2972     self.op.instance_name = instance_name = hostname1.name
2973     instance_list = self.cfg.GetInstanceList()
2974     if instance_name in instance_list:
2975       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2976                                  instance_name)
2977
2978     ip = getattr(self.op, "ip", None)
2979     if ip is None or ip.lower() == "none":
2980       inst_ip = None
2981     elif ip.lower() == "auto":
2982       inst_ip = hostname1.ip
2983     else:
2984       if not utils.IsValidIP(ip):
2985         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2986                                    " like a valid IP" % ip)
2987       inst_ip = ip
2988     self.inst_ip = inst_ip
2989
2990     if self.op.start and not self.op.ip_check:
2991       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2992                                  " adding an instance in start mode")
2993
2994     if self.op.ip_check:
2995       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2996                        constants.DEFAULT_NODED_PORT):
2997         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2998                                    (hostname1.ip, instance_name))
2999
3000     # bridge verification
3001     bridge = getattr(self.op, "bridge", None)
3002     if bridge is None:
3003       self.op.bridge = self.cfg.GetDefBridge()
3004     else:
3005       self.op.bridge = bridge
3006
3007     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3008       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3009                                  " destination node '%s'" %
3010                                  (self.op.bridge, pnode.name))
3011
3012     if self.op.start:
3013       self.instance_status = 'up'
3014     else:
3015       self.instance_status = 'down'
3016
3017   def Exec(self, feedback_fn):
3018     """Create and add the instance to the cluster.
3019
3020     """
3021     instance = self.op.instance_name
3022     pnode_name = self.pnode.name
3023
3024     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3025     if self.inst_ip is not None:
3026       nic.ip = self.inst_ip
3027
3028     disks = _GenerateDiskTemplate(self.cfg,
3029                                   self.op.disk_template,
3030                                   instance, pnode_name,
3031                                   self.secondaries, self.op.disk_size,
3032                                   self.op.swap_size)
3033
3034     iobj = objects.Instance(name=instance, os=self.op.os_type,
3035                             primary_node=pnode_name,
3036                             memory=self.op.mem_size,
3037                             vcpus=self.op.vcpus,
3038                             nics=[nic], disks=disks,
3039                             disk_template=self.op.disk_template,
3040                             status=self.instance_status,
3041                             )
3042
3043     feedback_fn("* creating instance disks...")
3044     if not _CreateDisks(self.cfg, iobj):
3045       _RemoveDisks(iobj, self.cfg)
3046       raise errors.OpExecError("Device creation failed, reverting...")
3047
3048     feedback_fn("adding instance %s to cluster config" % instance)
3049
3050     self.cfg.AddInstance(iobj)
3051
3052     if self.op.wait_for_sync:
3053       disk_abort = not _WaitForSync(self.cfg, iobj)
3054     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3055       # make sure the disks are not degraded (still sync-ing is ok)
3056       time.sleep(15)
3057       feedback_fn("* checking mirrors status")
3058       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3059     else:
3060       disk_abort = False
3061
3062     if disk_abort:
3063       _RemoveDisks(iobj, self.cfg)
3064       self.cfg.RemoveInstance(iobj.name)
3065       raise errors.OpExecError("There are some degraded disks for"
3066                                " this instance")
3067
3068     feedback_fn("creating os for instance %s on node %s" %
3069                 (instance, pnode_name))
3070
3071     if iobj.disk_template != constants.DT_DISKLESS:
3072       if self.op.mode == constants.INSTANCE_CREATE:
3073         feedback_fn("* running the instance OS create scripts...")
3074         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3075           raise errors.OpExecError("could not add os for instance %s"
3076                                    " on node %s" %
3077                                    (instance, pnode_name))
3078
3079       elif self.op.mode == constants.INSTANCE_IMPORT:
3080         feedback_fn("* running the instance OS import scripts...")
3081         src_node = self.op.src_node
3082         src_image = self.src_image
3083         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3084                                                 src_node, src_image):
3085           raise errors.OpExecError("Could not import os for instance"
3086                                    " %s on node %s" %
3087                                    (instance, pnode_name))
3088       else:
3089         # also checked in the prereq part
3090         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3091                                      % self.op.mode)
3092
3093     if self.op.start:
3094       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3095       feedback_fn("* starting instance...")
3096       if not rpc.call_instance_start(pnode_name, iobj, None):
3097         raise errors.OpExecError("Could not start instance")
3098
3099
3100 class LUConnectConsole(NoHooksLU):
3101   """Connect to an instance's console.
3102
3103   This is somewhat special in that it returns the command line that
3104   you need to run on the master node in order to connect to the
3105   console.
3106
3107   """
3108   _OP_REQP = ["instance_name"]
3109
3110   def CheckPrereq(self):
3111     """Check prerequisites.
3112
3113     This checks that the instance is in the cluster.
3114
3115     """
3116     instance = self.cfg.GetInstanceInfo(
3117       self.cfg.ExpandInstanceName(self.op.instance_name))
3118     if instance is None:
3119       raise errors.OpPrereqError("Instance '%s' not known" %
3120                                  self.op.instance_name)
3121     self.instance = instance
3122
3123   def Exec(self, feedback_fn):
3124     """Connect to the console of an instance
3125
3126     """
3127     instance = self.instance
3128     node = instance.primary_node
3129
3130     node_insts = rpc.call_instance_list([node])[node]
3131     if node_insts is False:
3132       raise errors.OpExecError("Can't connect to node %s." % node)
3133
3134     if instance.name not in node_insts:
3135       raise errors.OpExecError("Instance %s is not running." % instance.name)
3136
3137     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3138
3139     hyper = hypervisor.GetHypervisor()
3140     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3141     # build ssh cmdline
3142     argv = ["ssh", "-q", "-t"]
3143     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3144     argv.extend(ssh.BATCH_MODE_OPTS)
3145     argv.append(node)
3146     argv.append(console_cmd)
3147     return "ssh", argv
3148
3149
3150 class LUAddMDDRBDComponent(LogicalUnit):
3151   """Adda new mirror member to an instance's disk.
3152
3153   """
3154   HPATH = "mirror-add"
3155   HTYPE = constants.HTYPE_INSTANCE
3156   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3157
3158   def BuildHooksEnv(self):
3159     """Build hooks env.
3160
3161     This runs on the master, the primary and all the secondaries.
3162
3163     """
3164     env = {
3165       "NEW_SECONDARY": self.op.remote_node,
3166       "DISK_NAME": self.op.disk_name,
3167       }
3168     env.update(_BuildInstanceHookEnvByObject(self.instance))
3169     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3170           self.op.remote_node,] + list(self.instance.secondary_nodes)
3171     return env, nl, nl
3172
3173   def CheckPrereq(self):
3174     """Check prerequisites.
3175
3176     This checks that the instance is in the cluster.
3177
3178     """
3179     instance = self.cfg.GetInstanceInfo(
3180       self.cfg.ExpandInstanceName(self.op.instance_name))
3181     if instance is None:
3182       raise errors.OpPrereqError("Instance '%s' not known" %
3183                                  self.op.instance_name)
3184     self.instance = instance
3185
3186     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3187     if remote_node is None:
3188       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3189     self.remote_node = remote_node
3190
3191     if remote_node == instance.primary_node:
3192       raise errors.OpPrereqError("The specified node is the primary node of"
3193                                  " the instance.")
3194
3195     if instance.disk_template != constants.DT_REMOTE_RAID1:
3196       raise errors.OpPrereqError("Instance's disk layout is not"
3197                                  " remote_raid1.")
3198     for disk in instance.disks:
3199       if disk.iv_name == self.op.disk_name:
3200         break
3201     else:
3202       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3203                                  " instance." % self.op.disk_name)
3204     if len(disk.children) > 1:
3205       raise errors.OpPrereqError("The device already has two slave"
3206                                  " devices.\n"
3207                                  "This would create a 3-disk raid1"
3208                                  " which we don't allow.")
3209     self.disk = disk
3210
3211   def Exec(self, feedback_fn):
3212     """Add the mirror component
3213
3214     """
3215     disk = self.disk
3216     instance = self.instance
3217
3218     remote_node = self.remote_node
3219     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3220     names = _GenerateUniqueNames(self.cfg, lv_names)
3221     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3222                                      remote_node, disk.size, names)
3223
3224     logger.Info("adding new mirror component on secondary")
3225     #HARDCODE
3226     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3227                                       new_drbd, False,
3228                                       _GetInstanceInfoText(instance)):
3229       raise errors.OpExecError("Failed to create new component on secondary"
3230                                " node %s" % remote_node)
3231
3232     logger.Info("adding new mirror component on primary")
3233     #HARDCODE
3234     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3235                                     instance, new_drbd,
3236                                     _GetInstanceInfoText(instance)):
3237       # remove secondary dev
3238       self.cfg.SetDiskID(new_drbd, remote_node)
3239       rpc.call_blockdev_remove(remote_node, new_drbd)
3240       raise errors.OpExecError("Failed to create volume on primary")
3241
3242     # the device exists now
3243     # call the primary node to add the mirror to md
3244     logger.Info("adding new mirror component to md")
3245     if not rpc.call_blockdev_addchildren(instance.primary_node,
3246                                          disk, [new_drbd]):
3247       logger.Error("Can't add mirror compoment to md!")
3248       self.cfg.SetDiskID(new_drbd, remote_node)
3249       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3250         logger.Error("Can't rollback on secondary")
3251       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3252       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3253         logger.Error("Can't rollback on primary")
3254       raise errors.OpExecError("Can't add mirror component to md array")
3255
3256     disk.children.append(new_drbd)
3257
3258     self.cfg.AddInstance(instance)
3259
3260     _WaitForSync(self.cfg, instance)
3261
3262     return 0
3263
3264
3265 class LURemoveMDDRBDComponent(LogicalUnit):
3266   """Remove a component from a remote_raid1 disk.
3267
3268   """
3269   HPATH = "mirror-remove"
3270   HTYPE = constants.HTYPE_INSTANCE
3271   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3272
3273   def BuildHooksEnv(self):
3274     """Build hooks env.
3275
3276     This runs on the master, the primary and all the secondaries.
3277
3278     """
3279     env = {
3280       "DISK_NAME": self.op.disk_name,
3281       "DISK_ID": self.op.disk_id,
3282       "OLD_SECONDARY": self.old_secondary,
3283       }
3284     env.update(_BuildInstanceHookEnvByObject(self.instance))
3285     nl = [self.sstore.GetMasterNode(),
3286           self.instance.primary_node] + list(self.instance.secondary_nodes)
3287     return env, nl, nl
3288
3289   def CheckPrereq(self):
3290     """Check prerequisites.
3291
3292     This checks that the instance is in the cluster.
3293
3294     """
3295     instance = self.cfg.GetInstanceInfo(
3296       self.cfg.ExpandInstanceName(self.op.instance_name))
3297     if instance is None:
3298       raise errors.OpPrereqError("Instance '%s' not known" %
3299                                  self.op.instance_name)
3300     self.instance = instance
3301
3302     if instance.disk_template != constants.DT_REMOTE_RAID1:
3303       raise errors.OpPrereqError("Instance's disk layout is not"
3304                                  " remote_raid1.")
3305     for disk in instance.disks:
3306       if disk.iv_name == self.op.disk_name:
3307         break
3308     else:
3309       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310                                  " instance." % self.op.disk_name)
3311     for child in disk.children:
3312       if (child.dev_type == constants.LD_DRBD7 and
3313           child.logical_id[2] == self.op.disk_id):
3314         break
3315     else:
3316       raise errors.OpPrereqError("Can't find the device with this port.")
3317
3318     if len(disk.children) < 2:
3319       raise errors.OpPrereqError("Cannot remove the last component from"
3320                                  " a mirror.")
3321     self.disk = disk
3322     self.child = child
3323     if self.child.logical_id[0] == instance.primary_node:
3324       oid = 1
3325     else:
3326       oid = 0
3327     self.old_secondary = self.child.logical_id[oid]
3328
3329   def Exec(self, feedback_fn):
3330     """Remove the mirror component
3331
3332     """
3333     instance = self.instance
3334     disk = self.disk
3335     child = self.child
3336     logger.Info("remove mirror component")
3337     self.cfg.SetDiskID(disk, instance.primary_node)
3338     if not rpc.call_blockdev_removechildren(instance.primary_node,
3339                                             disk, [child]):
3340       raise errors.OpExecError("Can't remove child from mirror.")
3341
3342     for node in child.logical_id[:2]:
3343       self.cfg.SetDiskID(child, node)
3344       if not rpc.call_blockdev_remove(node, child):
3345         logger.Error("Warning: failed to remove device from node %s,"
3346                      " continuing operation." % node)
3347
3348     disk.children.remove(child)
3349     self.cfg.AddInstance(instance)
3350
3351
3352 class LUReplaceDisks(LogicalUnit):
3353   """Replace the disks of an instance.
3354
3355   """
3356   HPATH = "mirrors-replace"
3357   HTYPE = constants.HTYPE_INSTANCE
3358   _OP_REQP = ["instance_name", "mode", "disks"]
3359
3360   def BuildHooksEnv(self):
3361     """Build hooks env.
3362
3363     This runs on the master, the primary and all the secondaries.
3364
3365     """
3366     env = {
3367       "MODE": self.op.mode,
3368       "NEW_SECONDARY": self.op.remote_node,
3369       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3370       }
3371     env.update(_BuildInstanceHookEnvByObject(self.instance))
3372     nl = [
3373       self.sstore.GetMasterNode(),
3374       self.instance.primary_node,
3375       ]
3376     if self.op.remote_node is not None:
3377       nl.append(self.op.remote_node)
3378     return env, nl, nl
3379
3380   def CheckPrereq(self):
3381     """Check prerequisites.
3382
3383     This checks that the instance is in the cluster.
3384
3385     """
3386     instance = self.cfg.GetInstanceInfo(
3387       self.cfg.ExpandInstanceName(self.op.instance_name))
3388     if instance is None:
3389       raise errors.OpPrereqError("Instance '%s' not known" %
3390                                  self.op.instance_name)
3391     self.instance = instance
3392
3393     if instance.disk_template not in constants.DTS_NET_MIRROR:
3394       raise errors.OpPrereqError("Instance's disk layout is not"
3395                                  " network mirrored.")
3396
3397     if len(instance.secondary_nodes) != 1:
3398       raise errors.OpPrereqError("The instance has a strange layout,"
3399                                  " expected one secondary but found %d" %
3400                                  len(instance.secondary_nodes))
3401
3402     self.sec_node = instance.secondary_nodes[0]
3403
3404     remote_node = getattr(self.op, "remote_node", None)
3405     if remote_node is not None:
3406       remote_node = self.cfg.ExpandNodeName(remote_node)
3407       if remote_node is None:
3408         raise errors.OpPrereqError("Node '%s' not known" %
3409                                    self.op.remote_node)
3410       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3411     else:
3412       self.remote_node_info = None
3413     if remote_node == instance.primary_node:
3414       raise errors.OpPrereqError("The specified node is the primary node of"
3415                                  " the instance.")
3416     elif remote_node == self.sec_node:
3417       if self.op.mode == constants.REPLACE_DISK_SEC:
3418         # this is for DRBD8, where we can't execute the same mode of
3419         # replacement as for drbd7 (no different port allocated)
3420         raise errors.OpPrereqError("Same secondary given, cannot execute"
3421                                    " replacement")
3422       # the user gave the current secondary, switch to
3423       # 'no-replace-secondary' mode for drbd7
3424       remote_node = None
3425     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3426         self.op.mode != constants.REPLACE_DISK_ALL):
3427       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3428                                  " disks replacement, not individual ones")
3429     if instance.disk_template == constants.DT_DRBD8:
3430       if self.op.mode == constants.REPLACE_DISK_ALL:
3431         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3432                                    " secondary disk replacement, not"
3433                                    " both at once")
3434       elif self.op.mode == constants.REPLACE_DISK_PRI:
3435         if remote_node is not None:
3436           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3437                                      " the secondary while doing a primary"
3438                                      " node disk replacement")
3439         self.tgt_node = instance.primary_node
3440         self.oth_node = instance.secondary_nodes[0]
3441       elif self.op.mode == constants.REPLACE_DISK_SEC:
3442         self.new_node = remote_node # this can be None, in which case
3443                                     # we don't change the secondary
3444         self.tgt_node = instance.secondary_nodes[0]
3445         self.oth_node = instance.primary_node
3446       else:
3447         raise errors.ProgrammerError("Unhandled disk replace mode")
3448
3449     for name in self.op.disks:
3450       if instance.FindDisk(name) is None:
3451         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3452                                    (name, instance.name))
3453     self.op.remote_node = remote_node
3454
3455   def _ExecRR1(self, feedback_fn):
3456     """Replace the disks of an instance.
3457
3458     """
3459     instance = self.instance
3460     iv_names = {}
3461     # start of work
3462     if self.op.remote_node is None:
3463       remote_node = self.sec_node
3464     else:
3465       remote_node = self.op.remote_node
3466     cfg = self.cfg
3467     for dev in instance.disks:
3468       size = dev.size
3469       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3470       names = _GenerateUniqueNames(cfg, lv_names)
3471       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3472                                        remote_node, size, names)
3473       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3474       logger.Info("adding new mirror component on secondary for %s" %
3475                   dev.iv_name)
3476       #HARDCODE
3477       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3478                                         new_drbd, False,
3479                                         _GetInstanceInfoText(instance)):
3480         raise errors.OpExecError("Failed to create new component on"
3481                                  " secondary node %s\n"
3482                                  "Full abort, cleanup manually!" %
3483                                  remote_node)
3484
3485       logger.Info("adding new mirror component on primary")
3486       #HARDCODE
3487       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3488                                       instance, new_drbd,
3489                                       _GetInstanceInfoText(instance)):
3490         # remove secondary dev
3491         cfg.SetDiskID(new_drbd, remote_node)
3492         rpc.call_blockdev_remove(remote_node, new_drbd)
3493         raise errors.OpExecError("Failed to create volume on primary!\n"
3494                                  "Full abort, cleanup manually!!")
3495
3496       # the device exists now
3497       # call the primary node to add the mirror to md
3498       logger.Info("adding new mirror component to md")
3499       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3500                                            [new_drbd]):
3501         logger.Error("Can't add mirror compoment to md!")
3502         cfg.SetDiskID(new_drbd, remote_node)
3503         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3504           logger.Error("Can't rollback on secondary")
3505         cfg.SetDiskID(new_drbd, instance.primary_node)
3506         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3507           logger.Error("Can't rollback on primary")
3508         raise errors.OpExecError("Full abort, cleanup manually!!")
3509
3510       dev.children.append(new_drbd)
3511       cfg.AddInstance(instance)
3512
3513     # this can fail as the old devices are degraded and _WaitForSync
3514     # does a combined result over all disks, so we don't check its
3515     # return value
3516     _WaitForSync(cfg, instance, unlock=True)
3517
3518     # so check manually all the devices
3519     for name in iv_names:
3520       dev, child, new_drbd = iv_names[name]
3521       cfg.SetDiskID(dev, instance.primary_node)
3522       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3523       if is_degr:
3524         raise errors.OpExecError("MD device %s is degraded!" % name)
3525       cfg.SetDiskID(new_drbd, instance.primary_node)
3526       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3527       if is_degr:
3528         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3529
3530     for name in iv_names:
3531       dev, child, new_drbd = iv_names[name]
3532       logger.Info("remove mirror %s component" % name)
3533       cfg.SetDiskID(dev, instance.primary_node)
3534       if not rpc.call_blockdev_removechildren(instance.primary_node,
3535                                               dev, [child]):
3536         logger.Error("Can't remove child from mirror, aborting"
3537                      " *this device cleanup*.\nYou need to cleanup manually!!")
3538         continue
3539
3540       for node in child.logical_id[:2]:
3541         logger.Info("remove child device on %s" % node)
3542         cfg.SetDiskID(child, node)
3543         if not rpc.call_blockdev_remove(node, child):
3544           logger.Error("Warning: failed to remove device from node %s,"
3545                        " continuing operation." % node)
3546
3547       dev.children.remove(child)
3548
3549       cfg.AddInstance(instance)
3550
3551   def _ExecD8DiskOnly(self, feedback_fn):
3552     """Replace a disk on the primary or secondary for dbrd8.
3553
3554     The algorithm for replace is quite complicated:
3555       - for each disk to be replaced:
3556         - create new LVs on the target node with unique names
3557         - detach old LVs from the drbd device
3558         - rename old LVs to name_replaced.<time_t>
3559         - rename new LVs to old LVs
3560         - attach the new LVs (with the old names now) to the drbd device
3561       - wait for sync across all devices
3562       - for each modified disk:
3563         - remove old LVs (which have the name name_replaces.<time_t>)
3564
3565     Failures are not very well handled.
3566
3567     """
3568     steps_total = 6
3569     warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3570     instance = self.instance
3571     iv_names = {}
3572     vgname = self.cfg.GetVGName()
3573     # start of work
3574     cfg = self.cfg
3575     tgt_node = self.tgt_node
3576     oth_node = self.oth_node
3577
3578     # Step: check device activation
3579     self.processor.LogStep(1, steps_total, "check device existence")
3580     info("checking volume groups")
3581     my_vg = cfg.GetVGName()
3582     results = rpc.call_vg_list([oth_node, tgt_node])
3583     if not results:
3584       raise errors.OpExecError("Can't list volume groups on the nodes")
3585     for node in oth_node, tgt_node:
3586       res = results.get(node, False)
3587       if not res or my_vg not in res:
3588         raise errors.OpExecError("Volume group '%s' not found on %s" %
3589                                  (my_vg, node))
3590     for dev in instance.disks:
3591       if not dev.iv_name in self.op.disks:
3592         continue
3593       for node in tgt_node, oth_node:
3594         info("checking %s on %s" % (dev.iv_name, node))
3595         cfg.SetDiskID(dev, node)
3596         if not rpc.call_blockdev_find(node, dev):
3597           raise errors.OpExecError("Can't find device %s on node %s" %
3598                                    (dev.iv_name, node))
3599
3600     # Step: check other node consistency
3601     self.processor.LogStep(2, steps_total, "check peer consistency")
3602     for dev in instance.disks:
3603       if not dev.iv_name in self.op.disks:
3604         continue
3605       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3606       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3607                                    oth_node==instance.primary_node):
3608         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3609                                  " to replace disks on this node (%s)" %
3610                                  (oth_node, tgt_node))
3611
3612     # Step: create new storage
3613     self.processor.LogStep(3, steps_total, "allocate new storage")
3614     for dev in instance.disks:
3615       if not dev.iv_name in self.op.disks:
3616         continue
3617       size = dev.size
3618       cfg.SetDiskID(dev, tgt_node)
3619       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3620       names = _GenerateUniqueNames(cfg, lv_names)
3621       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3622                              logical_id=(vgname, names[0]))
3623       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3624                              logical_id=(vgname, names[1]))
3625       new_lvs = [lv_data, lv_meta]
3626       old_lvs = dev.children
3627       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3628       info("creating new local storage on %s for %s" %
3629            (tgt_node, dev.iv_name))
3630       # since we *always* want to create this LV, we use the
3631       # _Create...OnPrimary (which forces the creation), even if we
3632       # are talking about the secondary node
3633       for new_lv in new_lvs:
3634         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3635                                         _GetInstanceInfoText(instance)):
3636           raise errors.OpExecError("Failed to create new LV named '%s' on"
3637                                    " node '%s'" %
3638                                    (new_lv.logical_id[1], tgt_node))
3639
3640     # Step: for each lv, detach+rename*2+attach
3641     self.processor.LogStep(4, steps_total, "change drbd configuration")
3642     for dev, old_lvs, new_lvs in iv_names.itervalues():
3643       info("detaching %s drbd from local storage" % dev.iv_name)
3644       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3645         raise errors.OpExecError("Can't detach drbd from local storage on node"
3646                                  " %s for device %s" % (tgt_node, dev.iv_name))
3647       #dev.children = []
3648       #cfg.Update(instance)
3649
3650       # ok, we created the new LVs, so now we know we have the needed
3651       # storage; as such, we proceed on the target node to rename
3652       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3653       # using the assumption than logical_id == physical_id (which in
3654       # turn is the unique_id on that node)
3655
3656       # FIXME(iustin): use a better name for the replaced LVs
3657       temp_suffix = int(time.time())
3658       ren_fn = lambda d, suff: (d.physical_id[0],
3659                                 d.physical_id[1] + "_replaced-%s" % suff)
3660       # build the rename list based on what LVs exist on the node
3661       rlist = []
3662       for to_ren in old_lvs:
3663         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3664         if find_res is not None: # device exists
3665           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3666
3667       info("renaming the old LVs on the target node")
3668       if not rpc.call_blockdev_rename(tgt_node, rlist):
3669         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3670       # now we rename the new LVs to the old LVs
3671       info("renaming the new LVs on the target node")
3672       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3673       if not rpc.call_blockdev_rename(tgt_node, rlist):
3674         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3675
3676       for old, new in zip(old_lvs, new_lvs):
3677         new.logical_id = old.logical_id
3678         cfg.SetDiskID(new, tgt_node)
3679
3680       for disk in old_lvs:
3681         disk.logical_id = ren_fn(disk, temp_suffix)
3682         cfg.SetDiskID(disk, tgt_node)
3683
3684       # now that the new lvs have the old name, we can add them to the device
3685       info("adding new mirror component on %s" % tgt_node)
3686       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3687         for new_lv in new_lvs:
3688           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3689             warning("Can't rollback device %s", "manually cleanup unused"
3690                     " logical volumes")
3691         raise errors.OpExecError("Can't add local storage to drbd")
3692
3693       dev.children = new_lvs
3694       cfg.Update(instance)
3695
3696     # Step: wait for sync
3697
3698     # this can fail as the old devices are degraded and _WaitForSync
3699     # does a combined result over all disks, so we don't check its
3700     # return value
3701     self.processor.LogStep(5, steps_total, "sync devices")
3702     _WaitForSync(cfg, instance, unlock=True)
3703
3704     # so check manually all the devices
3705     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3706       cfg.SetDiskID(dev, instance.primary_node)
3707       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3708       if is_degr:
3709         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3710
3711     # Step: remove old storage
3712     self.processor.LogStep(6, steps_total, "removing old storage")
3713     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3714       info("remove logical volumes for %s" % name)
3715       for lv in old_lvs:
3716         cfg.SetDiskID(lv, tgt_node)
3717         if not rpc.call_blockdev_remove(tgt_node, lv):
3718           warning("Can't remove old LV", "manually remove unused LVs")
3719           continue
3720
3721   def _ExecD8Secondary(self, feedback_fn):
3722     """Replace the secondary node for drbd8.
3723
3724     The algorithm for replace is quite complicated:
3725       - for all disks of the instance:
3726         - create new LVs on the new node with same names
3727         - shutdown the drbd device on the old secondary
3728         - disconnect the drbd network on the primary
3729         - create the drbd device on the new secondary
3730         - network attach the drbd on the primary, using an artifice:
3731           the drbd code for Attach() will connect to the network if it
3732           finds a device which is connected to the good local disks but
3733           not network enabled
3734       - wait for sync across all devices
3735       - remove all disks from the old secondary
3736
3737     Failures are not very well handled.
3738
3739     """
3740     steps_total = 6
3741     warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3742     instance = self.instance
3743     iv_names = {}
3744     vgname = self.cfg.GetVGName()
3745     # start of work
3746     cfg = self.cfg
3747     old_node = self.tgt_node
3748     new_node = self.new_node
3749     pri_node = instance.primary_node
3750
3751     # Step: check device activation
3752     self.processor.LogStep(1, steps_total, "check device existence")
3753     info("checking volume groups")
3754     my_vg = cfg.GetVGName()
3755     results = rpc.call_vg_list([pri_node, new_node])
3756     if not results:
3757       raise errors.OpExecError("Can't list volume groups on the nodes")
3758     for node in pri_node, new_node:
3759       res = results.get(node, False)
3760       if not res or my_vg not in res:
3761         raise errors.OpExecError("Volume group '%s' not found on %s" %
3762                                  (my_vg, node))
3763     for dev in instance.disks:
3764       if not dev.iv_name in self.op.disks:
3765         continue
3766       info("checking %s on %s" % (dev.iv_name, pri_node))
3767       cfg.SetDiskID(dev, pri_node)
3768       if not rpc.call_blockdev_find(pri_node, dev):
3769         raise errors.OpExecError("Can't find device %s on node %s" %
3770                                  (dev.iv_name, pri_node))
3771
3772     # Step: check other node consistency
3773     self.processor.LogStep(2, steps_total, "check peer consistency")
3774     for dev in instance.disks:
3775       if not dev.iv_name in self.op.disks:
3776         continue
3777       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3778       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3779         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3780                                  " unsafe to replace the secondary" %
3781                                  pri_node)
3782
3783     # Step: create new storage
3784     self.processor.LogStep(3, steps_total, "allocate new storage")
3785     for dev in instance.disks:
3786       size = dev.size
3787       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3788       # since we *always* want to create this LV, we use the
3789       # _Create...OnPrimary (which forces the creation), even if we
3790       # are talking about the secondary node
3791       for new_lv in dev.children:
3792         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3793                                         _GetInstanceInfoText(instance)):
3794           raise errors.OpExecError("Failed to create new LV named '%s' on"
3795                                    " node '%s'" %
3796                                    (new_lv.logical_id[1], new_node))
3797
3798       iv_names[dev.iv_name] = (dev, dev.children)
3799
3800     self.processor.LogStep(4, steps_total, "changing drbd configuration")
3801     for dev in instance.disks:
3802       size = dev.size
3803       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3804       # create new devices on new_node
3805       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3806                               logical_id=(pri_node, new_node,
3807                                           dev.logical_id[2]),
3808                               children=dev.children)
3809       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3810                                         new_drbd, False,
3811                                       _GetInstanceInfoText(instance)):
3812         raise errors.OpExecError("Failed to create new DRBD on"
3813                                  " node '%s'" % new_node)
3814
3815     for dev in instance.disks:
3816       # we have new devices, shutdown the drbd on the old secondary
3817       info("shutting down drbd for %s on old node" % dev.iv_name)
3818       cfg.SetDiskID(dev, old_node)
3819       if not rpc.call_blockdev_shutdown(old_node, dev):
3820         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3821                 "Please cleanup this device manuall as soon as possible")
3822
3823       # we have new storage, we 'rename' the network on the primary
3824       info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3825       cfg.SetDiskID(dev, pri_node)
3826       # rename to the ip of the new node
3827       new_uid = list(dev.physical_id)
3828       new_uid[2] = self.remote_node_info.secondary_ip
3829       rlist = [(dev, tuple(new_uid))]
3830       if not rpc.call_blockdev_rename(pri_node, rlist):
3831         raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3832                                  " %s from %s to %s" %
3833                                  (dev.iv_name, pri_node, old_node, new_node))
3834       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3835       cfg.SetDiskID(dev, pri_node)
3836       cfg.Update(instance)
3837
3838
3839     # this can fail as the old devices are degraded and _WaitForSync
3840     # does a combined result over all disks, so we don't check its
3841     # return value
3842     self.processor.LogStep(5, steps_total, "sync devices")
3843     _WaitForSync(cfg, instance, unlock=True)
3844
3845     # so check manually all the devices
3846     for name, (dev, old_lvs) in iv_names.iteritems():
3847       cfg.SetDiskID(dev, pri_node)
3848       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3849       if is_degr:
3850         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3851
3852     self.processor.LogStep(6, steps_total, "removing old storage")
3853     for name, (dev, old_lvs) in iv_names.iteritems():
3854       info("remove logical volumes for %s" % name)
3855       for lv in old_lvs:
3856         cfg.SetDiskID(lv, old_node)
3857         if not rpc.call_blockdev_remove(old_node, lv):
3858           warning("Can't remove LV on old secondary",
3859                   "Cleanup stale volumes by hand")
3860
3861   def Exec(self, feedback_fn):
3862     """Execute disk replacement.
3863
3864     This dispatches the disk replacement to the appropriate handler.
3865
3866     """
3867     instance = self.instance
3868     if instance.disk_template == constants.DT_REMOTE_RAID1:
3869       fn = self._ExecRR1
3870     elif instance.disk_template == constants.DT_DRBD8:
3871       if self.op.remote_node is None:
3872         fn = self._ExecD8DiskOnly
3873       else:
3874         fn = self._ExecD8Secondary
3875     else:
3876       raise errors.ProgrammerError("Unhandled disk replacement case")
3877     return fn(feedback_fn)
3878
3879
3880 class LUQueryInstanceData(NoHooksLU):
3881   """Query runtime instance data.
3882
3883   """
3884   _OP_REQP = ["instances"]
3885
3886   def CheckPrereq(self):
3887     """Check prerequisites.
3888
3889     This only checks the optional instance list against the existing names.
3890
3891     """
3892     if not isinstance(self.op.instances, list):
3893       raise errors.OpPrereqError("Invalid argument type 'instances'")
3894     if self.op.instances:
3895       self.wanted_instances = []
3896       names = self.op.instances
3897       for name in names:
3898         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3899         if instance is None:
3900           raise errors.OpPrereqError("No such instance name '%s'" % name)
3901       self.wanted_instances.append(instance)
3902     else:
3903       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3904                                in self.cfg.GetInstanceList()]
3905     return
3906
3907
3908   def _ComputeDiskStatus(self, instance, snode, dev):
3909     """Compute block device status.
3910
3911     """
3912     self.cfg.SetDiskID(dev, instance.primary_node)
3913     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3914     if dev.dev_type in constants.LDS_DRBD:
3915       # we change the snode then (otherwise we use the one passed in)
3916       if dev.logical_id[0] == instance.primary_node:
3917         snode = dev.logical_id[1]
3918       else:
3919         snode = dev.logical_id[0]
3920
3921     if snode:
3922       self.cfg.SetDiskID(dev, snode)
3923       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3924     else:
3925       dev_sstatus = None
3926
3927     if dev.children:
3928       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3929                       for child in dev.children]
3930     else:
3931       dev_children = []
3932
3933     data = {
3934       "iv_name": dev.iv_name,
3935       "dev_type": dev.dev_type,
3936       "logical_id": dev.logical_id,
3937       "physical_id": dev.physical_id,
3938       "pstatus": dev_pstatus,
3939       "sstatus": dev_sstatus,
3940       "children": dev_children,
3941       }
3942
3943     return data
3944
3945   def Exec(self, feedback_fn):
3946     """Gather and return data"""
3947     result = {}
3948     for instance in self.wanted_instances:
3949       remote_info = rpc.call_instance_info(instance.primary_node,
3950                                                 instance.name)
3951       if remote_info and "state" in remote_info:
3952         remote_state = "up"
3953       else:
3954         remote_state = "down"
3955       if instance.status == "down":
3956         config_state = "down"
3957       else:
3958         config_state = "up"
3959
3960       disks = [self._ComputeDiskStatus(instance, None, device)
3961                for device in instance.disks]
3962
3963       idict = {
3964         "name": instance.name,
3965         "config_state": config_state,
3966         "run_state": remote_state,
3967         "pnode": instance.primary_node,
3968         "snodes": instance.secondary_nodes,
3969         "os": instance.os,
3970         "memory": instance.memory,
3971         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3972         "disks": disks,
3973         "vcpus": instance.vcpus,
3974         }
3975
3976       result[instance.name] = idict
3977
3978     return result
3979
3980
3981 class LUSetInstanceParms(LogicalUnit):
3982   """Modifies an instances's parameters.
3983
3984   """
3985   HPATH = "instance-modify"
3986   HTYPE = constants.HTYPE_INSTANCE
3987   _OP_REQP = ["instance_name"]
3988
3989   def BuildHooksEnv(self):
3990     """Build hooks env.
3991
3992     This runs on the master, primary and secondaries.
3993
3994     """
3995     args = dict()
3996     if self.mem:
3997       args['memory'] = self.mem
3998     if self.vcpus:
3999       args['vcpus'] = self.vcpus
4000     if self.do_ip or self.do_bridge:
4001       if self.do_ip:
4002         ip = self.ip
4003       else:
4004         ip = self.instance.nics[0].ip
4005       if self.bridge:
4006         bridge = self.bridge
4007       else:
4008         bridge = self.instance.nics[0].bridge
4009       args['nics'] = [(ip, bridge)]
4010     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4011     nl = [self.sstore.GetMasterNode(),
4012           self.instance.primary_node] + list(self.instance.secondary_nodes)
4013     return env, nl, nl
4014
4015   def CheckPrereq(self):
4016     """Check prerequisites.
4017
4018     This only checks the instance list against the existing names.
4019
4020     """
4021     self.mem = getattr(self.op, "mem", None)
4022     self.vcpus = getattr(self.op, "vcpus", None)
4023     self.ip = getattr(self.op, "ip", None)
4024     self.bridge = getattr(self.op, "bridge", None)
4025     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4026       raise errors.OpPrereqError("No changes submitted")
4027     if self.mem is not None:
4028       try:
4029         self.mem = int(self.mem)
4030       except ValueError, err:
4031         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4032     if self.vcpus is not None:
4033       try:
4034         self.vcpus = int(self.vcpus)
4035       except ValueError, err:
4036         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4037     if self.ip is not None:
4038       self.do_ip = True
4039       if self.ip.lower() == "none":
4040         self.ip = None
4041       else:
4042         if not utils.IsValidIP(self.ip):
4043           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4044     else:
4045       self.do_ip = False
4046     self.do_bridge = (self.bridge is not None)
4047
4048     instance = self.cfg.GetInstanceInfo(
4049       self.cfg.ExpandInstanceName(self.op.instance_name))
4050     if instance is None:
4051       raise errors.OpPrereqError("No such instance name '%s'" %
4052                                  self.op.instance_name)
4053     self.op.instance_name = instance.name
4054     self.instance = instance
4055     return
4056
4057   def Exec(self, feedback_fn):
4058     """Modifies an instance.
4059
4060     All parameters take effect only at the next restart of the instance.
4061     """
4062     result = []
4063     instance = self.instance
4064     if self.mem:
4065       instance.memory = self.mem
4066       result.append(("mem", self.mem))
4067     if self.vcpus:
4068       instance.vcpus = self.vcpus
4069       result.append(("vcpus",  self.vcpus))
4070     if self.do_ip:
4071       instance.nics[0].ip = self.ip
4072       result.append(("ip", self.ip))
4073     if self.bridge:
4074       instance.nics[0].bridge = self.bridge
4075       result.append(("bridge", self.bridge))
4076
4077     self.cfg.AddInstance(instance)
4078
4079     return result
4080
4081
4082 class LUQueryExports(NoHooksLU):
4083   """Query the exports list
4084
4085   """
4086   _OP_REQP = []
4087
4088   def CheckPrereq(self):
4089     """Check that the nodelist contains only existing nodes.
4090
4091     """
4092     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4093
4094   def Exec(self, feedback_fn):
4095     """Compute the list of all the exported system images.
4096
4097     Returns:
4098       a dictionary with the structure node->(export-list)
4099       where export-list is a list of the instances exported on
4100       that node.
4101
4102     """
4103     return rpc.call_export_list(self.nodes)
4104
4105
4106 class LUExportInstance(LogicalUnit):
4107   """Export an instance to an image in the cluster.
4108
4109   """
4110   HPATH = "instance-export"
4111   HTYPE = constants.HTYPE_INSTANCE
4112   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4113
4114   def BuildHooksEnv(self):
4115     """Build hooks env.
4116
4117     This will run on the master, primary node and target node.
4118
4119     """
4120     env = {
4121       "EXPORT_NODE": self.op.target_node,
4122       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4123       }
4124     env.update(_BuildInstanceHookEnvByObject(self.instance))
4125     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4126           self.op.target_node]
4127     return env, nl, nl
4128
4129   def CheckPrereq(self):
4130     """Check prerequisites.
4131
4132     This checks that the instance name is a valid one.
4133
4134     """
4135     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4136     self.instance = self.cfg.GetInstanceInfo(instance_name)
4137     if self.instance is None:
4138       raise errors.OpPrereqError("Instance '%s' not found" %
4139                                  self.op.instance_name)
4140
4141     # node verification
4142     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4143     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4144
4145     if self.dst_node is None:
4146       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4147                                  self.op.target_node)
4148     self.op.target_node = self.dst_node.name
4149
4150   def Exec(self, feedback_fn):
4151     """Export an instance to an image in the cluster.
4152
4153     """
4154     instance = self.instance
4155     dst_node = self.dst_node
4156     src_node = instance.primary_node
4157     # shutdown the instance, unless requested not to do so
4158     if self.op.shutdown:
4159       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4160       self.processor.ChainOpCode(op)
4161
4162     vgname = self.cfg.GetVGName()
4163
4164     snap_disks = []
4165
4166     try:
4167       for disk in instance.disks:
4168         if disk.iv_name == "sda":
4169           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4170           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4171
4172           if not new_dev_name:
4173             logger.Error("could not snapshot block device %s on node %s" %
4174                          (disk.logical_id[1], src_node))
4175           else:
4176             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4177                                       logical_id=(vgname, new_dev_name),
4178                                       physical_id=(vgname, new_dev_name),
4179                                       iv_name=disk.iv_name)
4180             snap_disks.append(new_dev)
4181
4182     finally:
4183       if self.op.shutdown:
4184         op = opcodes.OpStartupInstance(instance_name=instance.name,
4185                                        force=False)
4186         self.processor.ChainOpCode(op)
4187
4188     # TODO: check for size
4189
4190     for dev in snap_disks:
4191       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4192                                            instance):
4193         logger.Error("could not export block device %s from node"
4194                      " %s to node %s" %
4195                      (dev.logical_id[1], src_node, dst_node.name))
4196       if not rpc.call_blockdev_remove(src_node, dev):
4197         logger.Error("could not remove snapshot block device %s from"
4198                      " node %s" % (dev.logical_id[1], src_node))
4199
4200     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4201       logger.Error("could not finalize export for instance %s on node %s" %
4202                    (instance.name, dst_node.name))
4203
4204     nodelist = self.cfg.GetNodeList()
4205     nodelist.remove(dst_node.name)
4206
4207     # on one-node clusters nodelist will be empty after the removal
4208     # if we proceed the backup would be removed because OpQueryExports
4209     # substitutes an empty list with the full cluster node list.
4210     if nodelist:
4211       op = opcodes.OpQueryExports(nodes=nodelist)
4212       exportlist = self.processor.ChainOpCode(op)
4213       for node in exportlist:
4214         if instance.name in exportlist[node]:
4215           if not rpc.call_export_remove(node, instance.name):
4216             logger.Error("could not remove older export for instance %s"
4217                          " on node %s" % (instance.name, node))
4218
4219
4220 class TagsLU(NoHooksLU):
4221   """Generic tags LU.
4222
4223   This is an abstract class which is the parent of all the other tags LUs.
4224
4225   """
4226   def CheckPrereq(self):
4227     """Check prerequisites.
4228
4229     """
4230     if self.op.kind == constants.TAG_CLUSTER:
4231       self.target = self.cfg.GetClusterInfo()
4232     elif self.op.kind == constants.TAG_NODE:
4233       name = self.cfg.ExpandNodeName(self.op.name)
4234       if name is None:
4235         raise errors.OpPrereqError("Invalid node name (%s)" %
4236                                    (self.op.name,))
4237       self.op.name = name
4238       self.target = self.cfg.GetNodeInfo(name)
4239     elif self.op.kind == constants.TAG_INSTANCE:
4240       name = self.cfg.ExpandInstanceName(self.op.name)
4241       if name is None:
4242         raise errors.OpPrereqError("Invalid instance name (%s)" %
4243                                    (self.op.name,))
4244       self.op.name = name
4245       self.target = self.cfg.GetInstanceInfo(name)
4246     else:
4247       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4248                                  str(self.op.kind))
4249
4250
4251 class LUGetTags(TagsLU):
4252   """Returns the tags of a given object.
4253
4254   """
4255   _OP_REQP = ["kind", "name"]
4256
4257   def Exec(self, feedback_fn):
4258     """Returns the tag list.
4259
4260     """
4261     return self.target.GetTags()
4262
4263
4264 class LUSearchTags(NoHooksLU):
4265   """Searches the tags for a given pattern.
4266
4267   """
4268   _OP_REQP = ["pattern"]
4269
4270   def CheckPrereq(self):
4271     """Check prerequisites.
4272
4273     This checks the pattern passed for validity by compiling it.
4274
4275     """
4276     try:
4277       self.re = re.compile(self.op.pattern)
4278     except re.error, err:
4279       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4280                                  (self.op.pattern, err))
4281
4282   def Exec(self, feedback_fn):
4283     """Returns the tag list.
4284
4285     """
4286     cfg = self.cfg
4287     tgts = [("/cluster", cfg.GetClusterInfo())]
4288     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4289     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4290     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4291     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4292     results = []
4293     for path, target in tgts:
4294       for tag in target.GetTags():
4295         if self.re.search(tag):
4296           results.append((path, tag))
4297     return results
4298
4299
4300 class LUAddTags(TagsLU):
4301   """Sets a tag on a given object.
4302
4303   """
4304   _OP_REQP = ["kind", "name", "tags"]
4305
4306   def CheckPrereq(self):
4307     """Check prerequisites.
4308
4309     This checks the type and length of the tag name and value.
4310
4311     """
4312     TagsLU.CheckPrereq(self)
4313     for tag in self.op.tags:
4314       objects.TaggableObject.ValidateTag(tag)
4315
4316   def Exec(self, feedback_fn):
4317     """Sets the tag.
4318
4319     """
4320     try:
4321       for tag in self.op.tags:
4322         self.target.AddTag(tag)
4323     except errors.TagError, err:
4324       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4325     try:
4326       self.cfg.Update(self.target)
4327     except errors.ConfigurationError:
4328       raise errors.OpRetryError("There has been a modification to the"
4329                                 " config file and the operation has been"
4330                                 " aborted. Please retry.")
4331
4332
4333 class LUDelTags(TagsLU):
4334   """Delete a list of tags from a given object.
4335
4336   """
4337   _OP_REQP = ["kind", "name", "tags"]
4338
4339   def CheckPrereq(self):
4340     """Check prerequisites.
4341
4342     This checks that we have the given tag.
4343
4344     """
4345     TagsLU.CheckPrereq(self)
4346     for tag in self.op.tags:
4347       objects.TaggableObject.ValidateTag(tag)
4348     del_tags = frozenset(self.op.tags)
4349     cur_tags = self.target.GetTags()
4350     if not del_tags <= cur_tags:
4351       diff_tags = del_tags - cur_tags
4352       diff_names = ["'%s'" % tag for tag in diff_tags]
4353       diff_names.sort()
4354       raise errors.OpPrereqError("Tag(s) %s not found" %
4355                                  (",".join(diff_names)))
4356
4357   def Exec(self, feedback_fn):
4358     """Remove the tag from the object.
4359
4360     """
4361     for tag in self.op.tags:
4362       self.target.RemoveTag(tag)
4363     try:
4364       self.cfg.Update(self.target)
4365     except errors.ConfigurationError:
4366       raise errors.OpRetryError("There has been a modification to the"
4367                                 " config file and the operation has been"
4368                                 " aborted. Please retry.")