Fix some indendation issues
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge, mac) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276   else:
277     nic_count = 0
278
279   env["INSTANCE_NIC_COUNT"] = nic_count
280
281   return env
282
283
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285   """Builds instance related env variables for hooks from an object.
286
287   Args:
288     instance: objects.Instance object of instance
289     override: dict of values to override
290   """
291   args = {
292     'name': instance.name,
293     'primary_node': instance.primary_node,
294     'secondary_nodes': instance.secondary_nodes,
295     'os_type': instance.os,
296     'status': instance.os,
297     'memory': instance.memory,
298     'vcpus': instance.vcpus,
299     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300   }
301   if override:
302     args.update(override)
303   return _BuildInstanceHookEnv(**args)
304
305
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307   """Ensure a node has a correct known_hosts entry.
308
309   Args:
310     fullnode - Fully qualified domain name of host. (str)
311     ip       - IPv4 address of host (str)
312     pubkey   - the public key of the cluster
313
314   """
315   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317   else:
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319
320   inthere = False
321
322   save_lines = []
323   add_lines = []
324   removed = False
325
326   for rawline in f:
327     logger.Debug('read %s' % (repr(rawline),))
328
329     parts = rawline.rstrip('\r\n').split()
330
331     # Ignore unwanted lines
332     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333       fields = parts[0].split(',')
334       key = parts[2]
335
336       haveall = True
337       havesome = False
338       for spec in [ ip, fullnode ]:
339         if spec not in fields:
340           haveall = False
341         if spec in fields:
342           havesome = True
343
344       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345       if haveall and key == pubkey:
346         inthere = True
347         save_lines.append(rawline)
348         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349         continue
350
351       if havesome and (not haveall or key != pubkey):
352         removed = True
353         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354         continue
355
356     save_lines.append(rawline)
357
358   if not inthere:
359     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361
362   if removed:
363     save_lines = save_lines + add_lines
364
365     # Write a new file and replace old.
366     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367                                    constants.DATA_DIR)
368     newfile = os.fdopen(fd, 'w')
369     try:
370       newfile.write(''.join(save_lines))
371     finally:
372       newfile.close()
373     logger.Debug("Wrote new known_hosts.")
374     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375
376   elif add_lines:
377     # Simply appending a new line will do the trick.
378     f.seek(0, 2)
379     for add in add_lines:
380       f.write(add)
381
382   f.close()
383
384
385 def _HasValidVG(vglist, vgname):
386   """Checks if the volume group list is valid.
387
388   A non-None return value means there's an error, and the return value
389   is the error message.
390
391   """
392   vgsize = vglist.get(vgname, None)
393   if vgsize is None:
394     return "volume group '%s' missing" % vgname
395   elif vgsize < 20480:
396     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397             (vgname, vgsize))
398   return None
399
400
401 def _InitSSHSetup(node):
402   """Setup the SSH configuration for the cluster.
403
404
405   This generates a dsa keypair for root, adds the pub key to the
406   permitted hosts and adds the hostkey to its own known hosts.
407
408   Args:
409     node: the name of this host as a fqdn
410
411   """
412   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413
414   for name in priv_key, pub_key:
415     if os.path.exists(name):
416       utils.CreateBackup(name)
417     utils.RemoveFile(name)
418
419   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420                          "-f", priv_key,
421                          "-q", "-N", ""])
422   if result.failed:
423     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424                              result.output)
425
426   f = open(pub_key, 'r')
427   try:
428     utils.AddAuthorizedKey(auth_keys, f.read(8192))
429   finally:
430     f.close()
431
432
433 def _InitGanetiServerSetup(ss):
434   """Setup the necessary configuration for the initial node daemon.
435
436   This creates the nodepass file containing the shared password for
437   the cluster and also generates the SSL certificate.
438
439   """
440   # Create pseudo random password
441   randpass = sha.new(os.urandom(64)).hexdigest()
442   # and write it into sstore
443   ss.SetKey(ss.SS_NODED_PASS, randpass)
444
445   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446                          "-days", str(365*5), "-nodes", "-x509",
447                          "-keyout", constants.SSL_CERT_FILE,
448                          "-out", constants.SSL_CERT_FILE, "-batch"])
449   if result.failed:
450     raise errors.OpExecError("could not generate server ssl cert, command"
451                              " %s had exitcode %s and error message %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454   os.chmod(constants.SSL_CERT_FILE, 0400)
455
456   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457
458   if result.failed:
459     raise errors.OpExecError("Could not start the node daemon, command %s"
460                              " had exitcode %s and error %s" %
461                              (result.cmd, result.exit_code, result.output))
462
463
464 def _CheckInstanceBridgesExist(instance):
465   """Check that the brigdes needed by an instance exist.
466
467   """
468   # check bridges existance
469   brlist = [nic.bridge for nic in instance.nics]
470   if not rpc.call_bridges_exist(instance.primary_node, brlist):
471     raise errors.OpPrereqError("one or more target bridges %s does not"
472                                " exist on destination node '%s'" %
473                                (brlist, instance.primary_node))
474
475
476 class LUInitCluster(LogicalUnit):
477   """Initialise the cluster.
478
479   """
480   HPATH = "cluster-init"
481   HTYPE = constants.HTYPE_CLUSTER
482   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483               "def_bridge", "master_netdev"]
484   REQ_CLUSTER = False
485
486   def BuildHooksEnv(self):
487     """Build hooks env.
488
489     Notes: Since we don't require a cluster, we must manually add
490     ourselves in the post-run node list.
491
492     """
493     env = {"OP_TARGET": self.op.cluster_name}
494     return env, [], [self.hostname.name]
495
496   def CheckPrereq(self):
497     """Verify that the passed name is a valid one.
498
499     """
500     if config.ConfigWriter.IsCluster():
501       raise errors.OpPrereqError("Cluster is already initialised")
502
503     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504       if not os.path.exists(constants.VNC_PASSWORD_FILE):
505         raise errors.OpPrereqError("Please prepare the cluster VNC"
506                                    "password file %s" %
507                                    constants.VNC_PASSWORD_FILE)
508
509     self.hostname = hostname = utils.HostInfo()
510
511     if hostname.ip.startswith("127."):
512       raise errors.OpPrereqError("This host's IP resolves to the private"
513                                  " range (%s). Please fix DNS or /etc/hosts." %
514                                  (hostname.ip,))
515
516     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517
518     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519                          constants.DEFAULT_NODED_PORT):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     secondary_ip = getattr(self.op, "secondary_ip", None)
526     if secondary_ip and not utils.IsValidIP(secondary_ip):
527       raise errors.OpPrereqError("Invalid secondary ip given")
528     if (secondary_ip and
529         secondary_ip != hostname.ip and
530         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531                            constants.DEFAULT_NODED_PORT))):
532       raise errors.OpPrereqError("You gave %s as secondary IP,"
533                                  " but it does not belong to this host." %
534                                  secondary_ip)
535     self.secondary_ip = secondary_ip
536
537     # checks presence of the volume group given
538     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539
540     if vgstatus:
541       raise errors.OpPrereqError("Error: %s" % vgstatus)
542
543     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544                     self.op.mac_prefix):
545       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546                                  self.op.mac_prefix)
547
548     if self.op.hypervisor_type not in constants.HYPER_TYPES:
549       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550                                  self.op.hypervisor_type)
551
552     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553     if result.failed:
554       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555                                  (self.op.master_netdev,
556                                   result.output.strip()))
557
558     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560       raise errors.OpPrereqError("Init.d script '%s' missing or not"
561                                  " executable." % constants.NODE_INITD_SCRIPT)
562
563   def Exec(self, feedback_fn):
564     """Initialize the cluster.
565
566     """
567     clustername = self.clustername
568     hostname = self.hostname
569
570     # set up the simple store
571     self.sstore = ss = ssconf.SimpleStore()
572     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577
578     # set up the inter-node password and certificate
579     _InitGanetiServerSetup(ss)
580
581     # start the master ip
582     rpc.call_node_start_master(hostname.name)
583
584     # set up ssh config and /etc/hosts
585     f = open(constants.SSH_HOST_RSA_PUB, 'r')
586     try:
587       sshline = f.read()
588     finally:
589       f.close()
590     sshkey = sshline.split(" ")[1]
591
592     _AddHostToEtcHosts(hostname.name)
593
594     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595
596     _InitSSHSetup(hostname.name)
597
598     # init of cluster config file
599     self.cfg = cfgw = config.ConfigWriter()
600     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601                     sshkey, self.op.mac_prefix,
602                     self.op.vg_name, self.op.def_bridge)
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signalled by raising errors.OpPrereqError.
617
618     """
619     master = self.sstore.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.sstore.GetMasterNode()
635     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636     utils.CreateBackup(priv_key)
637     utils.CreateBackup(pub_key)
638     rpc.call_node_leave_cluster(master)
639
640
641 class LUVerifyCluster(NoHooksLU):
642   """Verifies the cluster status.
643
644   """
645   _OP_REQP = []
646
647   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648                   remote_version, feedback_fn):
649     """Run multiple tests against a node.
650
651     Test list:
652       - compares ganeti version
653       - checks vg existance and size > 20G
654       - checks config file checksum
655       - checks ssh to other nodes
656
657     Args:
658       node: name of the node to check
659       file_list: required list of files
660       local_cksum: dictionary of local files and their checksums
661
662     """
663     # compares ganeti version
664     local_version = constants.PROTOCOL_VERSION
665     if not remote_version:
666       feedback_fn(" - ERROR: connection to %s failed" % (node))
667       return True
668
669     if local_version != remote_version:
670       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671                       (local_version, node, remote_version))
672       return True
673
674     # checks vg existance and size > 20G
675
676     bad = False
677     if not vglist:
678       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679                       (node,))
680       bad = True
681     else:
682       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688     # checks ssh to any
689
690     if 'filelist' not in node_result:
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       remote_cksum = node_result['filelist']
695       for file_name in file_list:
696         if file_name not in remote_cksum:
697           bad = True
698           feedback_fn("  - ERROR: file '%s' missing" % file_name)
699         elif remote_cksum[file_name] != local_cksum[file_name]:
700           bad = True
701           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702
703     if 'nodelist' not in node_result:
704       bad = True
705       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706     else:
707       if node_result['nodelist']:
708         bad = True
709         for node in node_result['nodelist']:
710           feedback_fn("  - ERROR: communication with node '%s': %s" %
711                           (node, node_result['nodelist'][node]))
712     hyp_result = node_result.get('hypervisor', None)
713     if hyp_result is not None:
714       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715     return bad
716
717   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718     """Verify an instance.
719
720     This function checks to see if the required block devices are
721     available on the instance's node.
722
723     """
724     bad = False
725
726     instancelist = self.cfg.GetInstanceList()
727     if not instance in instancelist:
728       feedback_fn("  - ERROR: instance %s not in instance list %s" %
729                       (instance, instancelist))
730       bad = True
731
732     instanceconfig = self.cfg.GetInstanceInfo(instance)
733     node_current = instanceconfig.primary_node
734
735     node_vol_should = {}
736     instanceconfig.MapLVsByNode(node_vol_should)
737
738     for node in node_vol_should:
739       for volume in node_vol_should[node]:
740         if node not in node_vol_is or volume not in node_vol_is[node]:
741           feedback_fn("  - ERROR: volume %s missing on node %s" %
742                           (volume, node))
743           bad = True
744
745     if not instanceconfig.status == 'down':
746       if not instance in node_instance[node_current]:
747         feedback_fn("  - ERROR: instance %s not running on node %s" %
748                         (instance, node_current))
749         bad = True
750
751     for node in node_instance:
752       if (not node == node_current):
753         if instance in node_instance[node]:
754           feedback_fn("  - ERROR: instance %s should not run on node %s" %
755                           (instance, node))
756           bad = True
757
758     return bad
759
760   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761     """Verify if there are any unknown volumes in the cluster.
762
763     The .os, .swap and backup volumes are ignored. All other volumes are
764     reported as unknown.
765
766     """
767     bad = False
768
769     for node in node_vol_is:
770       for volume in node_vol_is[node]:
771         if node not in node_vol_should or volume not in node_vol_should[node]:
772           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773                       (volume, node))
774           bad = True
775     return bad
776
777   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778     """Verify the list of running instances.
779
780     This checks what instances are running but unknown to the cluster.
781
782     """
783     bad = False
784     for node in node_instance:
785       for runninginstance in node_instance[node]:
786         if runninginstance not in instancelist:
787           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788                           (runninginstance, node))
789           bad = True
790     return bad
791
792   def CheckPrereq(self):
793     """Check prerequisites.
794
795     This has no prerequisites.
796
797     """
798     pass
799
800   def Exec(self, feedback_fn):
801     """Verify integrity of cluster, performing various test on nodes.
802
803     """
804     bad = False
805     feedback_fn("* Verifying global settings")
806     for msg in self.cfg.VerifyConfig():
807       feedback_fn("  - ERROR: %s" % msg)
808
809     vg_name = self.cfg.GetVGName()
810     nodelist = utils.NiceSort(self.cfg.GetNodeList())
811     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812     node_volume = {}
813     node_instance = {}
814
815     # FIXME: verify OS list
816     # do local checksums
817     file_names = list(self.sstore.GetFileList())
818     file_names.append(constants.SSL_CERT_FILE)
819     file_names.append(constants.CLUSTER_CONF_FILE)
820     local_checksums = utils.FingerprintFiles(file_names)
821
822     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824     all_instanceinfo = rpc.call_instance_list(nodelist)
825     all_vglist = rpc.call_vg_list(nodelist)
826     node_verify_param = {
827       'filelist': file_names,
828       'nodelist': nodelist,
829       'hypervisor': None,
830       }
831     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832     all_rversion = rpc.call_version(nodelist)
833
834     for node in nodelist:
835       feedback_fn("* Verifying node %s" % node)
836       result = self._VerifyNode(node, file_names, local_checksums,
837                                 all_vglist[node], all_nvinfo[node],
838                                 all_rversion[node], feedback_fn)
839       bad = bad or result
840
841       # node_volume
842       volumeinfo = all_volumeinfo[node]
843
844       if isinstance(volumeinfo, basestring):
845         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846                     (node, volumeinfo[-400:].encode('string_escape')))
847         bad = True
848         node_volume[node] = {}
849       elif not isinstance(volumeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853       else:
854         node_volume[node] = volumeinfo
855
856       # node_instance
857       nodeinstance = all_instanceinfo[node]
858       if type(nodeinstance) != list:
859         feedback_fn("  - ERROR: connection to %s failed" % (node,))
860         bad = True
861         continue
862
863       node_instance[node] = nodeinstance
864
865     node_vol_should = {}
866
867     for instance in instancelist:
868       feedback_fn("* Verifying instance %s" % instance)
869       result =  self._VerifyInstance(instance, node_volume, node_instance,
870                                      feedback_fn)
871       bad = bad or result
872
873       inst_config = self.cfg.GetInstanceInfo(instance)
874
875       inst_config.MapLVsByNode(node_vol_should)
876
877     feedback_fn("* Verifying orphan volumes")
878     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879                                        feedback_fn)
880     bad = bad or result
881
882     feedback_fn("* Verifying remaining instances")
883     result = self._VerifyOrphanInstances(instancelist, node_instance,
884                                          feedback_fn)
885     bad = bad or result
886
887     return int(bad)
888
889
890 class LUVerifyDisks(NoHooksLU):
891   """Verifies the cluster disks status.
892
893   """
894   _OP_REQP = []
895
896   def CheckPrereq(self):
897     """Check prerequisites.
898
899     This has no prerequisites.
900
901     """
902     pass
903
904   def Exec(self, feedback_fn):
905     """Verify integrity of cluster disks.
906
907     """
908     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909
910     vg_name = self.cfg.GetVGName()
911     nodes = utils.NiceSort(self.cfg.GetNodeList())
912     instances = [self.cfg.GetInstanceInfo(name)
913                  for name in self.cfg.GetInstanceList()]
914
915     nv_dict = {}
916     for inst in instances:
917       inst_lvs = {}
918       if (inst.status != "up" or
919           inst.disk_template not in constants.DTS_NET_MIRROR):
920         continue
921       inst.MapLVsByNode(inst_lvs)
922       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923       for node, vol_list in inst_lvs.iteritems():
924         for vol in vol_list:
925           nv_dict[(node, vol)] = inst
926
927     if not nv_dict:
928       return result
929
930     node_lvs = rpc.call_volume_list(nodes, vg_name)
931
932     to_act = set()
933     for node in nodes:
934       # node_volume
935       lvs = node_lvs[node]
936
937       if isinstance(lvs, basestring):
938         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939         res_nlvm[node] = lvs
940       elif not isinstance(lvs, dict):
941         logger.Info("connection to node %s failed or invalid data returned" %
942                     (node,))
943         res_nodes.append(node)
944         continue
945
946       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947         inst = nv_dict.pop((node, lv_name), None)
948         if (not lv_online and inst is not None
949             and inst.name not in res_instances):
950           res_instances.append(inst.name)
951
952     # any leftover items in nv_dict are missing LVs, let's arrange the
953     # data better
954     for key, inst in nv_dict.iteritems():
955       if inst.name not in res_missing:
956         res_missing[inst.name] = []
957       res_missing[inst.name].append(key)
958
959     return result
960
961
962 class LURenameCluster(LogicalUnit):
963   """Rename the cluster.
964
965   """
966   HPATH = "cluster-rename"
967   HTYPE = constants.HTYPE_CLUSTER
968   _OP_REQP = ["name"]
969
970   def BuildHooksEnv(self):
971     """Build hooks env.
972
973     """
974     env = {
975       "OP_TARGET": self.op.sstore.GetClusterName(),
976       "NEW_NAME": self.op.name,
977       }
978     mn = self.sstore.GetMasterNode()
979     return env, [mn], [mn]
980
981   def CheckPrereq(self):
982     """Verify that the passed name is a valid one.
983
984     """
985     hostname = utils.HostInfo(self.op.name)
986
987     new_name = hostname.name
988     self.ip = new_ip = hostname.ip
989     old_name = self.sstore.GetClusterName()
990     old_ip = self.sstore.GetMasterIP()
991     if new_name == old_name and new_ip == old_ip:
992       raise errors.OpPrereqError("Neither the name nor the IP address of the"
993                                  " cluster has changed")
994     if new_ip != old_ip:
995       result = utils.RunCmd(["fping", "-q", new_ip])
996       if not result.failed:
997         raise errors.OpPrereqError("The given cluster IP address (%s) is"
998                                    " reachable on the network. Aborting." %
999                                    new_ip)
1000
1001     self.op.name = new_name
1002
1003   def Exec(self, feedback_fn):
1004     """Rename the cluster.
1005
1006     """
1007     clustername = self.op.name
1008     ip = self.ip
1009     ss = self.sstore
1010
1011     # shutdown the master IP
1012     master = ss.GetMasterNode()
1013     if not rpc.call_node_stop_master(master):
1014       raise errors.OpExecError("Could not disable the master role")
1015
1016     try:
1017       # modify the sstore
1018       ss.SetKey(ss.SS_MASTER_IP, ip)
1019       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020
1021       # Distribute updated ss config to all nodes
1022       myself = self.cfg.GetNodeInfo(master)
1023       dist_nodes = self.cfg.GetNodeList()
1024       if myself.name in dist_nodes:
1025         dist_nodes.remove(myself.name)
1026
1027       logger.Debug("Copying updated ssconf data to all nodes")
1028       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029         fname = ss.KeyToFilename(keyname)
1030         result = rpc.call_upload_file(dist_nodes, fname)
1031         for to_node in dist_nodes:
1032           if not result[to_node]:
1033             logger.Error("copy of file %s to node %s failed" %
1034                          (fname, to_node))
1035     finally:
1036       if not rpc.call_node_start_master(master):
1037         logger.Error("Could not re-enable the master role on the master,"
1038                      " please restart manually.")
1039
1040
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042   """Sleep and poll for an instance's disk to sync.
1043
1044   """
1045   if not instance.disks:
1046     return True
1047
1048   if not oneshot:
1049     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050
1051   node = instance.primary_node
1052
1053   for dev in instance.disks:
1054     cfgw.SetDiskID(dev, node)
1055
1056   retries = 0
1057   while True:
1058     max_time = 0
1059     done = True
1060     cumul_degraded = False
1061     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062     if not rstats:
1063       proc.LogWarning("Can't get any data from node %s" % node)
1064       retries += 1
1065       if retries >= 10:
1066         raise errors.RemoteError("Can't contact node %s for mirror data,"
1067                                  " aborting." % node)
1068       time.sleep(6)
1069       continue
1070     retries = 0
1071     for i in range(len(rstats)):
1072       mstat = rstats[i]
1073       if mstat is None:
1074         proc.LogWarning("Can't compute data for node %s/%s" %
1075                         (node, instance.disks[i].iv_name))
1076         continue
1077       # we ignore the ldisk parameter
1078       perc_done, est_time, is_degraded, _ = mstat
1079       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080       if perc_done is not None:
1081         done = False
1082         if est_time is not None:
1083           rem_time = "%d estimated seconds remaining" % est_time
1084           max_time = est_time
1085         else:
1086           rem_time = "no time estimate"
1087         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088                      (instance.disks[i].iv_name, perc_done, rem_time))
1089     if done or oneshot:
1090       break
1091
1092     if unlock:
1093       utils.Unlock('cmd')
1094     try:
1095       time.sleep(min(60, max_time))
1096     finally:
1097       if unlock:
1098         utils.Lock('cmd')
1099
1100   if done:
1101     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102   return not cumul_degraded
1103
1104
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106   """Check that mirrors are not degraded.
1107
1108   The ldisk parameter, if True, will change the test from the
1109   is_degraded attribute (which represents overall non-ok status for
1110   the device(s)) to the ldisk (representing the local storage status).
1111
1112   """
1113   cfgw.SetDiskID(dev, node)
1114   if ldisk:
1115     idx = 6
1116   else:
1117     idx = 5
1118
1119   result = True
1120   if on_primary or dev.AssembleOnSecondary():
1121     rstats = rpc.call_blockdev_find(node, dev)
1122     if not rstats:
1123       logger.ToStderr("Can't get any data from node %s" % node)
1124       result = False
1125     else:
1126       result = result and (not rstats[idx])
1127   if dev.children:
1128     for child in dev.children:
1129       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130
1131   return result
1132
1133
1134 class LUDiagnoseOS(NoHooksLU):
1135   """Logical unit for OS diagnose/query.
1136
1137   """
1138   _OP_REQP = []
1139
1140   def CheckPrereq(self):
1141     """Check prerequisites.
1142
1143     This always succeeds, since this is a pure query LU.
1144
1145     """
1146     return
1147
1148   def Exec(self, feedback_fn):
1149     """Compute the list of OSes.
1150
1151     """
1152     node_list = self.cfg.GetNodeList()
1153     node_data = rpc.call_os_diagnose(node_list)
1154     if node_data == False:
1155       raise errors.OpExecError("Can't gather the list of OSes")
1156     return node_data
1157
1158
1159 class LURemoveNode(LogicalUnit):
1160   """Logical unit for removing a node.
1161
1162   """
1163   HPATH = "node-remove"
1164   HTYPE = constants.HTYPE_NODE
1165   _OP_REQP = ["node_name"]
1166
1167   def BuildHooksEnv(self):
1168     """Build hooks env.
1169
1170     This doesn't run on the target node in the pre phase as a failed
1171     node would not allows itself to run.
1172
1173     """
1174     env = {
1175       "OP_TARGET": self.op.node_name,
1176       "NODE_NAME": self.op.node_name,
1177       }
1178     all_nodes = self.cfg.GetNodeList()
1179     all_nodes.remove(self.op.node_name)
1180     return env, all_nodes, all_nodes
1181
1182   def CheckPrereq(self):
1183     """Check prerequisites.
1184
1185     This checks:
1186      - the node exists in the configuration
1187      - it does not have primary or secondary instances
1188      - it's not the master
1189
1190     Any errors are signalled by raising errors.OpPrereqError.
1191
1192     """
1193     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194     if node is None:
1195       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196
1197     instance_list = self.cfg.GetInstanceList()
1198
1199     masternode = self.sstore.GetMasterNode()
1200     if node.name == masternode:
1201       raise errors.OpPrereqError("Node is the master node,"
1202                                  " you need to failover first.")
1203
1204     for instance_name in instance_list:
1205       instance = self.cfg.GetInstanceInfo(instance_name)
1206       if node.name == instance.primary_node:
1207         raise errors.OpPrereqError("Instance %s still running on the node,"
1208                                    " please remove first." % instance_name)
1209       if node.name in instance.secondary_nodes:
1210         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211                                    " please remove first." % instance_name)
1212     self.op.node_name = node.name
1213     self.node = node
1214
1215   def Exec(self, feedback_fn):
1216     """Removes the node from the cluster.
1217
1218     """
1219     node = self.node
1220     logger.Info("stopping the node daemon and removing configs from node %s" %
1221                 node.name)
1222
1223     rpc.call_node_leave_cluster(node.name)
1224
1225     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226
1227     logger.Info("Removing node %s from config" % node.name)
1228
1229     self.cfg.RemoveNode(node.name)
1230
1231     _RemoveHostFromEtcHosts(node.name)
1232
1233
1234 class LUQueryNodes(NoHooksLU):
1235   """Logical unit for querying nodes.
1236
1237   """
1238   _OP_REQP = ["output_fields", "names"]
1239
1240   def CheckPrereq(self):
1241     """Check prerequisites.
1242
1243     This checks that the fields required are valid output fields.
1244
1245     """
1246     self.dynamic_fields = frozenset(["dtotal", "dfree",
1247                                      "mtotal", "mnode", "mfree",
1248                                      "bootid"])
1249
1250     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251                                "pinst_list", "sinst_list",
1252                                "pip", "sip"],
1253                        dynamic=self.dynamic_fields,
1254                        selected=self.op.output_fields)
1255
1256     self.wanted = _GetWantedNodes(self, self.op.names)
1257
1258   def Exec(self, feedback_fn):
1259     """Computes the list of nodes and their attributes.
1260
1261     """
1262     nodenames = self.wanted
1263     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264
1265     # begin data gathering
1266
1267     if self.dynamic_fields.intersection(self.op.output_fields):
1268       live_data = {}
1269       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270       for name in nodenames:
1271         nodeinfo = node_data.get(name, None)
1272         if nodeinfo:
1273           live_data[name] = {
1274             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279             "bootid": nodeinfo['bootid'],
1280             }
1281         else:
1282           live_data[name] = {}
1283     else:
1284       live_data = dict.fromkeys(nodenames, {})
1285
1286     node_to_primary = dict([(name, set()) for name in nodenames])
1287     node_to_secondary = dict([(name, set()) for name in nodenames])
1288
1289     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290                              "sinst_cnt", "sinst_list"))
1291     if inst_fields & frozenset(self.op.output_fields):
1292       instancelist = self.cfg.GetInstanceList()
1293
1294       for instance_name in instancelist:
1295         inst = self.cfg.GetInstanceInfo(instance_name)
1296         if inst.primary_node in node_to_primary:
1297           node_to_primary[inst.primary_node].add(inst.name)
1298         for secnode in inst.secondary_nodes:
1299           if secnode in node_to_secondary:
1300             node_to_secondary[secnode].add(inst.name)
1301
1302     # end data gathering
1303
1304     output = []
1305     for node in nodelist:
1306       node_output = []
1307       for field in self.op.output_fields:
1308         if field == "name":
1309           val = node.name
1310         elif field == "pinst_list":
1311           val = list(node_to_primary[node.name])
1312         elif field == "sinst_list":
1313           val = list(node_to_secondary[node.name])
1314         elif field == "pinst_cnt":
1315           val = len(node_to_primary[node.name])
1316         elif field == "sinst_cnt":
1317           val = len(node_to_secondary[node.name])
1318         elif field == "pip":
1319           val = node.primary_ip
1320         elif field == "sip":
1321           val = node.secondary_ip
1322         elif field in self.dynamic_fields:
1323           val = live_data[node.name].get(field, None)
1324         else:
1325           raise errors.ParameterError(field)
1326         node_output.append(val)
1327       output.append(node_output)
1328
1329     return output
1330
1331
1332 class LUQueryNodeVolumes(NoHooksLU):
1333   """Logical unit for getting volumes on node(s).
1334
1335   """
1336   _OP_REQP = ["nodes", "output_fields"]
1337
1338   def CheckPrereq(self):
1339     """Check prerequisites.
1340
1341     This checks that the fields required are valid output fields.
1342
1343     """
1344     self.nodes = _GetWantedNodes(self, self.op.nodes)
1345
1346     _CheckOutputFields(static=["node"],
1347                        dynamic=["phys", "vg", "name", "size", "instance"],
1348                        selected=self.op.output_fields)
1349
1350
1351   def Exec(self, feedback_fn):
1352     """Computes the list of nodes and their attributes.
1353
1354     """
1355     nodenames = self.nodes
1356     volumes = rpc.call_node_volumes(nodenames)
1357
1358     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359              in self.cfg.GetInstanceList()]
1360
1361     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362
1363     output = []
1364     for node in nodenames:
1365       if node not in volumes or not volumes[node]:
1366         continue
1367
1368       node_vols = volumes[node][:]
1369       node_vols.sort(key=lambda vol: vol['dev'])
1370
1371       for vol in node_vols:
1372         node_output = []
1373         for field in self.op.output_fields:
1374           if field == "node":
1375             val = node
1376           elif field == "phys":
1377             val = vol['dev']
1378           elif field == "vg":
1379             val = vol['vg']
1380           elif field == "name":
1381             val = vol['name']
1382           elif field == "size":
1383             val = int(float(vol['size']))
1384           elif field == "instance":
1385             for inst in ilist:
1386               if node not in lv_by_node[inst]:
1387                 continue
1388               if vol['name'] in lv_by_node[inst][node]:
1389                 val = inst.name
1390                 break
1391             else:
1392               val = '-'
1393           else:
1394             raise errors.ParameterError(field)
1395           node_output.append(str(val))
1396
1397         output.append(node_output)
1398
1399     return output
1400
1401
1402 class LUAddNode(LogicalUnit):
1403   """Logical unit for adding node to the cluster.
1404
1405   """
1406   HPATH = "node-add"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This will run on all nodes before, and on all nodes + the new node after.
1414
1415     """
1416     env = {
1417       "OP_TARGET": self.op.node_name,
1418       "NODE_NAME": self.op.node_name,
1419       "NODE_PIP": self.op.primary_ip,
1420       "NODE_SIP": self.op.secondary_ip,
1421       }
1422     nodes_0 = self.cfg.GetNodeList()
1423     nodes_1 = nodes_0 + [self.op.node_name, ]
1424     return env, nodes_0, nodes_1
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the new node is not already in the config
1431      - it is resolvable
1432      - its parameters (single/dual homed) matches the cluster
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node_name = self.op.node_name
1438     cfg = self.cfg
1439
1440     dns_data = utils.HostInfo(node_name)
1441
1442     node = dns_data.name
1443     primary_ip = self.op.primary_ip = dns_data.ip
1444     secondary_ip = getattr(self.op, "secondary_ip", None)
1445     if secondary_ip is None:
1446       secondary_ip = primary_ip
1447     if not utils.IsValidIP(secondary_ip):
1448       raise errors.OpPrereqError("Invalid secondary IP given")
1449     self.op.secondary_ip = secondary_ip
1450     node_list = cfg.GetNodeList()
1451     if node in node_list:
1452       raise errors.OpPrereqError("Node %s is already in the configuration"
1453                                  % node)
1454
1455     for existing_node_name in node_list:
1456       existing_node = cfg.GetNodeInfo(existing_node_name)
1457       if (existing_node.primary_ip == primary_ip or
1458           existing_node.secondary_ip == primary_ip or
1459           existing_node.primary_ip == secondary_ip or
1460           existing_node.secondary_ip == secondary_ip):
1461         raise errors.OpPrereqError("New node ip address(es) conflict with"
1462                                    " existing node %s" % existing_node.name)
1463
1464     # check that the type of the node (single versus dual homed) is the
1465     # same as for the master
1466     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467     master_singlehomed = myself.secondary_ip == myself.primary_ip
1468     newbie_singlehomed = secondary_ip == primary_ip
1469     if master_singlehomed != newbie_singlehomed:
1470       if master_singlehomed:
1471         raise errors.OpPrereqError("The master has no private ip but the"
1472                                    " new node has one")
1473       else:
1474         raise errors.OpPrereqError("The master has a private ip but the"
1475                                    " new node doesn't have one")
1476
1477     # checks reachablity
1478     if not utils.TcpPing(utils.HostInfo().name,
1479                          primary_ip,
1480                          constants.DEFAULT_NODED_PORT):
1481       raise errors.OpPrereqError("Node not reachable by ping")
1482
1483     if not newbie_singlehomed:
1484       # check reachability from my secondary ip to newbie's secondary ip
1485       if not utils.TcpPing(myself.secondary_ip,
1486                            secondary_ip,
1487                            constants.DEFAULT_NODED_PORT):
1488         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489                                    " based ping to noded port")
1490
1491     self.new_node = objects.Node(name=node,
1492                                  primary_ip=primary_ip,
1493                                  secondary_ip=secondary_ip)
1494
1495     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498                                    constants.VNC_PASSWORD_FILE)
1499
1500   def Exec(self, feedback_fn):
1501     """Adds the new node to the cluster.
1502
1503     """
1504     new_node = self.new_node
1505     node = new_node.name
1506
1507     # set up inter-node password and certificate and restarts the node daemon
1508     gntpass = self.sstore.GetNodeDaemonPassword()
1509     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510       raise errors.OpExecError("ganeti password corruption detected")
1511     f = open(constants.SSL_CERT_FILE)
1512     try:
1513       gntpem = f.read(8192)
1514     finally:
1515       f.close()
1516     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517     # so we use this to detect an invalid certificate; as long as the
1518     # cert doesn't contain this, the here-document will be correctly
1519     # parsed by the shell sequence below
1520     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522     if not gntpem.endswith("\n"):
1523       raise errors.OpExecError("PEM must end with newline")
1524     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525
1526     # and then connect with ssh to set password and start ganeti-noded
1527     # note that all the below variables are sanitized at this point,
1528     # either by being constants or by the checks above
1529     ss = self.sstore
1530     mycommand = ("umask 077 && "
1531                  "echo '%s' > '%s' && "
1532                  "cat > '%s' << '!EOF.' && \n"
1533                  "%s!EOF.\n%s restart" %
1534                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535                   constants.SSL_CERT_FILE, gntpem,
1536                   constants.NODE_INITD_SCRIPT))
1537
1538     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539     if result.failed:
1540       raise errors.OpExecError("Remote command on node %s, error: %s,"
1541                                " output: %s" %
1542                                (node, result.fail_reason, result.output))
1543
1544     # check connectivity
1545     time.sleep(4)
1546
1547     result = rpc.call_version([node])[node]
1548     if result:
1549       if constants.PROTOCOL_VERSION == result:
1550         logger.Info("communication to node %s fine, sw version %s match" %
1551                     (node, result))
1552       else:
1553         raise errors.OpExecError("Version mismatch master version %s,"
1554                                  " node version %s" %
1555                                  (constants.PROTOCOL_VERSION, result))
1556     else:
1557       raise errors.OpExecError("Cannot get version from the new node")
1558
1559     # setup ssh on node
1560     logger.Info("copy ssh key to node %s" % node)
1561     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562     keyarray = []
1563     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565                 priv_key, pub_key]
1566
1567     for i in keyfiles:
1568       f = open(i, 'r')
1569       try:
1570         keyarray.append(f.read())
1571       finally:
1572         f.close()
1573
1574     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575                                keyarray[3], keyarray[4], keyarray[5])
1576
1577     if not result:
1578       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579
1580     # Add node to our /etc/hosts, and add key to known_hosts
1581     _AddHostToEtcHosts(new_node.name)
1582
1583     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584                       self.cfg.GetHostKey())
1585
1586     if new_node.secondary_ip != new_node.primary_ip:
1587       if not rpc.call_node_tcp_ping(new_node.name,
1588                                     constants.LOCALHOST_IP_ADDRESS,
1589                                     new_node.secondary_ip,
1590                                     constants.DEFAULT_NODED_PORT,
1591                                     10, False):
1592         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593                                  " you gave (%s). Please fix and re-run this"
1594                                  " command." % new_node.secondary_ip)
1595
1596     success, msg = ssh.VerifyNodeHostname(node)
1597     if not success:
1598       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599                                " than the one the resolver gives: %s."
1600                                " Please fix and re-run this command." %
1601                                (node, msg))
1602
1603     # Distribute updated /etc/hosts and known_hosts to all nodes,
1604     # including the node just added
1605     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606     dist_nodes = self.cfg.GetNodeList() + [node]
1607     if myself.name in dist_nodes:
1608       dist_nodes.remove(myself.name)
1609
1610     logger.Debug("Copying hosts and known_hosts to all nodes")
1611     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612       result = rpc.call_upload_file(dist_nodes, fname)
1613       for to_node in dist_nodes:
1614         if not result[to_node]:
1615           logger.Error("copy of file %s to node %s failed" %
1616                        (fname, to_node))
1617
1618     to_copy = ss.GetFileList()
1619     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620       to_copy.append(constants.VNC_PASSWORD_FILE)
1621     for fname in to_copy:
1622       if not ssh.CopyFileToNode(node, fname):
1623         logger.Error("could not copy file %s to node %s" % (fname, node))
1624
1625     logger.Info("adding node %s to cluster.conf" % node)
1626     self.cfg.AddNode(new_node)
1627
1628
1629 class LUMasterFailover(LogicalUnit):
1630   """Failover the master node to the current node.
1631
1632   This is a special LU in that it must run on a non-master node.
1633
1634   """
1635   HPATH = "master-failover"
1636   HTYPE = constants.HTYPE_CLUSTER
1637   REQ_MASTER = False
1638   _OP_REQP = []
1639
1640   def BuildHooksEnv(self):
1641     """Build hooks env.
1642
1643     This will run on the new master only in the pre phase, and on all
1644     the nodes in the post phase.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.new_master,
1649       "NEW_MASTER": self.new_master,
1650       "OLD_MASTER": self.old_master,
1651       }
1652     return env, [self.new_master], self.cfg.GetNodeList()
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that we are not already the master.
1658
1659     """
1660     self.new_master = utils.HostInfo().name
1661     self.old_master = self.sstore.GetMasterNode()
1662
1663     if self.old_master == self.new_master:
1664       raise errors.OpPrereqError("This commands must be run on the node"
1665                                  " where you want the new master to be."
1666                                  " %s is already the master" %
1667                                  self.old_master)
1668
1669   def Exec(self, feedback_fn):
1670     """Failover the master node.
1671
1672     This command, when run on a non-master node, will cause the current
1673     master to cease being master, and the non-master to become new
1674     master.
1675
1676     """
1677     #TODO: do not rely on gethostname returning the FQDN
1678     logger.Info("setting master to %s, old master: %s" %
1679                 (self.new_master, self.old_master))
1680
1681     if not rpc.call_node_stop_master(self.old_master):
1682       logger.Error("could disable the master role on the old master"
1683                    " %s, please disable manually" % self.old_master)
1684
1685     ss = self.sstore
1686     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689       logger.Error("could not distribute the new simple store master file"
1690                    " to the other nodes, please check.")
1691
1692     if not rpc.call_node_start_master(self.new_master):
1693       logger.Error("could not start the master role on the new master"
1694                    " %s, please check" % self.new_master)
1695       feedback_fn("Error in activating the master IP on the new master,"
1696                   " please fix manually.")
1697
1698
1699
1700 class LUQueryClusterInfo(NoHooksLU):
1701   """Query cluster configuration.
1702
1703   """
1704   _OP_REQP = []
1705   REQ_MASTER = False
1706
1707   def CheckPrereq(self):
1708     """No prerequsites needed for this LU.
1709
1710     """
1711     pass
1712
1713   def Exec(self, feedback_fn):
1714     """Return cluster config.
1715
1716     """
1717     result = {
1718       "name": self.sstore.GetClusterName(),
1719       "software_version": constants.RELEASE_VERSION,
1720       "protocol_version": constants.PROTOCOL_VERSION,
1721       "config_version": constants.CONFIG_VERSION,
1722       "os_api_version": constants.OS_API_VERSION,
1723       "export_version": constants.EXPORT_VERSION,
1724       "master": self.sstore.GetMasterNode(),
1725       "architecture": (platform.architecture()[0], platform.machine()),
1726       }
1727
1728     return result
1729
1730
1731 class LUClusterCopyFile(NoHooksLU):
1732   """Copy file to cluster.
1733
1734   """
1735   _OP_REQP = ["nodes", "filename"]
1736
1737   def CheckPrereq(self):
1738     """Check prerequisites.
1739
1740     It should check that the named file exists and that the given list
1741     of nodes is valid.
1742
1743     """
1744     if not os.path.exists(self.op.filename):
1745       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746
1747     self.nodes = _GetWantedNodes(self, self.op.nodes)
1748
1749   def Exec(self, feedback_fn):
1750     """Copy a file from master to some nodes.
1751
1752     Args:
1753       opts - class with options as members
1754       args - list containing a single element, the file name
1755     Opts used:
1756       nodes - list containing the name of target nodes; if empty, all nodes
1757
1758     """
1759     filename = self.op.filename
1760
1761     myname = utils.HostInfo().name
1762
1763     for node in self.nodes:
1764       if node == myname:
1765         continue
1766       if not ssh.CopyFileToNode(node, filename):
1767         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768
1769
1770 class LUDumpClusterConfig(NoHooksLU):
1771   """Return a text-representation of the cluster-config.
1772
1773   """
1774   _OP_REQP = []
1775
1776   def CheckPrereq(self):
1777     """No prerequisites.
1778
1779     """
1780     pass
1781
1782   def Exec(self, feedback_fn):
1783     """Dump a representation of the cluster config to the standard output.
1784
1785     """
1786     return self.cfg.DumpConfig()
1787
1788
1789 class LURunClusterCommand(NoHooksLU):
1790   """Run a command on some nodes.
1791
1792   """
1793   _OP_REQP = ["command", "nodes"]
1794
1795   def CheckPrereq(self):
1796     """Check prerequisites.
1797
1798     It checks that the given list of nodes is valid.
1799
1800     """
1801     self.nodes = _GetWantedNodes(self, self.op.nodes)
1802
1803   def Exec(self, feedback_fn):
1804     """Run a command on some nodes.
1805
1806     """
1807     data = []
1808     for node in self.nodes:
1809       result = ssh.SSHCall(node, "root", self.op.command)
1810       data.append((node, result.output, result.exit_code))
1811
1812     return data
1813
1814
1815 class LUActivateInstanceDisks(NoHooksLU):
1816   """Bring up an instance's disks.
1817
1818   """
1819   _OP_REQP = ["instance_name"]
1820
1821   def CheckPrereq(self):
1822     """Check prerequisites.
1823
1824     This checks that the instance is in the cluster.
1825
1826     """
1827     instance = self.cfg.GetInstanceInfo(
1828       self.cfg.ExpandInstanceName(self.op.instance_name))
1829     if instance is None:
1830       raise errors.OpPrereqError("Instance '%s' not known" %
1831                                  self.op.instance_name)
1832     self.instance = instance
1833
1834
1835   def Exec(self, feedback_fn):
1836     """Activate the disks.
1837
1838     """
1839     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840     if not disks_ok:
1841       raise errors.OpExecError("Cannot activate block devices")
1842
1843     return disks_info
1844
1845
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847   """Prepare the block devices for an instance.
1848
1849   This sets up the block devices on all nodes.
1850
1851   Args:
1852     instance: a ganeti.objects.Instance object
1853     ignore_secondaries: if true, errors on secondary nodes won't result
1854                         in an error return from the function
1855
1856   Returns:
1857     false if the operation failed
1858     list of (host, instance_visible_name, node_visible_name) if the operation
1859          suceeded with the mapping from node devices to instance devices
1860   """
1861   device_info = []
1862   disks_ok = True
1863   for inst_disk in instance.disks:
1864     master_result = None
1865     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1866       cfg.SetDiskID(node_disk, node)
1867       is_primary = node == instance.primary_node
1868       result = rpc.call_blockdev_assemble(node, node_disk,
1869                                           instance.name, is_primary)
1870       if not result:
1871         logger.Error("could not prepare block device %s on node %s"
1872                      " (is_primary=%s)" %
1873                      (inst_disk.iv_name, node, is_primary))
1874         if is_primary or not ignore_secondaries:
1875           disks_ok = False
1876       if is_primary:
1877         master_result = result
1878     device_info.append((instance.primary_node, inst_disk.iv_name,
1879                         master_result))
1880
1881   # leave the disks configured for the primary node
1882   # this is a workaround that would be fixed better by
1883   # improving the logical/physical id handling
1884   for disk in instance.disks:
1885     cfg.SetDiskID(disk, instance.primary_node)
1886
1887   return disks_ok, device_info
1888
1889
1890 def _StartInstanceDisks(cfg, instance, force):
1891   """Start the disks of an instance.
1892
1893   """
1894   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1895                                            ignore_secondaries=force)
1896   if not disks_ok:
1897     _ShutdownInstanceDisks(instance, cfg)
1898     if force is not None and not force:
1899       logger.Error("If the message above refers to a secondary node,"
1900                    " you can retry the operation using '--force'.")
1901     raise errors.OpExecError("Disk consistency error")
1902
1903
1904 class LUDeactivateInstanceDisks(NoHooksLU):
1905   """Shutdown an instance's disks.
1906
1907   """
1908   _OP_REQP = ["instance_name"]
1909
1910   def CheckPrereq(self):
1911     """Check prerequisites.
1912
1913     This checks that the instance is in the cluster.
1914
1915     """
1916     instance = self.cfg.GetInstanceInfo(
1917       self.cfg.ExpandInstanceName(self.op.instance_name))
1918     if instance is None:
1919       raise errors.OpPrereqError("Instance '%s' not known" %
1920                                  self.op.instance_name)
1921     self.instance = instance
1922
1923   def Exec(self, feedback_fn):
1924     """Deactivate the disks
1925
1926     """
1927     instance = self.instance
1928     ins_l = rpc.call_instance_list([instance.primary_node])
1929     ins_l = ins_l[instance.primary_node]
1930     if not type(ins_l) is list:
1931       raise errors.OpExecError("Can't contact node '%s'" %
1932                                instance.primary_node)
1933
1934     if self.instance.name in ins_l:
1935       raise errors.OpExecError("Instance is running, can't shutdown"
1936                                " block devices.")
1937
1938     _ShutdownInstanceDisks(instance, self.cfg)
1939
1940
1941 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1942   """Shutdown block devices of an instance.
1943
1944   This does the shutdown on all nodes of the instance.
1945
1946   If the ignore_primary is false, errors on the primary node are
1947   ignored.
1948
1949   """
1950   result = True
1951   for disk in instance.disks:
1952     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1953       cfg.SetDiskID(top_disk, node)
1954       if not rpc.call_blockdev_shutdown(node, top_disk):
1955         logger.Error("could not shutdown block device %s on node %s" %
1956                      (disk.iv_name, node))
1957         if not ignore_primary or node != instance.primary_node:
1958           result = False
1959   return result
1960
1961
1962 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1963   """Checks if a node has enough free memory.
1964
1965   This function check if a given node has the needed amount of free
1966   memory. In case the node has less memory or we cannot get the
1967   information from the node, this function raise an OpPrereqError
1968   exception.
1969
1970   Args:
1971     - cfg: a ConfigWriter instance
1972     - node: the node name
1973     - reason: string to use in the error message
1974     - requested: the amount of memory in MiB
1975
1976   """
1977   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1978   if not nodeinfo or not isinstance(nodeinfo, dict):
1979     raise errors.OpPrereqError("Could not contact node %s for resource"
1980                              " information" % (node,))
1981
1982   free_mem = nodeinfo[node].get('memory_free')
1983   if not isinstance(free_mem, int):
1984     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1985                              " was '%s'" % (node, free_mem))
1986   if requested > free_mem:
1987     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1988                              " needed %s MiB, available %s MiB" %
1989                              (node, reason, requested, free_mem))
1990
1991
1992 class LUStartupInstance(LogicalUnit):
1993   """Starts an instance.
1994
1995   """
1996   HPATH = "instance-start"
1997   HTYPE = constants.HTYPE_INSTANCE
1998   _OP_REQP = ["instance_name", "force"]
1999
2000   def BuildHooksEnv(self):
2001     """Build hooks env.
2002
2003     This runs on master, primary and secondary nodes of the instance.
2004
2005     """
2006     env = {
2007       "FORCE": self.op.force,
2008       }
2009     env.update(_BuildInstanceHookEnvByObject(self.instance))
2010     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2011           list(self.instance.secondary_nodes))
2012     return env, nl, nl
2013
2014   def CheckPrereq(self):
2015     """Check prerequisites.
2016
2017     This checks that the instance is in the cluster.
2018
2019     """
2020     instance = self.cfg.GetInstanceInfo(
2021       self.cfg.ExpandInstanceName(self.op.instance_name))
2022     if instance is None:
2023       raise errors.OpPrereqError("Instance '%s' not known" %
2024                                  self.op.instance_name)
2025
2026     # check bridges existance
2027     _CheckInstanceBridgesExist(instance)
2028
2029     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2030                          "starting instance %s" % instance.name,
2031                          instance.memory)
2032
2033     self.instance = instance
2034     self.op.instance_name = instance.name
2035
2036   def Exec(self, feedback_fn):
2037     """Start the instance.
2038
2039     """
2040     instance = self.instance
2041     force = self.op.force
2042     extra_args = getattr(self.op, "extra_args", "")
2043
2044     node_current = instance.primary_node
2045
2046     _StartInstanceDisks(self.cfg, instance, force)
2047
2048     if not rpc.call_instance_start(node_current, instance, extra_args):
2049       _ShutdownInstanceDisks(instance, self.cfg)
2050       raise errors.OpExecError("Could not start instance")
2051
2052     self.cfg.MarkInstanceUp(instance.name)
2053
2054
2055 class LURebootInstance(LogicalUnit):
2056   """Reboot an instance.
2057
2058   """
2059   HPATH = "instance-reboot"
2060   HTYPE = constants.HTYPE_INSTANCE
2061   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2062
2063   def BuildHooksEnv(self):
2064     """Build hooks env.
2065
2066     This runs on master, primary and secondary nodes of the instance.
2067
2068     """
2069     env = {
2070       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2071       }
2072     env.update(_BuildInstanceHookEnvByObject(self.instance))
2073     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2074           list(self.instance.secondary_nodes))
2075     return env, nl, nl
2076
2077   def CheckPrereq(self):
2078     """Check prerequisites.
2079
2080     This checks that the instance is in the cluster.
2081
2082     """
2083     instance = self.cfg.GetInstanceInfo(
2084       self.cfg.ExpandInstanceName(self.op.instance_name))
2085     if instance is None:
2086       raise errors.OpPrereqError("Instance '%s' not known" %
2087                                  self.op.instance_name)
2088
2089     # check bridges existance
2090     _CheckInstanceBridgesExist(instance)
2091
2092     self.instance = instance
2093     self.op.instance_name = instance.name
2094
2095   def Exec(self, feedback_fn):
2096     """Reboot the instance.
2097
2098     """
2099     instance = self.instance
2100     ignore_secondaries = self.op.ignore_secondaries
2101     reboot_type = self.op.reboot_type
2102     extra_args = getattr(self.op, "extra_args", "")
2103
2104     node_current = instance.primary_node
2105
2106     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2107                            constants.INSTANCE_REBOOT_HARD,
2108                            constants.INSTANCE_REBOOT_FULL]:
2109       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2110                                   (constants.INSTANCE_REBOOT_SOFT,
2111                                    constants.INSTANCE_REBOOT_HARD,
2112                                    constants.INSTANCE_REBOOT_FULL))
2113
2114     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2115                        constants.INSTANCE_REBOOT_HARD]:
2116       if not rpc.call_instance_reboot(node_current, instance,
2117                                       reboot_type, extra_args):
2118         raise errors.OpExecError("Could not reboot instance")
2119     else:
2120       if not rpc.call_instance_shutdown(node_current, instance):
2121         raise errors.OpExecError("could not shutdown instance for full reboot")
2122       _ShutdownInstanceDisks(instance, self.cfg)
2123       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2124       if not rpc.call_instance_start(node_current, instance, extra_args):
2125         _ShutdownInstanceDisks(instance, self.cfg)
2126         raise errors.OpExecError("Could not start instance for full reboot")
2127
2128     self.cfg.MarkInstanceUp(instance.name)
2129
2130
2131 class LUShutdownInstance(LogicalUnit):
2132   """Shutdown an instance.
2133
2134   """
2135   HPATH = "instance-stop"
2136   HTYPE = constants.HTYPE_INSTANCE
2137   _OP_REQP = ["instance_name"]
2138
2139   def BuildHooksEnv(self):
2140     """Build hooks env.
2141
2142     This runs on master, primary and secondary nodes of the instance.
2143
2144     """
2145     env = _BuildInstanceHookEnvByObject(self.instance)
2146     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2147           list(self.instance.secondary_nodes))
2148     return env, nl, nl
2149
2150   def CheckPrereq(self):
2151     """Check prerequisites.
2152
2153     This checks that the instance is in the cluster.
2154
2155     """
2156     instance = self.cfg.GetInstanceInfo(
2157       self.cfg.ExpandInstanceName(self.op.instance_name))
2158     if instance is None:
2159       raise errors.OpPrereqError("Instance '%s' not known" %
2160                                  self.op.instance_name)
2161     self.instance = instance
2162
2163   def Exec(self, feedback_fn):
2164     """Shutdown the instance.
2165
2166     """
2167     instance = self.instance
2168     node_current = instance.primary_node
2169     if not rpc.call_instance_shutdown(node_current, instance):
2170       logger.Error("could not shutdown instance")
2171
2172     self.cfg.MarkInstanceDown(instance.name)
2173     _ShutdownInstanceDisks(instance, self.cfg)
2174
2175
2176 class LUReinstallInstance(LogicalUnit):
2177   """Reinstall an instance.
2178
2179   """
2180   HPATH = "instance-reinstall"
2181   HTYPE = constants.HTYPE_INSTANCE
2182   _OP_REQP = ["instance_name"]
2183
2184   def BuildHooksEnv(self):
2185     """Build hooks env.
2186
2187     This runs on master, primary and secondary nodes of the instance.
2188
2189     """
2190     env = _BuildInstanceHookEnvByObject(self.instance)
2191     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2192           list(self.instance.secondary_nodes))
2193     return env, nl, nl
2194
2195   def CheckPrereq(self):
2196     """Check prerequisites.
2197
2198     This checks that the instance is in the cluster and is not running.
2199
2200     """
2201     instance = self.cfg.GetInstanceInfo(
2202       self.cfg.ExpandInstanceName(self.op.instance_name))
2203     if instance is None:
2204       raise errors.OpPrereqError("Instance '%s' not known" %
2205                                  self.op.instance_name)
2206     if instance.disk_template == constants.DT_DISKLESS:
2207       raise errors.OpPrereqError("Instance '%s' has no disks" %
2208                                  self.op.instance_name)
2209     if instance.status != "down":
2210       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2211                                  self.op.instance_name)
2212     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2213     if remote_info:
2214       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2215                                  (self.op.instance_name,
2216                                   instance.primary_node))
2217
2218     self.op.os_type = getattr(self.op, "os_type", None)
2219     if self.op.os_type is not None:
2220       # OS verification
2221       pnode = self.cfg.GetNodeInfo(
2222         self.cfg.ExpandNodeName(instance.primary_node))
2223       if pnode is None:
2224         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2225                                    self.op.pnode)
2226       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2227       if not os_obj:
2228         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2229                                    " primary node"  % self.op.os_type)
2230
2231     self.instance = instance
2232
2233   def Exec(self, feedback_fn):
2234     """Reinstall the instance.
2235
2236     """
2237     inst = self.instance
2238
2239     if self.op.os_type is not None:
2240       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2241       inst.os = self.op.os_type
2242       self.cfg.AddInstance(inst)
2243
2244     _StartInstanceDisks(self.cfg, inst, None)
2245     try:
2246       feedback_fn("Running the instance OS create scripts...")
2247       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2248         raise errors.OpExecError("Could not install OS for instance %s"
2249                                  " on node %s" %
2250                                  (inst.name, inst.primary_node))
2251     finally:
2252       _ShutdownInstanceDisks(inst, self.cfg)
2253
2254
2255 class LURenameInstance(LogicalUnit):
2256   """Rename an instance.
2257
2258   """
2259   HPATH = "instance-rename"
2260   HTYPE = constants.HTYPE_INSTANCE
2261   _OP_REQP = ["instance_name", "new_name"]
2262
2263   def BuildHooksEnv(self):
2264     """Build hooks env.
2265
2266     This runs on master, primary and secondary nodes of the instance.
2267
2268     """
2269     env = _BuildInstanceHookEnvByObject(self.instance)
2270     env["INSTANCE_NEW_NAME"] = self.op.new_name
2271     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272           list(self.instance.secondary_nodes))
2273     return env, nl, nl
2274
2275   def CheckPrereq(self):
2276     """Check prerequisites.
2277
2278     This checks that the instance is in the cluster and is not running.
2279
2280     """
2281     instance = self.cfg.GetInstanceInfo(
2282       self.cfg.ExpandInstanceName(self.op.instance_name))
2283     if instance is None:
2284       raise errors.OpPrereqError("Instance '%s' not known" %
2285                                  self.op.instance_name)
2286     if instance.status != "down":
2287       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2288                                  self.op.instance_name)
2289     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2290     if remote_info:
2291       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2292                                  (self.op.instance_name,
2293                                   instance.primary_node))
2294     self.instance = instance
2295
2296     # new name verification
2297     name_info = utils.HostInfo(self.op.new_name)
2298
2299     self.op.new_name = new_name = name_info.name
2300     if not getattr(self.op, "ignore_ip", False):
2301       command = ["fping", "-q", name_info.ip]
2302       result = utils.RunCmd(command)
2303       if not result.failed:
2304         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2305                                    (name_info.ip, new_name))
2306
2307
2308   def Exec(self, feedback_fn):
2309     """Reinstall the instance.
2310
2311     """
2312     inst = self.instance
2313     old_name = inst.name
2314
2315     self.cfg.RenameInstance(inst.name, self.op.new_name)
2316
2317     # re-read the instance from the configuration after rename
2318     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2319
2320     _StartInstanceDisks(self.cfg, inst, None)
2321     try:
2322       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2323                                           "sda", "sdb"):
2324         msg = ("Could run OS rename script for instance %s on node %s (but the"
2325                " instance has been renamed in Ganeti)" %
2326                (inst.name, inst.primary_node))
2327         logger.Error(msg)
2328     finally:
2329       _ShutdownInstanceDisks(inst, self.cfg)
2330
2331
2332 class LURemoveInstance(LogicalUnit):
2333   """Remove an instance.
2334
2335   """
2336   HPATH = "instance-remove"
2337   HTYPE = constants.HTYPE_INSTANCE
2338   _OP_REQP = ["instance_name"]
2339
2340   def BuildHooksEnv(self):
2341     """Build hooks env.
2342
2343     This runs on master, primary and secondary nodes of the instance.
2344
2345     """
2346     env = _BuildInstanceHookEnvByObject(self.instance)
2347     nl = [self.sstore.GetMasterNode()]
2348     return env, nl, nl
2349
2350   def CheckPrereq(self):
2351     """Check prerequisites.
2352
2353     This checks that the instance is in the cluster.
2354
2355     """
2356     instance = self.cfg.GetInstanceInfo(
2357       self.cfg.ExpandInstanceName(self.op.instance_name))
2358     if instance is None:
2359       raise errors.OpPrereqError("Instance '%s' not known" %
2360                                  self.op.instance_name)
2361     self.instance = instance
2362
2363   def Exec(self, feedback_fn):
2364     """Remove the instance.
2365
2366     """
2367     instance = self.instance
2368     logger.Info("shutting down instance %s on node %s" %
2369                 (instance.name, instance.primary_node))
2370
2371     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2372       if self.op.ignore_failures:
2373         feedback_fn("Warning: can't shutdown instance")
2374       else:
2375         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2376                                  (instance.name, instance.primary_node))
2377
2378     logger.Info("removing block devices for instance %s" % instance.name)
2379
2380     if not _RemoveDisks(instance, self.cfg):
2381       if self.op.ignore_failures:
2382         feedback_fn("Warning: can't remove instance's disks")
2383       else:
2384         raise errors.OpExecError("Can't remove instance's disks")
2385
2386     logger.Info("removing instance %s out of cluster config" % instance.name)
2387
2388     self.cfg.RemoveInstance(instance.name)
2389
2390
2391 class LUQueryInstances(NoHooksLU):
2392   """Logical unit for querying instances.
2393
2394   """
2395   _OP_REQP = ["output_fields", "names"]
2396
2397   def CheckPrereq(self):
2398     """Check prerequisites.
2399
2400     This checks that the fields required are valid output fields.
2401
2402     """
2403     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2404     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2405                                "admin_state", "admin_ram",
2406                                "disk_template", "ip", "mac", "bridge",
2407                                "sda_size", "sdb_size", "vcpus"],
2408                        dynamic=self.dynamic_fields,
2409                        selected=self.op.output_fields)
2410
2411     self.wanted = _GetWantedInstances(self, self.op.names)
2412
2413   def Exec(self, feedback_fn):
2414     """Computes the list of nodes and their attributes.
2415
2416     """
2417     instance_names = self.wanted
2418     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2419                      in instance_names]
2420
2421     # begin data gathering
2422
2423     nodes = frozenset([inst.primary_node for inst in instance_list])
2424
2425     bad_nodes = []
2426     if self.dynamic_fields.intersection(self.op.output_fields):
2427       live_data = {}
2428       node_data = rpc.call_all_instances_info(nodes)
2429       for name in nodes:
2430         result = node_data[name]
2431         if result:
2432           live_data.update(result)
2433         elif result == False:
2434           bad_nodes.append(name)
2435         # else no instance is alive
2436     else:
2437       live_data = dict([(name, {}) for name in instance_names])
2438
2439     # end data gathering
2440
2441     output = []
2442     for instance in instance_list:
2443       iout = []
2444       for field in self.op.output_fields:
2445         if field == "name":
2446           val = instance.name
2447         elif field == "os":
2448           val = instance.os
2449         elif field == "pnode":
2450           val = instance.primary_node
2451         elif field == "snodes":
2452           val = list(instance.secondary_nodes)
2453         elif field == "admin_state":
2454           val = (instance.status != "down")
2455         elif field == "oper_state":
2456           if instance.primary_node in bad_nodes:
2457             val = None
2458           else:
2459             val = bool(live_data.get(instance.name))
2460         elif field == "admin_ram":
2461           val = instance.memory
2462         elif field == "oper_ram":
2463           if instance.primary_node in bad_nodes:
2464             val = None
2465           elif instance.name in live_data:
2466             val = live_data[instance.name].get("memory", "?")
2467           else:
2468             val = "-"
2469         elif field == "disk_template":
2470           val = instance.disk_template
2471         elif field == "ip":
2472           val = instance.nics[0].ip
2473         elif field == "bridge":
2474           val = instance.nics[0].bridge
2475         elif field == "mac":
2476           val = instance.nics[0].mac
2477         elif field == "sda_size" or field == "sdb_size":
2478           disk = instance.FindDisk(field[:3])
2479           if disk is None:
2480             val = None
2481           else:
2482             val = disk.size
2483         elif field == "vcpus":
2484           val = instance.vcpus
2485         else:
2486           raise errors.ParameterError(field)
2487         iout.append(val)
2488       output.append(iout)
2489
2490     return output
2491
2492
2493 class LUFailoverInstance(LogicalUnit):
2494   """Failover an instance.
2495
2496   """
2497   HPATH = "instance-failover"
2498   HTYPE = constants.HTYPE_INSTANCE
2499   _OP_REQP = ["instance_name", "ignore_consistency"]
2500
2501   def BuildHooksEnv(self):
2502     """Build hooks env.
2503
2504     This runs on master, primary and secondary nodes of the instance.
2505
2506     """
2507     env = {
2508       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2509       }
2510     env.update(_BuildInstanceHookEnvByObject(self.instance))
2511     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2512     return env, nl, nl
2513
2514   def CheckPrereq(self):
2515     """Check prerequisites.
2516
2517     This checks that the instance is in the cluster.
2518
2519     """
2520     instance = self.cfg.GetInstanceInfo(
2521       self.cfg.ExpandInstanceName(self.op.instance_name))
2522     if instance is None:
2523       raise errors.OpPrereqError("Instance '%s' not known" %
2524                                  self.op.instance_name)
2525
2526     if instance.disk_template not in constants.DTS_NET_MIRROR:
2527       raise errors.OpPrereqError("Instance's disk layout is not"
2528                                  " network mirrored, cannot failover.")
2529
2530     secondary_nodes = instance.secondary_nodes
2531     if not secondary_nodes:
2532       raise errors.ProgrammerError("no secondary node but using "
2533                                    "DT_REMOTE_RAID1 template")
2534
2535     target_node = secondary_nodes[0]
2536     # check memory requirements on the secondary node
2537     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2538                          instance.name, instance.memory)
2539
2540     # check bridge existance
2541     brlist = [nic.bridge for nic in instance.nics]
2542     if not rpc.call_bridges_exist(target_node, brlist):
2543       raise errors.OpPrereqError("One or more target bridges %s does not"
2544                                  " exist on destination node '%s'" %
2545                                  (brlist, target_node))
2546
2547     self.instance = instance
2548
2549   def Exec(self, feedback_fn):
2550     """Failover an instance.
2551
2552     The failover is done by shutting it down on its present node and
2553     starting it on the secondary.
2554
2555     """
2556     instance = self.instance
2557
2558     source_node = instance.primary_node
2559     target_node = instance.secondary_nodes[0]
2560
2561     feedback_fn("* checking disk consistency between source and target")
2562     for dev in instance.disks:
2563       # for remote_raid1, these are md over drbd
2564       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2565         if not self.op.ignore_consistency:
2566           raise errors.OpExecError("Disk %s is degraded on target node,"
2567                                    " aborting failover." % dev.iv_name)
2568
2569     feedback_fn("* shutting down instance on source node")
2570     logger.Info("Shutting down instance %s on node %s" %
2571                 (instance.name, source_node))
2572
2573     if not rpc.call_instance_shutdown(source_node, instance):
2574       if self.op.ignore_consistency:
2575         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576                      " anyway. Please make sure node %s is down"  %
2577                      (instance.name, source_node, source_node))
2578       else:
2579         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580                                  (instance.name, source_node))
2581
2582     feedback_fn("* deactivating the instance's disks on source node")
2583     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584       raise errors.OpExecError("Can't shut down the instance's disks.")
2585
2586     instance.primary_node = target_node
2587     # distribute new instance config to the other nodes
2588     self.cfg.AddInstance(instance)
2589
2590     feedback_fn("* activating the instance's disks on target node")
2591     logger.Info("Starting instance %s on node %s" %
2592                 (instance.name, target_node))
2593
2594     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595                                              ignore_secondaries=True)
2596     if not disks_ok:
2597       _ShutdownInstanceDisks(instance, self.cfg)
2598       raise errors.OpExecError("Can't activate the instance's disks")
2599
2600     feedback_fn("* starting the instance on the target node")
2601     if not rpc.call_instance_start(target_node, instance, None):
2602       _ShutdownInstanceDisks(instance, self.cfg)
2603       raise errors.OpExecError("Could not start instance %s on node %s." %
2604                                (instance.name, target_node))
2605
2606
2607 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608   """Create a tree of block devices on the primary node.
2609
2610   This always creates all devices.
2611
2612   """
2613   if device.children:
2614     for child in device.children:
2615       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2616         return False
2617
2618   cfg.SetDiskID(device, node)
2619   new_id = rpc.call_blockdev_create(node, device, device.size,
2620                                     instance.name, True, info)
2621   if not new_id:
2622     return False
2623   if device.physical_id is None:
2624     device.physical_id = new_id
2625   return True
2626
2627
2628 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629   """Create a tree of block devices on a secondary node.
2630
2631   If this device type has to be created on secondaries, create it and
2632   all its children.
2633
2634   If not, just recurse to children keeping the same 'force' value.
2635
2636   """
2637   if device.CreateOnSecondary():
2638     force = True
2639   if device.children:
2640     for child in device.children:
2641       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642                                         child, force, info):
2643         return False
2644
2645   if not force:
2646     return True
2647   cfg.SetDiskID(device, node)
2648   new_id = rpc.call_blockdev_create(node, device, device.size,
2649                                     instance.name, False, info)
2650   if not new_id:
2651     return False
2652   if device.physical_id is None:
2653     device.physical_id = new_id
2654   return True
2655
2656
2657 def _GenerateUniqueNames(cfg, exts):
2658   """Generate a suitable LV name.
2659
2660   This will generate a logical volume name for the given instance.
2661
2662   """
2663   results = []
2664   for val in exts:
2665     new_id = cfg.GenerateUniqueID()
2666     results.append("%s%s" % (new_id, val))
2667   return results
2668
2669
2670 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671   """Generate a drbd device complete with its children.
2672
2673   """
2674   port = cfg.AllocatePort()
2675   vgname = cfg.GetVGName()
2676   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677                           logical_id=(vgname, names[0]))
2678   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679                           logical_id=(vgname, names[1]))
2680   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681                           logical_id = (primary, secondary, port),
2682                           children = [dev_data, dev_meta])
2683   return drbd_dev
2684
2685
2686 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687   """Generate a drbd8 device complete with its children.
2688
2689   """
2690   port = cfg.AllocatePort()
2691   vgname = cfg.GetVGName()
2692   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693                           logical_id=(vgname, names[0]))
2694   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695                           logical_id=(vgname, names[1]))
2696   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697                           logical_id = (primary, secondary, port),
2698                           children = [dev_data, dev_meta],
2699                           iv_name=iv_name)
2700   return drbd_dev
2701
2702 def _GenerateDiskTemplate(cfg, template_name,
2703                           instance_name, primary_node,
2704                           secondary_nodes, disk_sz, swap_sz):
2705   """Generate the entire disk layout for a given template type.
2706
2707   """
2708   #TODO: compute space requirements
2709
2710   vgname = cfg.GetVGName()
2711   if template_name == "diskless":
2712     disks = []
2713   elif template_name == "plain":
2714     if len(secondary_nodes) != 0:
2715       raise errors.ProgrammerError("Wrong template configuration")
2716
2717     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719                            logical_id=(vgname, names[0]),
2720                            iv_name = "sda")
2721     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722                            logical_id=(vgname, names[1]),
2723                            iv_name = "sdb")
2724     disks = [sda_dev, sdb_dev]
2725   elif template_name == "local_raid1":
2726     if len(secondary_nodes) != 0:
2727       raise errors.ProgrammerError("Wrong template configuration")
2728
2729
2730     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731                                        ".sdb_m1", ".sdb_m2"])
2732     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733                               logical_id=(vgname, names[0]))
2734     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735                               logical_id=(vgname, names[1]))
2736     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2737                               size=disk_sz,
2738                               children = [sda_dev_m1, sda_dev_m2])
2739     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740                               logical_id=(vgname, names[2]))
2741     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742                               logical_id=(vgname, names[3]))
2743     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2744                               size=swap_sz,
2745                               children = [sdb_dev_m1, sdb_dev_m2])
2746     disks = [md_sda_dev, md_sdb_dev]
2747   elif template_name == constants.DT_REMOTE_RAID1:
2748     if len(secondary_nodes) != 1:
2749       raise errors.ProgrammerError("Wrong template configuration")
2750     remote_node = secondary_nodes[0]
2751     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752                                        ".sdb_data", ".sdb_meta"])
2753     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754                                          disk_sz, names[0:2])
2755     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756                               children = [drbd_sda_dev], size=disk_sz)
2757     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758                                          swap_sz, names[2:4])
2759     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760                               children = [drbd_sdb_dev], size=swap_sz)
2761     disks = [md_sda_dev, md_sdb_dev]
2762   elif template_name == constants.DT_DRBD8:
2763     if len(secondary_nodes) != 1:
2764       raise errors.ProgrammerError("Wrong template configuration")
2765     remote_node = secondary_nodes[0]
2766     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767                                        ".sdb_data", ".sdb_meta"])
2768     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769                                          disk_sz, names[0:2], "sda")
2770     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771                                          swap_sz, names[2:4], "sdb")
2772     disks = [drbd_sda_dev, drbd_sdb_dev]
2773   else:
2774     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2775   return disks
2776
2777
2778 def _GetInstanceInfoText(instance):
2779   """Compute that text that should be added to the disk's metadata.
2780
2781   """
2782   return "originstname+%s" % instance.name
2783
2784
2785 def _CreateDisks(cfg, instance):
2786   """Create all disks for an instance.
2787
2788   This abstracts away some work from AddInstance.
2789
2790   Args:
2791     instance: the instance object
2792
2793   Returns:
2794     True or False showing the success of the creation process
2795
2796   """
2797   info = _GetInstanceInfoText(instance)
2798
2799   for device in instance.disks:
2800     logger.Info("creating volume %s for instance %s" %
2801               (device.iv_name, instance.name))
2802     #HARDCODE
2803     for secondary_node in instance.secondary_nodes:
2804       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805                                         device, False, info):
2806         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807                      (device.iv_name, device, secondary_node))
2808         return False
2809     #HARDCODE
2810     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811                                     instance, device, info):
2812       logger.Error("failed to create volume %s on primary!" %
2813                    device.iv_name)
2814       return False
2815   return True
2816
2817
2818 def _RemoveDisks(instance, cfg):
2819   """Remove all disks for an instance.
2820
2821   This abstracts away some work from `AddInstance()` and
2822   `RemoveInstance()`. Note that in case some of the devices couldn't
2823   be removed, the removal will continue with the other ones (compare
2824   with `_CreateDisks()`).
2825
2826   Args:
2827     instance: the instance object
2828
2829   Returns:
2830     True or False showing the success of the removal proces
2831
2832   """
2833   logger.Info("removing block devices for instance %s" % instance.name)
2834
2835   result = True
2836   for device in instance.disks:
2837     for node, disk in device.ComputeNodeTree(instance.primary_node):
2838       cfg.SetDiskID(disk, node)
2839       if not rpc.call_blockdev_remove(node, disk):
2840         logger.Error("could not remove block device %s on node %s,"
2841                      " continuing anyway" %
2842                      (device.iv_name, node))
2843         result = False
2844   return result
2845
2846
2847 class LUCreateInstance(LogicalUnit):
2848   """Create an instance.
2849
2850   """
2851   HPATH = "instance-add"
2852   HTYPE = constants.HTYPE_INSTANCE
2853   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854               "disk_template", "swap_size", "mode", "start", "vcpus",
2855               "wait_for_sync", "ip_check", "mac"]
2856
2857   def BuildHooksEnv(self):
2858     """Build hooks env.
2859
2860     This runs on master, primary and secondary nodes of the instance.
2861
2862     """
2863     env = {
2864       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865       "INSTANCE_DISK_SIZE": self.op.disk_size,
2866       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867       "INSTANCE_ADD_MODE": self.op.mode,
2868       }
2869     if self.op.mode == constants.INSTANCE_IMPORT:
2870       env["INSTANCE_SRC_NODE"] = self.op.src_node
2871       env["INSTANCE_SRC_PATH"] = self.op.src_path
2872       env["INSTANCE_SRC_IMAGE"] = self.src_image
2873
2874     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875       primary_node=self.op.pnode,
2876       secondary_nodes=self.secondaries,
2877       status=self.instance_status,
2878       os_type=self.op.os_type,
2879       memory=self.op.mem_size,
2880       vcpus=self.op.vcpus,
2881       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2882     ))
2883
2884     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2885           self.secondaries)
2886     return env, nl, nl
2887
2888
2889   def CheckPrereq(self):
2890     """Check prerequisites.
2891
2892     """
2893     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2894       if not hasattr(self.op, attr):
2895         setattr(self.op, attr, None)
2896
2897     if self.op.mode not in (constants.INSTANCE_CREATE,
2898                             constants.INSTANCE_IMPORT):
2899       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2900                                  self.op.mode)
2901
2902     if self.op.mode == constants.INSTANCE_IMPORT:
2903       src_node = getattr(self.op, "src_node", None)
2904       src_path = getattr(self.op, "src_path", None)
2905       if src_node is None or src_path is None:
2906         raise errors.OpPrereqError("Importing an instance requires source"
2907                                    " node and path options")
2908       src_node_full = self.cfg.ExpandNodeName(src_node)
2909       if src_node_full is None:
2910         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2911       self.op.src_node = src_node = src_node_full
2912
2913       if not os.path.isabs(src_path):
2914         raise errors.OpPrereqError("The source path must be absolute")
2915
2916       export_info = rpc.call_export_info(src_node, src_path)
2917
2918       if not export_info:
2919         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2920
2921       if not export_info.has_section(constants.INISECT_EXP):
2922         raise errors.ProgrammerError("Corrupted export config")
2923
2924       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2925       if (int(ei_version) != constants.EXPORT_VERSION):
2926         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2927                                    (ei_version, constants.EXPORT_VERSION))
2928
2929       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2930         raise errors.OpPrereqError("Can't import instance with more than"
2931                                    " one data disk")
2932
2933       # FIXME: are the old os-es, disk sizes, etc. useful?
2934       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2935       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2936                                                          'disk0_dump'))
2937       self.src_image = diskimage
2938     else: # INSTANCE_CREATE
2939       if getattr(self.op, "os_type", None) is None:
2940         raise errors.OpPrereqError("No guest OS specified")
2941
2942     # check primary node
2943     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2944     if pnode is None:
2945       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2946                                  self.op.pnode)
2947     self.op.pnode = pnode.name
2948     self.pnode = pnode
2949     self.secondaries = []
2950     # disk template and mirror node verification
2951     if self.op.disk_template not in constants.DISK_TEMPLATES:
2952       raise errors.OpPrereqError("Invalid disk template name")
2953
2954     if self.op.disk_template in constants.DTS_NET_MIRROR:
2955       if getattr(self.op, "snode", None) is None:
2956         raise errors.OpPrereqError("The networked disk templates need"
2957                                    " a mirror node")
2958
2959       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2960       if snode_name is None:
2961         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2962                                    self.op.snode)
2963       elif snode_name == pnode.name:
2964         raise errors.OpPrereqError("The secondary node cannot be"
2965                                    " the primary node.")
2966       self.secondaries.append(snode_name)
2967
2968     # Required free disk space as a function of disk and swap space
2969     req_size_dict = {
2970       constants.DT_DISKLESS: None,
2971       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973       # 256 MB are added for drbd metadata, 128MB for each drbd device
2974       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2976     }
2977
2978     if self.op.disk_template not in req_size_dict:
2979       raise errors.ProgrammerError("Disk template '%s' size requirement"
2980                                    " is unknown" %  self.op.disk_template)
2981
2982     req_size = req_size_dict[self.op.disk_template]
2983
2984     # Check lv size requirements
2985     if req_size is not None:
2986       nodenames = [pnode.name] + self.secondaries
2987       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2988       for node in nodenames:
2989         info = nodeinfo.get(node, None)
2990         if not info:
2991           raise errors.OpPrereqError("Cannot get current information"
2992                                      " from node '%s'" % nodeinfo)
2993         vg_free = info.get('vg_free', None)
2994         if not isinstance(vg_free, int):
2995           raise errors.OpPrereqError("Can't compute free disk space on"
2996                                      " node %s" % node)
2997         if req_size > info['vg_free']:
2998           raise errors.OpPrereqError("Not enough disk space on target node %s."
2999                                      " %d MB available, %d MB required" %
3000                                      (node, info['vg_free'], req_size))
3001
3002     # os verification
3003     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3004     if not os_obj:
3005       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3006                                  " primary node"  % self.op.os_type)
3007
3008     if self.op.kernel_path == constants.VALUE_NONE:
3009       raise errors.OpPrereqError("Can't set instance kernel to none")
3010
3011     # instance verification
3012     hostname1 = utils.HostInfo(self.op.instance_name)
3013
3014     self.op.instance_name = instance_name = hostname1.name
3015     instance_list = self.cfg.GetInstanceList()
3016     if instance_name in instance_list:
3017       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3018                                  instance_name)
3019
3020     ip = getattr(self.op, "ip", None)
3021     if ip is None or ip.lower() == "none":
3022       inst_ip = None
3023     elif ip.lower() == "auto":
3024       inst_ip = hostname1.ip
3025     else:
3026       if not utils.IsValidIP(ip):
3027         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3028                                    " like a valid IP" % ip)
3029       inst_ip = ip
3030     self.inst_ip = inst_ip
3031
3032     if self.op.start and not self.op.ip_check:
3033       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3034                                  " adding an instance in start mode")
3035
3036     if self.op.ip_check:
3037       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3038                        constants.DEFAULT_NODED_PORT):
3039         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3040                                    (hostname1.ip, instance_name))
3041
3042     # MAC address verification
3043     if self.op.mac != "auto":
3044       if not utils.IsValidMac(self.op.mac.lower()):
3045         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3046                                    self.op.mac)
3047
3048     # bridge verification
3049     bridge = getattr(self.op, "bridge", None)
3050     if bridge is None:
3051       self.op.bridge = self.cfg.GetDefBridge()
3052     else:
3053       self.op.bridge = bridge
3054
3055     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3056       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3057                                  " destination node '%s'" %
3058                                  (self.op.bridge, pnode.name))
3059
3060     # boot order verification
3061     if self.op.hvm_boot_order is not None:
3062       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3063         raise errors.OpPrereqError("invalid boot order specified,"
3064                                    " must be one or more of [acdn]")
3065
3066     if self.op.start:
3067       self.instance_status = 'up'
3068     else:
3069       self.instance_status = 'down'
3070
3071   def Exec(self, feedback_fn):
3072     """Create and add the instance to the cluster.
3073
3074     """
3075     instance = self.op.instance_name
3076     pnode_name = self.pnode.name
3077
3078     if self.op.mac == "auto":
3079       mac_address = self.cfg.GenerateMAC()
3080     else:
3081       mac_address = self.op.mac
3082
3083     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3084     if self.inst_ip is not None:
3085       nic.ip = self.inst_ip
3086
3087     ht_kind = self.sstore.GetHypervisorType()
3088     if ht_kind in constants.HTS_REQ_PORT:
3089       network_port = self.cfg.AllocatePort()
3090     else:
3091       network_port = None
3092
3093     disks = _GenerateDiskTemplate(self.cfg,
3094                                   self.op.disk_template,
3095                                   instance, pnode_name,
3096                                   self.secondaries, self.op.disk_size,
3097                                   self.op.swap_size)
3098
3099     iobj = objects.Instance(name=instance, os=self.op.os_type,
3100                             primary_node=pnode_name,
3101                             memory=self.op.mem_size,
3102                             vcpus=self.op.vcpus,
3103                             nics=[nic], disks=disks,
3104                             disk_template=self.op.disk_template,
3105                             status=self.instance_status,
3106                             network_port=network_port,
3107                             kernel_path=self.op.kernel_path,
3108                             initrd_path=self.op.initrd_path,
3109                             hvm_boot_order=self.op.hvm_boot_order,
3110                             )
3111
3112     feedback_fn("* creating instance disks...")
3113     if not _CreateDisks(self.cfg, iobj):
3114       _RemoveDisks(iobj, self.cfg)
3115       raise errors.OpExecError("Device creation failed, reverting...")
3116
3117     feedback_fn("adding instance %s to cluster config" % instance)
3118
3119     self.cfg.AddInstance(iobj)
3120
3121     if self.op.wait_for_sync:
3122       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3123     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3124       # make sure the disks are not degraded (still sync-ing is ok)
3125       time.sleep(15)
3126       feedback_fn("* checking mirrors status")
3127       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3128     else:
3129       disk_abort = False
3130
3131     if disk_abort:
3132       _RemoveDisks(iobj, self.cfg)
3133       self.cfg.RemoveInstance(iobj.name)
3134       raise errors.OpExecError("There are some degraded disks for"
3135                                " this instance")
3136
3137     feedback_fn("creating os for instance %s on node %s" %
3138                 (instance, pnode_name))
3139
3140     if iobj.disk_template != constants.DT_DISKLESS:
3141       if self.op.mode == constants.INSTANCE_CREATE:
3142         feedback_fn("* running the instance OS create scripts...")
3143         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3144           raise errors.OpExecError("could not add os for instance %s"
3145                                    " on node %s" %
3146                                    (instance, pnode_name))
3147
3148       elif self.op.mode == constants.INSTANCE_IMPORT:
3149         feedback_fn("* running the instance OS import scripts...")
3150         src_node = self.op.src_node
3151         src_image = self.src_image
3152         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3153                                                 src_node, src_image):
3154           raise errors.OpExecError("Could not import os for instance"
3155                                    " %s on node %s" %
3156                                    (instance, pnode_name))
3157       else:
3158         # also checked in the prereq part
3159         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3160                                      % self.op.mode)
3161
3162     if self.op.start:
3163       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3164       feedback_fn("* starting instance...")
3165       if not rpc.call_instance_start(pnode_name, iobj, None):
3166         raise errors.OpExecError("Could not start instance")
3167
3168
3169 class LUConnectConsole(NoHooksLU):
3170   """Connect to an instance's console.
3171
3172   This is somewhat special in that it returns the command line that
3173   you need to run on the master node in order to connect to the
3174   console.
3175
3176   """
3177   _OP_REQP = ["instance_name"]
3178
3179   def CheckPrereq(self):
3180     """Check prerequisites.
3181
3182     This checks that the instance is in the cluster.
3183
3184     """
3185     instance = self.cfg.GetInstanceInfo(
3186       self.cfg.ExpandInstanceName(self.op.instance_name))
3187     if instance is None:
3188       raise errors.OpPrereqError("Instance '%s' not known" %
3189                                  self.op.instance_name)
3190     self.instance = instance
3191
3192   def Exec(self, feedback_fn):
3193     """Connect to the console of an instance
3194
3195     """
3196     instance = self.instance
3197     node = instance.primary_node
3198
3199     node_insts = rpc.call_instance_list([node])[node]
3200     if node_insts is False:
3201       raise errors.OpExecError("Can't connect to node %s." % node)
3202
3203     if instance.name not in node_insts:
3204       raise errors.OpExecError("Instance %s is not running." % instance.name)
3205
3206     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3207
3208     hyper = hypervisor.GetHypervisor()
3209     console_cmd = hyper.GetShellCommandForConsole(instance)
3210     # build ssh cmdline
3211     argv = ["ssh", "-q", "-t"]
3212     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3213     argv.extend(ssh.BATCH_MODE_OPTS)
3214     argv.append(node)
3215     argv.append(console_cmd)
3216     return "ssh", argv
3217
3218
3219 class LUAddMDDRBDComponent(LogicalUnit):
3220   """Adda new mirror member to an instance's disk.
3221
3222   """
3223   HPATH = "mirror-add"
3224   HTYPE = constants.HTYPE_INSTANCE
3225   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3226
3227   def BuildHooksEnv(self):
3228     """Build hooks env.
3229
3230     This runs on the master, the primary and all the secondaries.
3231
3232     """
3233     env = {
3234       "NEW_SECONDARY": self.op.remote_node,
3235       "DISK_NAME": self.op.disk_name,
3236       }
3237     env.update(_BuildInstanceHookEnvByObject(self.instance))
3238     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3239           self.op.remote_node,] + list(self.instance.secondary_nodes)
3240     return env, nl, nl
3241
3242   def CheckPrereq(self):
3243     """Check prerequisites.
3244
3245     This checks that the instance is in the cluster.
3246
3247     """
3248     instance = self.cfg.GetInstanceInfo(
3249       self.cfg.ExpandInstanceName(self.op.instance_name))
3250     if instance is None:
3251       raise errors.OpPrereqError("Instance '%s' not known" %
3252                                  self.op.instance_name)
3253     self.instance = instance
3254
3255     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3256     if remote_node is None:
3257       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3258     self.remote_node = remote_node
3259
3260     if remote_node == instance.primary_node:
3261       raise errors.OpPrereqError("The specified node is the primary node of"
3262                                  " the instance.")
3263
3264     if instance.disk_template != constants.DT_REMOTE_RAID1:
3265       raise errors.OpPrereqError("Instance's disk layout is not"
3266                                  " remote_raid1.")
3267     for disk in instance.disks:
3268       if disk.iv_name == self.op.disk_name:
3269         break
3270     else:
3271       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3272                                  " instance." % self.op.disk_name)
3273     if len(disk.children) > 1:
3274       raise errors.OpPrereqError("The device already has two slave devices."
3275                                  " This would create a 3-disk raid1 which we"
3276                                  " don't allow.")
3277     self.disk = disk
3278
3279   def Exec(self, feedback_fn):
3280     """Add the mirror component
3281
3282     """
3283     disk = self.disk
3284     instance = self.instance
3285
3286     remote_node = self.remote_node
3287     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3288     names = _GenerateUniqueNames(self.cfg, lv_names)
3289     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3290                                      remote_node, disk.size, names)
3291
3292     logger.Info("adding new mirror component on secondary")
3293     #HARDCODE
3294     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3295                                       new_drbd, False,
3296                                       _GetInstanceInfoText(instance)):
3297       raise errors.OpExecError("Failed to create new component on secondary"
3298                                " node %s" % remote_node)
3299
3300     logger.Info("adding new mirror component on primary")
3301     #HARDCODE
3302     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3303                                     instance, new_drbd,
3304                                     _GetInstanceInfoText(instance)):
3305       # remove secondary dev
3306       self.cfg.SetDiskID(new_drbd, remote_node)
3307       rpc.call_blockdev_remove(remote_node, new_drbd)
3308       raise errors.OpExecError("Failed to create volume on primary")
3309
3310     # the device exists now
3311     # call the primary node to add the mirror to md
3312     logger.Info("adding new mirror component to md")
3313     if not rpc.call_blockdev_addchildren(instance.primary_node,
3314                                          disk, [new_drbd]):
3315       logger.Error("Can't add mirror compoment to md!")
3316       self.cfg.SetDiskID(new_drbd, remote_node)
3317       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3318         logger.Error("Can't rollback on secondary")
3319       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3320       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3321         logger.Error("Can't rollback on primary")
3322       raise errors.OpExecError("Can't add mirror component to md array")
3323
3324     disk.children.append(new_drbd)
3325
3326     self.cfg.AddInstance(instance)
3327
3328     _WaitForSync(self.cfg, instance, self.proc)
3329
3330     return 0
3331
3332
3333 class LURemoveMDDRBDComponent(LogicalUnit):
3334   """Remove a component from a remote_raid1 disk.
3335
3336   """
3337   HPATH = "mirror-remove"
3338   HTYPE = constants.HTYPE_INSTANCE
3339   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3340
3341   def BuildHooksEnv(self):
3342     """Build hooks env.
3343
3344     This runs on the master, the primary and all the secondaries.
3345
3346     """
3347     env = {
3348       "DISK_NAME": self.op.disk_name,
3349       "DISK_ID": self.op.disk_id,
3350       "OLD_SECONDARY": self.old_secondary,
3351       }
3352     env.update(_BuildInstanceHookEnvByObject(self.instance))
3353     nl = [self.sstore.GetMasterNode(),
3354           self.instance.primary_node] + list(self.instance.secondary_nodes)
3355     return env, nl, nl
3356
3357   def CheckPrereq(self):
3358     """Check prerequisites.
3359
3360     This checks that the instance is in the cluster.
3361
3362     """
3363     instance = self.cfg.GetInstanceInfo(
3364       self.cfg.ExpandInstanceName(self.op.instance_name))
3365     if instance is None:
3366       raise errors.OpPrereqError("Instance '%s' not known" %
3367                                  self.op.instance_name)
3368     self.instance = instance
3369
3370     if instance.disk_template != constants.DT_REMOTE_RAID1:
3371       raise errors.OpPrereqError("Instance's disk layout is not"
3372                                  " remote_raid1.")
3373     for disk in instance.disks:
3374       if disk.iv_name == self.op.disk_name:
3375         break
3376     else:
3377       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3378                                  " instance." % self.op.disk_name)
3379     for child in disk.children:
3380       if (child.dev_type == constants.LD_DRBD7 and
3381           child.logical_id[2] == self.op.disk_id):
3382         break
3383     else:
3384       raise errors.OpPrereqError("Can't find the device with this port.")
3385
3386     if len(disk.children) < 2:
3387       raise errors.OpPrereqError("Cannot remove the last component from"
3388                                  " a mirror.")
3389     self.disk = disk
3390     self.child = child
3391     if self.child.logical_id[0] == instance.primary_node:
3392       oid = 1
3393     else:
3394       oid = 0
3395     self.old_secondary = self.child.logical_id[oid]
3396
3397   def Exec(self, feedback_fn):
3398     """Remove the mirror component
3399
3400     """
3401     instance = self.instance
3402     disk = self.disk
3403     child = self.child
3404     logger.Info("remove mirror component")
3405     self.cfg.SetDiskID(disk, instance.primary_node)
3406     if not rpc.call_blockdev_removechildren(instance.primary_node,
3407                                             disk, [child]):
3408       raise errors.OpExecError("Can't remove child from mirror.")
3409
3410     for node in child.logical_id[:2]:
3411       self.cfg.SetDiskID(child, node)
3412       if not rpc.call_blockdev_remove(node, child):
3413         logger.Error("Warning: failed to remove device from node %s,"
3414                      " continuing operation." % node)
3415
3416     disk.children.remove(child)
3417     self.cfg.AddInstance(instance)
3418
3419
3420 class LUReplaceDisks(LogicalUnit):
3421   """Replace the disks of an instance.
3422
3423   """
3424   HPATH = "mirrors-replace"
3425   HTYPE = constants.HTYPE_INSTANCE
3426   _OP_REQP = ["instance_name", "mode", "disks"]
3427
3428   def BuildHooksEnv(self):
3429     """Build hooks env.
3430
3431     This runs on the master, the primary and all the secondaries.
3432
3433     """
3434     env = {
3435       "MODE": self.op.mode,
3436       "NEW_SECONDARY": self.op.remote_node,
3437       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3438       }
3439     env.update(_BuildInstanceHookEnvByObject(self.instance))
3440     nl = [
3441       self.sstore.GetMasterNode(),
3442       self.instance.primary_node,
3443       ]
3444     if self.op.remote_node is not None:
3445       nl.append(self.op.remote_node)
3446     return env, nl, nl
3447
3448   def CheckPrereq(self):
3449     """Check prerequisites.
3450
3451     This checks that the instance is in the cluster.
3452
3453     """
3454     instance = self.cfg.GetInstanceInfo(
3455       self.cfg.ExpandInstanceName(self.op.instance_name))
3456     if instance is None:
3457       raise errors.OpPrereqError("Instance '%s' not known" %
3458                                  self.op.instance_name)
3459     self.instance = instance
3460     self.op.instance_name = instance.name
3461
3462     if instance.disk_template not in constants.DTS_NET_MIRROR:
3463       raise errors.OpPrereqError("Instance's disk layout is not"
3464                                  " network mirrored.")
3465
3466     if len(instance.secondary_nodes) != 1:
3467       raise errors.OpPrereqError("The instance has a strange layout,"
3468                                  " expected one secondary but found %d" %
3469                                  len(instance.secondary_nodes))
3470
3471     self.sec_node = instance.secondary_nodes[0]
3472
3473     remote_node = getattr(self.op, "remote_node", None)
3474     if remote_node is not None:
3475       remote_node = self.cfg.ExpandNodeName(remote_node)
3476       if remote_node is None:
3477         raise errors.OpPrereqError("Node '%s' not known" %
3478                                    self.op.remote_node)
3479       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3480     else:
3481       self.remote_node_info = None
3482     if remote_node == instance.primary_node:
3483       raise errors.OpPrereqError("The specified node is the primary node of"
3484                                  " the instance.")
3485     elif remote_node == self.sec_node:
3486       if self.op.mode == constants.REPLACE_DISK_SEC:
3487         # this is for DRBD8, where we can't execute the same mode of
3488         # replacement as for drbd7 (no different port allocated)
3489         raise errors.OpPrereqError("Same secondary given, cannot execute"
3490                                    " replacement")
3491       # the user gave the current secondary, switch to
3492       # 'no-replace-secondary' mode for drbd7
3493       remote_node = None
3494     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3495         self.op.mode != constants.REPLACE_DISK_ALL):
3496       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3497                                  " disks replacement, not individual ones")
3498     if instance.disk_template == constants.DT_DRBD8:
3499       if (self.op.mode == constants.REPLACE_DISK_ALL and
3500           remote_node is not None):
3501         # switch to replace secondary mode
3502         self.op.mode = constants.REPLACE_DISK_SEC
3503
3504       if self.op.mode == constants.REPLACE_DISK_ALL:
3505         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3506                                    " secondary disk replacement, not"
3507                                    " both at once")
3508       elif self.op.mode == constants.REPLACE_DISK_PRI:
3509         if remote_node is not None:
3510           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3511                                      " the secondary while doing a primary"
3512                                      " node disk replacement")
3513         self.tgt_node = instance.primary_node
3514         self.oth_node = instance.secondary_nodes[0]
3515       elif self.op.mode == constants.REPLACE_DISK_SEC:
3516         self.new_node = remote_node # this can be None, in which case
3517                                     # we don't change the secondary
3518         self.tgt_node = instance.secondary_nodes[0]
3519         self.oth_node = instance.primary_node
3520       else:
3521         raise errors.ProgrammerError("Unhandled disk replace mode")
3522
3523     for name in self.op.disks:
3524       if instance.FindDisk(name) is None:
3525         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3526                                    (name, instance.name))
3527     self.op.remote_node = remote_node
3528
3529   def _ExecRR1(self, feedback_fn):
3530     """Replace the disks of an instance.
3531
3532     """
3533     instance = self.instance
3534     iv_names = {}
3535     # start of work
3536     if self.op.remote_node is None:
3537       remote_node = self.sec_node
3538     else:
3539       remote_node = self.op.remote_node
3540     cfg = self.cfg
3541     for dev in instance.disks:
3542       size = dev.size
3543       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3544       names = _GenerateUniqueNames(cfg, lv_names)
3545       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3546                                        remote_node, size, names)
3547       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3548       logger.Info("adding new mirror component on secondary for %s" %
3549                   dev.iv_name)
3550       #HARDCODE
3551       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3552                                         new_drbd, False,
3553                                         _GetInstanceInfoText(instance)):
3554         raise errors.OpExecError("Failed to create new component on secondary"
3555                                  " node %s. Full abort, cleanup manually!" %
3556                                  remote_node)
3557
3558       logger.Info("adding new mirror component on primary")
3559       #HARDCODE
3560       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3561                                       instance, new_drbd,
3562                                       _GetInstanceInfoText(instance)):
3563         # remove secondary dev
3564         cfg.SetDiskID(new_drbd, remote_node)
3565         rpc.call_blockdev_remove(remote_node, new_drbd)
3566         raise errors.OpExecError("Failed to create volume on primary!"
3567                                  " Full abort, cleanup manually!!")
3568
3569       # the device exists now
3570       # call the primary node to add the mirror to md
3571       logger.Info("adding new mirror component to md")
3572       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3573                                            [new_drbd]):
3574         logger.Error("Can't add mirror compoment to md!")
3575         cfg.SetDiskID(new_drbd, remote_node)
3576         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3577           logger.Error("Can't rollback on secondary")
3578         cfg.SetDiskID(new_drbd, instance.primary_node)
3579         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3580           logger.Error("Can't rollback on primary")
3581         raise errors.OpExecError("Full abort, cleanup manually!!")
3582
3583       dev.children.append(new_drbd)
3584       cfg.AddInstance(instance)
3585
3586     # this can fail as the old devices are degraded and _WaitForSync
3587     # does a combined result over all disks, so we don't check its
3588     # return value
3589     _WaitForSync(cfg, instance, self.proc, unlock=True)
3590
3591     # so check manually all the devices
3592     for name in iv_names:
3593       dev, child, new_drbd = iv_names[name]
3594       cfg.SetDiskID(dev, instance.primary_node)
3595       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3596       if is_degr:
3597         raise errors.OpExecError("MD device %s is degraded!" % name)
3598       cfg.SetDiskID(new_drbd, instance.primary_node)
3599       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3600       if is_degr:
3601         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3602
3603     for name in iv_names:
3604       dev, child, new_drbd = iv_names[name]
3605       logger.Info("remove mirror %s component" % name)
3606       cfg.SetDiskID(dev, instance.primary_node)
3607       if not rpc.call_blockdev_removechildren(instance.primary_node,
3608                                               dev, [child]):
3609         logger.Error("Can't remove child from mirror, aborting"
3610                      " *this device cleanup*.\nYou need to cleanup manually!!")
3611         continue
3612
3613       for node in child.logical_id[:2]:
3614         logger.Info("remove child device on %s" % node)
3615         cfg.SetDiskID(child, node)
3616         if not rpc.call_blockdev_remove(node, child):
3617           logger.Error("Warning: failed to remove device from node %s,"
3618                        " continuing operation." % node)
3619
3620       dev.children.remove(child)
3621
3622       cfg.AddInstance(instance)
3623
3624   def _ExecD8DiskOnly(self, feedback_fn):
3625     """Replace a disk on the primary or secondary for dbrd8.
3626
3627     The algorithm for replace is quite complicated:
3628       - for each disk to be replaced:
3629         - create new LVs on the target node with unique names
3630         - detach old LVs from the drbd device
3631         - rename old LVs to name_replaced.<time_t>
3632         - rename new LVs to old LVs
3633         - attach the new LVs (with the old names now) to the drbd device
3634       - wait for sync across all devices
3635       - for each modified disk:
3636         - remove old LVs (which have the name name_replaces.<time_t>)
3637
3638     Failures are not very well handled.
3639
3640     """
3641     steps_total = 6
3642     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3643     instance = self.instance
3644     iv_names = {}
3645     vgname = self.cfg.GetVGName()
3646     # start of work
3647     cfg = self.cfg
3648     tgt_node = self.tgt_node
3649     oth_node = self.oth_node
3650
3651     # Step: check device activation
3652     self.proc.LogStep(1, steps_total, "check device existence")
3653     info("checking volume groups")
3654     my_vg = cfg.GetVGName()
3655     results = rpc.call_vg_list([oth_node, tgt_node])
3656     if not results:
3657       raise errors.OpExecError("Can't list volume groups on the nodes")
3658     for node in oth_node, tgt_node:
3659       res = results.get(node, False)
3660       if not res or my_vg not in res:
3661         raise errors.OpExecError("Volume group '%s' not found on %s" %
3662                                  (my_vg, node))
3663     for dev in instance.disks:
3664       if not dev.iv_name in self.op.disks:
3665         continue
3666       for node in tgt_node, oth_node:
3667         info("checking %s on %s" % (dev.iv_name, node))
3668         cfg.SetDiskID(dev, node)
3669         if not rpc.call_blockdev_find(node, dev):
3670           raise errors.OpExecError("Can't find device %s on node %s" %
3671                                    (dev.iv_name, node))
3672
3673     # Step: check other node consistency
3674     self.proc.LogStep(2, steps_total, "check peer consistency")
3675     for dev in instance.disks:
3676       if not dev.iv_name in self.op.disks:
3677         continue
3678       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3679       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3680                                    oth_node==instance.primary_node):
3681         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3682                                  " to replace disks on this node (%s)" %
3683                                  (oth_node, tgt_node))
3684
3685     # Step: create new storage
3686     self.proc.LogStep(3, steps_total, "allocate new storage")
3687     for dev in instance.disks:
3688       if not dev.iv_name in self.op.disks:
3689         continue
3690       size = dev.size
3691       cfg.SetDiskID(dev, tgt_node)
3692       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3693       names = _GenerateUniqueNames(cfg, lv_names)
3694       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3695                              logical_id=(vgname, names[0]))
3696       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3697                              logical_id=(vgname, names[1]))
3698       new_lvs = [lv_data, lv_meta]
3699       old_lvs = dev.children
3700       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3701       info("creating new local storage on %s for %s" %
3702            (tgt_node, dev.iv_name))
3703       # since we *always* want to create this LV, we use the
3704       # _Create...OnPrimary (which forces the creation), even if we
3705       # are talking about the secondary node
3706       for new_lv in new_lvs:
3707         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3708                                         _GetInstanceInfoText(instance)):
3709           raise errors.OpExecError("Failed to create new LV named '%s' on"
3710                                    " node '%s'" %
3711                                    (new_lv.logical_id[1], tgt_node))
3712
3713     # Step: for each lv, detach+rename*2+attach
3714     self.proc.LogStep(4, steps_total, "change drbd configuration")
3715     for dev, old_lvs, new_lvs in iv_names.itervalues():
3716       info("detaching %s drbd from local storage" % dev.iv_name)
3717       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3718         raise errors.OpExecError("Can't detach drbd from local storage on node"
3719                                  " %s for device %s" % (tgt_node, dev.iv_name))
3720       #dev.children = []
3721       #cfg.Update(instance)
3722
3723       # ok, we created the new LVs, so now we know we have the needed
3724       # storage; as such, we proceed on the target node to rename
3725       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3726       # using the assumption than logical_id == physical_id (which in
3727       # turn is the unique_id on that node)
3728
3729       # FIXME(iustin): use a better name for the replaced LVs
3730       temp_suffix = int(time.time())
3731       ren_fn = lambda d, suff: (d.physical_id[0],
3732                                 d.physical_id[1] + "_replaced-%s" % suff)
3733       # build the rename list based on what LVs exist on the node
3734       rlist = []
3735       for to_ren in old_lvs:
3736         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3737         if find_res is not None: # device exists
3738           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3739
3740       info("renaming the old LVs on the target node")
3741       if not rpc.call_blockdev_rename(tgt_node, rlist):
3742         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3743       # now we rename the new LVs to the old LVs
3744       info("renaming the new LVs on the target node")
3745       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3746       if not rpc.call_blockdev_rename(tgt_node, rlist):
3747         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3748
3749       for old, new in zip(old_lvs, new_lvs):
3750         new.logical_id = old.logical_id
3751         cfg.SetDiskID(new, tgt_node)
3752
3753       for disk in old_lvs:
3754         disk.logical_id = ren_fn(disk, temp_suffix)
3755         cfg.SetDiskID(disk, tgt_node)
3756
3757       # now that the new lvs have the old name, we can add them to the device
3758       info("adding new mirror component on %s" % tgt_node)
3759       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3760         for new_lv in new_lvs:
3761           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3762             warning("Can't rollback device %s", hint="manually cleanup unused"
3763                     " logical volumes")
3764         raise errors.OpExecError("Can't add local storage to drbd")
3765
3766       dev.children = new_lvs
3767       cfg.Update(instance)
3768
3769     # Step: wait for sync
3770
3771     # this can fail as the old devices are degraded and _WaitForSync
3772     # does a combined result over all disks, so we don't check its
3773     # return value
3774     self.proc.LogStep(5, steps_total, "sync devices")
3775     _WaitForSync(cfg, instance, self.proc, unlock=True)
3776
3777     # so check manually all the devices
3778     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3779       cfg.SetDiskID(dev, instance.primary_node)
3780       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3781       if is_degr:
3782         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3783
3784     # Step: remove old storage
3785     self.proc.LogStep(6, steps_total, "removing old storage")
3786     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3787       info("remove logical volumes for %s" % name)
3788       for lv in old_lvs:
3789         cfg.SetDiskID(lv, tgt_node)
3790         if not rpc.call_blockdev_remove(tgt_node, lv):
3791           warning("Can't remove old LV", hint="manually remove unused LVs")
3792           continue
3793
3794   def _ExecD8Secondary(self, feedback_fn):
3795     """Replace the secondary node for drbd8.
3796
3797     The algorithm for replace is quite complicated:
3798       - for all disks of the instance:
3799         - create new LVs on the new node with same names
3800         - shutdown the drbd device on the old secondary
3801         - disconnect the drbd network on the primary
3802         - create the drbd device on the new secondary
3803         - network attach the drbd on the primary, using an artifice:
3804           the drbd code for Attach() will connect to the network if it
3805           finds a device which is connected to the good local disks but
3806           not network enabled
3807       - wait for sync across all devices
3808       - remove all disks from the old secondary
3809
3810     Failures are not very well handled.
3811
3812     """
3813     steps_total = 6
3814     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3815     instance = self.instance
3816     iv_names = {}
3817     vgname = self.cfg.GetVGName()
3818     # start of work
3819     cfg = self.cfg
3820     old_node = self.tgt_node
3821     new_node = self.new_node
3822     pri_node = instance.primary_node
3823
3824     # Step: check device activation
3825     self.proc.LogStep(1, steps_total, "check device existence")
3826     info("checking volume groups")
3827     my_vg = cfg.GetVGName()
3828     results = rpc.call_vg_list([pri_node, new_node])
3829     if not results:
3830       raise errors.OpExecError("Can't list volume groups on the nodes")
3831     for node in pri_node, new_node:
3832       res = results.get(node, False)
3833       if not res or my_vg not in res:
3834         raise errors.OpExecError("Volume group '%s' not found on %s" %
3835                                  (my_vg, node))
3836     for dev in instance.disks:
3837       if not dev.iv_name in self.op.disks:
3838         continue
3839       info("checking %s on %s" % (dev.iv_name, pri_node))
3840       cfg.SetDiskID(dev, pri_node)
3841       if not rpc.call_blockdev_find(pri_node, dev):
3842         raise errors.OpExecError("Can't find device %s on node %s" %
3843                                  (dev.iv_name, pri_node))
3844
3845     # Step: check other node consistency
3846     self.proc.LogStep(2, steps_total, "check peer consistency")
3847     for dev in instance.disks:
3848       if not dev.iv_name in self.op.disks:
3849         continue
3850       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3851       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3852         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3853                                  " unsafe to replace the secondary" %
3854                                  pri_node)
3855
3856     # Step: create new storage
3857     self.proc.LogStep(3, steps_total, "allocate new storage")
3858     for dev in instance.disks:
3859       size = dev.size
3860       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3861       # since we *always* want to create this LV, we use the
3862       # _Create...OnPrimary (which forces the creation), even if we
3863       # are talking about the secondary node
3864       for new_lv in dev.children:
3865         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3866                                         _GetInstanceInfoText(instance)):
3867           raise errors.OpExecError("Failed to create new LV named '%s' on"
3868                                    " node '%s'" %
3869                                    (new_lv.logical_id[1], new_node))
3870
3871       iv_names[dev.iv_name] = (dev, dev.children)
3872
3873     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3874     for dev in instance.disks:
3875       size = dev.size
3876       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3877       # create new devices on new_node
3878       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3879                               logical_id=(pri_node, new_node,
3880                                           dev.logical_id[2]),
3881                               children=dev.children)
3882       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3883                                         new_drbd, False,
3884                                       _GetInstanceInfoText(instance)):
3885         raise errors.OpExecError("Failed to create new DRBD on"
3886                                  " node '%s'" % new_node)
3887
3888     for dev in instance.disks:
3889       # we have new devices, shutdown the drbd on the old secondary
3890       info("shutting down drbd for %s on old node" % dev.iv_name)
3891       cfg.SetDiskID(dev, old_node)
3892       if not rpc.call_blockdev_shutdown(old_node, dev):
3893         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3894                 hint="Please cleanup this device manually as soon as possible")
3895
3896     info("detaching primary drbds from the network (=> standalone)")
3897     done = 0
3898     for dev in instance.disks:
3899       cfg.SetDiskID(dev, pri_node)
3900       # set the physical (unique in bdev terms) id to None, meaning
3901       # detach from network
3902       dev.physical_id = (None,) * len(dev.physical_id)
3903       # and 'find' the device, which will 'fix' it to match the
3904       # standalone state
3905       if rpc.call_blockdev_find(pri_node, dev):
3906         done += 1
3907       else:
3908         warning("Failed to detach drbd %s from network, unusual case" %
3909                 dev.iv_name)
3910
3911     if not done:
3912       # no detaches succeeded (very unlikely)
3913       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3914
3915     # if we managed to detach at least one, we update all the disks of
3916     # the instance to point to the new secondary
3917     info("updating instance configuration")
3918     for dev in instance.disks:
3919       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3920       cfg.SetDiskID(dev, pri_node)
3921     cfg.Update(instance)
3922
3923     # and now perform the drbd attach
3924     info("attaching primary drbds to new secondary (standalone => connected)")
3925     failures = []
3926     for dev in instance.disks:
3927       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3928       # since the attach is smart, it's enough to 'find' the device,
3929       # it will automatically activate the network, if the physical_id
3930       # is correct
3931       cfg.SetDiskID(dev, pri_node)
3932       if not rpc.call_blockdev_find(pri_node, dev):
3933         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3934                 "please do a gnt-instance info to see the status of disks")
3935
3936     # this can fail as the old devices are degraded and _WaitForSync
3937     # does a combined result over all disks, so we don't check its
3938     # return value
3939     self.proc.LogStep(5, steps_total, "sync devices")
3940     _WaitForSync(cfg, instance, self.proc, unlock=True)
3941
3942     # so check manually all the devices
3943     for name, (dev, old_lvs) in iv_names.iteritems():
3944       cfg.SetDiskID(dev, pri_node)
3945       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3946       if is_degr:
3947         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3948
3949     self.proc.LogStep(6, steps_total, "removing old storage")
3950     for name, (dev, old_lvs) in iv_names.iteritems():
3951       info("remove logical volumes for %s" % name)
3952       for lv in old_lvs:
3953         cfg.SetDiskID(lv, old_node)
3954         if not rpc.call_blockdev_remove(old_node, lv):
3955           warning("Can't remove LV on old secondary",
3956                   hint="Cleanup stale volumes by hand")
3957
3958   def Exec(self, feedback_fn):
3959     """Execute disk replacement.
3960
3961     This dispatches the disk replacement to the appropriate handler.
3962
3963     """
3964     instance = self.instance
3965     if instance.disk_template == constants.DT_REMOTE_RAID1:
3966       fn = self._ExecRR1
3967     elif instance.disk_template == constants.DT_DRBD8:
3968       if self.op.remote_node is None:
3969         fn = self._ExecD8DiskOnly
3970       else:
3971         fn = self._ExecD8Secondary
3972     else:
3973       raise errors.ProgrammerError("Unhandled disk replacement case")
3974     return fn(feedback_fn)
3975
3976
3977 class LUQueryInstanceData(NoHooksLU):
3978   """Query runtime instance data.
3979
3980   """
3981   _OP_REQP = ["instances"]
3982
3983   def CheckPrereq(self):
3984     """Check prerequisites.
3985
3986     This only checks the optional instance list against the existing names.
3987
3988     """
3989     if not isinstance(self.op.instances, list):
3990       raise errors.OpPrereqError("Invalid argument type 'instances'")
3991     if self.op.instances:
3992       self.wanted_instances = []
3993       names = self.op.instances
3994       for name in names:
3995         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3996         if instance is None:
3997           raise errors.OpPrereqError("No such instance name '%s'" % name)
3998       self.wanted_instances.append(instance)
3999     else:
4000       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4001                                in self.cfg.GetInstanceList()]
4002     return
4003
4004
4005   def _ComputeDiskStatus(self, instance, snode, dev):
4006     """Compute block device status.
4007
4008     """
4009     self.cfg.SetDiskID(dev, instance.primary_node)
4010     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4011     if dev.dev_type in constants.LDS_DRBD:
4012       # we change the snode then (otherwise we use the one passed in)
4013       if dev.logical_id[0] == instance.primary_node:
4014         snode = dev.logical_id[1]
4015       else:
4016         snode = dev.logical_id[0]
4017
4018     if snode:
4019       self.cfg.SetDiskID(dev, snode)
4020       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4021     else:
4022       dev_sstatus = None
4023
4024     if dev.children:
4025       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4026                       for child in dev.children]
4027     else:
4028       dev_children = []
4029
4030     data = {
4031       "iv_name": dev.iv_name,
4032       "dev_type": dev.dev_type,
4033       "logical_id": dev.logical_id,
4034       "physical_id": dev.physical_id,
4035       "pstatus": dev_pstatus,
4036       "sstatus": dev_sstatus,
4037       "children": dev_children,
4038       }
4039
4040     return data
4041
4042   def Exec(self, feedback_fn):
4043     """Gather and return data"""
4044     result = {}
4045     for instance in self.wanted_instances:
4046       remote_info = rpc.call_instance_info(instance.primary_node,
4047                                                 instance.name)
4048       if remote_info and "state" in remote_info:
4049         remote_state = "up"
4050       else:
4051         remote_state = "down"
4052       if instance.status == "down":
4053         config_state = "down"
4054       else:
4055         config_state = "up"
4056
4057       disks = [self._ComputeDiskStatus(instance, None, device)
4058                for device in instance.disks]
4059
4060       idict = {
4061         "name": instance.name,
4062         "config_state": config_state,
4063         "run_state": remote_state,
4064         "pnode": instance.primary_node,
4065         "snodes": instance.secondary_nodes,
4066         "os": instance.os,
4067         "memory": instance.memory,
4068         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4069         "disks": disks,
4070         "network_port": instance.network_port,
4071         "vcpus": instance.vcpus,
4072         "kernel_path": instance.kernel_path,
4073         "initrd_path": instance.initrd_path,
4074         "hvm_boot_order": instance.hvm_boot_order,
4075         }
4076
4077       result[instance.name] = idict
4078
4079     return result
4080
4081
4082 class LUSetInstanceParms(LogicalUnit):
4083   """Modifies an instances's parameters.
4084
4085   """
4086   HPATH = "instance-modify"
4087   HTYPE = constants.HTYPE_INSTANCE
4088   _OP_REQP = ["instance_name"]
4089
4090   def BuildHooksEnv(self):
4091     """Build hooks env.
4092
4093     This runs on the master, primary and secondaries.
4094
4095     """
4096     args = dict()
4097     if self.mem:
4098       args['memory'] = self.mem
4099     if self.vcpus:
4100       args['vcpus'] = self.vcpus
4101     if self.do_ip or self.do_bridge:
4102       if self.do_ip:
4103         ip = self.ip
4104       else:
4105         ip = self.instance.nics[0].ip
4106       if self.bridge:
4107         bridge = self.bridge
4108       else:
4109         bridge = self.instance.nics[0].bridge
4110       args['nics'] = [(ip, bridge)]
4111     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4112     nl = [self.sstore.GetMasterNode(),
4113           self.instance.primary_node] + list(self.instance.secondary_nodes)
4114     return env, nl, nl
4115
4116   def CheckPrereq(self):
4117     """Check prerequisites.
4118
4119     This only checks the instance list against the existing names.
4120
4121     """
4122     self.mem = getattr(self.op, "mem", None)
4123     self.vcpus = getattr(self.op, "vcpus", None)
4124     self.ip = getattr(self.op, "ip", None)
4125     self.mac = getattr(self.op, "mac", None)
4126     self.bridge = getattr(self.op, "bridge", None)
4127     self.kernel_path = getattr(self.op, "kernel_path", None)
4128     self.initrd_path = getattr(self.op, "initrd_path", None)
4129     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4130     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4131                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4132     if all_parms.count(None) == len(all_parms):
4133       raise errors.OpPrereqError("No changes submitted")
4134     if self.mem is not None:
4135       try:
4136         self.mem = int(self.mem)
4137       except ValueError, err:
4138         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4139     if self.vcpus is not None:
4140       try:
4141         self.vcpus = int(self.vcpus)
4142       except ValueError, err:
4143         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4144     if self.ip is not None:
4145       self.do_ip = True
4146       if self.ip.lower() == "none":
4147         self.ip = None
4148       else:
4149         if not utils.IsValidIP(self.ip):
4150           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4151     else:
4152       self.do_ip = False
4153     self.do_bridge = (self.bridge is not None)
4154     if self.mac is not None:
4155       if self.cfg.IsMacInUse(self.mac):
4156         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4157                                    self.mac)
4158       if not utils.IsValidMac(self.mac):
4159         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4160
4161     if self.kernel_path is not None:
4162       self.do_kernel_path = True
4163       if self.kernel_path == constants.VALUE_NONE:
4164         raise errors.OpPrereqError("Can't set instance to no kernel")
4165
4166       if self.kernel_path != constants.VALUE_DEFAULT:
4167         if not os.path.isabs(self.kernel_path):
4168           raise errors.OpPrereqError("The kernel path must be an absolute"
4169                                     " filename")
4170     else:
4171       self.do_kernel_path = False
4172
4173     if self.initrd_path is not None:
4174       self.do_initrd_path = True
4175       if self.initrd_path not in (constants.VALUE_NONE,
4176                                   constants.VALUE_DEFAULT):
4177         if not os.path.isabs(self.initrd_path):
4178           raise errors.OpPrereqError("The initrd path must be an absolute"
4179                                     " filename")
4180     else:
4181       self.do_initrd_path = False
4182
4183     # boot order verification
4184     if self.hvm_boot_order is not None:
4185       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4186         if len(self.hvm_boot_order.strip("acdn")) != 0:
4187           raise errors.OpPrereqError("invalid boot order specified,"
4188                                      " must be one or more of [acdn]"
4189                                      " or 'default'")
4190
4191     instance = self.cfg.GetInstanceInfo(
4192       self.cfg.ExpandInstanceName(self.op.instance_name))
4193     if instance is None:
4194       raise errors.OpPrereqError("No such instance name '%s'" %
4195                                  self.op.instance_name)
4196     self.op.instance_name = instance.name
4197     self.instance = instance
4198     return
4199
4200   def Exec(self, feedback_fn):
4201     """Modifies an instance.
4202
4203     All parameters take effect only at the next restart of the instance.
4204     """
4205     result = []
4206     instance = self.instance
4207     if self.mem:
4208       instance.memory = self.mem
4209       result.append(("mem", self.mem))
4210     if self.vcpus:
4211       instance.vcpus = self.vcpus
4212       result.append(("vcpus",  self.vcpus))
4213     if self.do_ip:
4214       instance.nics[0].ip = self.ip
4215       result.append(("ip", self.ip))
4216     if self.bridge:
4217       instance.nics[0].bridge = self.bridge
4218       result.append(("bridge", self.bridge))
4219     if self.mac:
4220       instance.nics[0].mac = self.mac
4221       result.append(("mac", self.mac))
4222     if self.do_kernel_path:
4223       instance.kernel_path = self.kernel_path
4224       result.append(("kernel_path", self.kernel_path))
4225     if self.do_initrd_path:
4226       instance.initrd_path = self.initrd_path
4227       result.append(("initrd_path", self.initrd_path))
4228     if self.hvm_boot_order:
4229       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4230         instance.hvm_boot_order = None
4231       else:
4232         instance.hvm_boot_order = self.hvm_boot_order
4233       result.append(("hvm_boot_order", self.hvm_boot_order))
4234
4235     self.cfg.AddInstance(instance)
4236
4237     return result
4238
4239
4240 class LUQueryExports(NoHooksLU):
4241   """Query the exports list
4242
4243   """
4244   _OP_REQP = []
4245
4246   def CheckPrereq(self):
4247     """Check that the nodelist contains only existing nodes.
4248
4249     """
4250     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4251
4252   def Exec(self, feedback_fn):
4253     """Compute the list of all the exported system images.
4254
4255     Returns:
4256       a dictionary with the structure node->(export-list)
4257       where export-list is a list of the instances exported on
4258       that node.
4259
4260     """
4261     return rpc.call_export_list(self.nodes)
4262
4263
4264 class LUExportInstance(LogicalUnit):
4265   """Export an instance to an image in the cluster.
4266
4267   """
4268   HPATH = "instance-export"
4269   HTYPE = constants.HTYPE_INSTANCE
4270   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4271
4272   def BuildHooksEnv(self):
4273     """Build hooks env.
4274
4275     This will run on the master, primary node and target node.
4276
4277     """
4278     env = {
4279       "EXPORT_NODE": self.op.target_node,
4280       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4281       }
4282     env.update(_BuildInstanceHookEnvByObject(self.instance))
4283     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4284           self.op.target_node]
4285     return env, nl, nl
4286
4287   def CheckPrereq(self):
4288     """Check prerequisites.
4289
4290     This checks that the instance name is a valid one.
4291
4292     """
4293     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4294     self.instance = self.cfg.GetInstanceInfo(instance_name)
4295     if self.instance is None:
4296       raise errors.OpPrereqError("Instance '%s' not found" %
4297                                  self.op.instance_name)
4298
4299     # node verification
4300     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4301     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4302
4303     if self.dst_node is None:
4304       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4305                                  self.op.target_node)
4306     self.op.target_node = self.dst_node.name
4307
4308   def Exec(self, feedback_fn):
4309     """Export an instance to an image in the cluster.
4310
4311     """
4312     instance = self.instance
4313     dst_node = self.dst_node
4314     src_node = instance.primary_node
4315     # shutdown the instance, unless requested not to do so
4316     if self.op.shutdown:
4317       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4318       self.proc.ChainOpCode(op)
4319
4320     vgname = self.cfg.GetVGName()
4321
4322     snap_disks = []
4323
4324     try:
4325       for disk in instance.disks:
4326         if disk.iv_name == "sda":
4327           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4328           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4329
4330           if not new_dev_name:
4331             logger.Error("could not snapshot block device %s on node %s" %
4332                          (disk.logical_id[1], src_node))
4333           else:
4334             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4335                                       logical_id=(vgname, new_dev_name),
4336                                       physical_id=(vgname, new_dev_name),
4337                                       iv_name=disk.iv_name)
4338             snap_disks.append(new_dev)
4339
4340     finally:
4341       if self.op.shutdown:
4342         op = opcodes.OpStartupInstance(instance_name=instance.name,
4343                                        force=False)
4344         self.proc.ChainOpCode(op)
4345
4346     # TODO: check for size
4347
4348     for dev in snap_disks:
4349       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4350                                            instance):
4351         logger.Error("could not export block device %s from node"
4352                      " %s to node %s" %
4353                      (dev.logical_id[1], src_node, dst_node.name))
4354       if not rpc.call_blockdev_remove(src_node, dev):
4355         logger.Error("could not remove snapshot block device %s from"
4356                      " node %s" % (dev.logical_id[1], src_node))
4357
4358     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359       logger.Error("could not finalize export for instance %s on node %s" %
4360                    (instance.name, dst_node.name))
4361
4362     nodelist = self.cfg.GetNodeList()
4363     nodelist.remove(dst_node.name)
4364
4365     # on one-node clusters nodelist will be empty after the removal
4366     # if we proceed the backup would be removed because OpQueryExports
4367     # substitutes an empty list with the full cluster node list.
4368     if nodelist:
4369       op = opcodes.OpQueryExports(nodes=nodelist)
4370       exportlist = self.proc.ChainOpCode(op)
4371       for node in exportlist:
4372         if instance.name in exportlist[node]:
4373           if not rpc.call_export_remove(node, instance.name):
4374             logger.Error("could not remove older export for instance %s"
4375                          " on node %s" % (instance.name, node))
4376
4377
4378 class TagsLU(NoHooksLU):
4379   """Generic tags LU.
4380
4381   This is an abstract class which is the parent of all the other tags LUs.
4382
4383   """
4384   def CheckPrereq(self):
4385     """Check prerequisites.
4386
4387     """
4388     if self.op.kind == constants.TAG_CLUSTER:
4389       self.target = self.cfg.GetClusterInfo()
4390     elif self.op.kind == constants.TAG_NODE:
4391       name = self.cfg.ExpandNodeName(self.op.name)
4392       if name is None:
4393         raise errors.OpPrereqError("Invalid node name (%s)" %
4394                                    (self.op.name,))
4395       self.op.name = name
4396       self.target = self.cfg.GetNodeInfo(name)
4397     elif self.op.kind == constants.TAG_INSTANCE:
4398       name = self.cfg.ExpandInstanceName(self.op.name)
4399       if name is None:
4400         raise errors.OpPrereqError("Invalid instance name (%s)" %
4401                                    (self.op.name,))
4402       self.op.name = name
4403       self.target = self.cfg.GetInstanceInfo(name)
4404     else:
4405       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4406                                  str(self.op.kind))
4407
4408
4409 class LUGetTags(TagsLU):
4410   """Returns the tags of a given object.
4411
4412   """
4413   _OP_REQP = ["kind", "name"]
4414
4415   def Exec(self, feedback_fn):
4416     """Returns the tag list.
4417
4418     """
4419     return self.target.GetTags()
4420
4421
4422 class LUSearchTags(NoHooksLU):
4423   """Searches the tags for a given pattern.
4424
4425   """
4426   _OP_REQP = ["pattern"]
4427
4428   def CheckPrereq(self):
4429     """Check prerequisites.
4430
4431     This checks the pattern passed for validity by compiling it.
4432
4433     """
4434     try:
4435       self.re = re.compile(self.op.pattern)
4436     except re.error, err:
4437       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4438                                  (self.op.pattern, err))
4439
4440   def Exec(self, feedback_fn):
4441     """Returns the tag list.
4442
4443     """
4444     cfg = self.cfg
4445     tgts = [("/cluster", cfg.GetClusterInfo())]
4446     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4447     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4448     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4449     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4450     results = []
4451     for path, target in tgts:
4452       for tag in target.GetTags():
4453         if self.re.search(tag):
4454           results.append((path, tag))
4455     return results
4456
4457
4458 class LUAddTags(TagsLU):
4459   """Sets a tag on a given object.
4460
4461   """
4462   _OP_REQP = ["kind", "name", "tags"]
4463
4464   def CheckPrereq(self):
4465     """Check prerequisites.
4466
4467     This checks the type and length of the tag name and value.
4468
4469     """
4470     TagsLU.CheckPrereq(self)
4471     for tag in self.op.tags:
4472       objects.TaggableObject.ValidateTag(tag)
4473
4474   def Exec(self, feedback_fn):
4475     """Sets the tag.
4476
4477     """
4478     try:
4479       for tag in self.op.tags:
4480         self.target.AddTag(tag)
4481     except errors.TagError, err:
4482       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4483     try:
4484       self.cfg.Update(self.target)
4485     except errors.ConfigurationError:
4486       raise errors.OpRetryError("There has been a modification to the"
4487                                 " config file and the operation has been"
4488                                 " aborted. Please retry.")
4489
4490
4491 class LUDelTags(TagsLU):
4492   """Delete a list of tags from a given object.
4493
4494   """
4495   _OP_REQP = ["kind", "name", "tags"]
4496
4497   def CheckPrereq(self):
4498     """Check prerequisites.
4499
4500     This checks that we have the given tag.
4501
4502     """
4503     TagsLU.CheckPrereq(self)
4504     for tag in self.op.tags:
4505       objects.TaggableObject.ValidateTag(tag)
4506     del_tags = frozenset(self.op.tags)
4507     cur_tags = self.target.GetTags()
4508     if not del_tags <= cur_tags:
4509       diff_tags = del_tags - cur_tags
4510       diff_names = ["'%s'" % tag for tag in diff_tags]
4511       diff_names.sort()
4512       raise errors.OpPrereqError("Tag(s) %s not found" %
4513                                  (",".join(diff_names)))
4514
4515   def Exec(self, feedback_fn):
4516     """Remove the tag from the object.
4517
4518     """
4519     for tag in self.op.tags:
4520       self.target.RemoveTag(tag)
4521     try:
4522       self.cfg.Update(self.target)
4523     except errors.ConfigurationError:
4524       raise errors.OpRetryError("There has been a modification to the"
4525                                 " config file and the operation has been"
4526                                 " aborted. Please retry.")