Fix misleading error message when checking disks
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge, mac) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276   else:
277     nic_count = 0
278
279   env["INSTANCE_NIC_COUNT"] = nic_count
280
281   return env
282
283
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285   """Builds instance related env variables for hooks from an object.
286
287   Args:
288     instance: objects.Instance object of instance
289     override: dict of values to override
290   """
291   args = {
292     'name': instance.name,
293     'primary_node': instance.primary_node,
294     'secondary_nodes': instance.secondary_nodes,
295     'os_type': instance.os,
296     'status': instance.os,
297     'memory': instance.memory,
298     'vcpus': instance.vcpus,
299     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300   }
301   if override:
302     args.update(override)
303   return _BuildInstanceHookEnv(**args)
304
305
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307   """Ensure a node has a correct known_hosts entry.
308
309   Args:
310     fullnode - Fully qualified domain name of host. (str)
311     ip       - IPv4 address of host (str)
312     pubkey   - the public key of the cluster
313
314   """
315   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317   else:
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319
320   inthere = False
321
322   save_lines = []
323   add_lines = []
324   removed = False
325
326   for rawline in f:
327     logger.Debug('read %s' % (repr(rawline),))
328
329     parts = rawline.rstrip('\r\n').split()
330
331     # Ignore unwanted lines
332     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333       fields = parts[0].split(',')
334       key = parts[2]
335
336       haveall = True
337       havesome = False
338       for spec in [ ip, fullnode ]:
339         if spec not in fields:
340           haveall = False
341         if spec in fields:
342           havesome = True
343
344       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345       if haveall and key == pubkey:
346         inthere = True
347         save_lines.append(rawline)
348         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349         continue
350
351       if havesome and (not haveall or key != pubkey):
352         removed = True
353         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354         continue
355
356     save_lines.append(rawline)
357
358   if not inthere:
359     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361
362   if removed:
363     save_lines = save_lines + add_lines
364
365     # Write a new file and replace old.
366     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367                                    constants.DATA_DIR)
368     newfile = os.fdopen(fd, 'w')
369     try:
370       newfile.write(''.join(save_lines))
371     finally:
372       newfile.close()
373     logger.Debug("Wrote new known_hosts.")
374     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375
376   elif add_lines:
377     # Simply appending a new line will do the trick.
378     f.seek(0, 2)
379     for add in add_lines:
380       f.write(add)
381
382   f.close()
383
384
385 def _HasValidVG(vglist, vgname):
386   """Checks if the volume group list is valid.
387
388   A non-None return value means there's an error, and the return value
389   is the error message.
390
391   """
392   vgsize = vglist.get(vgname, None)
393   if vgsize is None:
394     return "volume group '%s' missing" % vgname
395   elif vgsize < 20480:
396     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397             (vgname, vgsize))
398   return None
399
400
401 def _InitSSHSetup(node):
402   """Setup the SSH configuration for the cluster.
403
404
405   This generates a dsa keypair for root, adds the pub key to the
406   permitted hosts and adds the hostkey to its own known hosts.
407
408   Args:
409     node: the name of this host as a fqdn
410
411   """
412   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413
414   for name in priv_key, pub_key:
415     if os.path.exists(name):
416       utils.CreateBackup(name)
417     utils.RemoveFile(name)
418
419   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420                          "-f", priv_key,
421                          "-q", "-N", ""])
422   if result.failed:
423     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424                              result.output)
425
426   f = open(pub_key, 'r')
427   try:
428     utils.AddAuthorizedKey(auth_keys, f.read(8192))
429   finally:
430     f.close()
431
432
433 def _InitGanetiServerSetup(ss):
434   """Setup the necessary configuration for the initial node daemon.
435
436   This creates the nodepass file containing the shared password for
437   the cluster and also generates the SSL certificate.
438
439   """
440   # Create pseudo random password
441   randpass = sha.new(os.urandom(64)).hexdigest()
442   # and write it into sstore
443   ss.SetKey(ss.SS_NODED_PASS, randpass)
444
445   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446                          "-days", str(365*5), "-nodes", "-x509",
447                          "-keyout", constants.SSL_CERT_FILE,
448                          "-out", constants.SSL_CERT_FILE, "-batch"])
449   if result.failed:
450     raise errors.OpExecError("could not generate server ssl cert, command"
451                              " %s had exitcode %s and error message %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454   os.chmod(constants.SSL_CERT_FILE, 0400)
455
456   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457
458   if result.failed:
459     raise errors.OpExecError("Could not start the node daemon, command %s"
460                              " had exitcode %s and error %s" %
461                              (result.cmd, result.exit_code, result.output))
462
463
464 def _CheckInstanceBridgesExist(instance):
465   """Check that the brigdes needed by an instance exist.
466
467   """
468   # check bridges existance
469   brlist = [nic.bridge for nic in instance.nics]
470   if not rpc.call_bridges_exist(instance.primary_node, brlist):
471     raise errors.OpPrereqError("one or more target bridges %s does not"
472                                " exist on destination node '%s'" %
473                                (brlist, instance.primary_node))
474
475
476 class LUInitCluster(LogicalUnit):
477   """Initialise the cluster.
478
479   """
480   HPATH = "cluster-init"
481   HTYPE = constants.HTYPE_CLUSTER
482   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483               "def_bridge", "master_netdev"]
484   REQ_CLUSTER = False
485
486   def BuildHooksEnv(self):
487     """Build hooks env.
488
489     Notes: Since we don't require a cluster, we must manually add
490     ourselves in the post-run node list.
491
492     """
493     env = {"OP_TARGET": self.op.cluster_name}
494     return env, [], [self.hostname.name]
495
496   def CheckPrereq(self):
497     """Verify that the passed name is a valid one.
498
499     """
500     if config.ConfigWriter.IsCluster():
501       raise errors.OpPrereqError("Cluster is already initialised")
502
503     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504       if not os.path.exists(constants.VNC_PASSWORD_FILE):
505         raise errors.OpPrereqError("Please prepare the cluster VNC"
506                                    "password file %s" %
507                                    constants.VNC_PASSWORD_FILE)
508
509     self.hostname = hostname = utils.HostInfo()
510
511     if hostname.ip.startswith("127."):
512       raise errors.OpPrereqError("This host's IP resolves to the private"
513                                  " range (%s). Please fix DNS or %s." %
514                                  (hostname.ip, constants.ETC_HOSTS))
515
516     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
517                          source=constants.LOCALHOST_IP_ADDRESS):
518       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
519                                  " to %s,\nbut this ip address does not"
520                                  " belong to this host."
521                                  " Aborting." % hostname.ip)
522
523     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
524
525     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
526                      timeout=5):
527       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
528
529     secondary_ip = getattr(self.op, "secondary_ip", None)
530     if secondary_ip and not utils.IsValidIP(secondary_ip):
531       raise errors.OpPrereqError("Invalid secondary ip given")
532     if (secondary_ip and
533         secondary_ip != hostname.ip and
534         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
535                            source=constants.LOCALHOST_IP_ADDRESS))):
536       raise errors.OpPrereqError("You gave %s as secondary IP,"
537                                  " but it does not belong to this host." %
538                                  secondary_ip)
539     self.secondary_ip = secondary_ip
540
541     # checks presence of the volume group given
542     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
543
544     if vgstatus:
545       raise errors.OpPrereqError("Error: %s" % vgstatus)
546
547     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
548                     self.op.mac_prefix):
549       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
550                                  self.op.mac_prefix)
551
552     if self.op.hypervisor_type not in constants.HYPER_TYPES:
553       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
554                                  self.op.hypervisor_type)
555
556     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
557     if result.failed:
558       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
559                                  (self.op.master_netdev,
560                                   result.output.strip()))
561
562     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
563             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
564       raise errors.OpPrereqError("Init.d script '%s' missing or not"
565                                  " executable." % constants.NODE_INITD_SCRIPT)
566
567   def Exec(self, feedback_fn):
568     """Initialize the cluster.
569
570     """
571     clustername = self.clustername
572     hostname = self.hostname
573
574     # set up the simple store
575     self.sstore = ss = ssconf.SimpleStore()
576     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
577     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
578     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
579     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
580     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
581
582     # set up the inter-node password and certificate
583     _InitGanetiServerSetup(ss)
584
585     # start the master ip
586     rpc.call_node_start_master(hostname.name)
587
588     # set up ssh config and /etc/hosts
589     f = open(constants.SSH_HOST_RSA_PUB, 'r')
590     try:
591       sshline = f.read()
592     finally:
593       f.close()
594     sshkey = sshline.split(" ")[1]
595
596     _AddHostToEtcHosts(hostname.name)
597
598     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
599
600     _InitSSHSetup(hostname.name)
601
602     # init of cluster config file
603     self.cfg = cfgw = config.ConfigWriter()
604     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
605                     sshkey, self.op.mac_prefix,
606                     self.op.vg_name, self.op.def_bridge)
607
608
609 class LUDestroyCluster(NoHooksLU):
610   """Logical unit for destroying the cluster.
611
612   """
613   _OP_REQP = []
614
615   def CheckPrereq(self):
616     """Check prerequisites.
617
618     This checks whether the cluster is empty.
619
620     Any errors are signalled by raising errors.OpPrereqError.
621
622     """
623     master = self.sstore.GetMasterNode()
624
625     nodelist = self.cfg.GetNodeList()
626     if len(nodelist) != 1 or nodelist[0] != master:
627       raise errors.OpPrereqError("There are still %d node(s) in"
628                                  " this cluster." % (len(nodelist) - 1))
629     instancelist = self.cfg.GetInstanceList()
630     if instancelist:
631       raise errors.OpPrereqError("There are still %d instance(s) in"
632                                  " this cluster." % len(instancelist))
633
634   def Exec(self, feedback_fn):
635     """Destroys the cluster.
636
637     """
638     master = self.sstore.GetMasterNode()
639     if not rpc.call_node_stop_master(master):
640       raise errors.OpExecError("Could not disable the master role")
641     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
642     utils.CreateBackup(priv_key)
643     utils.CreateBackup(pub_key)
644     rpc.call_node_leave_cluster(master)
645
646
647 class LUVerifyCluster(NoHooksLU):
648   """Verifies the cluster status.
649
650   """
651   _OP_REQP = []
652
653   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
654                   remote_version, feedback_fn):
655     """Run multiple tests against a node.
656
657     Test list:
658       - compares ganeti version
659       - checks vg existance and size > 20G
660       - checks config file checksum
661       - checks ssh to other nodes
662
663     Args:
664       node: name of the node to check
665       file_list: required list of files
666       local_cksum: dictionary of local files and their checksums
667
668     """
669     # compares ganeti version
670     local_version = constants.PROTOCOL_VERSION
671     if not remote_version:
672       feedback_fn(" - ERROR: connection to %s failed" % (node))
673       return True
674
675     if local_version != remote_version:
676       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
677                       (local_version, node, remote_version))
678       return True
679
680     # checks vg existance and size > 20G
681
682     bad = False
683     if not vglist:
684       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
685                       (node,))
686       bad = True
687     else:
688       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
689       if vgstatus:
690         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
691         bad = True
692
693     # checks config file checksum
694     # checks ssh to any
695
696     if 'filelist' not in node_result:
697       bad = True
698       feedback_fn("  - ERROR: node hasn't returned file checksum data")
699     else:
700       remote_cksum = node_result['filelist']
701       for file_name in file_list:
702         if file_name not in remote_cksum:
703           bad = True
704           feedback_fn("  - ERROR: file '%s' missing" % file_name)
705         elif remote_cksum[file_name] != local_cksum[file_name]:
706           bad = True
707           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
708
709     if 'nodelist' not in node_result:
710       bad = True
711       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
712     else:
713       if node_result['nodelist']:
714         bad = True
715         for node in node_result['nodelist']:
716           feedback_fn("  - ERROR: communication with node '%s': %s" %
717                           (node, node_result['nodelist'][node]))
718     hyp_result = node_result.get('hypervisor', None)
719     if hyp_result is not None:
720       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
721     return bad
722
723   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
724     """Verify an instance.
725
726     This function checks to see if the required block devices are
727     available on the instance's node.
728
729     """
730     bad = False
731
732     instancelist = self.cfg.GetInstanceList()
733     if not instance in instancelist:
734       feedback_fn("  - ERROR: instance %s not in instance list %s" %
735                       (instance, instancelist))
736       bad = True
737
738     instanceconfig = self.cfg.GetInstanceInfo(instance)
739     node_current = instanceconfig.primary_node
740
741     node_vol_should = {}
742     instanceconfig.MapLVsByNode(node_vol_should)
743
744     for node in node_vol_should:
745       for volume in node_vol_should[node]:
746         if node not in node_vol_is or volume not in node_vol_is[node]:
747           feedback_fn("  - ERROR: volume %s missing on node %s" %
748                           (volume, node))
749           bad = True
750
751     if not instanceconfig.status == 'down':
752       if not instance in node_instance[node_current]:
753         feedback_fn("  - ERROR: instance %s not running on node %s" %
754                         (instance, node_current))
755         bad = True
756
757     for node in node_instance:
758       if (not node == node_current):
759         if instance in node_instance[node]:
760           feedback_fn("  - ERROR: instance %s should not run on node %s" %
761                           (instance, node))
762           bad = True
763
764     return bad
765
766   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
767     """Verify if there are any unknown volumes in the cluster.
768
769     The .os, .swap and backup volumes are ignored. All other volumes are
770     reported as unknown.
771
772     """
773     bad = False
774
775     for node in node_vol_is:
776       for volume in node_vol_is[node]:
777         if node not in node_vol_should or volume not in node_vol_should[node]:
778           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
779                       (volume, node))
780           bad = True
781     return bad
782
783   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
784     """Verify the list of running instances.
785
786     This checks what instances are running but unknown to the cluster.
787
788     """
789     bad = False
790     for node in node_instance:
791       for runninginstance in node_instance[node]:
792         if runninginstance not in instancelist:
793           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
794                           (runninginstance, node))
795           bad = True
796     return bad
797
798   def CheckPrereq(self):
799     """Check prerequisites.
800
801     This has no prerequisites.
802
803     """
804     pass
805
806   def Exec(self, feedback_fn):
807     """Verify integrity of cluster, performing various test on nodes.
808
809     """
810     bad = False
811     feedback_fn("* Verifying global settings")
812     for msg in self.cfg.VerifyConfig():
813       feedback_fn("  - ERROR: %s" % msg)
814
815     vg_name = self.cfg.GetVGName()
816     nodelist = utils.NiceSort(self.cfg.GetNodeList())
817     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
818     node_volume = {}
819     node_instance = {}
820
821     # FIXME: verify OS list
822     # do local checksums
823     file_names = list(self.sstore.GetFileList())
824     file_names.append(constants.SSL_CERT_FILE)
825     file_names.append(constants.CLUSTER_CONF_FILE)
826     local_checksums = utils.FingerprintFiles(file_names)
827
828     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
829     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
830     all_instanceinfo = rpc.call_instance_list(nodelist)
831     all_vglist = rpc.call_vg_list(nodelist)
832     node_verify_param = {
833       'filelist': file_names,
834       'nodelist': nodelist,
835       'hypervisor': None,
836       }
837     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
838     all_rversion = rpc.call_version(nodelist)
839
840     for node in nodelist:
841       feedback_fn("* Verifying node %s" % node)
842       result = self._VerifyNode(node, file_names, local_checksums,
843                                 all_vglist[node], all_nvinfo[node],
844                                 all_rversion[node], feedback_fn)
845       bad = bad or result
846
847       # node_volume
848       volumeinfo = all_volumeinfo[node]
849
850       if isinstance(volumeinfo, basestring):
851         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
852                     (node, volumeinfo[-400:].encode('string_escape')))
853         bad = True
854         node_volume[node] = {}
855       elif not isinstance(volumeinfo, dict):
856         feedback_fn("  - ERROR: connection to %s failed" % (node,))
857         bad = True
858         continue
859       else:
860         node_volume[node] = volumeinfo
861
862       # node_instance
863       nodeinstance = all_instanceinfo[node]
864       if type(nodeinstance) != list:
865         feedback_fn("  - ERROR: connection to %s failed" % (node,))
866         bad = True
867         continue
868
869       node_instance[node] = nodeinstance
870
871     node_vol_should = {}
872
873     for instance in instancelist:
874       feedback_fn("* Verifying instance %s" % instance)
875       result =  self._VerifyInstance(instance, node_volume, node_instance,
876                                      feedback_fn)
877       bad = bad or result
878
879       inst_config = self.cfg.GetInstanceInfo(instance)
880
881       inst_config.MapLVsByNode(node_vol_should)
882
883     feedback_fn("* Verifying orphan volumes")
884     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
885                                        feedback_fn)
886     bad = bad or result
887
888     feedback_fn("* Verifying remaining instances")
889     result = self._VerifyOrphanInstances(instancelist, node_instance,
890                                          feedback_fn)
891     bad = bad or result
892
893     return int(bad)
894
895
896 class LUVerifyDisks(NoHooksLU):
897   """Verifies the cluster disks status.
898
899   """
900   _OP_REQP = []
901
902   def CheckPrereq(self):
903     """Check prerequisites.
904
905     This has no prerequisites.
906
907     """
908     pass
909
910   def Exec(self, feedback_fn):
911     """Verify integrity of cluster disks.
912
913     """
914     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
915
916     vg_name = self.cfg.GetVGName()
917     nodes = utils.NiceSort(self.cfg.GetNodeList())
918     instances = [self.cfg.GetInstanceInfo(name)
919                  for name in self.cfg.GetInstanceList()]
920
921     nv_dict = {}
922     for inst in instances:
923       inst_lvs = {}
924       if (inst.status != "up" or
925           inst.disk_template not in constants.DTS_NET_MIRROR):
926         continue
927       inst.MapLVsByNode(inst_lvs)
928       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
929       for node, vol_list in inst_lvs.iteritems():
930         for vol in vol_list:
931           nv_dict[(node, vol)] = inst
932
933     if not nv_dict:
934       return result
935
936     node_lvs = rpc.call_volume_list(nodes, vg_name)
937
938     to_act = set()
939     for node in nodes:
940       # node_volume
941       lvs = node_lvs[node]
942
943       if isinstance(lvs, basestring):
944         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
945         res_nlvm[node] = lvs
946       elif not isinstance(lvs, dict):
947         logger.Info("connection to node %s failed or invalid data returned" %
948                     (node,))
949         res_nodes.append(node)
950         continue
951
952       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
953         inst = nv_dict.pop((node, lv_name), None)
954         if (not lv_online and inst is not None
955             and inst.name not in res_instances):
956           res_instances.append(inst.name)
957
958     # any leftover items in nv_dict are missing LVs, let's arrange the
959     # data better
960     for key, inst in nv_dict.iteritems():
961       if inst.name not in res_missing:
962         res_missing[inst.name] = []
963       res_missing[inst.name].append(key)
964
965     return result
966
967
968 class LURenameCluster(LogicalUnit):
969   """Rename the cluster.
970
971   """
972   HPATH = "cluster-rename"
973   HTYPE = constants.HTYPE_CLUSTER
974   _OP_REQP = ["name"]
975
976   def BuildHooksEnv(self):
977     """Build hooks env.
978
979     """
980     env = {
981       "OP_TARGET": self.sstore.GetClusterName(),
982       "NEW_NAME": self.op.name,
983       }
984     mn = self.sstore.GetMasterNode()
985     return env, [mn], [mn]
986
987   def CheckPrereq(self):
988     """Verify that the passed name is a valid one.
989
990     """
991     hostname = utils.HostInfo(self.op.name)
992
993     new_name = hostname.name
994     self.ip = new_ip = hostname.ip
995     old_name = self.sstore.GetClusterName()
996     old_ip = self.sstore.GetMasterIP()
997     if new_name == old_name and new_ip == old_ip:
998       raise errors.OpPrereqError("Neither the name nor the IP address of the"
999                                  " cluster has changed")
1000     if new_ip != old_ip:
1001       result = utils.RunCmd(["fping", "-q", new_ip])
1002       if not result.failed:
1003         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1004                                    " reachable on the network. Aborting." %
1005                                    new_ip)
1006
1007     self.op.name = new_name
1008
1009   def Exec(self, feedback_fn):
1010     """Rename the cluster.
1011
1012     """
1013     clustername = self.op.name
1014     ip = self.ip
1015     ss = self.sstore
1016
1017     # shutdown the master IP
1018     master = ss.GetMasterNode()
1019     if not rpc.call_node_stop_master(master):
1020       raise errors.OpExecError("Could not disable the master role")
1021
1022     try:
1023       # modify the sstore
1024       ss.SetKey(ss.SS_MASTER_IP, ip)
1025       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1026
1027       # Distribute updated ss config to all nodes
1028       myself = self.cfg.GetNodeInfo(master)
1029       dist_nodes = self.cfg.GetNodeList()
1030       if myself.name in dist_nodes:
1031         dist_nodes.remove(myself.name)
1032
1033       logger.Debug("Copying updated ssconf data to all nodes")
1034       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1035         fname = ss.KeyToFilename(keyname)
1036         result = rpc.call_upload_file(dist_nodes, fname)
1037         for to_node in dist_nodes:
1038           if not result[to_node]:
1039             logger.Error("copy of file %s to node %s failed" %
1040                          (fname, to_node))
1041     finally:
1042       if not rpc.call_node_start_master(master):
1043         logger.Error("Could not re-enable the master role on the master,"
1044                      " please restart manually.")
1045
1046
1047 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1048   """Sleep and poll for an instance's disk to sync.
1049
1050   """
1051   if not instance.disks:
1052     return True
1053
1054   if not oneshot:
1055     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1056
1057   node = instance.primary_node
1058
1059   for dev in instance.disks:
1060     cfgw.SetDiskID(dev, node)
1061
1062   retries = 0
1063   while True:
1064     max_time = 0
1065     done = True
1066     cumul_degraded = False
1067     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1068     if not rstats:
1069       proc.LogWarning("Can't get any data from node %s" % node)
1070       retries += 1
1071       if retries >= 10:
1072         raise errors.RemoteError("Can't contact node %s for mirror data,"
1073                                  " aborting." % node)
1074       time.sleep(6)
1075       continue
1076     retries = 0
1077     for i in range(len(rstats)):
1078       mstat = rstats[i]
1079       if mstat is None:
1080         proc.LogWarning("Can't compute data for node %s/%s" %
1081                         (node, instance.disks[i].iv_name))
1082         continue
1083       # we ignore the ldisk parameter
1084       perc_done, est_time, is_degraded, _ = mstat
1085       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1086       if perc_done is not None:
1087         done = False
1088         if est_time is not None:
1089           rem_time = "%d estimated seconds remaining" % est_time
1090           max_time = est_time
1091         else:
1092           rem_time = "no time estimate"
1093         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1094                      (instance.disks[i].iv_name, perc_done, rem_time))
1095     if done or oneshot:
1096       break
1097
1098     if unlock:
1099       utils.Unlock('cmd')
1100     try:
1101       time.sleep(min(60, max_time))
1102     finally:
1103       if unlock:
1104         utils.Lock('cmd')
1105
1106   if done:
1107     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1108   return not cumul_degraded
1109
1110
1111 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1112   """Check that mirrors are not degraded.
1113
1114   The ldisk parameter, if True, will change the test from the
1115   is_degraded attribute (which represents overall non-ok status for
1116   the device(s)) to the ldisk (representing the local storage status).
1117
1118   """
1119   cfgw.SetDiskID(dev, node)
1120   if ldisk:
1121     idx = 6
1122   else:
1123     idx = 5
1124
1125   result = True
1126   if on_primary or dev.AssembleOnSecondary():
1127     rstats = rpc.call_blockdev_find(node, dev)
1128     if not rstats:
1129       logger.ToStderr("Disk degraded or not found on node %s" % node)
1130       result = False
1131     else:
1132       result = result and (not rstats[idx])
1133   if dev.children:
1134     for child in dev.children:
1135       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1136
1137   return result
1138
1139
1140 class LUDiagnoseOS(NoHooksLU):
1141   """Logical unit for OS diagnose/query.
1142
1143   """
1144   _OP_REQP = []
1145
1146   def CheckPrereq(self):
1147     """Check prerequisites.
1148
1149     This always succeeds, since this is a pure query LU.
1150
1151     """
1152     return
1153
1154   def Exec(self, feedback_fn):
1155     """Compute the list of OSes.
1156
1157     """
1158     node_list = self.cfg.GetNodeList()
1159     node_data = rpc.call_os_diagnose(node_list)
1160     if node_data == False:
1161       raise errors.OpExecError("Can't gather the list of OSes")
1162     return node_data
1163
1164
1165 class LURemoveNode(LogicalUnit):
1166   """Logical unit for removing a node.
1167
1168   """
1169   HPATH = "node-remove"
1170   HTYPE = constants.HTYPE_NODE
1171   _OP_REQP = ["node_name"]
1172
1173   def BuildHooksEnv(self):
1174     """Build hooks env.
1175
1176     This doesn't run on the target node in the pre phase as a failed
1177     node would not allows itself to run.
1178
1179     """
1180     env = {
1181       "OP_TARGET": self.op.node_name,
1182       "NODE_NAME": self.op.node_name,
1183       }
1184     all_nodes = self.cfg.GetNodeList()
1185     all_nodes.remove(self.op.node_name)
1186     return env, all_nodes, all_nodes
1187
1188   def CheckPrereq(self):
1189     """Check prerequisites.
1190
1191     This checks:
1192      - the node exists in the configuration
1193      - it does not have primary or secondary instances
1194      - it's not the master
1195
1196     Any errors are signalled by raising errors.OpPrereqError.
1197
1198     """
1199     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1200     if node is None:
1201       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1202
1203     instance_list = self.cfg.GetInstanceList()
1204
1205     masternode = self.sstore.GetMasterNode()
1206     if node.name == masternode:
1207       raise errors.OpPrereqError("Node is the master node,"
1208                                  " you need to failover first.")
1209
1210     for instance_name in instance_list:
1211       instance = self.cfg.GetInstanceInfo(instance_name)
1212       if node.name == instance.primary_node:
1213         raise errors.OpPrereqError("Instance %s still running on the node,"
1214                                    " please remove first." % instance_name)
1215       if node.name in instance.secondary_nodes:
1216         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1217                                    " please remove first." % instance_name)
1218     self.op.node_name = node.name
1219     self.node = node
1220
1221   def Exec(self, feedback_fn):
1222     """Removes the node from the cluster.
1223
1224     """
1225     node = self.node
1226     logger.Info("stopping the node daemon and removing configs from node %s" %
1227                 node.name)
1228
1229     rpc.call_node_leave_cluster(node.name)
1230
1231     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1232
1233     logger.Info("Removing node %s from config" % node.name)
1234
1235     self.cfg.RemoveNode(node.name)
1236
1237     _RemoveHostFromEtcHosts(node.name)
1238
1239
1240 class LUQueryNodes(NoHooksLU):
1241   """Logical unit for querying nodes.
1242
1243   """
1244   _OP_REQP = ["output_fields", "names"]
1245
1246   def CheckPrereq(self):
1247     """Check prerequisites.
1248
1249     This checks that the fields required are valid output fields.
1250
1251     """
1252     self.dynamic_fields = frozenset(["dtotal", "dfree",
1253                                      "mtotal", "mnode", "mfree",
1254                                      "bootid"])
1255
1256     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1257                                "pinst_list", "sinst_list",
1258                                "pip", "sip"],
1259                        dynamic=self.dynamic_fields,
1260                        selected=self.op.output_fields)
1261
1262     self.wanted = _GetWantedNodes(self, self.op.names)
1263
1264   def Exec(self, feedback_fn):
1265     """Computes the list of nodes and their attributes.
1266
1267     """
1268     nodenames = self.wanted
1269     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1270
1271     # begin data gathering
1272
1273     if self.dynamic_fields.intersection(self.op.output_fields):
1274       live_data = {}
1275       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1276       for name in nodenames:
1277         nodeinfo = node_data.get(name, None)
1278         if nodeinfo:
1279           live_data[name] = {
1280             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1281             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1282             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1283             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1284             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1285             "bootid": nodeinfo['bootid'],
1286             }
1287         else:
1288           live_data[name] = {}
1289     else:
1290       live_data = dict.fromkeys(nodenames, {})
1291
1292     node_to_primary = dict([(name, set()) for name in nodenames])
1293     node_to_secondary = dict([(name, set()) for name in nodenames])
1294
1295     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1296                              "sinst_cnt", "sinst_list"))
1297     if inst_fields & frozenset(self.op.output_fields):
1298       instancelist = self.cfg.GetInstanceList()
1299
1300       for instance_name in instancelist:
1301         inst = self.cfg.GetInstanceInfo(instance_name)
1302         if inst.primary_node in node_to_primary:
1303           node_to_primary[inst.primary_node].add(inst.name)
1304         for secnode in inst.secondary_nodes:
1305           if secnode in node_to_secondary:
1306             node_to_secondary[secnode].add(inst.name)
1307
1308     # end data gathering
1309
1310     output = []
1311     for node in nodelist:
1312       node_output = []
1313       for field in self.op.output_fields:
1314         if field == "name":
1315           val = node.name
1316         elif field == "pinst_list":
1317           val = list(node_to_primary[node.name])
1318         elif field == "sinst_list":
1319           val = list(node_to_secondary[node.name])
1320         elif field == "pinst_cnt":
1321           val = len(node_to_primary[node.name])
1322         elif field == "sinst_cnt":
1323           val = len(node_to_secondary[node.name])
1324         elif field == "pip":
1325           val = node.primary_ip
1326         elif field == "sip":
1327           val = node.secondary_ip
1328         elif field in self.dynamic_fields:
1329           val = live_data[node.name].get(field, None)
1330         else:
1331           raise errors.ParameterError(field)
1332         node_output.append(val)
1333       output.append(node_output)
1334
1335     return output
1336
1337
1338 class LUQueryNodeVolumes(NoHooksLU):
1339   """Logical unit for getting volumes on node(s).
1340
1341   """
1342   _OP_REQP = ["nodes", "output_fields"]
1343
1344   def CheckPrereq(self):
1345     """Check prerequisites.
1346
1347     This checks that the fields required are valid output fields.
1348
1349     """
1350     self.nodes = _GetWantedNodes(self, self.op.nodes)
1351
1352     _CheckOutputFields(static=["node"],
1353                        dynamic=["phys", "vg", "name", "size", "instance"],
1354                        selected=self.op.output_fields)
1355
1356
1357   def Exec(self, feedback_fn):
1358     """Computes the list of nodes and their attributes.
1359
1360     """
1361     nodenames = self.nodes
1362     volumes = rpc.call_node_volumes(nodenames)
1363
1364     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1365              in self.cfg.GetInstanceList()]
1366
1367     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1368
1369     output = []
1370     for node in nodenames:
1371       if node not in volumes or not volumes[node]:
1372         continue
1373
1374       node_vols = volumes[node][:]
1375       node_vols.sort(key=lambda vol: vol['dev'])
1376
1377       for vol in node_vols:
1378         node_output = []
1379         for field in self.op.output_fields:
1380           if field == "node":
1381             val = node
1382           elif field == "phys":
1383             val = vol['dev']
1384           elif field == "vg":
1385             val = vol['vg']
1386           elif field == "name":
1387             val = vol['name']
1388           elif field == "size":
1389             val = int(float(vol['size']))
1390           elif field == "instance":
1391             for inst in ilist:
1392               if node not in lv_by_node[inst]:
1393                 continue
1394               if vol['name'] in lv_by_node[inst][node]:
1395                 val = inst.name
1396                 break
1397             else:
1398               val = '-'
1399           else:
1400             raise errors.ParameterError(field)
1401           node_output.append(str(val))
1402
1403         output.append(node_output)
1404
1405     return output
1406
1407
1408 class LUAddNode(LogicalUnit):
1409   """Logical unit for adding node to the cluster.
1410
1411   """
1412   HPATH = "node-add"
1413   HTYPE = constants.HTYPE_NODE
1414   _OP_REQP = ["node_name"]
1415
1416   def BuildHooksEnv(self):
1417     """Build hooks env.
1418
1419     This will run on all nodes before, and on all nodes + the new node after.
1420
1421     """
1422     env = {
1423       "OP_TARGET": self.op.node_name,
1424       "NODE_NAME": self.op.node_name,
1425       "NODE_PIP": self.op.primary_ip,
1426       "NODE_SIP": self.op.secondary_ip,
1427       }
1428     nodes_0 = self.cfg.GetNodeList()
1429     nodes_1 = nodes_0 + [self.op.node_name, ]
1430     return env, nodes_0, nodes_1
1431
1432   def CheckPrereq(self):
1433     """Check prerequisites.
1434
1435     This checks:
1436      - the new node is not already in the config
1437      - it is resolvable
1438      - its parameters (single/dual homed) matches the cluster
1439
1440     Any errors are signalled by raising errors.OpPrereqError.
1441
1442     """
1443     node_name = self.op.node_name
1444     cfg = self.cfg
1445
1446     dns_data = utils.HostInfo(node_name)
1447
1448     node = dns_data.name
1449     primary_ip = self.op.primary_ip = dns_data.ip
1450     secondary_ip = getattr(self.op, "secondary_ip", None)
1451     if secondary_ip is None:
1452       secondary_ip = primary_ip
1453     if not utils.IsValidIP(secondary_ip):
1454       raise errors.OpPrereqError("Invalid secondary IP given")
1455     self.op.secondary_ip = secondary_ip
1456     node_list = cfg.GetNodeList()
1457     if node in node_list:
1458       raise errors.OpPrereqError("Node %s is already in the configuration"
1459                                  % node)
1460
1461     for existing_node_name in node_list:
1462       existing_node = cfg.GetNodeInfo(existing_node_name)
1463       if (existing_node.primary_ip == primary_ip or
1464           existing_node.secondary_ip == primary_ip or
1465           existing_node.primary_ip == secondary_ip or
1466           existing_node.secondary_ip == secondary_ip):
1467         raise errors.OpPrereqError("New node ip address(es) conflict with"
1468                                    " existing node %s" % existing_node.name)
1469
1470     # check that the type of the node (single versus dual homed) is the
1471     # same as for the master
1472     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1473     master_singlehomed = myself.secondary_ip == myself.primary_ip
1474     newbie_singlehomed = secondary_ip == primary_ip
1475     if master_singlehomed != newbie_singlehomed:
1476       if master_singlehomed:
1477         raise errors.OpPrereqError("The master has no private ip but the"
1478                                    " new node has one")
1479       else:
1480         raise errors.OpPrereqError("The master has a private ip but the"
1481                                    " new node doesn't have one")
1482
1483     # checks reachablity
1484     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1485       raise errors.OpPrereqError("Node not reachable by ping")
1486
1487     if not newbie_singlehomed:
1488       # check reachability from my secondary ip to newbie's secondary ip
1489       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1490                            source=myself.secondary_ip):
1491         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1492                                    " based ping to noded port")
1493
1494     self.new_node = objects.Node(name=node,
1495                                  primary_ip=primary_ip,
1496                                  secondary_ip=secondary_ip)
1497
1498     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1499       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1500         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1501                                    constants.VNC_PASSWORD_FILE)
1502
1503   def Exec(self, feedback_fn):
1504     """Adds the new node to the cluster.
1505
1506     """
1507     new_node = self.new_node
1508     node = new_node.name
1509
1510     # set up inter-node password and certificate and restarts the node daemon
1511     gntpass = self.sstore.GetNodeDaemonPassword()
1512     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1513       raise errors.OpExecError("ganeti password corruption detected")
1514     f = open(constants.SSL_CERT_FILE)
1515     try:
1516       gntpem = f.read(8192)
1517     finally:
1518       f.close()
1519     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1520     # so we use this to detect an invalid certificate; as long as the
1521     # cert doesn't contain this, the here-document will be correctly
1522     # parsed by the shell sequence below
1523     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1524       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1525     if not gntpem.endswith("\n"):
1526       raise errors.OpExecError("PEM must end with newline")
1527     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1528
1529     # and then connect with ssh to set password and start ganeti-noded
1530     # note that all the below variables are sanitized at this point,
1531     # either by being constants or by the checks above
1532     ss = self.sstore
1533     mycommand = ("umask 077 && "
1534                  "echo '%s' > '%s' && "
1535                  "cat > '%s' << '!EOF.' && \n"
1536                  "%s!EOF.\n%s restart" %
1537                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1538                   constants.SSL_CERT_FILE, gntpem,
1539                   constants.NODE_INITD_SCRIPT))
1540
1541     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1542     if result.failed:
1543       raise errors.OpExecError("Remote command on node %s, error: %s,"
1544                                " output: %s" %
1545                                (node, result.fail_reason, result.output))
1546
1547     # check connectivity
1548     time.sleep(4)
1549
1550     result = rpc.call_version([node])[node]
1551     if result:
1552       if constants.PROTOCOL_VERSION == result:
1553         logger.Info("communication to node %s fine, sw version %s match" %
1554                     (node, result))
1555       else:
1556         raise errors.OpExecError("Version mismatch master version %s,"
1557                                  " node version %s" %
1558                                  (constants.PROTOCOL_VERSION, result))
1559     else:
1560       raise errors.OpExecError("Cannot get version from the new node")
1561
1562     # setup ssh on node
1563     logger.Info("copy ssh key to node %s" % node)
1564     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1565     keyarray = []
1566     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1567                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1568                 priv_key, pub_key]
1569
1570     for i in keyfiles:
1571       f = open(i, 'r')
1572       try:
1573         keyarray.append(f.read())
1574       finally:
1575         f.close()
1576
1577     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1578                                keyarray[3], keyarray[4], keyarray[5])
1579
1580     if not result:
1581       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1582
1583     # Add node to our /etc/hosts, and add key to known_hosts
1584     _AddHostToEtcHosts(new_node.name)
1585
1586     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1587                       self.cfg.GetHostKey())
1588
1589     if new_node.secondary_ip != new_node.primary_ip:
1590       if not rpc.call_node_tcp_ping(new_node.name,
1591                                     constants.LOCALHOST_IP_ADDRESS,
1592                                     new_node.secondary_ip,
1593                                     constants.DEFAULT_NODED_PORT,
1594                                     10, False):
1595         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1596                                  " you gave (%s). Please fix and re-run this"
1597                                  " command." % new_node.secondary_ip)
1598
1599     success, msg = ssh.VerifyNodeHostname(node)
1600     if not success:
1601       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1602                                " than the one the resolver gives: %s."
1603                                " Please fix and re-run this command." %
1604                                (node, msg))
1605
1606     # Distribute updated /etc/hosts and known_hosts to all nodes,
1607     # including the node just added
1608     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1609     dist_nodes = self.cfg.GetNodeList() + [node]
1610     if myself.name in dist_nodes:
1611       dist_nodes.remove(myself.name)
1612
1613     logger.Debug("Copying hosts and known_hosts to all nodes")
1614     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1615       result = rpc.call_upload_file(dist_nodes, fname)
1616       for to_node in dist_nodes:
1617         if not result[to_node]:
1618           logger.Error("copy of file %s to node %s failed" %
1619                        (fname, to_node))
1620
1621     to_copy = ss.GetFileList()
1622     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1623       to_copy.append(constants.VNC_PASSWORD_FILE)
1624     for fname in to_copy:
1625       if not ssh.CopyFileToNode(node, fname):
1626         logger.Error("could not copy file %s to node %s" % (fname, node))
1627
1628     logger.Info("adding node %s to cluster.conf" % node)
1629     self.cfg.AddNode(new_node)
1630
1631
1632 class LUMasterFailover(LogicalUnit):
1633   """Failover the master node to the current node.
1634
1635   This is a special LU in that it must run on a non-master node.
1636
1637   """
1638   HPATH = "master-failover"
1639   HTYPE = constants.HTYPE_CLUSTER
1640   REQ_MASTER = False
1641   _OP_REQP = []
1642
1643   def BuildHooksEnv(self):
1644     """Build hooks env.
1645
1646     This will run on the new master only in the pre phase, and on all
1647     the nodes in the post phase.
1648
1649     """
1650     env = {
1651       "OP_TARGET": self.new_master,
1652       "NEW_MASTER": self.new_master,
1653       "OLD_MASTER": self.old_master,
1654       }
1655     return env, [self.new_master], self.cfg.GetNodeList()
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     This checks that we are not already the master.
1661
1662     """
1663     self.new_master = utils.HostInfo().name
1664     self.old_master = self.sstore.GetMasterNode()
1665
1666     if self.old_master == self.new_master:
1667       raise errors.OpPrereqError("This commands must be run on the node"
1668                                  " where you want the new master to be."
1669                                  " %s is already the master" %
1670                                  self.old_master)
1671
1672   def Exec(self, feedback_fn):
1673     """Failover the master node.
1674
1675     This command, when run on a non-master node, will cause the current
1676     master to cease being master, and the non-master to become new
1677     master.
1678
1679     """
1680     #TODO: do not rely on gethostname returning the FQDN
1681     logger.Info("setting master to %s, old master: %s" %
1682                 (self.new_master, self.old_master))
1683
1684     if not rpc.call_node_stop_master(self.old_master):
1685       logger.Error("could disable the master role on the old master"
1686                    " %s, please disable manually" % self.old_master)
1687
1688     ss = self.sstore
1689     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1690     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1691                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1692       logger.Error("could not distribute the new simple store master file"
1693                    " to the other nodes, please check.")
1694
1695     if not rpc.call_node_start_master(self.new_master):
1696       logger.Error("could not start the master role on the new master"
1697                    " %s, please check" % self.new_master)
1698       feedback_fn("Error in activating the master IP on the new master,"
1699                   " please fix manually.")
1700
1701
1702
1703 class LUQueryClusterInfo(NoHooksLU):
1704   """Query cluster configuration.
1705
1706   """
1707   _OP_REQP = []
1708   REQ_MASTER = False
1709
1710   def CheckPrereq(self):
1711     """No prerequsites needed for this LU.
1712
1713     """
1714     pass
1715
1716   def Exec(self, feedback_fn):
1717     """Return cluster config.
1718
1719     """
1720     result = {
1721       "name": self.sstore.GetClusterName(),
1722       "software_version": constants.RELEASE_VERSION,
1723       "protocol_version": constants.PROTOCOL_VERSION,
1724       "config_version": constants.CONFIG_VERSION,
1725       "os_api_version": constants.OS_API_VERSION,
1726       "export_version": constants.EXPORT_VERSION,
1727       "master": self.sstore.GetMasterNode(),
1728       "architecture": (platform.architecture()[0], platform.machine()),
1729       }
1730
1731     return result
1732
1733
1734 class LUClusterCopyFile(NoHooksLU):
1735   """Copy file to cluster.
1736
1737   """
1738   _OP_REQP = ["nodes", "filename"]
1739
1740   def CheckPrereq(self):
1741     """Check prerequisites.
1742
1743     It should check that the named file exists and that the given list
1744     of nodes is valid.
1745
1746     """
1747     if not os.path.exists(self.op.filename):
1748       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1749
1750     self.nodes = _GetWantedNodes(self, self.op.nodes)
1751
1752   def Exec(self, feedback_fn):
1753     """Copy a file from master to some nodes.
1754
1755     Args:
1756       opts - class with options as members
1757       args - list containing a single element, the file name
1758     Opts used:
1759       nodes - list containing the name of target nodes; if empty, all nodes
1760
1761     """
1762     filename = self.op.filename
1763
1764     myname = utils.HostInfo().name
1765
1766     for node in self.nodes:
1767       if node == myname:
1768         continue
1769       if not ssh.CopyFileToNode(node, filename):
1770         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1771
1772
1773 class LUDumpClusterConfig(NoHooksLU):
1774   """Return a text-representation of the cluster-config.
1775
1776   """
1777   _OP_REQP = []
1778
1779   def CheckPrereq(self):
1780     """No prerequisites.
1781
1782     """
1783     pass
1784
1785   def Exec(self, feedback_fn):
1786     """Dump a representation of the cluster config to the standard output.
1787
1788     """
1789     return self.cfg.DumpConfig()
1790
1791
1792 class LURunClusterCommand(NoHooksLU):
1793   """Run a command on some nodes.
1794
1795   """
1796   _OP_REQP = ["command", "nodes"]
1797
1798   def CheckPrereq(self):
1799     """Check prerequisites.
1800
1801     It checks that the given list of nodes is valid.
1802
1803     """
1804     self.nodes = _GetWantedNodes(self, self.op.nodes)
1805
1806   def Exec(self, feedback_fn):
1807     """Run a command on some nodes.
1808
1809     """
1810     # put the master at the end of the nodes list
1811     master_node = self.sstore.GetMasterNode()
1812     if master_node in self.nodes:
1813       self.nodes.remove(master_node)
1814       self.nodes.append(master_node)
1815
1816     data = []
1817     for node in self.nodes:
1818       result = ssh.SSHCall(node, "root", self.op.command)
1819       data.append((node, result.output, result.exit_code))
1820
1821     return data
1822
1823
1824 class LUActivateInstanceDisks(NoHooksLU):
1825   """Bring up an instance's disks.
1826
1827   """
1828   _OP_REQP = ["instance_name"]
1829
1830   def CheckPrereq(self):
1831     """Check prerequisites.
1832
1833     This checks that the instance is in the cluster.
1834
1835     """
1836     instance = self.cfg.GetInstanceInfo(
1837       self.cfg.ExpandInstanceName(self.op.instance_name))
1838     if instance is None:
1839       raise errors.OpPrereqError("Instance '%s' not known" %
1840                                  self.op.instance_name)
1841     self.instance = instance
1842
1843
1844   def Exec(self, feedback_fn):
1845     """Activate the disks.
1846
1847     """
1848     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1849     if not disks_ok:
1850       raise errors.OpExecError("Cannot activate block devices")
1851
1852     return disks_info
1853
1854
1855 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1856   """Prepare the block devices for an instance.
1857
1858   This sets up the block devices on all nodes.
1859
1860   Args:
1861     instance: a ganeti.objects.Instance object
1862     ignore_secondaries: if true, errors on secondary nodes won't result
1863                         in an error return from the function
1864
1865   Returns:
1866     false if the operation failed
1867     list of (host, instance_visible_name, node_visible_name) if the operation
1868          suceeded with the mapping from node devices to instance devices
1869   """
1870   device_info = []
1871   disks_ok = True
1872   iname = instance.name
1873   # With the two passes mechanism we try to reduce the window of
1874   # opportunity for the race condition of switching DRBD to primary
1875   # before handshaking occured, but we do not eliminate it
1876
1877   # The proper fix would be to wait (with some limits) until the
1878   # connection has been made and drbd transitions from WFConnection
1879   # into any other network-connected state (Connected, SyncTarget,
1880   # SyncSource, etc.)
1881
1882   # 1st pass, assemble on all nodes in secondary mode
1883   for inst_disk in instance.disks:
1884     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1885       cfg.SetDiskID(node_disk, node)
1886       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1887       if not result:
1888         logger.Error("could not prepare block device %s on node %s"
1889                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1890         if not ignore_secondaries:
1891           disks_ok = False
1892
1893   # FIXME: race condition on drbd migration to primary
1894
1895   # 2nd pass, do only the primary node
1896   for inst_disk in instance.disks:
1897     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1898       if node != instance.primary_node:
1899         continue
1900       cfg.SetDiskID(node_disk, node)
1901       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1902       if not result:
1903         logger.Error("could not prepare block device %s on node %s"
1904                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1905         disks_ok = False
1906     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1907
1908   # leave the disks configured for the primary node
1909   # this is a workaround that would be fixed better by
1910   # improving the logical/physical id handling
1911   for disk in instance.disks:
1912     cfg.SetDiskID(disk, instance.primary_node)
1913
1914   return disks_ok, device_info
1915
1916
1917 def _StartInstanceDisks(cfg, instance, force):
1918   """Start the disks of an instance.
1919
1920   """
1921   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1922                                            ignore_secondaries=force)
1923   if not disks_ok:
1924     _ShutdownInstanceDisks(instance, cfg)
1925     if force is not None and not force:
1926       logger.Error("If the message above refers to a secondary node,"
1927                    " you can retry the operation using '--force'.")
1928     raise errors.OpExecError("Disk consistency error")
1929
1930
1931 class LUDeactivateInstanceDisks(NoHooksLU):
1932   """Shutdown an instance's disks.
1933
1934   """
1935   _OP_REQP = ["instance_name"]
1936
1937   def CheckPrereq(self):
1938     """Check prerequisites.
1939
1940     This checks that the instance is in the cluster.
1941
1942     """
1943     instance = self.cfg.GetInstanceInfo(
1944       self.cfg.ExpandInstanceName(self.op.instance_name))
1945     if instance is None:
1946       raise errors.OpPrereqError("Instance '%s' not known" %
1947                                  self.op.instance_name)
1948     self.instance = instance
1949
1950   def Exec(self, feedback_fn):
1951     """Deactivate the disks
1952
1953     """
1954     instance = self.instance
1955     ins_l = rpc.call_instance_list([instance.primary_node])
1956     ins_l = ins_l[instance.primary_node]
1957     if not type(ins_l) is list:
1958       raise errors.OpExecError("Can't contact node '%s'" %
1959                                instance.primary_node)
1960
1961     if self.instance.name in ins_l:
1962       raise errors.OpExecError("Instance is running, can't shutdown"
1963                                " block devices.")
1964
1965     _ShutdownInstanceDisks(instance, self.cfg)
1966
1967
1968 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1969   """Shutdown block devices of an instance.
1970
1971   This does the shutdown on all nodes of the instance.
1972
1973   If the ignore_primary is false, errors on the primary node are
1974   ignored.
1975
1976   """
1977   result = True
1978   for disk in instance.disks:
1979     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1980       cfg.SetDiskID(top_disk, node)
1981       if not rpc.call_blockdev_shutdown(node, top_disk):
1982         logger.Error("could not shutdown block device %s on node %s" %
1983                      (disk.iv_name, node))
1984         if not ignore_primary or node != instance.primary_node:
1985           result = False
1986   return result
1987
1988
1989 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1990   """Checks if a node has enough free memory.
1991
1992   This function check if a given node has the needed amount of free
1993   memory. In case the node has less memory or we cannot get the
1994   information from the node, this function raise an OpPrereqError
1995   exception.
1996
1997   Args:
1998     - cfg: a ConfigWriter instance
1999     - node: the node name
2000     - reason: string to use in the error message
2001     - requested: the amount of memory in MiB
2002
2003   """
2004   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2005   if not nodeinfo or not isinstance(nodeinfo, dict):
2006     raise errors.OpPrereqError("Could not contact node %s for resource"
2007                              " information" % (node,))
2008
2009   free_mem = nodeinfo[node].get('memory_free')
2010   if not isinstance(free_mem, int):
2011     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2012                              " was '%s'" % (node, free_mem))
2013   if requested > free_mem:
2014     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2015                              " needed %s MiB, available %s MiB" %
2016                              (node, reason, requested, free_mem))
2017
2018
2019 class LUStartupInstance(LogicalUnit):
2020   """Starts an instance.
2021
2022   """
2023   HPATH = "instance-start"
2024   HTYPE = constants.HTYPE_INSTANCE
2025   _OP_REQP = ["instance_name", "force"]
2026
2027   def BuildHooksEnv(self):
2028     """Build hooks env.
2029
2030     This runs on master, primary and secondary nodes of the instance.
2031
2032     """
2033     env = {
2034       "FORCE": self.op.force,
2035       }
2036     env.update(_BuildInstanceHookEnvByObject(self.instance))
2037     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038           list(self.instance.secondary_nodes))
2039     return env, nl, nl
2040
2041   def CheckPrereq(self):
2042     """Check prerequisites.
2043
2044     This checks that the instance is in the cluster.
2045
2046     """
2047     instance = self.cfg.GetInstanceInfo(
2048       self.cfg.ExpandInstanceName(self.op.instance_name))
2049     if instance is None:
2050       raise errors.OpPrereqError("Instance '%s' not known" %
2051                                  self.op.instance_name)
2052
2053     # check bridges existance
2054     _CheckInstanceBridgesExist(instance)
2055
2056     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2057                          "starting instance %s" % instance.name,
2058                          instance.memory)
2059
2060     self.instance = instance
2061     self.op.instance_name = instance.name
2062
2063   def Exec(self, feedback_fn):
2064     """Start the instance.
2065
2066     """
2067     instance = self.instance
2068     force = self.op.force
2069     extra_args = getattr(self.op, "extra_args", "")
2070
2071     self.cfg.MarkInstanceUp(instance.name)
2072
2073     node_current = instance.primary_node
2074
2075     _StartInstanceDisks(self.cfg, instance, force)
2076
2077     if not rpc.call_instance_start(node_current, instance, extra_args):
2078       _ShutdownInstanceDisks(instance, self.cfg)
2079       raise errors.OpExecError("Could not start instance")
2080
2081
2082 class LURebootInstance(LogicalUnit):
2083   """Reboot an instance.
2084
2085   """
2086   HPATH = "instance-reboot"
2087   HTYPE = constants.HTYPE_INSTANCE
2088   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2089
2090   def BuildHooksEnv(self):
2091     """Build hooks env.
2092
2093     This runs on master, primary and secondary nodes of the instance.
2094
2095     """
2096     env = {
2097       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2098       }
2099     env.update(_BuildInstanceHookEnvByObject(self.instance))
2100     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2101           list(self.instance.secondary_nodes))
2102     return env, nl, nl
2103
2104   def CheckPrereq(self):
2105     """Check prerequisites.
2106
2107     This checks that the instance is in the cluster.
2108
2109     """
2110     instance = self.cfg.GetInstanceInfo(
2111       self.cfg.ExpandInstanceName(self.op.instance_name))
2112     if instance is None:
2113       raise errors.OpPrereqError("Instance '%s' not known" %
2114                                  self.op.instance_name)
2115
2116     # check bridges existance
2117     _CheckInstanceBridgesExist(instance)
2118
2119     self.instance = instance
2120     self.op.instance_name = instance.name
2121
2122   def Exec(self, feedback_fn):
2123     """Reboot the instance.
2124
2125     """
2126     instance = self.instance
2127     ignore_secondaries = self.op.ignore_secondaries
2128     reboot_type = self.op.reboot_type
2129     extra_args = getattr(self.op, "extra_args", "")
2130
2131     node_current = instance.primary_node
2132
2133     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2134                            constants.INSTANCE_REBOOT_HARD,
2135                            constants.INSTANCE_REBOOT_FULL]:
2136       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2137                                   (constants.INSTANCE_REBOOT_SOFT,
2138                                    constants.INSTANCE_REBOOT_HARD,
2139                                    constants.INSTANCE_REBOOT_FULL))
2140
2141     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2142                        constants.INSTANCE_REBOOT_HARD]:
2143       if not rpc.call_instance_reboot(node_current, instance,
2144                                       reboot_type, extra_args):
2145         raise errors.OpExecError("Could not reboot instance")
2146     else:
2147       if not rpc.call_instance_shutdown(node_current, instance):
2148         raise errors.OpExecError("could not shutdown instance for full reboot")
2149       _ShutdownInstanceDisks(instance, self.cfg)
2150       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2151       if not rpc.call_instance_start(node_current, instance, extra_args):
2152         _ShutdownInstanceDisks(instance, self.cfg)
2153         raise errors.OpExecError("Could not start instance for full reboot")
2154
2155     self.cfg.MarkInstanceUp(instance.name)
2156
2157
2158 class LUShutdownInstance(LogicalUnit):
2159   """Shutdown an instance.
2160
2161   """
2162   HPATH = "instance-stop"
2163   HTYPE = constants.HTYPE_INSTANCE
2164   _OP_REQP = ["instance_name"]
2165
2166   def BuildHooksEnv(self):
2167     """Build hooks env.
2168
2169     This runs on master, primary and secondary nodes of the instance.
2170
2171     """
2172     env = _BuildInstanceHookEnvByObject(self.instance)
2173     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2174           list(self.instance.secondary_nodes))
2175     return env, nl, nl
2176
2177   def CheckPrereq(self):
2178     """Check prerequisites.
2179
2180     This checks that the instance is in the cluster.
2181
2182     """
2183     instance = self.cfg.GetInstanceInfo(
2184       self.cfg.ExpandInstanceName(self.op.instance_name))
2185     if instance is None:
2186       raise errors.OpPrereqError("Instance '%s' not known" %
2187                                  self.op.instance_name)
2188     self.instance = instance
2189
2190   def Exec(self, feedback_fn):
2191     """Shutdown the instance.
2192
2193     """
2194     instance = self.instance
2195     node_current = instance.primary_node
2196     self.cfg.MarkInstanceDown(instance.name)
2197     if not rpc.call_instance_shutdown(node_current, instance):
2198       logger.Error("could not shutdown instance")
2199
2200     _ShutdownInstanceDisks(instance, self.cfg)
2201
2202
2203 class LUReinstallInstance(LogicalUnit):
2204   """Reinstall an instance.
2205
2206   """
2207   HPATH = "instance-reinstall"
2208   HTYPE = constants.HTYPE_INSTANCE
2209   _OP_REQP = ["instance_name"]
2210
2211   def BuildHooksEnv(self):
2212     """Build hooks env.
2213
2214     This runs on master, primary and secondary nodes of the instance.
2215
2216     """
2217     env = _BuildInstanceHookEnvByObject(self.instance)
2218     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2219           list(self.instance.secondary_nodes))
2220     return env, nl, nl
2221
2222   def CheckPrereq(self):
2223     """Check prerequisites.
2224
2225     This checks that the instance is in the cluster and is not running.
2226
2227     """
2228     instance = self.cfg.GetInstanceInfo(
2229       self.cfg.ExpandInstanceName(self.op.instance_name))
2230     if instance is None:
2231       raise errors.OpPrereqError("Instance '%s' not known" %
2232                                  self.op.instance_name)
2233     if instance.disk_template == constants.DT_DISKLESS:
2234       raise errors.OpPrereqError("Instance '%s' has no disks" %
2235                                  self.op.instance_name)
2236     if instance.status != "down":
2237       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2238                                  self.op.instance_name)
2239     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2240     if remote_info:
2241       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2242                                  (self.op.instance_name,
2243                                   instance.primary_node))
2244
2245     self.op.os_type = getattr(self.op, "os_type", None)
2246     if self.op.os_type is not None:
2247       # OS verification
2248       pnode = self.cfg.GetNodeInfo(
2249         self.cfg.ExpandNodeName(instance.primary_node))
2250       if pnode is None:
2251         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2252                                    self.op.pnode)
2253       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2254       if not os_obj:
2255         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2256                                    " primary node"  % self.op.os_type)
2257
2258     self.instance = instance
2259
2260   def Exec(self, feedback_fn):
2261     """Reinstall the instance.
2262
2263     """
2264     inst = self.instance
2265
2266     if self.op.os_type is not None:
2267       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2268       inst.os = self.op.os_type
2269       self.cfg.AddInstance(inst)
2270
2271     _StartInstanceDisks(self.cfg, inst, None)
2272     try:
2273       feedback_fn("Running the instance OS create scripts...")
2274       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2275         raise errors.OpExecError("Could not install OS for instance %s"
2276                                  " on node %s" %
2277                                  (inst.name, inst.primary_node))
2278     finally:
2279       _ShutdownInstanceDisks(inst, self.cfg)
2280
2281
2282 class LURenameInstance(LogicalUnit):
2283   """Rename an instance.
2284
2285   """
2286   HPATH = "instance-rename"
2287   HTYPE = constants.HTYPE_INSTANCE
2288   _OP_REQP = ["instance_name", "new_name"]
2289
2290   def BuildHooksEnv(self):
2291     """Build hooks env.
2292
2293     This runs on master, primary and secondary nodes of the instance.
2294
2295     """
2296     env = _BuildInstanceHookEnvByObject(self.instance)
2297     env["INSTANCE_NEW_NAME"] = self.op.new_name
2298     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2299           list(self.instance.secondary_nodes))
2300     return env, nl, nl
2301
2302   def CheckPrereq(self):
2303     """Check prerequisites.
2304
2305     This checks that the instance is in the cluster and is not running.
2306
2307     """
2308     instance = self.cfg.GetInstanceInfo(
2309       self.cfg.ExpandInstanceName(self.op.instance_name))
2310     if instance is None:
2311       raise errors.OpPrereqError("Instance '%s' not known" %
2312                                  self.op.instance_name)
2313     if instance.status != "down":
2314       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2315                                  self.op.instance_name)
2316     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2317     if remote_info:
2318       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2319                                  (self.op.instance_name,
2320                                   instance.primary_node))
2321     self.instance = instance
2322
2323     # new name verification
2324     name_info = utils.HostInfo(self.op.new_name)
2325
2326     self.op.new_name = new_name = name_info.name
2327     instance_list = self.cfg.GetInstanceList()
2328     if new_name in instance_list:
2329       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2330                                  instance_name)
2331
2332     if not getattr(self.op, "ignore_ip", False):
2333       command = ["fping", "-q", name_info.ip]
2334       result = utils.RunCmd(command)
2335       if not result.failed:
2336         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2337                                    (name_info.ip, new_name))
2338
2339
2340   def Exec(self, feedback_fn):
2341     """Reinstall the instance.
2342
2343     """
2344     inst = self.instance
2345     old_name = inst.name
2346
2347     self.cfg.RenameInstance(inst.name, self.op.new_name)
2348
2349     # re-read the instance from the configuration after rename
2350     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2351
2352     _StartInstanceDisks(self.cfg, inst, None)
2353     try:
2354       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2355                                           "sda", "sdb"):
2356         msg = ("Could run OS rename script for instance %s on node %s (but the"
2357                " instance has been renamed in Ganeti)" %
2358                (inst.name, inst.primary_node))
2359         logger.Error(msg)
2360     finally:
2361       _ShutdownInstanceDisks(inst, self.cfg)
2362
2363
2364 class LURemoveInstance(LogicalUnit):
2365   """Remove an instance.
2366
2367   """
2368   HPATH = "instance-remove"
2369   HTYPE = constants.HTYPE_INSTANCE
2370   _OP_REQP = ["instance_name"]
2371
2372   def BuildHooksEnv(self):
2373     """Build hooks env.
2374
2375     This runs on master, primary and secondary nodes of the instance.
2376
2377     """
2378     env = _BuildInstanceHookEnvByObject(self.instance)
2379     nl = [self.sstore.GetMasterNode()]
2380     return env, nl, nl
2381
2382   def CheckPrereq(self):
2383     """Check prerequisites.
2384
2385     This checks that the instance is in the cluster.
2386
2387     """
2388     instance = self.cfg.GetInstanceInfo(
2389       self.cfg.ExpandInstanceName(self.op.instance_name))
2390     if instance is None:
2391       raise errors.OpPrereqError("Instance '%s' not known" %
2392                                  self.op.instance_name)
2393     self.instance = instance
2394
2395   def Exec(self, feedback_fn):
2396     """Remove the instance.
2397
2398     """
2399     instance = self.instance
2400     logger.Info("shutting down instance %s on node %s" %
2401                 (instance.name, instance.primary_node))
2402
2403     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2404       if self.op.ignore_failures:
2405         feedback_fn("Warning: can't shutdown instance")
2406       else:
2407         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2408                                  (instance.name, instance.primary_node))
2409
2410     logger.Info("removing block devices for instance %s" % instance.name)
2411
2412     if not _RemoveDisks(instance, self.cfg):
2413       if self.op.ignore_failures:
2414         feedback_fn("Warning: can't remove instance's disks")
2415       else:
2416         raise errors.OpExecError("Can't remove instance's disks")
2417
2418     logger.Info("removing instance %s out of cluster config" % instance.name)
2419
2420     self.cfg.RemoveInstance(instance.name)
2421
2422
2423 class LUQueryInstances(NoHooksLU):
2424   """Logical unit for querying instances.
2425
2426   """
2427   _OP_REQP = ["output_fields", "names"]
2428
2429   def CheckPrereq(self):
2430     """Check prerequisites.
2431
2432     This checks that the fields required are valid output fields.
2433
2434     """
2435     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2436     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2437                                "admin_state", "admin_ram",
2438                                "disk_template", "ip", "mac", "bridge",
2439                                "sda_size", "sdb_size", "vcpus"],
2440                        dynamic=self.dynamic_fields,
2441                        selected=self.op.output_fields)
2442
2443     self.wanted = _GetWantedInstances(self, self.op.names)
2444
2445   def Exec(self, feedback_fn):
2446     """Computes the list of nodes and their attributes.
2447
2448     """
2449     instance_names = self.wanted
2450     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2451                      in instance_names]
2452
2453     # begin data gathering
2454
2455     nodes = frozenset([inst.primary_node for inst in instance_list])
2456
2457     bad_nodes = []
2458     if self.dynamic_fields.intersection(self.op.output_fields):
2459       live_data = {}
2460       node_data = rpc.call_all_instances_info(nodes)
2461       for name in nodes:
2462         result = node_data[name]
2463         if result:
2464           live_data.update(result)
2465         elif result == False:
2466           bad_nodes.append(name)
2467         # else no instance is alive
2468     else:
2469       live_data = dict([(name, {}) for name in instance_names])
2470
2471     # end data gathering
2472
2473     output = []
2474     for instance in instance_list:
2475       iout = []
2476       for field in self.op.output_fields:
2477         if field == "name":
2478           val = instance.name
2479         elif field == "os":
2480           val = instance.os
2481         elif field == "pnode":
2482           val = instance.primary_node
2483         elif field == "snodes":
2484           val = list(instance.secondary_nodes)
2485         elif field == "admin_state":
2486           val = (instance.status != "down")
2487         elif field == "oper_state":
2488           if instance.primary_node in bad_nodes:
2489             val = None
2490           else:
2491             val = bool(live_data.get(instance.name))
2492         elif field == "status":
2493           if instance.primary_node in bad_nodes:
2494             val = "ERROR_nodedown"
2495           else:
2496             running = bool(live_data.get(instance.name))
2497             if running:
2498               if instance.status != "down":
2499                 val = "running"
2500               else:
2501                 val = "ERROR_up"
2502             else:
2503               if instance.status != "down":
2504                 val = "ERROR_down"
2505               else:
2506                 val = "ADMIN_down"
2507         elif field == "admin_ram":
2508           val = instance.memory
2509         elif field == "oper_ram":
2510           if instance.primary_node in bad_nodes:
2511             val = None
2512           elif instance.name in live_data:
2513             val = live_data[instance.name].get("memory", "?")
2514           else:
2515             val = "-"
2516         elif field == "disk_template":
2517           val = instance.disk_template
2518         elif field == "ip":
2519           val = instance.nics[0].ip
2520         elif field == "bridge":
2521           val = instance.nics[0].bridge
2522         elif field == "mac":
2523           val = instance.nics[0].mac
2524         elif field == "sda_size" or field == "sdb_size":
2525           disk = instance.FindDisk(field[:3])
2526           if disk is None:
2527             val = None
2528           else:
2529             val = disk.size
2530         elif field == "vcpus":
2531           val = instance.vcpus
2532         else:
2533           raise errors.ParameterError(field)
2534         iout.append(val)
2535       output.append(iout)
2536
2537     return output
2538
2539
2540 class LUFailoverInstance(LogicalUnit):
2541   """Failover an instance.
2542
2543   """
2544   HPATH = "instance-failover"
2545   HTYPE = constants.HTYPE_INSTANCE
2546   _OP_REQP = ["instance_name", "ignore_consistency"]
2547
2548   def BuildHooksEnv(self):
2549     """Build hooks env.
2550
2551     This runs on master, primary and secondary nodes of the instance.
2552
2553     """
2554     env = {
2555       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2556       }
2557     env.update(_BuildInstanceHookEnvByObject(self.instance))
2558     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2559     return env, nl, nl
2560
2561   def CheckPrereq(self):
2562     """Check prerequisites.
2563
2564     This checks that the instance is in the cluster.
2565
2566     """
2567     instance = self.cfg.GetInstanceInfo(
2568       self.cfg.ExpandInstanceName(self.op.instance_name))
2569     if instance is None:
2570       raise errors.OpPrereqError("Instance '%s' not known" %
2571                                  self.op.instance_name)
2572
2573     if instance.disk_template not in constants.DTS_NET_MIRROR:
2574       raise errors.OpPrereqError("Instance's disk layout is not"
2575                                  " network mirrored, cannot failover.")
2576
2577     secondary_nodes = instance.secondary_nodes
2578     if not secondary_nodes:
2579       raise errors.ProgrammerError("no secondary node but using "
2580                                    "DT_REMOTE_RAID1 template")
2581
2582     target_node = secondary_nodes[0]
2583     # check memory requirements on the secondary node
2584     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2585                          instance.name, instance.memory)
2586
2587     # check bridge existance
2588     brlist = [nic.bridge for nic in instance.nics]
2589     if not rpc.call_bridges_exist(target_node, brlist):
2590       raise errors.OpPrereqError("One or more target bridges %s does not"
2591                                  " exist on destination node '%s'" %
2592                                  (brlist, target_node))
2593
2594     self.instance = instance
2595
2596   def Exec(self, feedback_fn):
2597     """Failover an instance.
2598
2599     The failover is done by shutting it down on its present node and
2600     starting it on the secondary.
2601
2602     """
2603     instance = self.instance
2604
2605     source_node = instance.primary_node
2606     target_node = instance.secondary_nodes[0]
2607
2608     feedback_fn("* checking disk consistency between source and target")
2609     for dev in instance.disks:
2610       # for remote_raid1, these are md over drbd
2611       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2612         if instance.status == "up" and not self.op.ignore_consistency:
2613           raise errors.OpExecError("Disk %s is degraded on target node,"
2614                                    " aborting failover." % dev.iv_name)
2615
2616     feedback_fn("* shutting down instance on source node")
2617     logger.Info("Shutting down instance %s on node %s" %
2618                 (instance.name, source_node))
2619
2620     if not rpc.call_instance_shutdown(source_node, instance):
2621       if self.op.ignore_consistency:
2622         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2623                      " anyway. Please make sure node %s is down"  %
2624                      (instance.name, source_node, source_node))
2625       else:
2626         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2627                                  (instance.name, source_node))
2628
2629     feedback_fn("* deactivating the instance's disks on source node")
2630     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2631       raise errors.OpExecError("Can't shut down the instance's disks.")
2632
2633     instance.primary_node = target_node
2634     # distribute new instance config to the other nodes
2635     self.cfg.AddInstance(instance)
2636
2637     # Only start the instance if it's marked as up
2638     if instance.status == "up":
2639       feedback_fn("* activating the instance's disks on target node")
2640       logger.Info("Starting instance %s on node %s" %
2641                   (instance.name, target_node))
2642
2643       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2644                                                ignore_secondaries=True)
2645       if not disks_ok:
2646         _ShutdownInstanceDisks(instance, self.cfg)
2647         raise errors.OpExecError("Can't activate the instance's disks")
2648
2649       feedback_fn("* starting the instance on the target node")
2650       if not rpc.call_instance_start(target_node, instance, None):
2651         _ShutdownInstanceDisks(instance, self.cfg)
2652         raise errors.OpExecError("Could not start instance %s on node %s." %
2653                                  (instance.name, target_node))
2654
2655
2656 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2657   """Create a tree of block devices on the primary node.
2658
2659   This always creates all devices.
2660
2661   """
2662   if device.children:
2663     for child in device.children:
2664       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2665         return False
2666
2667   cfg.SetDiskID(device, node)
2668   new_id = rpc.call_blockdev_create(node, device, device.size,
2669                                     instance.name, True, info)
2670   if not new_id:
2671     return False
2672   if device.physical_id is None:
2673     device.physical_id = new_id
2674   return True
2675
2676
2677 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2678   """Create a tree of block devices on a secondary node.
2679
2680   If this device type has to be created on secondaries, create it and
2681   all its children.
2682
2683   If not, just recurse to children keeping the same 'force' value.
2684
2685   """
2686   if device.CreateOnSecondary():
2687     force = True
2688   if device.children:
2689     for child in device.children:
2690       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2691                                         child, force, info):
2692         return False
2693
2694   if not force:
2695     return True
2696   cfg.SetDiskID(device, node)
2697   new_id = rpc.call_blockdev_create(node, device, device.size,
2698                                     instance.name, False, info)
2699   if not new_id:
2700     return False
2701   if device.physical_id is None:
2702     device.physical_id = new_id
2703   return True
2704
2705
2706 def _GenerateUniqueNames(cfg, exts):
2707   """Generate a suitable LV name.
2708
2709   This will generate a logical volume name for the given instance.
2710
2711   """
2712   results = []
2713   for val in exts:
2714     new_id = cfg.GenerateUniqueID()
2715     results.append("%s%s" % (new_id, val))
2716   return results
2717
2718
2719 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2720   """Generate a drbd device complete with its children.
2721
2722   """
2723   port = cfg.AllocatePort()
2724   vgname = cfg.GetVGName()
2725   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2726                           logical_id=(vgname, names[0]))
2727   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2728                           logical_id=(vgname, names[1]))
2729   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2730                           logical_id = (primary, secondary, port),
2731                           children = [dev_data, dev_meta])
2732   return drbd_dev
2733
2734
2735 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2736   """Generate a drbd8 device complete with its children.
2737
2738   """
2739   port = cfg.AllocatePort()
2740   vgname = cfg.GetVGName()
2741   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2742                           logical_id=(vgname, names[0]))
2743   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2744                           logical_id=(vgname, names[1]))
2745   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2746                           logical_id = (primary, secondary, port),
2747                           children = [dev_data, dev_meta],
2748                           iv_name=iv_name)
2749   return drbd_dev
2750
2751 def _GenerateDiskTemplate(cfg, template_name,
2752                           instance_name, primary_node,
2753                           secondary_nodes, disk_sz, swap_sz):
2754   """Generate the entire disk layout for a given template type.
2755
2756   """
2757   #TODO: compute space requirements
2758
2759   vgname = cfg.GetVGName()
2760   if template_name == constants.DT_DISKLESS:
2761     disks = []
2762   elif template_name == constants.DT_PLAIN:
2763     if len(secondary_nodes) != 0:
2764       raise errors.ProgrammerError("Wrong template configuration")
2765
2766     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2767     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2768                            logical_id=(vgname, names[0]),
2769                            iv_name = "sda")
2770     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2771                            logical_id=(vgname, names[1]),
2772                            iv_name = "sdb")
2773     disks = [sda_dev, sdb_dev]
2774   elif template_name == constants.DT_LOCAL_RAID1:
2775     if len(secondary_nodes) != 0:
2776       raise errors.ProgrammerError("Wrong template configuration")
2777
2778
2779     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2780                                        ".sdb_m1", ".sdb_m2"])
2781     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2782                               logical_id=(vgname, names[0]))
2783     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2784                               logical_id=(vgname, names[1]))
2785     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2786                               size=disk_sz,
2787                               children = [sda_dev_m1, sda_dev_m2])
2788     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2789                               logical_id=(vgname, names[2]))
2790     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2791                               logical_id=(vgname, names[3]))
2792     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2793                               size=swap_sz,
2794                               children = [sdb_dev_m1, sdb_dev_m2])
2795     disks = [md_sda_dev, md_sdb_dev]
2796   elif template_name == constants.DT_REMOTE_RAID1:
2797     if len(secondary_nodes) != 1:
2798       raise errors.ProgrammerError("Wrong template configuration")
2799     remote_node = secondary_nodes[0]
2800     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2801                                        ".sdb_data", ".sdb_meta"])
2802     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2803                                          disk_sz, names[0:2])
2804     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2805                               children = [drbd_sda_dev], size=disk_sz)
2806     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2807                                          swap_sz, names[2:4])
2808     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2809                               children = [drbd_sdb_dev], size=swap_sz)
2810     disks = [md_sda_dev, md_sdb_dev]
2811   elif template_name == constants.DT_DRBD8:
2812     if len(secondary_nodes) != 1:
2813       raise errors.ProgrammerError("Wrong template configuration")
2814     remote_node = secondary_nodes[0]
2815     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2816                                        ".sdb_data", ".sdb_meta"])
2817     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2818                                          disk_sz, names[0:2], "sda")
2819     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2820                                          swap_sz, names[2:4], "sdb")
2821     disks = [drbd_sda_dev, drbd_sdb_dev]
2822   else:
2823     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2824   return disks
2825
2826
2827 def _GetInstanceInfoText(instance):
2828   """Compute that text that should be added to the disk's metadata.
2829
2830   """
2831   return "originstname+%s" % instance.name
2832
2833
2834 def _CreateDisks(cfg, instance):
2835   """Create all disks for an instance.
2836
2837   This abstracts away some work from AddInstance.
2838
2839   Args:
2840     instance: the instance object
2841
2842   Returns:
2843     True or False showing the success of the creation process
2844
2845   """
2846   info = _GetInstanceInfoText(instance)
2847
2848   for device in instance.disks:
2849     logger.Info("creating volume %s for instance %s" %
2850               (device.iv_name, instance.name))
2851     #HARDCODE
2852     for secondary_node in instance.secondary_nodes:
2853       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2854                                         device, False, info):
2855         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2856                      (device.iv_name, device, secondary_node))
2857         return False
2858     #HARDCODE
2859     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2860                                     instance, device, info):
2861       logger.Error("failed to create volume %s on primary!" %
2862                    device.iv_name)
2863       return False
2864   return True
2865
2866
2867 def _RemoveDisks(instance, cfg):
2868   """Remove all disks for an instance.
2869
2870   This abstracts away some work from `AddInstance()` and
2871   `RemoveInstance()`. Note that in case some of the devices couldn't
2872   be removed, the removal will continue with the other ones (compare
2873   with `_CreateDisks()`).
2874
2875   Args:
2876     instance: the instance object
2877
2878   Returns:
2879     True or False showing the success of the removal proces
2880
2881   """
2882   logger.Info("removing block devices for instance %s" % instance.name)
2883
2884   result = True
2885   for device in instance.disks:
2886     for node, disk in device.ComputeNodeTree(instance.primary_node):
2887       cfg.SetDiskID(disk, node)
2888       if not rpc.call_blockdev_remove(node, disk):
2889         logger.Error("could not remove block device %s on node %s,"
2890                      " continuing anyway" %
2891                      (device.iv_name, node))
2892         result = False
2893   return result
2894
2895
2896 class LUCreateInstance(LogicalUnit):
2897   """Create an instance.
2898
2899   """
2900   HPATH = "instance-add"
2901   HTYPE = constants.HTYPE_INSTANCE
2902   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2903               "disk_template", "swap_size", "mode", "start", "vcpus",
2904               "wait_for_sync", "ip_check", "mac"]
2905
2906   def BuildHooksEnv(self):
2907     """Build hooks env.
2908
2909     This runs on master, primary and secondary nodes of the instance.
2910
2911     """
2912     env = {
2913       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2914       "INSTANCE_DISK_SIZE": self.op.disk_size,
2915       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2916       "INSTANCE_ADD_MODE": self.op.mode,
2917       }
2918     if self.op.mode == constants.INSTANCE_IMPORT:
2919       env["INSTANCE_SRC_NODE"] = self.op.src_node
2920       env["INSTANCE_SRC_PATH"] = self.op.src_path
2921       env["INSTANCE_SRC_IMAGE"] = self.src_image
2922
2923     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2924       primary_node=self.op.pnode,
2925       secondary_nodes=self.secondaries,
2926       status=self.instance_status,
2927       os_type=self.op.os_type,
2928       memory=self.op.mem_size,
2929       vcpus=self.op.vcpus,
2930       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2931     ))
2932
2933     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2934           self.secondaries)
2935     return env, nl, nl
2936
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     """
2942     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2943       if not hasattr(self.op, attr):
2944         setattr(self.op, attr, None)
2945
2946     if self.op.mode not in (constants.INSTANCE_CREATE,
2947                             constants.INSTANCE_IMPORT):
2948       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2949                                  self.op.mode)
2950
2951     if self.op.mode == constants.INSTANCE_IMPORT:
2952       src_node = getattr(self.op, "src_node", None)
2953       src_path = getattr(self.op, "src_path", None)
2954       if src_node is None or src_path is None:
2955         raise errors.OpPrereqError("Importing an instance requires source"
2956                                    " node and path options")
2957       src_node_full = self.cfg.ExpandNodeName(src_node)
2958       if src_node_full is None:
2959         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2960       self.op.src_node = src_node = src_node_full
2961
2962       if not os.path.isabs(src_path):
2963         raise errors.OpPrereqError("The source path must be absolute")
2964
2965       export_info = rpc.call_export_info(src_node, src_path)
2966
2967       if not export_info:
2968         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2969
2970       if not export_info.has_section(constants.INISECT_EXP):
2971         raise errors.ProgrammerError("Corrupted export config")
2972
2973       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2974       if (int(ei_version) != constants.EXPORT_VERSION):
2975         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2976                                    (ei_version, constants.EXPORT_VERSION))
2977
2978       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2979         raise errors.OpPrereqError("Can't import instance with more than"
2980                                    " one data disk")
2981
2982       # FIXME: are the old os-es, disk sizes, etc. useful?
2983       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2984       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2985                                                          'disk0_dump'))
2986       self.src_image = diskimage
2987     else: # INSTANCE_CREATE
2988       if getattr(self.op, "os_type", None) is None:
2989         raise errors.OpPrereqError("No guest OS specified")
2990
2991     # check primary node
2992     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2993     if pnode is None:
2994       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2995                                  self.op.pnode)
2996     self.op.pnode = pnode.name
2997     self.pnode = pnode
2998     self.secondaries = []
2999     # disk template and mirror node verification
3000     if self.op.disk_template not in constants.DISK_TEMPLATES:
3001       raise errors.OpPrereqError("Invalid disk template name")
3002
3003     if self.op.disk_template in constants.DTS_NET_MIRROR:
3004       if getattr(self.op, "snode", None) is None:
3005         raise errors.OpPrereqError("The networked disk templates need"
3006                                    " a mirror node")
3007
3008       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3009       if snode_name is None:
3010         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3011                                    self.op.snode)
3012       elif snode_name == pnode.name:
3013         raise errors.OpPrereqError("The secondary node cannot be"
3014                                    " the primary node.")
3015       self.secondaries.append(snode_name)
3016
3017     # Required free disk space as a function of disk and swap space
3018     req_size_dict = {
3019       constants.DT_DISKLESS: None,
3020       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3021       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3022       # 256 MB are added for drbd metadata, 128MB for each drbd device
3023       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3024       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3025     }
3026
3027     if self.op.disk_template not in req_size_dict:
3028       raise errors.ProgrammerError("Disk template '%s' size requirement"
3029                                    " is unknown" %  self.op.disk_template)
3030
3031     req_size = req_size_dict[self.op.disk_template]
3032
3033     # Check lv size requirements
3034     if req_size is not None:
3035       nodenames = [pnode.name] + self.secondaries
3036       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3037       for node in nodenames:
3038         info = nodeinfo.get(node, None)
3039         if not info:
3040           raise errors.OpPrereqError("Cannot get current information"
3041                                      " from node '%s'" % nodeinfo)
3042         vg_free = info.get('vg_free', None)
3043         if not isinstance(vg_free, int):
3044           raise errors.OpPrereqError("Can't compute free disk space on"
3045                                      " node %s" % node)
3046         if req_size > info['vg_free']:
3047           raise errors.OpPrereqError("Not enough disk space on target node %s."
3048                                      " %d MB available, %d MB required" %
3049                                      (node, info['vg_free'], req_size))
3050
3051     # os verification
3052     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3053     if not os_obj:
3054       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3055                                  " primary node"  % self.op.os_type)
3056
3057     if self.op.kernel_path == constants.VALUE_NONE:
3058       raise errors.OpPrereqError("Can't set instance kernel to none")
3059
3060     # instance verification
3061     hostname1 = utils.HostInfo(self.op.instance_name)
3062
3063     self.op.instance_name = instance_name = hostname1.name
3064     instance_list = self.cfg.GetInstanceList()
3065     if instance_name in instance_list:
3066       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3067                                  instance_name)
3068
3069     ip = getattr(self.op, "ip", None)
3070     if ip is None or ip.lower() == "none":
3071       inst_ip = None
3072     elif ip.lower() == "auto":
3073       inst_ip = hostname1.ip
3074     else:
3075       if not utils.IsValidIP(ip):
3076         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3077                                    " like a valid IP" % ip)
3078       inst_ip = ip
3079     self.inst_ip = inst_ip
3080
3081     if self.op.start and not self.op.ip_check:
3082       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3083                                  " adding an instance in start mode")
3084
3085     if self.op.ip_check:
3086       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3087         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3088                                    (hostname1.ip, instance_name))
3089
3090     # MAC address verification
3091     if self.op.mac != "auto":
3092       if not utils.IsValidMac(self.op.mac.lower()):
3093         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3094                                    self.op.mac)
3095
3096     # bridge verification
3097     bridge = getattr(self.op, "bridge", None)
3098     if bridge is None:
3099       self.op.bridge = self.cfg.GetDefBridge()
3100     else:
3101       self.op.bridge = bridge
3102
3103     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3104       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3105                                  " destination node '%s'" %
3106                                  (self.op.bridge, pnode.name))
3107
3108     # boot order verification
3109     if self.op.hvm_boot_order is not None:
3110       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3111         raise errors.OpPrereqError("invalid boot order specified,"
3112                                    " must be one or more of [acdn]")
3113
3114     if self.op.start:
3115       self.instance_status = 'up'
3116     else:
3117       self.instance_status = 'down'
3118
3119   def Exec(self, feedback_fn):
3120     """Create and add the instance to the cluster.
3121
3122     """
3123     instance = self.op.instance_name
3124     pnode_name = self.pnode.name
3125
3126     if self.op.mac == "auto":
3127       mac_address = self.cfg.GenerateMAC()
3128     else:
3129       mac_address = self.op.mac
3130
3131     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3132     if self.inst_ip is not None:
3133       nic.ip = self.inst_ip
3134
3135     ht_kind = self.sstore.GetHypervisorType()
3136     if ht_kind in constants.HTS_REQ_PORT:
3137       network_port = self.cfg.AllocatePort()
3138     else:
3139       network_port = None
3140
3141     disks = _GenerateDiskTemplate(self.cfg,
3142                                   self.op.disk_template,
3143                                   instance, pnode_name,
3144                                   self.secondaries, self.op.disk_size,
3145                                   self.op.swap_size)
3146
3147     iobj = objects.Instance(name=instance, os=self.op.os_type,
3148                             primary_node=pnode_name,
3149                             memory=self.op.mem_size,
3150                             vcpus=self.op.vcpus,
3151                             nics=[nic], disks=disks,
3152                             disk_template=self.op.disk_template,
3153                             status=self.instance_status,
3154                             network_port=network_port,
3155                             kernel_path=self.op.kernel_path,
3156                             initrd_path=self.op.initrd_path,
3157                             hvm_boot_order=self.op.hvm_boot_order,
3158                             )
3159
3160     feedback_fn("* creating instance disks...")
3161     if not _CreateDisks(self.cfg, iobj):
3162       _RemoveDisks(iobj, self.cfg)
3163       raise errors.OpExecError("Device creation failed, reverting...")
3164
3165     feedback_fn("adding instance %s to cluster config" % instance)
3166
3167     self.cfg.AddInstance(iobj)
3168
3169     if self.op.wait_for_sync:
3170       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3171     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3172       # make sure the disks are not degraded (still sync-ing is ok)
3173       time.sleep(15)
3174       feedback_fn("* checking mirrors status")
3175       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3176     else:
3177       disk_abort = False
3178
3179     if disk_abort:
3180       _RemoveDisks(iobj, self.cfg)
3181       self.cfg.RemoveInstance(iobj.name)
3182       raise errors.OpExecError("There are some degraded disks for"
3183                                " this instance")
3184
3185     feedback_fn("creating os for instance %s on node %s" %
3186                 (instance, pnode_name))
3187
3188     if iobj.disk_template != constants.DT_DISKLESS:
3189       if self.op.mode == constants.INSTANCE_CREATE:
3190         feedback_fn("* running the instance OS create scripts...")
3191         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3192           raise errors.OpExecError("could not add os for instance %s"
3193                                    " on node %s" %
3194                                    (instance, pnode_name))
3195
3196       elif self.op.mode == constants.INSTANCE_IMPORT:
3197         feedback_fn("* running the instance OS import scripts...")
3198         src_node = self.op.src_node
3199         src_image = self.src_image
3200         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3201                                                 src_node, src_image):
3202           raise errors.OpExecError("Could not import os for instance"
3203                                    " %s on node %s" %
3204                                    (instance, pnode_name))
3205       else:
3206         # also checked in the prereq part
3207         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3208                                      % self.op.mode)
3209
3210     if self.op.start:
3211       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3212       feedback_fn("* starting instance...")
3213       if not rpc.call_instance_start(pnode_name, iobj, None):
3214         raise errors.OpExecError("Could not start instance")
3215
3216
3217 class LUConnectConsole(NoHooksLU):
3218   """Connect to an instance's console.
3219
3220   This is somewhat special in that it returns the command line that
3221   you need to run on the master node in order to connect to the
3222   console.
3223
3224   """
3225   _OP_REQP = ["instance_name"]
3226
3227   def CheckPrereq(self):
3228     """Check prerequisites.
3229
3230     This checks that the instance is in the cluster.
3231
3232     """
3233     instance = self.cfg.GetInstanceInfo(
3234       self.cfg.ExpandInstanceName(self.op.instance_name))
3235     if instance is None:
3236       raise errors.OpPrereqError("Instance '%s' not known" %
3237                                  self.op.instance_name)
3238     self.instance = instance
3239
3240   def Exec(self, feedback_fn):
3241     """Connect to the console of an instance
3242
3243     """
3244     instance = self.instance
3245     node = instance.primary_node
3246
3247     node_insts = rpc.call_instance_list([node])[node]
3248     if node_insts is False:
3249       raise errors.OpExecError("Can't connect to node %s." % node)
3250
3251     if instance.name not in node_insts:
3252       raise errors.OpExecError("Instance %s is not running." % instance.name)
3253
3254     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3255
3256     hyper = hypervisor.GetHypervisor()
3257     console_cmd = hyper.GetShellCommandForConsole(instance)
3258     # build ssh cmdline
3259     argv = ["ssh", "-q", "-t"]
3260     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3261     argv.extend(ssh.BATCH_MODE_OPTS)
3262     argv.append(node)
3263     argv.append(console_cmd)
3264     return "ssh", argv
3265
3266
3267 class LUAddMDDRBDComponent(LogicalUnit):
3268   """Adda new mirror member to an instance's disk.
3269
3270   """
3271   HPATH = "mirror-add"
3272   HTYPE = constants.HTYPE_INSTANCE
3273   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3274
3275   def BuildHooksEnv(self):
3276     """Build hooks env.
3277
3278     This runs on the master, the primary and all the secondaries.
3279
3280     """
3281     env = {
3282       "NEW_SECONDARY": self.op.remote_node,
3283       "DISK_NAME": self.op.disk_name,
3284       }
3285     env.update(_BuildInstanceHookEnvByObject(self.instance))
3286     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3287           self.op.remote_node,] + list(self.instance.secondary_nodes)
3288     return env, nl, nl
3289
3290   def CheckPrereq(self):
3291     """Check prerequisites.
3292
3293     This checks that the instance is in the cluster.
3294
3295     """
3296     instance = self.cfg.GetInstanceInfo(
3297       self.cfg.ExpandInstanceName(self.op.instance_name))
3298     if instance is None:
3299       raise errors.OpPrereqError("Instance '%s' not known" %
3300                                  self.op.instance_name)
3301     self.instance = instance
3302
3303     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3304     if remote_node is None:
3305       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3306     self.remote_node = remote_node
3307
3308     if remote_node == instance.primary_node:
3309       raise errors.OpPrereqError("The specified node is the primary node of"
3310                                  " the instance.")
3311
3312     if instance.disk_template != constants.DT_REMOTE_RAID1:
3313       raise errors.OpPrereqError("Instance's disk layout is not"
3314                                  " remote_raid1.")
3315     for disk in instance.disks:
3316       if disk.iv_name == self.op.disk_name:
3317         break
3318     else:
3319       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3320                                  " instance." % self.op.disk_name)
3321     if len(disk.children) > 1:
3322       raise errors.OpPrereqError("The device already has two slave devices."
3323                                  " This would create a 3-disk raid1 which we"
3324                                  " don't allow.")
3325     self.disk = disk
3326
3327   def Exec(self, feedback_fn):
3328     """Add the mirror component
3329
3330     """
3331     disk = self.disk
3332     instance = self.instance
3333
3334     remote_node = self.remote_node
3335     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3336     names = _GenerateUniqueNames(self.cfg, lv_names)
3337     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3338                                      remote_node, disk.size, names)
3339
3340     logger.Info("adding new mirror component on secondary")
3341     #HARDCODE
3342     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3343                                       new_drbd, False,
3344                                       _GetInstanceInfoText(instance)):
3345       raise errors.OpExecError("Failed to create new component on secondary"
3346                                " node %s" % remote_node)
3347
3348     logger.Info("adding new mirror component on primary")
3349     #HARDCODE
3350     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3351                                     instance, new_drbd,
3352                                     _GetInstanceInfoText(instance)):
3353       # remove secondary dev
3354       self.cfg.SetDiskID(new_drbd, remote_node)
3355       rpc.call_blockdev_remove(remote_node, new_drbd)
3356       raise errors.OpExecError("Failed to create volume on primary")
3357
3358     # the device exists now
3359     # call the primary node to add the mirror to md
3360     logger.Info("adding new mirror component to md")
3361     if not rpc.call_blockdev_addchildren(instance.primary_node,
3362                                          disk, [new_drbd]):
3363       logger.Error("Can't add mirror compoment to md!")
3364       self.cfg.SetDiskID(new_drbd, remote_node)
3365       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3366         logger.Error("Can't rollback on secondary")
3367       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3368       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3369         logger.Error("Can't rollback on primary")
3370       raise errors.OpExecError("Can't add mirror component to md array")
3371
3372     disk.children.append(new_drbd)
3373
3374     self.cfg.AddInstance(instance)
3375
3376     _WaitForSync(self.cfg, instance, self.proc)
3377
3378     return 0
3379
3380
3381 class LURemoveMDDRBDComponent(LogicalUnit):
3382   """Remove a component from a remote_raid1 disk.
3383
3384   """
3385   HPATH = "mirror-remove"
3386   HTYPE = constants.HTYPE_INSTANCE
3387   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3388
3389   def BuildHooksEnv(self):
3390     """Build hooks env.
3391
3392     This runs on the master, the primary and all the secondaries.
3393
3394     """
3395     env = {
3396       "DISK_NAME": self.op.disk_name,
3397       "DISK_ID": self.op.disk_id,
3398       "OLD_SECONDARY": self.old_secondary,
3399       }
3400     env.update(_BuildInstanceHookEnvByObject(self.instance))
3401     nl = [self.sstore.GetMasterNode(),
3402           self.instance.primary_node] + list(self.instance.secondary_nodes)
3403     return env, nl, nl
3404
3405   def CheckPrereq(self):
3406     """Check prerequisites.
3407
3408     This checks that the instance is in the cluster.
3409
3410     """
3411     instance = self.cfg.GetInstanceInfo(
3412       self.cfg.ExpandInstanceName(self.op.instance_name))
3413     if instance is None:
3414       raise errors.OpPrereqError("Instance '%s' not known" %
3415                                  self.op.instance_name)
3416     self.instance = instance
3417
3418     if instance.disk_template != constants.DT_REMOTE_RAID1:
3419       raise errors.OpPrereqError("Instance's disk layout is not"
3420                                  " remote_raid1.")
3421     for disk in instance.disks:
3422       if disk.iv_name == self.op.disk_name:
3423         break
3424     else:
3425       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3426                                  " instance." % self.op.disk_name)
3427     for child in disk.children:
3428       if (child.dev_type == constants.LD_DRBD7 and
3429           child.logical_id[2] == self.op.disk_id):
3430         break
3431     else:
3432       raise errors.OpPrereqError("Can't find the device with this port.")
3433
3434     if len(disk.children) < 2:
3435       raise errors.OpPrereqError("Cannot remove the last component from"
3436                                  " a mirror.")
3437     self.disk = disk
3438     self.child = child
3439     if self.child.logical_id[0] == instance.primary_node:
3440       oid = 1
3441     else:
3442       oid = 0
3443     self.old_secondary = self.child.logical_id[oid]
3444
3445   def Exec(self, feedback_fn):
3446     """Remove the mirror component
3447
3448     """
3449     instance = self.instance
3450     disk = self.disk
3451     child = self.child
3452     logger.Info("remove mirror component")
3453     self.cfg.SetDiskID(disk, instance.primary_node)
3454     if not rpc.call_blockdev_removechildren(instance.primary_node,
3455                                             disk, [child]):
3456       raise errors.OpExecError("Can't remove child from mirror.")
3457
3458     for node in child.logical_id[:2]:
3459       self.cfg.SetDiskID(child, node)
3460       if not rpc.call_blockdev_remove(node, child):
3461         logger.Error("Warning: failed to remove device from node %s,"
3462                      " continuing operation." % node)
3463
3464     disk.children.remove(child)
3465     self.cfg.AddInstance(instance)
3466
3467
3468 class LUReplaceDisks(LogicalUnit):
3469   """Replace the disks of an instance.
3470
3471   """
3472   HPATH = "mirrors-replace"
3473   HTYPE = constants.HTYPE_INSTANCE
3474   _OP_REQP = ["instance_name", "mode", "disks"]
3475
3476   def BuildHooksEnv(self):
3477     """Build hooks env.
3478
3479     This runs on the master, the primary and all the secondaries.
3480
3481     """
3482     env = {
3483       "MODE": self.op.mode,
3484       "NEW_SECONDARY": self.op.remote_node,
3485       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3486       }
3487     env.update(_BuildInstanceHookEnvByObject(self.instance))
3488     nl = [
3489       self.sstore.GetMasterNode(),
3490       self.instance.primary_node,
3491       ]
3492     if self.op.remote_node is not None:
3493       nl.append(self.op.remote_node)
3494     return env, nl, nl
3495
3496   def CheckPrereq(self):
3497     """Check prerequisites.
3498
3499     This checks that the instance is in the cluster.
3500
3501     """
3502     instance = self.cfg.GetInstanceInfo(
3503       self.cfg.ExpandInstanceName(self.op.instance_name))
3504     if instance is None:
3505       raise errors.OpPrereqError("Instance '%s' not known" %
3506                                  self.op.instance_name)
3507     self.instance = instance
3508     self.op.instance_name = instance.name
3509
3510     if instance.disk_template not in constants.DTS_NET_MIRROR:
3511       raise errors.OpPrereqError("Instance's disk layout is not"
3512                                  " network mirrored.")
3513
3514     if len(instance.secondary_nodes) != 1:
3515       raise errors.OpPrereqError("The instance has a strange layout,"
3516                                  " expected one secondary but found %d" %
3517                                  len(instance.secondary_nodes))
3518
3519     self.sec_node = instance.secondary_nodes[0]
3520
3521     remote_node = getattr(self.op, "remote_node", None)
3522     if remote_node is not None:
3523       remote_node = self.cfg.ExpandNodeName(remote_node)
3524       if remote_node is None:
3525         raise errors.OpPrereqError("Node '%s' not known" %
3526                                    self.op.remote_node)
3527       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3528     else:
3529       self.remote_node_info = None
3530     if remote_node == instance.primary_node:
3531       raise errors.OpPrereqError("The specified node is the primary node of"
3532                                  " the instance.")
3533     elif remote_node == self.sec_node:
3534       if self.op.mode == constants.REPLACE_DISK_SEC:
3535         # this is for DRBD8, where we can't execute the same mode of
3536         # replacement as for drbd7 (no different port allocated)
3537         raise errors.OpPrereqError("Same secondary given, cannot execute"
3538                                    " replacement")
3539       # the user gave the current secondary, switch to
3540       # 'no-replace-secondary' mode for drbd7
3541       remote_node = None
3542     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3543         self.op.mode != constants.REPLACE_DISK_ALL):
3544       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3545                                  " disks replacement, not individual ones")
3546     if instance.disk_template == constants.DT_DRBD8:
3547       if (self.op.mode == constants.REPLACE_DISK_ALL and
3548           remote_node is not None):
3549         # switch to replace secondary mode
3550         self.op.mode = constants.REPLACE_DISK_SEC
3551
3552       if self.op.mode == constants.REPLACE_DISK_ALL:
3553         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3554                                    " secondary disk replacement, not"
3555                                    " both at once")
3556       elif self.op.mode == constants.REPLACE_DISK_PRI:
3557         if remote_node is not None:
3558           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3559                                      " the secondary while doing a primary"
3560                                      " node disk replacement")
3561         self.tgt_node = instance.primary_node
3562         self.oth_node = instance.secondary_nodes[0]
3563       elif self.op.mode == constants.REPLACE_DISK_SEC:
3564         self.new_node = remote_node # this can be None, in which case
3565                                     # we don't change the secondary
3566         self.tgt_node = instance.secondary_nodes[0]
3567         self.oth_node = instance.primary_node
3568       else:
3569         raise errors.ProgrammerError("Unhandled disk replace mode")
3570
3571     for name in self.op.disks:
3572       if instance.FindDisk(name) is None:
3573         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3574                                    (name, instance.name))
3575     self.op.remote_node = remote_node
3576
3577   def _ExecRR1(self, feedback_fn):
3578     """Replace the disks of an instance.
3579
3580     """
3581     instance = self.instance
3582     iv_names = {}
3583     # start of work
3584     if self.op.remote_node is None:
3585       remote_node = self.sec_node
3586     else:
3587       remote_node = self.op.remote_node
3588     cfg = self.cfg
3589     for dev in instance.disks:
3590       size = dev.size
3591       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3592       names = _GenerateUniqueNames(cfg, lv_names)
3593       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3594                                        remote_node, size, names)
3595       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3596       logger.Info("adding new mirror component on secondary for %s" %
3597                   dev.iv_name)
3598       #HARDCODE
3599       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3600                                         new_drbd, False,
3601                                         _GetInstanceInfoText(instance)):
3602         raise errors.OpExecError("Failed to create new component on secondary"
3603                                  " node %s. Full abort, cleanup manually!" %
3604                                  remote_node)
3605
3606       logger.Info("adding new mirror component on primary")
3607       #HARDCODE
3608       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3609                                       instance, new_drbd,
3610                                       _GetInstanceInfoText(instance)):
3611         # remove secondary dev
3612         cfg.SetDiskID(new_drbd, remote_node)
3613         rpc.call_blockdev_remove(remote_node, new_drbd)
3614         raise errors.OpExecError("Failed to create volume on primary!"
3615                                  " Full abort, cleanup manually!!")
3616
3617       # the device exists now
3618       # call the primary node to add the mirror to md
3619       logger.Info("adding new mirror component to md")
3620       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3621                                            [new_drbd]):
3622         logger.Error("Can't add mirror compoment to md!")
3623         cfg.SetDiskID(new_drbd, remote_node)
3624         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3625           logger.Error("Can't rollback on secondary")
3626         cfg.SetDiskID(new_drbd, instance.primary_node)
3627         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3628           logger.Error("Can't rollback on primary")
3629         raise errors.OpExecError("Full abort, cleanup manually!!")
3630
3631       dev.children.append(new_drbd)
3632       cfg.AddInstance(instance)
3633
3634     # this can fail as the old devices are degraded and _WaitForSync
3635     # does a combined result over all disks, so we don't check its
3636     # return value
3637     _WaitForSync(cfg, instance, self.proc, unlock=True)
3638
3639     # so check manually all the devices
3640     for name in iv_names:
3641       dev, child, new_drbd = iv_names[name]
3642       cfg.SetDiskID(dev, instance.primary_node)
3643       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3644       if is_degr:
3645         raise errors.OpExecError("MD device %s is degraded!" % name)
3646       cfg.SetDiskID(new_drbd, instance.primary_node)
3647       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3648       if is_degr:
3649         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3650
3651     for name in iv_names:
3652       dev, child, new_drbd = iv_names[name]
3653       logger.Info("remove mirror %s component" % name)
3654       cfg.SetDiskID(dev, instance.primary_node)
3655       if not rpc.call_blockdev_removechildren(instance.primary_node,
3656                                               dev, [child]):
3657         logger.Error("Can't remove child from mirror, aborting"
3658                      " *this device cleanup*.\nYou need to cleanup manually!!")
3659         continue
3660
3661       for node in child.logical_id[:2]:
3662         logger.Info("remove child device on %s" % node)
3663         cfg.SetDiskID(child, node)
3664         if not rpc.call_blockdev_remove(node, child):
3665           logger.Error("Warning: failed to remove device from node %s,"
3666                        " continuing operation." % node)
3667
3668       dev.children.remove(child)
3669
3670       cfg.AddInstance(instance)
3671
3672   def _ExecD8DiskOnly(self, feedback_fn):
3673     """Replace a disk on the primary or secondary for dbrd8.
3674
3675     The algorithm for replace is quite complicated:
3676       - for each disk to be replaced:
3677         - create new LVs on the target node with unique names
3678         - detach old LVs from the drbd device
3679         - rename old LVs to name_replaced.<time_t>
3680         - rename new LVs to old LVs
3681         - attach the new LVs (with the old names now) to the drbd device
3682       - wait for sync across all devices
3683       - for each modified disk:
3684         - remove old LVs (which have the name name_replaces.<time_t>)
3685
3686     Failures are not very well handled.
3687
3688     """
3689     steps_total = 6
3690     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3691     instance = self.instance
3692     iv_names = {}
3693     vgname = self.cfg.GetVGName()
3694     # start of work
3695     cfg = self.cfg
3696     tgt_node = self.tgt_node
3697     oth_node = self.oth_node
3698
3699     # Step: check device activation
3700     self.proc.LogStep(1, steps_total, "check device existence")
3701     info("checking volume groups")
3702     my_vg = cfg.GetVGName()
3703     results = rpc.call_vg_list([oth_node, tgt_node])
3704     if not results:
3705       raise errors.OpExecError("Can't list volume groups on the nodes")
3706     for node in oth_node, tgt_node:
3707       res = results.get(node, False)
3708       if not res or my_vg not in res:
3709         raise errors.OpExecError("Volume group '%s' not found on %s" %
3710                                  (my_vg, node))
3711     for dev in instance.disks:
3712       if not dev.iv_name in self.op.disks:
3713         continue
3714       for node in tgt_node, oth_node:
3715         info("checking %s on %s" % (dev.iv_name, node))
3716         cfg.SetDiskID(dev, node)
3717         if not rpc.call_blockdev_find(node, dev):
3718           raise errors.OpExecError("Can't find device %s on node %s" %
3719                                    (dev.iv_name, node))
3720
3721     # Step: check other node consistency
3722     self.proc.LogStep(2, steps_total, "check peer consistency")
3723     for dev in instance.disks:
3724       if not dev.iv_name in self.op.disks:
3725         continue
3726       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3727       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3728                                    oth_node==instance.primary_node):
3729         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3730                                  " to replace disks on this node (%s)" %
3731                                  (oth_node, tgt_node))
3732
3733     # Step: create new storage
3734     self.proc.LogStep(3, steps_total, "allocate new storage")
3735     for dev in instance.disks:
3736       if not dev.iv_name in self.op.disks:
3737         continue
3738       size = dev.size
3739       cfg.SetDiskID(dev, tgt_node)
3740       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3741       names = _GenerateUniqueNames(cfg, lv_names)
3742       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3743                              logical_id=(vgname, names[0]))
3744       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3745                              logical_id=(vgname, names[1]))
3746       new_lvs = [lv_data, lv_meta]
3747       old_lvs = dev.children
3748       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3749       info("creating new local storage on %s for %s" %
3750            (tgt_node, dev.iv_name))
3751       # since we *always* want to create this LV, we use the
3752       # _Create...OnPrimary (which forces the creation), even if we
3753       # are talking about the secondary node
3754       for new_lv in new_lvs:
3755         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3756                                         _GetInstanceInfoText(instance)):
3757           raise errors.OpExecError("Failed to create new LV named '%s' on"
3758                                    " node '%s'" %
3759                                    (new_lv.logical_id[1], tgt_node))
3760
3761     # Step: for each lv, detach+rename*2+attach
3762     self.proc.LogStep(4, steps_total, "change drbd configuration")
3763     for dev, old_lvs, new_lvs in iv_names.itervalues():
3764       info("detaching %s drbd from local storage" % dev.iv_name)
3765       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3766         raise errors.OpExecError("Can't detach drbd from local storage on node"
3767                                  " %s for device %s" % (tgt_node, dev.iv_name))
3768       #dev.children = []
3769       #cfg.Update(instance)
3770
3771       # ok, we created the new LVs, so now we know we have the needed
3772       # storage; as such, we proceed on the target node to rename
3773       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3774       # using the assumption that logical_id == physical_id (which in
3775       # turn is the unique_id on that node)
3776
3777       # FIXME(iustin): use a better name for the replaced LVs
3778       temp_suffix = int(time.time())
3779       ren_fn = lambda d, suff: (d.physical_id[0],
3780                                 d.physical_id[1] + "_replaced-%s" % suff)
3781       # build the rename list based on what LVs exist on the node
3782       rlist = []
3783       for to_ren in old_lvs:
3784         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3785         if find_res is not None: # device exists
3786           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3787
3788       info("renaming the old LVs on the target node")
3789       if not rpc.call_blockdev_rename(tgt_node, rlist):
3790         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3791       # now we rename the new LVs to the old LVs
3792       info("renaming the new LVs on the target node")
3793       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3794       if not rpc.call_blockdev_rename(tgt_node, rlist):
3795         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3796
3797       for old, new in zip(old_lvs, new_lvs):
3798         new.logical_id = old.logical_id
3799         cfg.SetDiskID(new, tgt_node)
3800
3801       for disk in old_lvs:
3802         disk.logical_id = ren_fn(disk, temp_suffix)
3803         cfg.SetDiskID(disk, tgt_node)
3804
3805       # now that the new lvs have the old name, we can add them to the device
3806       info("adding new mirror component on %s" % tgt_node)
3807       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3808         for new_lv in new_lvs:
3809           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3810             warning("Can't rollback device %s", hint="manually cleanup unused"
3811                     " logical volumes")
3812         raise errors.OpExecError("Can't add local storage to drbd")
3813
3814       dev.children = new_lvs
3815       cfg.Update(instance)
3816
3817     # Step: wait for sync
3818
3819     # this can fail as the old devices are degraded and _WaitForSync
3820     # does a combined result over all disks, so we don't check its
3821     # return value
3822     self.proc.LogStep(5, steps_total, "sync devices")
3823     _WaitForSync(cfg, instance, self.proc, unlock=True)
3824
3825     # so check manually all the devices
3826     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3827       cfg.SetDiskID(dev, instance.primary_node)
3828       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3829       if is_degr:
3830         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3831
3832     # Step: remove old storage
3833     self.proc.LogStep(6, steps_total, "removing old storage")
3834     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3835       info("remove logical volumes for %s" % name)
3836       for lv in old_lvs:
3837         cfg.SetDiskID(lv, tgt_node)
3838         if not rpc.call_blockdev_remove(tgt_node, lv):
3839           warning("Can't remove old LV", hint="manually remove unused LVs")
3840           continue
3841
3842   def _ExecD8Secondary(self, feedback_fn):
3843     """Replace the secondary node for drbd8.
3844
3845     The algorithm for replace is quite complicated:
3846       - for all disks of the instance:
3847         - create new LVs on the new node with same names
3848         - shutdown the drbd device on the old secondary
3849         - disconnect the drbd network on the primary
3850         - create the drbd device on the new secondary
3851         - network attach the drbd on the primary, using an artifice:
3852           the drbd code for Attach() will connect to the network if it
3853           finds a device which is connected to the good local disks but
3854           not network enabled
3855       - wait for sync across all devices
3856       - remove all disks from the old secondary
3857
3858     Failures are not very well handled.
3859
3860     """
3861     steps_total = 6
3862     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3863     instance = self.instance
3864     iv_names = {}
3865     vgname = self.cfg.GetVGName()
3866     # start of work
3867     cfg = self.cfg
3868     old_node = self.tgt_node
3869     new_node = self.new_node
3870     pri_node = instance.primary_node
3871
3872     # Step: check device activation
3873     self.proc.LogStep(1, steps_total, "check device existence")
3874     info("checking volume groups")
3875     my_vg = cfg.GetVGName()
3876     results = rpc.call_vg_list([pri_node, new_node])
3877     if not results:
3878       raise errors.OpExecError("Can't list volume groups on the nodes")
3879     for node in pri_node, new_node:
3880       res = results.get(node, False)
3881       if not res or my_vg not in res:
3882         raise errors.OpExecError("Volume group '%s' not found on %s" %
3883                                  (my_vg, node))
3884     for dev in instance.disks:
3885       if not dev.iv_name in self.op.disks:
3886         continue
3887       info("checking %s on %s" % (dev.iv_name, pri_node))
3888       cfg.SetDiskID(dev, pri_node)
3889       if not rpc.call_blockdev_find(pri_node, dev):
3890         raise errors.OpExecError("Can't find device %s on node %s" %
3891                                  (dev.iv_name, pri_node))
3892
3893     # Step: check other node consistency
3894     self.proc.LogStep(2, steps_total, "check peer consistency")
3895     for dev in instance.disks:
3896       if not dev.iv_name in self.op.disks:
3897         continue
3898       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3899       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3900         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3901                                  " unsafe to replace the secondary" %
3902                                  pri_node)
3903
3904     # Step: create new storage
3905     self.proc.LogStep(3, steps_total, "allocate new storage")
3906     for dev in instance.disks:
3907       size = dev.size
3908       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3909       # since we *always* want to create this LV, we use the
3910       # _Create...OnPrimary (which forces the creation), even if we
3911       # are talking about the secondary node
3912       for new_lv in dev.children:
3913         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3914                                         _GetInstanceInfoText(instance)):
3915           raise errors.OpExecError("Failed to create new LV named '%s' on"
3916                                    " node '%s'" %
3917                                    (new_lv.logical_id[1], new_node))
3918
3919       iv_names[dev.iv_name] = (dev, dev.children)
3920
3921     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3922     for dev in instance.disks:
3923       size = dev.size
3924       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3925       # create new devices on new_node
3926       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3927                               logical_id=(pri_node, new_node,
3928                                           dev.logical_id[2]),
3929                               children=dev.children)
3930       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3931                                         new_drbd, False,
3932                                       _GetInstanceInfoText(instance)):
3933         raise errors.OpExecError("Failed to create new DRBD on"
3934                                  " node '%s'" % new_node)
3935
3936     for dev in instance.disks:
3937       # we have new devices, shutdown the drbd on the old secondary
3938       info("shutting down drbd for %s on old node" % dev.iv_name)
3939       cfg.SetDiskID(dev, old_node)
3940       if not rpc.call_blockdev_shutdown(old_node, dev):
3941         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3942                 hint="Please cleanup this device manually as soon as possible")
3943
3944     info("detaching primary drbds from the network (=> standalone)")
3945     done = 0
3946     for dev in instance.disks:
3947       cfg.SetDiskID(dev, pri_node)
3948       # set the physical (unique in bdev terms) id to None, meaning
3949       # detach from network
3950       dev.physical_id = (None,) * len(dev.physical_id)
3951       # and 'find' the device, which will 'fix' it to match the
3952       # standalone state
3953       if rpc.call_blockdev_find(pri_node, dev):
3954         done += 1
3955       else:
3956         warning("Failed to detach drbd %s from network, unusual case" %
3957                 dev.iv_name)
3958
3959     if not done:
3960       # no detaches succeeded (very unlikely)
3961       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3962
3963     # if we managed to detach at least one, we update all the disks of
3964     # the instance to point to the new secondary
3965     info("updating instance configuration")
3966     for dev in instance.disks:
3967       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3968       cfg.SetDiskID(dev, pri_node)
3969     cfg.Update(instance)
3970
3971     # and now perform the drbd attach
3972     info("attaching primary drbds to new secondary (standalone => connected)")
3973     failures = []
3974     for dev in instance.disks:
3975       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3976       # since the attach is smart, it's enough to 'find' the device,
3977       # it will automatically activate the network, if the physical_id
3978       # is correct
3979       cfg.SetDiskID(dev, pri_node)
3980       if not rpc.call_blockdev_find(pri_node, dev):
3981         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3982                 "please do a gnt-instance info to see the status of disks")
3983
3984     # this can fail as the old devices are degraded and _WaitForSync
3985     # does a combined result over all disks, so we don't check its
3986     # return value
3987     self.proc.LogStep(5, steps_total, "sync devices")
3988     _WaitForSync(cfg, instance, self.proc, unlock=True)
3989
3990     # so check manually all the devices
3991     for name, (dev, old_lvs) in iv_names.iteritems():
3992       cfg.SetDiskID(dev, pri_node)
3993       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3994       if is_degr:
3995         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3996
3997     self.proc.LogStep(6, steps_total, "removing old storage")
3998     for name, (dev, old_lvs) in iv_names.iteritems():
3999       info("remove logical volumes for %s" % name)
4000       for lv in old_lvs:
4001         cfg.SetDiskID(lv, old_node)
4002         if not rpc.call_blockdev_remove(old_node, lv):
4003           warning("Can't remove LV on old secondary",
4004                   hint="Cleanup stale volumes by hand")
4005
4006   def Exec(self, feedback_fn):
4007     """Execute disk replacement.
4008
4009     This dispatches the disk replacement to the appropriate handler.
4010
4011     """
4012     instance = self.instance
4013     if instance.disk_template == constants.DT_REMOTE_RAID1:
4014       fn = self._ExecRR1
4015     elif instance.disk_template == constants.DT_DRBD8:
4016       if self.op.remote_node is None:
4017         fn = self._ExecD8DiskOnly
4018       else:
4019         fn = self._ExecD8Secondary
4020     else:
4021       raise errors.ProgrammerError("Unhandled disk replacement case")
4022     return fn(feedback_fn)
4023
4024
4025 class LUQueryInstanceData(NoHooksLU):
4026   """Query runtime instance data.
4027
4028   """
4029   _OP_REQP = ["instances"]
4030
4031   def CheckPrereq(self):
4032     """Check prerequisites.
4033
4034     This only checks the optional instance list against the existing names.
4035
4036     """
4037     if not isinstance(self.op.instances, list):
4038       raise errors.OpPrereqError("Invalid argument type 'instances'")
4039     if self.op.instances:
4040       self.wanted_instances = []
4041       names = self.op.instances
4042       for name in names:
4043         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4044         if instance is None:
4045           raise errors.OpPrereqError("No such instance name '%s'" % name)
4046         self.wanted_instances.append(instance)
4047     else:
4048       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4049                                in self.cfg.GetInstanceList()]
4050     return
4051
4052
4053   def _ComputeDiskStatus(self, instance, snode, dev):
4054     """Compute block device status.
4055
4056     """
4057     self.cfg.SetDiskID(dev, instance.primary_node)
4058     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4059     if dev.dev_type in constants.LDS_DRBD:
4060       # we change the snode then (otherwise we use the one passed in)
4061       if dev.logical_id[0] == instance.primary_node:
4062         snode = dev.logical_id[1]
4063       else:
4064         snode = dev.logical_id[0]
4065
4066     if snode:
4067       self.cfg.SetDiskID(dev, snode)
4068       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4069     else:
4070       dev_sstatus = None
4071
4072     if dev.children:
4073       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4074                       for child in dev.children]
4075     else:
4076       dev_children = []
4077
4078     data = {
4079       "iv_name": dev.iv_name,
4080       "dev_type": dev.dev_type,
4081       "logical_id": dev.logical_id,
4082       "physical_id": dev.physical_id,
4083       "pstatus": dev_pstatus,
4084       "sstatus": dev_sstatus,
4085       "children": dev_children,
4086       }
4087
4088     return data
4089
4090   def Exec(self, feedback_fn):
4091     """Gather and return data"""
4092     result = {}
4093     for instance in self.wanted_instances:
4094       remote_info = rpc.call_instance_info(instance.primary_node,
4095                                                 instance.name)
4096       if remote_info and "state" in remote_info:
4097         remote_state = "up"
4098       else:
4099         remote_state = "down"
4100       if instance.status == "down":
4101         config_state = "down"
4102       else:
4103         config_state = "up"
4104
4105       disks = [self._ComputeDiskStatus(instance, None, device)
4106                for device in instance.disks]
4107
4108       idict = {
4109         "name": instance.name,
4110         "config_state": config_state,
4111         "run_state": remote_state,
4112         "pnode": instance.primary_node,
4113         "snodes": instance.secondary_nodes,
4114         "os": instance.os,
4115         "memory": instance.memory,
4116         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4117         "disks": disks,
4118         "network_port": instance.network_port,
4119         "vcpus": instance.vcpus,
4120         "kernel_path": instance.kernel_path,
4121         "initrd_path": instance.initrd_path,
4122         "hvm_boot_order": instance.hvm_boot_order,
4123         }
4124
4125       result[instance.name] = idict
4126
4127     return result
4128
4129
4130 class LUSetInstanceParms(LogicalUnit):
4131   """Modifies an instances's parameters.
4132
4133   """
4134   HPATH = "instance-modify"
4135   HTYPE = constants.HTYPE_INSTANCE
4136   _OP_REQP = ["instance_name"]
4137
4138   def BuildHooksEnv(self):
4139     """Build hooks env.
4140
4141     This runs on the master, primary and secondaries.
4142
4143     """
4144     args = dict()
4145     if self.mem:
4146       args['memory'] = self.mem
4147     if self.vcpus:
4148       args['vcpus'] = self.vcpus
4149     if self.do_ip or self.do_bridge or self.mac:
4150       if self.do_ip:
4151         ip = self.ip
4152       else:
4153         ip = self.instance.nics[0].ip
4154       if self.bridge:
4155         bridge = self.bridge
4156       else:
4157         bridge = self.instance.nics[0].bridge
4158       if self.mac:
4159         mac = self.mac
4160       else:
4161         mac = self.instance.nics[0].mac
4162       args['nics'] = [(ip, bridge, mac)]
4163     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4164     nl = [self.sstore.GetMasterNode(),
4165           self.instance.primary_node] + list(self.instance.secondary_nodes)
4166     return env, nl, nl
4167
4168   def CheckPrereq(self):
4169     """Check prerequisites.
4170
4171     This only checks the instance list against the existing names.
4172
4173     """
4174     self.mem = getattr(self.op, "mem", None)
4175     self.vcpus = getattr(self.op, "vcpus", None)
4176     self.ip = getattr(self.op, "ip", None)
4177     self.mac = getattr(self.op, "mac", None)
4178     self.bridge = getattr(self.op, "bridge", None)
4179     self.kernel_path = getattr(self.op, "kernel_path", None)
4180     self.initrd_path = getattr(self.op, "initrd_path", None)
4181     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4182     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4183                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4184     if all_parms.count(None) == len(all_parms):
4185       raise errors.OpPrereqError("No changes submitted")
4186     if self.mem is not None:
4187       try:
4188         self.mem = int(self.mem)
4189       except ValueError, err:
4190         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4191     if self.vcpus is not None:
4192       try:
4193         self.vcpus = int(self.vcpus)
4194       except ValueError, err:
4195         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4196     if self.ip is not None:
4197       self.do_ip = True
4198       if self.ip.lower() == "none":
4199         self.ip = None
4200       else:
4201         if not utils.IsValidIP(self.ip):
4202           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4203     else:
4204       self.do_ip = False
4205     self.do_bridge = (self.bridge is not None)
4206     if self.mac is not None:
4207       if self.cfg.IsMacInUse(self.mac):
4208         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4209                                    self.mac)
4210       if not utils.IsValidMac(self.mac):
4211         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4212
4213     if self.kernel_path is not None:
4214       self.do_kernel_path = True
4215       if self.kernel_path == constants.VALUE_NONE:
4216         raise errors.OpPrereqError("Can't set instance to no kernel")
4217
4218       if self.kernel_path != constants.VALUE_DEFAULT:
4219         if not os.path.isabs(self.kernel_path):
4220           raise errors.OpPrereqError("The kernel path must be an absolute"
4221                                     " filename")
4222     else:
4223       self.do_kernel_path = False
4224
4225     if self.initrd_path is not None:
4226       self.do_initrd_path = True
4227       if self.initrd_path not in (constants.VALUE_NONE,
4228                                   constants.VALUE_DEFAULT):
4229         if not os.path.isabs(self.initrd_path):
4230           raise errors.OpPrereqError("The initrd path must be an absolute"
4231                                     " filename")
4232     else:
4233       self.do_initrd_path = False
4234
4235     # boot order verification
4236     if self.hvm_boot_order is not None:
4237       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4238         if len(self.hvm_boot_order.strip("acdn")) != 0:
4239           raise errors.OpPrereqError("invalid boot order specified,"
4240                                      " must be one or more of [acdn]"
4241                                      " or 'default'")
4242
4243     instance = self.cfg.GetInstanceInfo(
4244       self.cfg.ExpandInstanceName(self.op.instance_name))
4245     if instance is None:
4246       raise errors.OpPrereqError("No such instance name '%s'" %
4247                                  self.op.instance_name)
4248     self.op.instance_name = instance.name
4249     self.instance = instance
4250     return
4251
4252   def Exec(self, feedback_fn):
4253     """Modifies an instance.
4254
4255     All parameters take effect only at the next restart of the instance.
4256     """
4257     result = []
4258     instance = self.instance
4259     if self.mem:
4260       instance.memory = self.mem
4261       result.append(("mem", self.mem))
4262     if self.vcpus:
4263       instance.vcpus = self.vcpus
4264       result.append(("vcpus",  self.vcpus))
4265     if self.do_ip:
4266       instance.nics[0].ip = self.ip
4267       result.append(("ip", self.ip))
4268     if self.bridge:
4269       instance.nics[0].bridge = self.bridge
4270       result.append(("bridge", self.bridge))
4271     if self.mac:
4272       instance.nics[0].mac = self.mac
4273       result.append(("mac", self.mac))
4274     if self.do_kernel_path:
4275       instance.kernel_path = self.kernel_path
4276       result.append(("kernel_path", self.kernel_path))
4277     if self.do_initrd_path:
4278       instance.initrd_path = self.initrd_path
4279       result.append(("initrd_path", self.initrd_path))
4280     if self.hvm_boot_order:
4281       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4282         instance.hvm_boot_order = None
4283       else:
4284         instance.hvm_boot_order = self.hvm_boot_order
4285       result.append(("hvm_boot_order", self.hvm_boot_order))
4286
4287     self.cfg.AddInstance(instance)
4288
4289     return result
4290
4291
4292 class LUQueryExports(NoHooksLU):
4293   """Query the exports list
4294
4295   """
4296   _OP_REQP = []
4297
4298   def CheckPrereq(self):
4299     """Check that the nodelist contains only existing nodes.
4300
4301     """
4302     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4303
4304   def Exec(self, feedback_fn):
4305     """Compute the list of all the exported system images.
4306
4307     Returns:
4308       a dictionary with the structure node->(export-list)
4309       where export-list is a list of the instances exported on
4310       that node.
4311
4312     """
4313     return rpc.call_export_list(self.nodes)
4314
4315
4316 class LUExportInstance(LogicalUnit):
4317   """Export an instance to an image in the cluster.
4318
4319   """
4320   HPATH = "instance-export"
4321   HTYPE = constants.HTYPE_INSTANCE
4322   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4323
4324   def BuildHooksEnv(self):
4325     """Build hooks env.
4326
4327     This will run on the master, primary node and target node.
4328
4329     """
4330     env = {
4331       "EXPORT_NODE": self.op.target_node,
4332       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4333       }
4334     env.update(_BuildInstanceHookEnvByObject(self.instance))
4335     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4336           self.op.target_node]
4337     return env, nl, nl
4338
4339   def CheckPrereq(self):
4340     """Check prerequisites.
4341
4342     This checks that the instance name is a valid one.
4343
4344     """
4345     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4346     self.instance = self.cfg.GetInstanceInfo(instance_name)
4347     if self.instance is None:
4348       raise errors.OpPrereqError("Instance '%s' not found" %
4349                                  self.op.instance_name)
4350
4351     # node verification
4352     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4353     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4354
4355     if self.dst_node is None:
4356       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4357                                  self.op.target_node)
4358     self.op.target_node = self.dst_node.name
4359
4360   def Exec(self, feedback_fn):
4361     """Export an instance to an image in the cluster.
4362
4363     """
4364     instance = self.instance
4365     dst_node = self.dst_node
4366     src_node = instance.primary_node
4367     if self.op.shutdown:
4368       # shutdown the instance, but not the disks
4369       if not rpc.call_instance_shutdown(src_node, instance):
4370          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4371                                  (instance.name, source_node))
4372
4373     vgname = self.cfg.GetVGName()
4374
4375     snap_disks = []
4376
4377     try:
4378       for disk in instance.disks:
4379         if disk.iv_name == "sda":
4380           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4381           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4382
4383           if not new_dev_name:
4384             logger.Error("could not snapshot block device %s on node %s" %
4385                          (disk.logical_id[1], src_node))
4386           else:
4387             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4388                                       logical_id=(vgname, new_dev_name),
4389                                       physical_id=(vgname, new_dev_name),
4390                                       iv_name=disk.iv_name)
4391             snap_disks.append(new_dev)
4392
4393     finally:
4394       if self.op.shutdown and instance.status == "up":
4395         if not rpc.call_instance_start(src_node, instance, None):
4396           _ShutdownInstanceDisks(instance, self.cfg)
4397           raise errors.OpExecError("Could not start instance")
4398
4399     # TODO: check for size
4400
4401     for dev in snap_disks:
4402       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4403                                            instance):
4404         logger.Error("could not export block device %s from node"
4405                      " %s to node %s" %
4406                      (dev.logical_id[1], src_node, dst_node.name))
4407       if not rpc.call_blockdev_remove(src_node, dev):
4408         logger.Error("could not remove snapshot block device %s from"
4409                      " node %s" % (dev.logical_id[1], src_node))
4410
4411     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4412       logger.Error("could not finalize export for instance %s on node %s" %
4413                    (instance.name, dst_node.name))
4414
4415     nodelist = self.cfg.GetNodeList()
4416     nodelist.remove(dst_node.name)
4417
4418     # on one-node clusters nodelist will be empty after the removal
4419     # if we proceed the backup would be removed because OpQueryExports
4420     # substitutes an empty list with the full cluster node list.
4421     if nodelist:
4422       op = opcodes.OpQueryExports(nodes=nodelist)
4423       exportlist = self.proc.ChainOpCode(op)
4424       for node in exportlist:
4425         if instance.name in exportlist[node]:
4426           if not rpc.call_export_remove(node, instance.name):
4427             logger.Error("could not remove older export for instance %s"
4428                          " on node %s" % (instance.name, node))
4429
4430
4431 class TagsLU(NoHooksLU):
4432   """Generic tags LU.
4433
4434   This is an abstract class which is the parent of all the other tags LUs.
4435
4436   """
4437   def CheckPrereq(self):
4438     """Check prerequisites.
4439
4440     """
4441     if self.op.kind == constants.TAG_CLUSTER:
4442       self.target = self.cfg.GetClusterInfo()
4443     elif self.op.kind == constants.TAG_NODE:
4444       name = self.cfg.ExpandNodeName(self.op.name)
4445       if name is None:
4446         raise errors.OpPrereqError("Invalid node name (%s)" %
4447                                    (self.op.name,))
4448       self.op.name = name
4449       self.target = self.cfg.GetNodeInfo(name)
4450     elif self.op.kind == constants.TAG_INSTANCE:
4451       name = self.cfg.ExpandInstanceName(self.op.name)
4452       if name is None:
4453         raise errors.OpPrereqError("Invalid instance name (%s)" %
4454                                    (self.op.name,))
4455       self.op.name = name
4456       self.target = self.cfg.GetInstanceInfo(name)
4457     else:
4458       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4459                                  str(self.op.kind))
4460
4461
4462 class LUGetTags(TagsLU):
4463   """Returns the tags of a given object.
4464
4465   """
4466   _OP_REQP = ["kind", "name"]
4467
4468   def Exec(self, feedback_fn):
4469     """Returns the tag list.
4470
4471     """
4472     return self.target.GetTags()
4473
4474
4475 class LUSearchTags(NoHooksLU):
4476   """Searches the tags for a given pattern.
4477
4478   """
4479   _OP_REQP = ["pattern"]
4480
4481   def CheckPrereq(self):
4482     """Check prerequisites.
4483
4484     This checks the pattern passed for validity by compiling it.
4485
4486     """
4487     try:
4488       self.re = re.compile(self.op.pattern)
4489     except re.error, err:
4490       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4491                                  (self.op.pattern, err))
4492
4493   def Exec(self, feedback_fn):
4494     """Returns the tag list.
4495
4496     """
4497     cfg = self.cfg
4498     tgts = [("/cluster", cfg.GetClusterInfo())]
4499     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4500     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4501     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4502     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4503     results = []
4504     for path, target in tgts:
4505       for tag in target.GetTags():
4506         if self.re.search(tag):
4507           results.append((path, tag))
4508     return results
4509
4510
4511 class LUAddTags(TagsLU):
4512   """Sets a tag on a given object.
4513
4514   """
4515   _OP_REQP = ["kind", "name", "tags"]
4516
4517   def CheckPrereq(self):
4518     """Check prerequisites.
4519
4520     This checks the type and length of the tag name and value.
4521
4522     """
4523     TagsLU.CheckPrereq(self)
4524     for tag in self.op.tags:
4525       objects.TaggableObject.ValidateTag(tag)
4526
4527   def Exec(self, feedback_fn):
4528     """Sets the tag.
4529
4530     """
4531     try:
4532       for tag in self.op.tags:
4533         self.target.AddTag(tag)
4534     except errors.TagError, err:
4535       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4536     try:
4537       self.cfg.Update(self.target)
4538     except errors.ConfigurationError:
4539       raise errors.OpRetryError("There has been a modification to the"
4540                                 " config file and the operation has been"
4541                                 " aborted. Please retry.")
4542
4543
4544 class LUDelTags(TagsLU):
4545   """Delete a list of tags from a given object.
4546
4547   """
4548   _OP_REQP = ["kind", "name", "tags"]
4549
4550   def CheckPrereq(self):
4551     """Check prerequisites.
4552
4553     This checks that we have the given tag.
4554
4555     """
4556     TagsLU.CheckPrereq(self)
4557     for tag in self.op.tags:
4558       objects.TaggableObject.ValidateTag(tag)
4559     del_tags = frozenset(self.op.tags)
4560     cur_tags = self.target.GetTags()
4561     if not del_tags <= cur_tags:
4562       diff_tags = del_tags - cur_tags
4563       diff_names = ["'%s'" % tag for tag in diff_tags]
4564       diff_names.sort()
4565       raise errors.OpPrereqError("Tag(s) %s not found" %
4566                                  (",".join(diff_names)))
4567
4568   def Exec(self, feedback_fn):
4569     """Remove the tag from the object.
4570
4571     """
4572     for tag in self.op.tags:
4573       self.target.RemoveTag(tag)
4574     try:
4575       self.cfg.Update(self.target)
4576     except errors.ConfigurationError:
4577       raise errors.OpRetryError("There has been a modification to the"
4578                                 " config file and the operation has been"
4579                                 " aborted. Please retry.")
4580
4581 class LUTestDelay(NoHooksLU):
4582   """Sleep for a specified amount of time.
4583
4584   This LU sleeps on the master and/or nodes for a specified amoutn of
4585   time.
4586
4587   """
4588   _OP_REQP = ["duration", "on_master", "on_nodes"]
4589
4590   def CheckPrereq(self):
4591     """Check prerequisites.
4592
4593     This checks that we have a good list of nodes and/or the duration
4594     is valid.
4595
4596     """
4597
4598     if self.op.on_nodes:
4599       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4600
4601   def Exec(self, feedback_fn):
4602     """Do the actual sleep.
4603
4604     """
4605     if self.op.on_master:
4606       if not utils.TestDelay(self.op.duration):
4607         raise errors.OpExecError("Error during master delay test")
4608     if self.op.on_nodes:
4609       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4610       if not result:
4611         raise errors.OpExecError("Complete failure from rpc call")
4612       for node, node_result in result.items():
4613         if not node_result:
4614           raise errors.OpExecError("Failure during rpc call to node %s,"
4615                                    " result: %s" % (node, node_result))