Export the cpu nodes and sockets from Xen
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           if instance_cfg[instance].auto_balance:
836             needed_mem += instance_cfg[instance].memory
837         if nodeinfo['mfree'] < needed_mem:
838           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
839                       " failovers should node %s fail" % (node, prinode))
840           bad = True
841     return bad
842
843   def CheckPrereq(self):
844     """Check prerequisites.
845
846     Transform the list of checks we're going to skip into a set and check that
847     all its members are valid.
848
849     """
850     self.skip_set = frozenset(self.op.skip_checks)
851     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
852       raise errors.OpPrereqError("Invalid checks to be skipped specified")
853
854   def BuildHooksEnv(self):
855     """Build hooks env.
856
857     Cluster-Verify hooks just rone in the post phase and their failure makes
858     the output be logged in the verify output and the verification to fail.
859
860     """
861     all_nodes = self.cfg.GetNodeList()
862     tags = self.cfg.GetClusterInfo().GetTags()
863     # TODO: populate the environment with useful information for verify hooks
864     env = {
865       "CLUSTER_TAGS": " ".join(tags),
866       }
867     return env, [], all_nodes
868
869   def Exec(self, feedback_fn):
870     """Verify integrity of cluster, performing various test on nodes.
871
872     """
873     bad = False
874     feedback_fn("* Verifying global settings")
875     for msg in self.cfg.VerifyConfig():
876       feedback_fn("  - ERROR: %s" % msg)
877
878     vg_name = self.cfg.GetVGName()
879     nodelist = utils.NiceSort(self.cfg.GetNodeList())
880     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
881     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
882     i_non_redundant = [] # Non redundant instances
883     i_non_a_balanced = [] # Non auto-balanced instances
884     node_volume = {}
885     node_instance = {}
886     node_info = {}
887     instance_cfg = {}
888
889     # FIXME: verify OS list
890     # do local checksums
891     file_names = list(self.sstore.GetFileList())
892     file_names.append(constants.SSL_CERT_FILE)
893     file_names.append(constants.CLUSTER_CONF_FILE)
894     local_checksums = utils.FingerprintFiles(file_names)
895
896     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
897     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
898     all_instanceinfo = rpc.call_instance_list(nodelist)
899     all_vglist = rpc.call_vg_list(nodelist)
900     node_verify_param = {
901       'filelist': file_names,
902       'nodelist': nodelist,
903       'hypervisor': None,
904       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
905                         for node in nodeinfo]
906       }
907     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
908     all_rversion = rpc.call_version(nodelist)
909     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
910
911     incomplete_nodeinfo = False
912
913     for node in nodelist:
914       feedback_fn("* Verifying node %s" % node)
915       result = self._VerifyNode(node, file_names, local_checksums,
916                                 all_vglist[node], all_nvinfo[node],
917                                 all_rversion[node], feedback_fn)
918       bad = bad or result
919
920       # node_volume
921       volumeinfo = all_volumeinfo[node]
922
923       if isinstance(volumeinfo, basestring):
924         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
925                     (node, volumeinfo[-400:].encode('string_escape')))
926         bad = True
927         node_volume[node] = {}
928       elif not isinstance(volumeinfo, dict):
929         feedback_fn("  - ERROR: connection to %s failed" % (node,))
930         bad = True
931         incomplete_nodeinfo = True
932         continue
933       else:
934         node_volume[node] = volumeinfo
935
936       # node_instance
937       nodeinstance = all_instanceinfo[node]
938       if type(nodeinstance) != list:
939         feedback_fn("  - ERROR: connection to %s failed" % (node,))
940         bad = True
941         incomplete_nodeinfo = True
942         continue
943
944       node_instance[node] = nodeinstance
945
946       # node_info
947       nodeinfo = all_ninfo[node]
948       if not isinstance(nodeinfo, dict):
949         feedback_fn("  - ERROR: connection to %s failed" % (node,))
950         bad = True
951         incomplete_nodeinfo = True
952         continue
953
954       try:
955         node_info[node] = {
956           "mfree": int(nodeinfo['memory_free']),
957           "dfree": int(nodeinfo['vg_free']),
958           "pinst": [],
959           "sinst": [],
960           # dictionary holding all instances this node is secondary for,
961           # grouped by their primary node. Each key is a cluster node, and each
962           # value is a list of instances which have the key as primary and the
963           # current node as secondary.  this is handy to calculate N+1 memory
964           # availability if you can only failover from a primary to its
965           # secondary.
966           "sinst-by-pnode": {},
967         }
968       except (ValueError, TypeError):
969         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
970         bad = True
971         incomplete_nodeinfo = True
972         continue
973
974     node_vol_should = {}
975
976     for instance in instancelist:
977       feedback_fn("* Verifying instance %s" % instance)
978       inst_config = self.cfg.GetInstanceInfo(instance)
979       result =  self._VerifyInstance(instance, inst_config, node_volume,
980                                      node_instance, feedback_fn)
981       bad = bad or result
982
983       inst_config.MapLVsByNode(node_vol_should)
984
985       instance_cfg[instance] = inst_config
986
987       pnode = inst_config.primary_node
988       if pnode in node_info:
989         node_info[pnode]['pinst'].append(instance)
990       else:
991         feedback_fn("  - ERROR: instance %s, connection to primary node"
992                     " %s failed" % (instance, pnode))
993         bad = True
994
995       # If the instance is non-redundant we cannot survive losing its primary
996       # node, so we are not N+1 compliant. On the other hand we have no disk
997       # templates with more than one secondary so that situation is not well
998       # supported either.
999       # FIXME: does not support file-backed instances
1000       if len(inst_config.secondary_nodes) == 0:
1001         i_non_redundant.append(instance)
1002       elif len(inst_config.secondary_nodes) > 1:
1003         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1004                     % instance)
1005
1006       if not inst_config.auto_balance:
1007         i_non_a_balanced.append(instance)
1008
1009       for snode in inst_config.secondary_nodes:
1010         if snode in node_info:
1011           node_info[snode]['sinst'].append(instance)
1012           if pnode not in node_info[snode]['sinst-by-pnode']:
1013             node_info[snode]['sinst-by-pnode'][pnode] = []
1014           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1015         else:
1016           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1017                       " %s failed" % (instance, snode))
1018
1019     feedback_fn("* Verifying orphan volumes")
1020     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1021                                        feedback_fn)
1022     bad = bad or result
1023
1024     feedback_fn("* Verifying remaining instances")
1025     result = self._VerifyOrphanInstances(instancelist, node_instance,
1026                                          feedback_fn)
1027     bad = bad or result
1028
1029     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1030         not incomplete_nodeinfo):
1031       feedback_fn("* Verifying N+1 Memory redundancy")
1032       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1033       bad = bad or result
1034
1035     feedback_fn("* Other Notes")
1036     if i_non_redundant:
1037       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1038                   % len(i_non_redundant))
1039
1040     if i_non_a_balanced:
1041       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1042                   % len(i_non_a_balanced))
1043
1044     return int(bad)
1045
1046   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1047     """Analize the post-hooks' result, handle it, and send some
1048     nicely-formatted feedback back to the user.
1049
1050     Args:
1051       phase: the hooks phase that has just been run
1052       hooks_results: the results of the multi-node hooks rpc call
1053       feedback_fn: function to send feedback back to the caller
1054       lu_result: previous Exec result
1055
1056     """
1057     # We only really run POST phase hooks, and are only interested in their results
1058     if phase == constants.HOOKS_PHASE_POST:
1059       # Used to change hooks' output to proper indentation
1060       indent_re = re.compile('^', re.M)
1061       feedback_fn("* Hooks Results")
1062       if not hooks_results:
1063         feedback_fn("  - ERROR: general communication failure")
1064         lu_result = 1
1065       else:
1066         for node_name in hooks_results:
1067           show_node_header = True
1068           res = hooks_results[node_name]
1069           if res is False or not isinstance(res, list):
1070             feedback_fn("    Communication failure")
1071             lu_result = 1
1072             continue
1073           for script, hkr, output in res:
1074             if hkr == constants.HKR_FAIL:
1075               # The node header is only shown once, if there are
1076               # failing hooks on that node
1077               if show_node_header:
1078                 feedback_fn("  Node %s:" % node_name)
1079                 show_node_header = False
1080               feedback_fn("    ERROR: Script %s failed, output:" % script)
1081               output = indent_re.sub('      ', output)
1082               feedback_fn("%s" % output)
1083               lu_result = 1
1084
1085       return lu_result
1086
1087
1088 class LUVerifyDisks(NoHooksLU):
1089   """Verifies the cluster disks status.
1090
1091   """
1092   _OP_REQP = []
1093
1094   def CheckPrereq(self):
1095     """Check prerequisites.
1096
1097     This has no prerequisites.
1098
1099     """
1100     pass
1101
1102   def Exec(self, feedback_fn):
1103     """Verify integrity of cluster disks.
1104
1105     """
1106     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1107
1108     vg_name = self.cfg.GetVGName()
1109     nodes = utils.NiceSort(self.cfg.GetNodeList())
1110     instances = [self.cfg.GetInstanceInfo(name)
1111                  for name in self.cfg.GetInstanceList()]
1112
1113     nv_dict = {}
1114     for inst in instances:
1115       inst_lvs = {}
1116       if (inst.status != "up" or
1117           inst.disk_template not in constants.DTS_NET_MIRROR):
1118         continue
1119       inst.MapLVsByNode(inst_lvs)
1120       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1121       for node, vol_list in inst_lvs.iteritems():
1122         for vol in vol_list:
1123           nv_dict[(node, vol)] = inst
1124
1125     if not nv_dict:
1126       return result
1127
1128     node_lvs = rpc.call_volume_list(nodes, vg_name)
1129
1130     to_act = set()
1131     for node in nodes:
1132       # node_volume
1133       lvs = node_lvs[node]
1134
1135       if isinstance(lvs, basestring):
1136         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1137         res_nlvm[node] = lvs
1138       elif not isinstance(lvs, dict):
1139         logger.Info("connection to node %s failed or invalid data returned" %
1140                     (node,))
1141         res_nodes.append(node)
1142         continue
1143
1144       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1145         inst = nv_dict.pop((node, lv_name), None)
1146         if (not lv_online and inst is not None
1147             and inst.name not in res_instances):
1148           res_instances.append(inst.name)
1149
1150     # any leftover items in nv_dict are missing LVs, let's arrange the
1151     # data better
1152     for key, inst in nv_dict.iteritems():
1153       if inst.name not in res_missing:
1154         res_missing[inst.name] = []
1155       res_missing[inst.name].append(key)
1156
1157     return result
1158
1159
1160 class LURenameCluster(LogicalUnit):
1161   """Rename the cluster.
1162
1163   """
1164   HPATH = "cluster-rename"
1165   HTYPE = constants.HTYPE_CLUSTER
1166   _OP_REQP = ["name"]
1167
1168   def BuildHooksEnv(self):
1169     """Build hooks env.
1170
1171     """
1172     env = {
1173       "OP_TARGET": self.sstore.GetClusterName(),
1174       "NEW_NAME": self.op.name,
1175       }
1176     mn = self.sstore.GetMasterNode()
1177     return env, [mn], [mn]
1178
1179   def CheckPrereq(self):
1180     """Verify that the passed name is a valid one.
1181
1182     """
1183     hostname = utils.HostInfo(self.op.name)
1184
1185     new_name = hostname.name
1186     self.ip = new_ip = hostname.ip
1187     old_name = self.sstore.GetClusterName()
1188     old_ip = self.sstore.GetMasterIP()
1189     if new_name == old_name and new_ip == old_ip:
1190       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1191                                  " cluster has changed")
1192     if new_ip != old_ip:
1193       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1194         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1195                                    " reachable on the network. Aborting." %
1196                                    new_ip)
1197
1198     self.op.name = new_name
1199
1200   def Exec(self, feedback_fn):
1201     """Rename the cluster.
1202
1203     """
1204     clustername = self.op.name
1205     ip = self.ip
1206     ss = self.sstore
1207
1208     # shutdown the master IP
1209     master = ss.GetMasterNode()
1210     if not rpc.call_node_stop_master(master):
1211       raise errors.OpExecError("Could not disable the master role")
1212
1213     try:
1214       # modify the sstore
1215       ss.SetKey(ss.SS_MASTER_IP, ip)
1216       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1217
1218       # Distribute updated ss config to all nodes
1219       myself = self.cfg.GetNodeInfo(master)
1220       dist_nodes = self.cfg.GetNodeList()
1221       if myself.name in dist_nodes:
1222         dist_nodes.remove(myself.name)
1223
1224       logger.Debug("Copying updated ssconf data to all nodes")
1225       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1226         fname = ss.KeyToFilename(keyname)
1227         result = rpc.call_upload_file(dist_nodes, fname)
1228         for to_node in dist_nodes:
1229           if not result[to_node]:
1230             logger.Error("copy of file %s to node %s failed" %
1231                          (fname, to_node))
1232     finally:
1233       if not rpc.call_node_start_master(master):
1234         logger.Error("Could not re-enable the master role on the master,"
1235                      " please restart manually.")
1236
1237
1238 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1239   """Sleep and poll for an instance's disk to sync.
1240
1241   """
1242   if not instance.disks:
1243     return True
1244
1245   if not oneshot:
1246     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1247
1248   node = instance.primary_node
1249
1250   for dev in instance.disks:
1251     cfgw.SetDiskID(dev, node)
1252
1253   retries = 0
1254   while True:
1255     max_time = 0
1256     done = True
1257     cumul_degraded = False
1258     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1259     if not rstats:
1260       proc.LogWarning("Can't get any data from node %s" % node)
1261       retries += 1
1262       if retries >= 10:
1263         raise errors.RemoteError("Can't contact node %s for mirror data,"
1264                                  " aborting." % node)
1265       time.sleep(6)
1266       continue
1267     retries = 0
1268     for i in range(len(rstats)):
1269       mstat = rstats[i]
1270       if mstat is None:
1271         proc.LogWarning("Can't compute data for node %s/%s" %
1272                         (node, instance.disks[i].iv_name))
1273         continue
1274       # we ignore the ldisk parameter
1275       perc_done, est_time, is_degraded, _ = mstat
1276       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1277       if perc_done is not None:
1278         done = False
1279         if est_time is not None:
1280           rem_time = "%d estimated seconds remaining" % est_time
1281           max_time = est_time
1282         else:
1283           rem_time = "no time estimate"
1284         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1285                      (instance.disks[i].iv_name, perc_done, rem_time))
1286     if done or oneshot:
1287       break
1288
1289     if unlock:
1290       utils.Unlock('cmd')
1291     try:
1292       time.sleep(min(60, max_time))
1293     finally:
1294       if unlock:
1295         utils.Lock('cmd')
1296
1297   if done:
1298     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1299   return not cumul_degraded
1300
1301
1302 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1303   """Check that mirrors are not degraded.
1304
1305   The ldisk parameter, if True, will change the test from the
1306   is_degraded attribute (which represents overall non-ok status for
1307   the device(s)) to the ldisk (representing the local storage status).
1308
1309   """
1310   cfgw.SetDiskID(dev, node)
1311   if ldisk:
1312     idx = 6
1313   else:
1314     idx = 5
1315
1316   result = True
1317   if on_primary or dev.AssembleOnSecondary():
1318     rstats = rpc.call_blockdev_find(node, dev)
1319     if not rstats:
1320       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1321       result = False
1322     else:
1323       result = result and (not rstats[idx])
1324   if dev.children:
1325     for child in dev.children:
1326       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1327
1328   return result
1329
1330
1331 class LUDiagnoseOS(NoHooksLU):
1332   """Logical unit for OS diagnose/query.
1333
1334   """
1335   _OP_REQP = ["output_fields", "names"]
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     This always succeeds, since this is a pure query LU.
1341
1342     """
1343     if self.op.names:
1344       raise errors.OpPrereqError("Selective OS query not supported")
1345
1346     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1347     _CheckOutputFields(static=[],
1348                        dynamic=self.dynamic_fields,
1349                        selected=self.op.output_fields)
1350
1351   @staticmethod
1352   def _DiagnoseByOS(node_list, rlist):
1353     """Remaps a per-node return list into an a per-os per-node dictionary
1354
1355       Args:
1356         node_list: a list with the names of all nodes
1357         rlist: a map with node names as keys and OS objects as values
1358
1359       Returns:
1360         map: a map with osnames as keys and as value another map, with
1361              nodes as
1362              keys and list of OS objects as values
1363              e.g. {"debian-etch": {"node1": [<object>,...],
1364                                    "node2": [<object>,]}
1365                   }
1366
1367     """
1368     all_os = {}
1369     for node_name, nr in rlist.iteritems():
1370       if not nr:
1371         continue
1372       for os in nr:
1373         if os.name not in all_os:
1374           # build a list of nodes for this os containing empty lists
1375           # for each node in node_list
1376           all_os[os.name] = {}
1377           for nname in node_list:
1378             all_os[os.name][nname] = []
1379         all_os[os.name][node_name].append(os)
1380     return all_os
1381
1382   def Exec(self, feedback_fn):
1383     """Compute the list of OSes.
1384
1385     """
1386     node_list = self.cfg.GetNodeList()
1387     node_data = rpc.call_os_diagnose(node_list)
1388     if node_data == False:
1389       raise errors.OpExecError("Can't gather the list of OSes")
1390     pol = self._DiagnoseByOS(node_list, node_data)
1391     output = []
1392     for os_name, os_data in pol.iteritems():
1393       row = []
1394       for field in self.op.output_fields:
1395         if field == "name":
1396           val = os_name
1397         elif field == "valid":
1398           val = utils.all([osl and osl[0] for osl in os_data.values()])
1399         elif field == "node_status":
1400           val = {}
1401           for node_name, nos_list in os_data.iteritems():
1402             val[node_name] = [(v.status, v.path) for v in nos_list]
1403         else:
1404           raise errors.ParameterError(field)
1405         row.append(val)
1406       output.append(row)
1407
1408     return output
1409
1410
1411 class LURemoveNode(LogicalUnit):
1412   """Logical unit for removing a node.
1413
1414   """
1415   HPATH = "node-remove"
1416   HTYPE = constants.HTYPE_NODE
1417   _OP_REQP = ["node_name"]
1418
1419   def BuildHooksEnv(self):
1420     """Build hooks env.
1421
1422     This doesn't run on the target node in the pre phase as a failed
1423     node would not allows itself to run.
1424
1425     """
1426     env = {
1427       "OP_TARGET": self.op.node_name,
1428       "NODE_NAME": self.op.node_name,
1429       }
1430     all_nodes = self.cfg.GetNodeList()
1431     all_nodes.remove(self.op.node_name)
1432     return env, all_nodes, all_nodes
1433
1434   def CheckPrereq(self):
1435     """Check prerequisites.
1436
1437     This checks:
1438      - the node exists in the configuration
1439      - it does not have primary or secondary instances
1440      - it's not the master
1441
1442     Any errors are signalled by raising errors.OpPrereqError.
1443
1444     """
1445     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1446     if node is None:
1447       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1448
1449     instance_list = self.cfg.GetInstanceList()
1450
1451     masternode = self.sstore.GetMasterNode()
1452     if node.name == masternode:
1453       raise errors.OpPrereqError("Node is the master node,"
1454                                  " you need to failover first.")
1455
1456     for instance_name in instance_list:
1457       instance = self.cfg.GetInstanceInfo(instance_name)
1458       if node.name == instance.primary_node:
1459         raise errors.OpPrereqError("Instance %s still running on the node,"
1460                                    " please remove first." % instance_name)
1461       if node.name in instance.secondary_nodes:
1462         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1463                                    " please remove first." % instance_name)
1464     self.op.node_name = node.name
1465     self.node = node
1466
1467   def Exec(self, feedback_fn):
1468     """Removes the node from the cluster.
1469
1470     """
1471     node = self.node
1472     logger.Info("stopping the node daemon and removing configs from node %s" %
1473                 node.name)
1474
1475     rpc.call_node_leave_cluster(node.name)
1476
1477     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1478
1479     logger.Info("Removing node %s from config" % node.name)
1480
1481     self.cfg.RemoveNode(node.name)
1482
1483     _RemoveHostFromEtcHosts(node.name)
1484
1485
1486 class LUQueryNodes(NoHooksLU):
1487   """Logical unit for querying nodes.
1488
1489   """
1490   _OP_REQP = ["output_fields", "names"]
1491
1492   def CheckPrereq(self):
1493     """Check prerequisites.
1494
1495     This checks that the fields required are valid output fields.
1496
1497     """
1498     self.dynamic_fields = frozenset([
1499       "dtotal", "dfree",
1500       "mtotal", "mnode", "mfree",
1501       "bootid",
1502       "ctotal", "cnodes", "csockets",
1503       ])
1504
1505     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1506                                "pinst_list", "sinst_list",
1507                                "pip", "sip", "tags"],
1508                        dynamic=self.dynamic_fields,
1509                        selected=self.op.output_fields)
1510
1511     self.wanted = _GetWantedNodes(self, self.op.names)
1512
1513   def Exec(self, feedback_fn):
1514     """Computes the list of nodes and their attributes.
1515
1516     """
1517     nodenames = self.wanted
1518     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1519
1520     # begin data gathering
1521
1522     if self.dynamic_fields.intersection(self.op.output_fields):
1523       live_data = {}
1524       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1525       for name in nodenames:
1526         nodeinfo = node_data.get(name, None)
1527         if nodeinfo:
1528           fn = utils.TryConvert
1529           live_data[name] = {
1530             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1531             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1532             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1533             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1534             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1535             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1536             "bootid": nodeinfo.get('bootid', None),
1537             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1538             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1539             }
1540         else:
1541           live_data[name] = {}
1542     else:
1543       live_data = dict.fromkeys(nodenames, {})
1544
1545     node_to_primary = dict([(name, set()) for name in nodenames])
1546     node_to_secondary = dict([(name, set()) for name in nodenames])
1547
1548     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1549                              "sinst_cnt", "sinst_list"))
1550     if inst_fields & frozenset(self.op.output_fields):
1551       instancelist = self.cfg.GetInstanceList()
1552
1553       for instance_name in instancelist:
1554         inst = self.cfg.GetInstanceInfo(instance_name)
1555         if inst.primary_node in node_to_primary:
1556           node_to_primary[inst.primary_node].add(inst.name)
1557         for secnode in inst.secondary_nodes:
1558           if secnode in node_to_secondary:
1559             node_to_secondary[secnode].add(inst.name)
1560
1561     # end data gathering
1562
1563     output = []
1564     for node in nodelist:
1565       node_output = []
1566       for field in self.op.output_fields:
1567         if field == "name":
1568           val = node.name
1569         elif field == "pinst_list":
1570           val = list(node_to_primary[node.name])
1571         elif field == "sinst_list":
1572           val = list(node_to_secondary[node.name])
1573         elif field == "pinst_cnt":
1574           val = len(node_to_primary[node.name])
1575         elif field == "sinst_cnt":
1576           val = len(node_to_secondary[node.name])
1577         elif field == "pip":
1578           val = node.primary_ip
1579         elif field == "sip":
1580           val = node.secondary_ip
1581         elif field == "tags":
1582           val = list(node.GetTags())
1583         elif field in self.dynamic_fields:
1584           val = live_data[node.name].get(field, None)
1585         else:
1586           raise errors.ParameterError(field)
1587         node_output.append(val)
1588       output.append(node_output)
1589
1590     return output
1591
1592
1593 class LUQueryNodeVolumes(NoHooksLU):
1594   """Logical unit for getting volumes on node(s).
1595
1596   """
1597   _OP_REQP = ["nodes", "output_fields"]
1598
1599   def CheckPrereq(self):
1600     """Check prerequisites.
1601
1602     This checks that the fields required are valid output fields.
1603
1604     """
1605     self.nodes = _GetWantedNodes(self, self.op.nodes)
1606
1607     _CheckOutputFields(static=["node"],
1608                        dynamic=["phys", "vg", "name", "size", "instance"],
1609                        selected=self.op.output_fields)
1610
1611
1612   def Exec(self, feedback_fn):
1613     """Computes the list of nodes and their attributes.
1614
1615     """
1616     nodenames = self.nodes
1617     volumes = rpc.call_node_volumes(nodenames)
1618
1619     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1620              in self.cfg.GetInstanceList()]
1621
1622     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1623
1624     output = []
1625     for node in nodenames:
1626       if node not in volumes or not volumes[node]:
1627         continue
1628
1629       node_vols = volumes[node][:]
1630       node_vols.sort(key=lambda vol: vol['dev'])
1631
1632       for vol in node_vols:
1633         node_output = []
1634         for field in self.op.output_fields:
1635           if field == "node":
1636             val = node
1637           elif field == "phys":
1638             val = vol['dev']
1639           elif field == "vg":
1640             val = vol['vg']
1641           elif field == "name":
1642             val = vol['name']
1643           elif field == "size":
1644             val = int(float(vol['size']))
1645           elif field == "instance":
1646             for inst in ilist:
1647               if node not in lv_by_node[inst]:
1648                 continue
1649               if vol['name'] in lv_by_node[inst][node]:
1650                 val = inst.name
1651                 break
1652             else:
1653               val = '-'
1654           else:
1655             raise errors.ParameterError(field)
1656           node_output.append(str(val))
1657
1658         output.append(node_output)
1659
1660     return output
1661
1662
1663 class LUAddNode(LogicalUnit):
1664   """Logical unit for adding node to the cluster.
1665
1666   """
1667   HPATH = "node-add"
1668   HTYPE = constants.HTYPE_NODE
1669   _OP_REQP = ["node_name"]
1670
1671   def BuildHooksEnv(self):
1672     """Build hooks env.
1673
1674     This will run on all nodes before, and on all nodes + the new node after.
1675
1676     """
1677     env = {
1678       "OP_TARGET": self.op.node_name,
1679       "NODE_NAME": self.op.node_name,
1680       "NODE_PIP": self.op.primary_ip,
1681       "NODE_SIP": self.op.secondary_ip,
1682       }
1683     nodes_0 = self.cfg.GetNodeList()
1684     nodes_1 = nodes_0 + [self.op.node_name, ]
1685     return env, nodes_0, nodes_1
1686
1687   def CheckPrereq(self):
1688     """Check prerequisites.
1689
1690     This checks:
1691      - the new node is not already in the config
1692      - it is resolvable
1693      - its parameters (single/dual homed) matches the cluster
1694
1695     Any errors are signalled by raising errors.OpPrereqError.
1696
1697     """
1698     node_name = self.op.node_name
1699     cfg = self.cfg
1700
1701     dns_data = utils.HostInfo(node_name)
1702
1703     node = dns_data.name
1704     primary_ip = self.op.primary_ip = dns_data.ip
1705     secondary_ip = getattr(self.op, "secondary_ip", None)
1706     if secondary_ip is None:
1707       secondary_ip = primary_ip
1708     if not utils.IsValidIP(secondary_ip):
1709       raise errors.OpPrereqError("Invalid secondary IP given")
1710     self.op.secondary_ip = secondary_ip
1711
1712     node_list = cfg.GetNodeList()
1713     if not self.op.readd and node in node_list:
1714       raise errors.OpPrereqError("Node %s is already in the configuration" %
1715                                  node)
1716     elif self.op.readd and node not in node_list:
1717       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1718
1719     for existing_node_name in node_list:
1720       existing_node = cfg.GetNodeInfo(existing_node_name)
1721
1722       if self.op.readd and node == existing_node_name:
1723         if (existing_node.primary_ip != primary_ip or
1724             existing_node.secondary_ip != secondary_ip):
1725           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1726                                      " address configuration as before")
1727         continue
1728
1729       if (existing_node.primary_ip == primary_ip or
1730           existing_node.secondary_ip == primary_ip or
1731           existing_node.primary_ip == secondary_ip or
1732           existing_node.secondary_ip == secondary_ip):
1733         raise errors.OpPrereqError("New node ip address(es) conflict with"
1734                                    " existing node %s" % existing_node.name)
1735
1736     # check that the type of the node (single versus dual homed) is the
1737     # same as for the master
1738     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1739     master_singlehomed = myself.secondary_ip == myself.primary_ip
1740     newbie_singlehomed = secondary_ip == primary_ip
1741     if master_singlehomed != newbie_singlehomed:
1742       if master_singlehomed:
1743         raise errors.OpPrereqError("The master has no private ip but the"
1744                                    " new node has one")
1745       else:
1746         raise errors.OpPrereqError("The master has a private ip but the"
1747                                    " new node doesn't have one")
1748
1749     # checks reachablity
1750     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1751       raise errors.OpPrereqError("Node not reachable by ping")
1752
1753     if not newbie_singlehomed:
1754       # check reachability from my secondary ip to newbie's secondary ip
1755       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1756                            source=myself.secondary_ip):
1757         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1758                                    " based ping to noded port")
1759
1760     self.new_node = objects.Node(name=node,
1761                                  primary_ip=primary_ip,
1762                                  secondary_ip=secondary_ip)
1763
1764     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1765       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1766         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1767                                    constants.VNC_PASSWORD_FILE)
1768
1769   def Exec(self, feedback_fn):
1770     """Adds the new node to the cluster.
1771
1772     """
1773     new_node = self.new_node
1774     node = new_node.name
1775
1776     # set up inter-node password and certificate and restarts the node daemon
1777     gntpass = self.sstore.GetNodeDaemonPassword()
1778     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1779       raise errors.OpExecError("ganeti password corruption detected")
1780     f = open(constants.SSL_CERT_FILE)
1781     try:
1782       gntpem = f.read(8192)
1783     finally:
1784       f.close()
1785     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1786     # so we use this to detect an invalid certificate; as long as the
1787     # cert doesn't contain this, the here-document will be correctly
1788     # parsed by the shell sequence below
1789     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1790       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1791     if not gntpem.endswith("\n"):
1792       raise errors.OpExecError("PEM must end with newline")
1793     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1794
1795     # and then connect with ssh to set password and start ganeti-noded
1796     # note that all the below variables are sanitized at this point,
1797     # either by being constants or by the checks above
1798     ss = self.sstore
1799     mycommand = ("umask 077 && "
1800                  "echo '%s' > '%s' && "
1801                  "cat > '%s' << '!EOF.' && \n"
1802                  "%s!EOF.\n%s restart" %
1803                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1804                   constants.SSL_CERT_FILE, gntpem,
1805                   constants.NODE_INITD_SCRIPT))
1806
1807     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1808     if result.failed:
1809       raise errors.OpExecError("Remote command on node %s, error: %s,"
1810                                " output: %s" %
1811                                (node, result.fail_reason, result.output))
1812
1813     # check connectivity
1814     time.sleep(4)
1815
1816     result = rpc.call_version([node])[node]
1817     if result:
1818       if constants.PROTOCOL_VERSION == result:
1819         logger.Info("communication to node %s fine, sw version %s match" %
1820                     (node, result))
1821       else:
1822         raise errors.OpExecError("Version mismatch master version %s,"
1823                                  " node version %s" %
1824                                  (constants.PROTOCOL_VERSION, result))
1825     else:
1826       raise errors.OpExecError("Cannot get version from the new node")
1827
1828     # setup ssh on node
1829     logger.Info("copy ssh key to node %s" % node)
1830     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1831     keyarray = []
1832     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1833                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1834                 priv_key, pub_key]
1835
1836     for i in keyfiles:
1837       f = open(i, 'r')
1838       try:
1839         keyarray.append(f.read())
1840       finally:
1841         f.close()
1842
1843     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1844                                keyarray[3], keyarray[4], keyarray[5])
1845
1846     if not result:
1847       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1848
1849     # Add node to our /etc/hosts, and add key to known_hosts
1850     _AddHostToEtcHosts(new_node.name)
1851
1852     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1853                       self.cfg.GetHostKey())
1854
1855     if new_node.secondary_ip != new_node.primary_ip:
1856       if not rpc.call_node_tcp_ping(new_node.name,
1857                                     constants.LOCALHOST_IP_ADDRESS,
1858                                     new_node.secondary_ip,
1859                                     constants.DEFAULT_NODED_PORT,
1860                                     10, False):
1861         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1862                                  " you gave (%s). Please fix and re-run this"
1863                                  " command." % new_node.secondary_ip)
1864
1865     success, msg = ssh.VerifyNodeHostname(node)
1866     if not success:
1867       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1868                                " than the one the resolver gives: %s."
1869                                " Please fix and re-run this command." %
1870                                (node, msg))
1871
1872     # Distribute updated /etc/hosts and known_hosts to all nodes,
1873     # including the node just added
1874     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1875     dist_nodes = self.cfg.GetNodeList()
1876     if not self.op.readd:
1877       dist_nodes.append(node)
1878     if myself.name in dist_nodes:
1879       dist_nodes.remove(myself.name)
1880
1881     logger.Debug("Copying hosts and known_hosts to all nodes")
1882     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1883       result = rpc.call_upload_file(dist_nodes, fname)
1884       for to_node in dist_nodes:
1885         if not result[to_node]:
1886           logger.Error("copy of file %s to node %s failed" %
1887                        (fname, to_node))
1888
1889     to_copy = ss.GetFileList()
1890     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1891       to_copy.append(constants.VNC_PASSWORD_FILE)
1892     for fname in to_copy:
1893       if not ssh.CopyFileToNode(node, fname):
1894         logger.Error("could not copy file %s to node %s" % (fname, node))
1895
1896     if not self.op.readd:
1897       logger.Info("adding node %s to cluster.conf" % node)
1898       self.cfg.AddNode(new_node)
1899
1900
1901 class LUMasterFailover(LogicalUnit):
1902   """Failover the master node to the current node.
1903
1904   This is a special LU in that it must run on a non-master node.
1905
1906   """
1907   HPATH = "master-failover"
1908   HTYPE = constants.HTYPE_CLUSTER
1909   REQ_MASTER = False
1910   _OP_REQP = []
1911
1912   def BuildHooksEnv(self):
1913     """Build hooks env.
1914
1915     This will run on the new master only in the pre phase, and on all
1916     the nodes in the post phase.
1917
1918     """
1919     env = {
1920       "OP_TARGET": self.new_master,
1921       "NEW_MASTER": self.new_master,
1922       "OLD_MASTER": self.old_master,
1923       }
1924     return env, [self.new_master], self.cfg.GetNodeList()
1925
1926   def CheckPrereq(self):
1927     """Check prerequisites.
1928
1929     This checks that we are not already the master.
1930
1931     """
1932     self.new_master = utils.HostInfo().name
1933     self.old_master = self.sstore.GetMasterNode()
1934
1935     if self.old_master == self.new_master:
1936       raise errors.OpPrereqError("This commands must be run on the node"
1937                                  " where you want the new master to be."
1938                                  " %s is already the master" %
1939                                  self.old_master)
1940
1941   def Exec(self, feedback_fn):
1942     """Failover the master node.
1943
1944     This command, when run on a non-master node, will cause the current
1945     master to cease being master, and the non-master to become new
1946     master.
1947
1948     """
1949     #TODO: do not rely on gethostname returning the FQDN
1950     logger.Info("setting master to %s, old master: %s" %
1951                 (self.new_master, self.old_master))
1952
1953     if not rpc.call_node_stop_master(self.old_master):
1954       logger.Error("could disable the master role on the old master"
1955                    " %s, please disable manually" % self.old_master)
1956
1957     ss = self.sstore
1958     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1959     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1960                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1961       logger.Error("could not distribute the new simple store master file"
1962                    " to the other nodes, please check.")
1963
1964     if not rpc.call_node_start_master(self.new_master):
1965       logger.Error("could not start the master role on the new master"
1966                    " %s, please check" % self.new_master)
1967       feedback_fn("Error in activating the master IP on the new master,"
1968                   " please fix manually.")
1969
1970
1971
1972 class LUQueryClusterInfo(NoHooksLU):
1973   """Query cluster configuration.
1974
1975   """
1976   _OP_REQP = []
1977   REQ_MASTER = False
1978
1979   def CheckPrereq(self):
1980     """No prerequsites needed for this LU.
1981
1982     """
1983     pass
1984
1985   def Exec(self, feedback_fn):
1986     """Return cluster config.
1987
1988     """
1989     result = {
1990       "name": self.sstore.GetClusterName(),
1991       "software_version": constants.RELEASE_VERSION,
1992       "protocol_version": constants.PROTOCOL_VERSION,
1993       "config_version": constants.CONFIG_VERSION,
1994       "os_api_version": constants.OS_API_VERSION,
1995       "export_version": constants.EXPORT_VERSION,
1996       "master": self.sstore.GetMasterNode(),
1997       "architecture": (platform.architecture()[0], platform.machine()),
1998       "hypervisor_type": self.sstore.GetHypervisorType(),
1999       }
2000
2001     return result
2002
2003
2004 class LUClusterCopyFile(NoHooksLU):
2005   """Copy file to cluster.
2006
2007   """
2008   _OP_REQP = ["nodes", "filename"]
2009
2010   def CheckPrereq(self):
2011     """Check prerequisites.
2012
2013     It should check that the named file exists and that the given list
2014     of nodes is valid.
2015
2016     """
2017     if not os.path.exists(self.op.filename):
2018       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2019
2020     self.nodes = _GetWantedNodes(self, self.op.nodes)
2021
2022   def Exec(self, feedback_fn):
2023     """Copy a file from master to some nodes.
2024
2025     Args:
2026       opts - class with options as members
2027       args - list containing a single element, the file name
2028     Opts used:
2029       nodes - list containing the name of target nodes; if empty, all nodes
2030
2031     """
2032     filename = self.op.filename
2033
2034     myname = utils.HostInfo().name
2035
2036     for node in self.nodes:
2037       if node == myname:
2038         continue
2039       if not ssh.CopyFileToNode(node, filename):
2040         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2041
2042
2043 class LUDumpClusterConfig(NoHooksLU):
2044   """Return a text-representation of the cluster-config.
2045
2046   """
2047   _OP_REQP = []
2048
2049   def CheckPrereq(self):
2050     """No prerequisites.
2051
2052     """
2053     pass
2054
2055   def Exec(self, feedback_fn):
2056     """Dump a representation of the cluster config to the standard output.
2057
2058     """
2059     return self.cfg.DumpConfig()
2060
2061
2062 class LURunClusterCommand(NoHooksLU):
2063   """Run a command on some nodes.
2064
2065   """
2066   _OP_REQP = ["command", "nodes"]
2067
2068   def CheckPrereq(self):
2069     """Check prerequisites.
2070
2071     It checks that the given list of nodes is valid.
2072
2073     """
2074     self.nodes = _GetWantedNodes(self, self.op.nodes)
2075
2076   def Exec(self, feedback_fn):
2077     """Run a command on some nodes.
2078
2079     """
2080     # put the master at the end of the nodes list
2081     master_node = self.sstore.GetMasterNode()
2082     if master_node in self.nodes:
2083       self.nodes.remove(master_node)
2084       self.nodes.append(master_node)
2085
2086     data = []
2087     for node in self.nodes:
2088       result = ssh.SSHCall(node, "root", self.op.command)
2089       data.append((node, result.output, result.exit_code))
2090
2091     return data
2092
2093
2094 class LUActivateInstanceDisks(NoHooksLU):
2095   """Bring up an instance's disks.
2096
2097   """
2098   _OP_REQP = ["instance_name"]
2099
2100   def CheckPrereq(self):
2101     """Check prerequisites.
2102
2103     This checks that the instance is in the cluster.
2104
2105     """
2106     instance = self.cfg.GetInstanceInfo(
2107       self.cfg.ExpandInstanceName(self.op.instance_name))
2108     if instance is None:
2109       raise errors.OpPrereqError("Instance '%s' not known" %
2110                                  self.op.instance_name)
2111     self.instance = instance
2112
2113
2114   def Exec(self, feedback_fn):
2115     """Activate the disks.
2116
2117     """
2118     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2119     if not disks_ok:
2120       raise errors.OpExecError("Cannot activate block devices")
2121
2122     return disks_info
2123
2124
2125 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2126   """Prepare the block devices for an instance.
2127
2128   This sets up the block devices on all nodes.
2129
2130   Args:
2131     instance: a ganeti.objects.Instance object
2132     ignore_secondaries: if true, errors on secondary nodes won't result
2133                         in an error return from the function
2134
2135   Returns:
2136     false if the operation failed
2137     list of (host, instance_visible_name, node_visible_name) if the operation
2138          suceeded with the mapping from node devices to instance devices
2139   """
2140   device_info = []
2141   disks_ok = True
2142   iname = instance.name
2143   # With the two passes mechanism we try to reduce the window of
2144   # opportunity for the race condition of switching DRBD to primary
2145   # before handshaking occured, but we do not eliminate it
2146
2147   # The proper fix would be to wait (with some limits) until the
2148   # connection has been made and drbd transitions from WFConnection
2149   # into any other network-connected state (Connected, SyncTarget,
2150   # SyncSource, etc.)
2151
2152   # 1st pass, assemble on all nodes in secondary mode
2153   for inst_disk in instance.disks:
2154     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2155       cfg.SetDiskID(node_disk, node)
2156       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2157       if not result:
2158         logger.Error("could not prepare block device %s on node %s"
2159                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2160         if not ignore_secondaries:
2161           disks_ok = False
2162
2163   # FIXME: race condition on drbd migration to primary
2164
2165   # 2nd pass, do only the primary node
2166   for inst_disk in instance.disks:
2167     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2168       if node != instance.primary_node:
2169         continue
2170       cfg.SetDiskID(node_disk, node)
2171       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2172       if not result:
2173         logger.Error("could not prepare block device %s on node %s"
2174                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2175         disks_ok = False
2176     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2177
2178   # leave the disks configured for the primary node
2179   # this is a workaround that would be fixed better by
2180   # improving the logical/physical id handling
2181   for disk in instance.disks:
2182     cfg.SetDiskID(disk, instance.primary_node)
2183
2184   return disks_ok, device_info
2185
2186
2187 def _StartInstanceDisks(cfg, instance, force):
2188   """Start the disks of an instance.
2189
2190   """
2191   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2192                                            ignore_secondaries=force)
2193   if not disks_ok:
2194     _ShutdownInstanceDisks(instance, cfg)
2195     if force is not None and not force:
2196       logger.Error("If the message above refers to a secondary node,"
2197                    " you can retry the operation using '--force'.")
2198     raise errors.OpExecError("Disk consistency error")
2199
2200
2201 class LUDeactivateInstanceDisks(NoHooksLU):
2202   """Shutdown an instance's disks.
2203
2204   """
2205   _OP_REQP = ["instance_name"]
2206
2207   def CheckPrereq(self):
2208     """Check prerequisites.
2209
2210     This checks that the instance is in the cluster.
2211
2212     """
2213     instance = self.cfg.GetInstanceInfo(
2214       self.cfg.ExpandInstanceName(self.op.instance_name))
2215     if instance is None:
2216       raise errors.OpPrereqError("Instance '%s' not known" %
2217                                  self.op.instance_name)
2218     self.instance = instance
2219
2220   def Exec(self, feedback_fn):
2221     """Deactivate the disks
2222
2223     """
2224     instance = self.instance
2225     ins_l = rpc.call_instance_list([instance.primary_node])
2226     ins_l = ins_l[instance.primary_node]
2227     if not type(ins_l) is list:
2228       raise errors.OpExecError("Can't contact node '%s'" %
2229                                instance.primary_node)
2230
2231     if self.instance.name in ins_l:
2232       raise errors.OpExecError("Instance is running, can't shutdown"
2233                                " block devices.")
2234
2235     _ShutdownInstanceDisks(instance, self.cfg)
2236
2237
2238 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2239   """Shutdown block devices of an instance.
2240
2241   This does the shutdown on all nodes of the instance.
2242
2243   If the ignore_primary is false, errors on the primary node are
2244   ignored.
2245
2246   """
2247   result = True
2248   for disk in instance.disks:
2249     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2250       cfg.SetDiskID(top_disk, node)
2251       if not rpc.call_blockdev_shutdown(node, top_disk):
2252         logger.Error("could not shutdown block device %s on node %s" %
2253                      (disk.iv_name, node))
2254         if not ignore_primary or node != instance.primary_node:
2255           result = False
2256   return result
2257
2258
2259 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2260   """Checks if a node has enough free memory.
2261
2262   This function check if a given node has the needed amount of free
2263   memory. In case the node has less memory or we cannot get the
2264   information from the node, this function raise an OpPrereqError
2265   exception.
2266
2267   Args:
2268     - cfg: a ConfigWriter instance
2269     - node: the node name
2270     - reason: string to use in the error message
2271     - requested: the amount of memory in MiB
2272
2273   """
2274   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2275   if not (nodeinfo and isinstance(nodeinfo, dict) and
2276           node in nodeinfo and isinstance(nodeinfo[node], dict)):
2277     raise errors.OpPrereqError("Could not contact node %s for resource"
2278                              " information" % (node,))
2279
2280   free_mem = nodeinfo[node].get('memory_free')
2281   if not isinstance(free_mem, int):
2282     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2283                              " was '%s'" % (node, free_mem))
2284   if requested > free_mem:
2285     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2286                              " needed %s MiB, available %s MiB" %
2287                              (node, reason, requested, free_mem))
2288
2289
2290 class LUStartupInstance(LogicalUnit):
2291   """Starts an instance.
2292
2293   """
2294   HPATH = "instance-start"
2295   HTYPE = constants.HTYPE_INSTANCE
2296   _OP_REQP = ["instance_name", "force"]
2297
2298   def BuildHooksEnv(self):
2299     """Build hooks env.
2300
2301     This runs on master, primary and secondary nodes of the instance.
2302
2303     """
2304     env = {
2305       "FORCE": self.op.force,
2306       }
2307     env.update(_BuildInstanceHookEnvByObject(self.instance))
2308     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2309           list(self.instance.secondary_nodes))
2310     return env, nl, nl
2311
2312   def CheckPrereq(self):
2313     """Check prerequisites.
2314
2315     This checks that the instance is in the cluster.
2316
2317     """
2318     instance = self.cfg.GetInstanceInfo(
2319       self.cfg.ExpandInstanceName(self.op.instance_name))
2320     if instance is None:
2321       raise errors.OpPrereqError("Instance '%s' not known" %
2322                                  self.op.instance_name)
2323
2324     # check bridges existance
2325     _CheckInstanceBridgesExist(instance)
2326
2327     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2328                          "starting instance %s" % instance.name,
2329                          instance.memory)
2330
2331     self.instance = instance
2332     self.op.instance_name = instance.name
2333
2334   def Exec(self, feedback_fn):
2335     """Start the instance.
2336
2337     """
2338     instance = self.instance
2339     force = self.op.force
2340     extra_args = getattr(self.op, "extra_args", "")
2341
2342     self.cfg.MarkInstanceUp(instance.name)
2343
2344     node_current = instance.primary_node
2345
2346     _StartInstanceDisks(self.cfg, instance, force)
2347
2348     if not rpc.call_instance_start(node_current, instance, extra_args):
2349       _ShutdownInstanceDisks(instance, self.cfg)
2350       raise errors.OpExecError("Could not start instance")
2351
2352
2353 class LURebootInstance(LogicalUnit):
2354   """Reboot an instance.
2355
2356   """
2357   HPATH = "instance-reboot"
2358   HTYPE = constants.HTYPE_INSTANCE
2359   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2360
2361   def BuildHooksEnv(self):
2362     """Build hooks env.
2363
2364     This runs on master, primary and secondary nodes of the instance.
2365
2366     """
2367     env = {
2368       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2369       }
2370     env.update(_BuildInstanceHookEnvByObject(self.instance))
2371     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2372           list(self.instance.secondary_nodes))
2373     return env, nl, nl
2374
2375   def CheckPrereq(self):
2376     """Check prerequisites.
2377
2378     This checks that the instance is in the cluster.
2379
2380     """
2381     instance = self.cfg.GetInstanceInfo(
2382       self.cfg.ExpandInstanceName(self.op.instance_name))
2383     if instance is None:
2384       raise errors.OpPrereqError("Instance '%s' not known" %
2385                                  self.op.instance_name)
2386
2387     # check bridges existance
2388     _CheckInstanceBridgesExist(instance)
2389
2390     self.instance = instance
2391     self.op.instance_name = instance.name
2392
2393   def Exec(self, feedback_fn):
2394     """Reboot the instance.
2395
2396     """
2397     instance = self.instance
2398     ignore_secondaries = self.op.ignore_secondaries
2399     reboot_type = self.op.reboot_type
2400     extra_args = getattr(self.op, "extra_args", "")
2401
2402     node_current = instance.primary_node
2403
2404     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2405                            constants.INSTANCE_REBOOT_HARD,
2406                            constants.INSTANCE_REBOOT_FULL]:
2407       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2408                                   (constants.INSTANCE_REBOOT_SOFT,
2409                                    constants.INSTANCE_REBOOT_HARD,
2410                                    constants.INSTANCE_REBOOT_FULL))
2411
2412     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2413                        constants.INSTANCE_REBOOT_HARD]:
2414       if not rpc.call_instance_reboot(node_current, instance,
2415                                       reboot_type, extra_args):
2416         raise errors.OpExecError("Could not reboot instance")
2417     else:
2418       if not rpc.call_instance_shutdown(node_current, instance):
2419         raise errors.OpExecError("could not shutdown instance for full reboot")
2420       _ShutdownInstanceDisks(instance, self.cfg)
2421       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2422       if not rpc.call_instance_start(node_current, instance, extra_args):
2423         _ShutdownInstanceDisks(instance, self.cfg)
2424         raise errors.OpExecError("Could not start instance for full reboot")
2425
2426     self.cfg.MarkInstanceUp(instance.name)
2427
2428
2429 class LUShutdownInstance(LogicalUnit):
2430   """Shutdown an instance.
2431
2432   """
2433   HPATH = "instance-stop"
2434   HTYPE = constants.HTYPE_INSTANCE
2435   _OP_REQP = ["instance_name"]
2436
2437   def BuildHooksEnv(self):
2438     """Build hooks env.
2439
2440     This runs on master, primary and secondary nodes of the instance.
2441
2442     """
2443     env = _BuildInstanceHookEnvByObject(self.instance)
2444     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2445           list(self.instance.secondary_nodes))
2446     return env, nl, nl
2447
2448   def CheckPrereq(self):
2449     """Check prerequisites.
2450
2451     This checks that the instance is in the cluster.
2452
2453     """
2454     instance = self.cfg.GetInstanceInfo(
2455       self.cfg.ExpandInstanceName(self.op.instance_name))
2456     if instance is None:
2457       raise errors.OpPrereqError("Instance '%s' not known" %
2458                                  self.op.instance_name)
2459     self.instance = instance
2460
2461   def Exec(self, feedback_fn):
2462     """Shutdown the instance.
2463
2464     """
2465     instance = self.instance
2466     node_current = instance.primary_node
2467     self.cfg.MarkInstanceDown(instance.name)
2468     if not rpc.call_instance_shutdown(node_current, instance):
2469       logger.Error("could not shutdown instance")
2470
2471     _ShutdownInstanceDisks(instance, self.cfg)
2472
2473
2474 class LUReinstallInstance(LogicalUnit):
2475   """Reinstall an instance.
2476
2477   """
2478   HPATH = "instance-reinstall"
2479   HTYPE = constants.HTYPE_INSTANCE
2480   _OP_REQP = ["instance_name"]
2481
2482   def BuildHooksEnv(self):
2483     """Build hooks env.
2484
2485     This runs on master, primary and secondary nodes of the instance.
2486
2487     """
2488     env = _BuildInstanceHookEnvByObject(self.instance)
2489     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2490           list(self.instance.secondary_nodes))
2491     return env, nl, nl
2492
2493   def CheckPrereq(self):
2494     """Check prerequisites.
2495
2496     This checks that the instance is in the cluster and is not running.
2497
2498     """
2499     instance = self.cfg.GetInstanceInfo(
2500       self.cfg.ExpandInstanceName(self.op.instance_name))
2501     if instance is None:
2502       raise errors.OpPrereqError("Instance '%s' not known" %
2503                                  self.op.instance_name)
2504     if instance.disk_template == constants.DT_DISKLESS:
2505       raise errors.OpPrereqError("Instance '%s' has no disks" %
2506                                  self.op.instance_name)
2507     if instance.status != "down":
2508       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2509                                  self.op.instance_name)
2510     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2511     if remote_info:
2512       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2513                                  (self.op.instance_name,
2514                                   instance.primary_node))
2515
2516     self.op.os_type = getattr(self.op, "os_type", None)
2517     if self.op.os_type is not None:
2518       # OS verification
2519       pnode = self.cfg.GetNodeInfo(
2520         self.cfg.ExpandNodeName(instance.primary_node))
2521       if pnode is None:
2522         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2523                                    self.op.pnode)
2524       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2525       if not os_obj:
2526         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2527                                    " primary node"  % self.op.os_type)
2528
2529     self.instance = instance
2530
2531   def Exec(self, feedback_fn):
2532     """Reinstall the instance.
2533
2534     """
2535     inst = self.instance
2536
2537     if self.op.os_type is not None:
2538       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2539       inst.os = self.op.os_type
2540       self.cfg.AddInstance(inst)
2541
2542     _StartInstanceDisks(self.cfg, inst, None)
2543     try:
2544       feedback_fn("Running the instance OS create scripts...")
2545       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2546         raise errors.OpExecError("Could not install OS for instance %s"
2547                                  " on node %s" %
2548                                  (inst.name, inst.primary_node))
2549     finally:
2550       _ShutdownInstanceDisks(inst, self.cfg)
2551
2552
2553 class LURenameInstance(LogicalUnit):
2554   """Rename an instance.
2555
2556   """
2557   HPATH = "instance-rename"
2558   HTYPE = constants.HTYPE_INSTANCE
2559   _OP_REQP = ["instance_name", "new_name"]
2560
2561   def BuildHooksEnv(self):
2562     """Build hooks env.
2563
2564     This runs on master, primary and secondary nodes of the instance.
2565
2566     """
2567     env = _BuildInstanceHookEnvByObject(self.instance)
2568     env["INSTANCE_NEW_NAME"] = self.op.new_name
2569     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2570           list(self.instance.secondary_nodes))
2571     return env, nl, nl
2572
2573   def CheckPrereq(self):
2574     """Check prerequisites.
2575
2576     This checks that the instance is in the cluster and is not running.
2577
2578     """
2579     instance = self.cfg.GetInstanceInfo(
2580       self.cfg.ExpandInstanceName(self.op.instance_name))
2581     if instance is None:
2582       raise errors.OpPrereqError("Instance '%s' not known" %
2583                                  self.op.instance_name)
2584     if instance.status != "down":
2585       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2586                                  self.op.instance_name)
2587     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2588     if remote_info:
2589       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2590                                  (self.op.instance_name,
2591                                   instance.primary_node))
2592     self.instance = instance
2593
2594     # new name verification
2595     name_info = utils.HostInfo(self.op.new_name)
2596
2597     self.op.new_name = new_name = name_info.name
2598     instance_list = self.cfg.GetInstanceList()
2599     if new_name in instance_list:
2600       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2601                                  new_name)
2602
2603     if not getattr(self.op, "ignore_ip", False):
2604       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2605         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2606                                    (name_info.ip, new_name))
2607
2608
2609   def Exec(self, feedback_fn):
2610     """Reinstall the instance.
2611
2612     """
2613     inst = self.instance
2614     old_name = inst.name
2615
2616     self.cfg.RenameInstance(inst.name, self.op.new_name)
2617
2618     # re-read the instance from the configuration after rename
2619     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2620
2621     _StartInstanceDisks(self.cfg, inst, None)
2622     try:
2623       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2624                                           "sda", "sdb"):
2625         msg = ("Could not run OS rename script for instance %s on node %s"
2626                " (but the instance has been renamed in Ganeti)" %
2627                (inst.name, inst.primary_node))
2628         logger.Error(msg)
2629     finally:
2630       _ShutdownInstanceDisks(inst, self.cfg)
2631
2632
2633 class LURemoveInstance(LogicalUnit):
2634   """Remove an instance.
2635
2636   """
2637   HPATH = "instance-remove"
2638   HTYPE = constants.HTYPE_INSTANCE
2639   _OP_REQP = ["instance_name", "ignore_failures"]
2640
2641   def BuildHooksEnv(self):
2642     """Build hooks env.
2643
2644     This runs on master, primary and secondary nodes of the instance.
2645
2646     """
2647     env = _BuildInstanceHookEnvByObject(self.instance)
2648     nl = [self.sstore.GetMasterNode()]
2649     return env, nl, nl
2650
2651   def CheckPrereq(self):
2652     """Check prerequisites.
2653
2654     This checks that the instance is in the cluster.
2655
2656     """
2657     instance = self.cfg.GetInstanceInfo(
2658       self.cfg.ExpandInstanceName(self.op.instance_name))
2659     if instance is None:
2660       raise errors.OpPrereqError("Instance '%s' not known" %
2661                                  self.op.instance_name)
2662     self.instance = instance
2663
2664   def Exec(self, feedback_fn):
2665     """Remove the instance.
2666
2667     """
2668     instance = self.instance
2669     logger.Info("shutting down instance %s on node %s" %
2670                 (instance.name, instance.primary_node))
2671
2672     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2673       if self.op.ignore_failures:
2674         feedback_fn("Warning: can't shutdown instance")
2675       else:
2676         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2677                                  (instance.name, instance.primary_node))
2678
2679     logger.Info("removing block devices for instance %s" % instance.name)
2680
2681     if not _RemoveDisks(instance, self.cfg):
2682       if self.op.ignore_failures:
2683         feedback_fn("Warning: can't remove instance's disks")
2684       else:
2685         raise errors.OpExecError("Can't remove instance's disks")
2686
2687     logger.Info("removing instance %s out of cluster config" % instance.name)
2688
2689     self.cfg.RemoveInstance(instance.name)
2690
2691
2692 class LUQueryInstances(NoHooksLU):
2693   """Logical unit for querying instances.
2694
2695   """
2696   _OP_REQP = ["output_fields", "names"]
2697
2698   def CheckPrereq(self):
2699     """Check prerequisites.
2700
2701     This checks that the fields required are valid output fields.
2702
2703     """
2704     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2705     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2706                                "admin_state", "admin_ram",
2707                                "disk_template", "ip", "mac", "bridge",
2708                                "sda_size", "sdb_size", "vcpus", "tags",
2709                                "auto_balance",
2710                                "network_port", "kernel_path", "initrd_path",
2711                                "hvm_boot_order", "hvm_acpi", "hvm_pae",
2712                                "hvm_cdrom_image_path", "hvm_nic_type",
2713                                "hvm_disk_type", "vnc_bind_address"],
2714                        dynamic=self.dynamic_fields,
2715                        selected=self.op.output_fields)
2716
2717     self.wanted = _GetWantedInstances(self, self.op.names)
2718
2719   def Exec(self, feedback_fn):
2720     """Computes the list of nodes and their attributes.
2721
2722     """
2723     instance_names = self.wanted
2724     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2725                      in instance_names]
2726
2727     # begin data gathering
2728
2729     nodes = frozenset([inst.primary_node for inst in instance_list])
2730
2731     bad_nodes = []
2732     if self.dynamic_fields.intersection(self.op.output_fields):
2733       live_data = {}
2734       node_data = rpc.call_all_instances_info(nodes)
2735       for name in nodes:
2736         result = node_data[name]
2737         if result:
2738           live_data.update(result)
2739         elif result == False:
2740           bad_nodes.append(name)
2741         # else no instance is alive
2742     else:
2743       live_data = dict([(name, {}) for name in instance_names])
2744
2745     # end data gathering
2746
2747     output = []
2748     for instance in instance_list:
2749       iout = []
2750       for field in self.op.output_fields:
2751         if field == "name":
2752           val = instance.name
2753         elif field == "os":
2754           val = instance.os
2755         elif field == "pnode":
2756           val = instance.primary_node
2757         elif field == "snodes":
2758           val = list(instance.secondary_nodes)
2759         elif field == "admin_state":
2760           val = (instance.status != "down")
2761         elif field == "oper_state":
2762           if instance.primary_node in bad_nodes:
2763             val = None
2764           else:
2765             val = bool(live_data.get(instance.name))
2766         elif field == "status":
2767           if instance.primary_node in bad_nodes:
2768             val = "ERROR_nodedown"
2769           else:
2770             running = bool(live_data.get(instance.name))
2771             if running:
2772               if instance.status != "down":
2773                 val = "running"
2774               else:
2775                 val = "ERROR_up"
2776             else:
2777               if instance.status != "down":
2778                 val = "ERROR_down"
2779               else:
2780                 val = "ADMIN_down"
2781         elif field == "admin_ram":
2782           val = instance.memory
2783         elif field == "oper_ram":
2784           if instance.primary_node in bad_nodes:
2785             val = None
2786           elif instance.name in live_data:
2787             val = live_data[instance.name].get("memory", "?")
2788           else:
2789             val = "-"
2790         elif field == "disk_template":
2791           val = instance.disk_template
2792         elif field == "ip":
2793           val = instance.nics[0].ip
2794         elif field == "bridge":
2795           val = instance.nics[0].bridge
2796         elif field == "mac":
2797           val = instance.nics[0].mac
2798         elif field == "sda_size" or field == "sdb_size":
2799           disk = instance.FindDisk(field[:3])
2800           if disk is None:
2801             val = None
2802           else:
2803             val = disk.size
2804         elif field == "vcpus":
2805           val = instance.vcpus
2806         elif field == "tags":
2807           val = list(instance.GetTags())
2808         elif field == "auto_balance":
2809           val = instance.auto_balance
2810         elif field in ("network_port", "kernel_path", "initrd_path",
2811                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2812                        "hvm_cdrom_image_path", "hvm_nic_type",
2813                        "hvm_disk_type", "vnc_bind_address"):
2814           val = getattr(instance, field, None)
2815           if val is None:
2816             if field in ("hvm_nic_type", "hvm_disk_type",
2817                          "kernel_path", "initrd_path"):
2818               val = "default"
2819             else:
2820               val = "-"
2821         else:
2822           raise errors.ParameterError(field)
2823         iout.append(val)
2824       output.append(iout)
2825
2826     return output
2827
2828
2829 class LUFailoverInstance(LogicalUnit):
2830   """Failover an instance.
2831
2832   """
2833   HPATH = "instance-failover"
2834   HTYPE = constants.HTYPE_INSTANCE
2835   _OP_REQP = ["instance_name", "ignore_consistency"]
2836
2837   def BuildHooksEnv(self):
2838     """Build hooks env.
2839
2840     This runs on master, primary and secondary nodes of the instance.
2841
2842     """
2843     env = {
2844       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2845       }
2846     env.update(_BuildInstanceHookEnvByObject(self.instance))
2847     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2848     return env, nl, nl
2849
2850   def CheckPrereq(self):
2851     """Check prerequisites.
2852
2853     This checks that the instance is in the cluster.
2854
2855     """
2856     instance = self.cfg.GetInstanceInfo(
2857       self.cfg.ExpandInstanceName(self.op.instance_name))
2858     if instance is None:
2859       raise errors.OpPrereqError("Instance '%s' not known" %
2860                                  self.op.instance_name)
2861
2862     if instance.disk_template not in constants.DTS_NET_MIRROR:
2863       raise errors.OpPrereqError("Instance's disk layout is not"
2864                                  " network mirrored, cannot failover.")
2865
2866     secondary_nodes = instance.secondary_nodes
2867     if not secondary_nodes:
2868       raise errors.ProgrammerError("no secondary node but using "
2869                                    "DT_REMOTE_RAID1 template")
2870
2871     target_node = secondary_nodes[0]
2872     # check memory requirements on the secondary node
2873     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2874                          instance.name, instance.memory)
2875
2876     # check bridge existance
2877     brlist = [nic.bridge for nic in instance.nics]
2878     if not rpc.call_bridges_exist(target_node, brlist):
2879       raise errors.OpPrereqError("One or more target bridges %s does not"
2880                                  " exist on destination node '%s'" %
2881                                  (brlist, target_node))
2882
2883     self.instance = instance
2884
2885   def Exec(self, feedback_fn):
2886     """Failover an instance.
2887
2888     The failover is done by shutting it down on its present node and
2889     starting it on the secondary.
2890
2891     """
2892     instance = self.instance
2893
2894     source_node = instance.primary_node
2895     target_node = instance.secondary_nodes[0]
2896
2897     feedback_fn("* checking disk consistency between source and target")
2898     for dev in instance.disks:
2899       # for remote_raid1, these are md over drbd
2900       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2901         if instance.status == "up" and not self.op.ignore_consistency:
2902           raise errors.OpExecError("Disk %s is degraded on target node,"
2903                                    " aborting failover." % dev.iv_name)
2904
2905     feedback_fn("* shutting down instance on source node")
2906     logger.Info("Shutting down instance %s on node %s" %
2907                 (instance.name, source_node))
2908
2909     if not rpc.call_instance_shutdown(source_node, instance):
2910       if self.op.ignore_consistency:
2911         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2912                      " anyway. Please make sure node %s is down"  %
2913                      (instance.name, source_node, source_node))
2914       else:
2915         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2916                                  (instance.name, source_node))
2917
2918     feedback_fn("* deactivating the instance's disks on source node")
2919     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2920       raise errors.OpExecError("Can't shut down the instance's disks.")
2921
2922     instance.primary_node = target_node
2923     # distribute new instance config to the other nodes
2924     self.cfg.Update(instance)
2925
2926     # Only start the instance if it's marked as up
2927     if instance.status == "up":
2928       feedback_fn("* activating the instance's disks on target node")
2929       logger.Info("Starting instance %s on node %s" %
2930                   (instance.name, target_node))
2931
2932       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2933                                                ignore_secondaries=True)
2934       if not disks_ok:
2935         _ShutdownInstanceDisks(instance, self.cfg)
2936         raise errors.OpExecError("Can't activate the instance's disks")
2937
2938       feedback_fn("* starting the instance on the target node")
2939       if not rpc.call_instance_start(target_node, instance, None):
2940         _ShutdownInstanceDisks(instance, self.cfg)
2941         raise errors.OpExecError("Could not start instance %s on node %s." %
2942                                  (instance.name, target_node))
2943
2944
2945 class LUMigrateInstance(LogicalUnit):
2946   """Migrate an instance.
2947
2948   This is migration without shutting down, compared to the failover,
2949   which is done with shutdown.
2950
2951   """
2952   HPATH = "instance-migrate"
2953   HTYPE = constants.HTYPE_INSTANCE
2954   _OP_REQP = ["instance_name", "live", "cleanup"]
2955
2956   def BuildHooksEnv(self):
2957     """Build hooks env.
2958
2959     This runs on master, primary and secondary nodes of the instance.
2960
2961     """
2962     env = _BuildInstanceHookEnvByObject(self.instance)
2963     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2964     return env, nl, nl
2965
2966   def CheckPrereq(self):
2967     """Check prerequisites.
2968
2969     This checks that the instance is in the cluster.
2970
2971     """
2972     instance = self.cfg.GetInstanceInfo(
2973       self.cfg.ExpandInstanceName(self.op.instance_name))
2974     if instance is None:
2975       raise errors.OpPrereqError("Instance '%s' not known" %
2976                                  self.op.instance_name)
2977
2978     if instance.disk_template != constants.DT_DRBD8:
2979       raise errors.OpPrereqError("Instance's disk layout is not"
2980                                  " drbd8, cannot migrate.")
2981
2982     secondary_nodes = instance.secondary_nodes
2983     if not secondary_nodes:
2984       raise errors.ProgrammerError("no secondary node but using "
2985                                    "drbd8 disk template")
2986
2987     target_node = secondary_nodes[0]
2988     # check memory requirements on the secondary node
2989     _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2990                          instance.name, instance.memory)
2991
2992     # check bridge existance
2993     brlist = [nic.bridge for nic in instance.nics]
2994     if not rpc.call_bridges_exist(target_node, brlist):
2995       raise errors.OpPrereqError("One or more target bridges %s does not"
2996                                  " exist on destination node '%s'" %
2997                                  (brlist, target_node))
2998
2999     if not self.op.cleanup:
3000       migratable = rpc.call_instance_migratable(instance.primary_node,
3001                                                 instance)
3002       if not migratable:
3003         raise errors.OpPrereqError("Can't contact node '%s'" %
3004                                    instance.primary_node)
3005       if not migratable[0]:
3006         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3007                                    migratable[1])
3008
3009     self.instance = instance
3010
3011   def _WaitUntilSync(self):
3012     """Poll with custom rpc for disk sync.
3013
3014     This uses our own step-based rpc call.
3015
3016     """
3017     self.feedback_fn("* wait until resync is done")
3018     all_done = False
3019     while not all_done:
3020       all_done = True
3021       result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3022                                           self.instance.disks,
3023                                           self.nodes_ip, False,
3024                                           constants.DRBD_RECONF_RPC_WFSYNC)
3025       min_percent = 100
3026       for node in self.all_nodes:
3027         if not result[node] or not result[node][0]:
3028           raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
3029         node_done, node_percent = result[node][1]
3030         all_done = all_done and node_done
3031         if node_percent is not None:
3032           min_percent = min(min_percent, node_percent)
3033       if not all_done:
3034         if min_percent < 100:
3035           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3036         time.sleep(2)
3037
3038   def _EnsureSecondary(self, node):
3039     """Demote a node to secondary.
3040
3041     """
3042     self.feedback_fn("* switching node %s to secondary mode" % node)
3043     result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3044                                         self.instance.disks,
3045                                         self.nodes_ip, False,
3046                                         constants.DRBD_RECONF_RPC_SECONDARY)
3047     if not result[node] or not result[node][0]:
3048         raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3049                                  " error %s" %
3050                                  (node, result[node][1]))
3051
3052   def _GoStandalone(self):
3053     """Disconnect from the network.
3054
3055     """
3056     self.feedback_fn("* changing into standalone mode")
3057     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3058                                         self.instance.disks,
3059                                         self.nodes_ip, True,
3060                                         constants.DRBD_RECONF_RPC_DISCONNECT)
3061     for node in self.all_nodes:
3062       if not result[node] or not result[node][0]:
3063         raise errors.OpExecError("Cannot disconnect disks node %s,"
3064                                  " error %s" % (node, result[node][1]))
3065
3066   def _GoReconnect(self, multimaster):
3067     """Reconnect to the network.
3068
3069     """
3070     if multimaster:
3071       msg = "dual-master"
3072     else:
3073       msg = "single-master"
3074     self.feedback_fn("* changing disks into %s mode" % msg)
3075     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3076                                         self.instance.disks,
3077                                         self.nodes_ip,
3078                                         multimaster,
3079                                         constants.DRBD_RECONF_RPC_RECONNECT)
3080     for node in self.all_nodes:
3081       if not result[node] or not result[node][0]:
3082         raise errors.OpExecError("Cannot change disks config on node %s,"
3083                                  " error %s" % (node, result[node][1]))
3084
3085   def _IdentifyDisks(self):
3086     """Start the migration RPC sequence.
3087
3088     """
3089     self.feedback_fn("* identifying disks")
3090     result = rpc.call_drbd_reconfig_net(self.all_nodes,
3091                                         self.instance.name,
3092                                         self.instance.disks,
3093                                         self.nodes_ip, True,
3094                                         constants.DRBD_RECONF_RPC_INIT)
3095     for node in self.all_nodes:
3096       if not result[node] or not result[node][0]:
3097         raise errors.OpExecError("Cannot identify disks node %s,"
3098                                  " error %s" % (node, result[node][1]))
3099
3100   def _ExecCleanup(self):
3101     """Try to cleanup after a failed migration.
3102
3103     The cleanup is done by:
3104       - check that the instance is running only on one node
3105         (and update the config if needed)
3106       - change disks on its secondary node to secondary
3107       - wait until disks are fully synchronized
3108       - disconnect from the network
3109       - change disks into single-master mode
3110       - wait again until disks are fully synchronized
3111
3112     """
3113     instance = self.instance
3114     target_node = self.target_node
3115     source_node = self.source_node
3116
3117     # check running on only one node
3118     self.feedback_fn("* checking where the instance actually runs"
3119                      " (if this hangs, the hypervisor might be in"
3120                      " a bad state)")
3121     ins_l = rpc.call_instance_list(self.all_nodes)
3122     for node in self.all_nodes:
3123       if not type(ins_l[node]) is list:
3124         raise errors.OpExecError("Can't contact node '%s'" % node)
3125
3126     runningon_source = instance.name in ins_l[source_node]
3127     runningon_target = instance.name in ins_l[target_node]
3128
3129     if runningon_source and runningon_target:
3130       raise errors.OpExecError("Instance seems to be running on two nodes,"
3131                                " or the hypervisor is confused. You will have"
3132                                " to ensure manually that it runs only on one"
3133                                " and restart this operation.")
3134
3135     if not (runningon_source or runningon_target):
3136       raise errors.OpExecError("Instance does not seem to be running at all."
3137                                " In this case, it's safer to repair by"
3138                                " running 'gnt-instance stop' to ensure disk"
3139                                " shutdown, and then restarting it.")
3140
3141     if runningon_target:
3142       # the migration has actually succeeded, we need to update the config
3143       self.feedback_fn("* instance running on secondary node (%s),"
3144                        " updating config" % target_node)
3145       instance.primary_node = target_node
3146       self.cfg.Update(instance)
3147       demoted_node = source_node
3148     else:
3149       self.feedback_fn("* instance confirmed to be running on its"
3150                        " primary node (%s)" % source_node)
3151       demoted_node = target_node
3152
3153     self._IdentifyDisks()
3154
3155     self._EnsureSecondary(demoted_node)
3156     self._WaitUntilSync()
3157     self._GoStandalone()
3158     self._GoReconnect(False)
3159     self._WaitUntilSync()
3160
3161     self.feedback_fn("* done")
3162
3163   def _ExecMigration(self):
3164     """Migrate an instance.
3165
3166     The migrate is done by:
3167       - change the disks into dual-master mode
3168       - wait until disks are fully synchronized again
3169       - migrate the instance
3170       - change disks on the new secondary node (the old primary) to secondary
3171       - wait until disks are fully synchronized
3172       - change disks into single-master mode
3173
3174     """
3175     instance = self.instance
3176     target_node = self.target_node
3177     source_node = self.source_node
3178
3179     self.feedback_fn("* checking disk consistency between source and target")
3180     for dev in instance.disks:
3181       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3182         raise errors.OpExecError("Disk %s is degraded or not fully"
3183                                  " synchronized on target node,"
3184                                  " aborting migrate." % dev.iv_name)
3185
3186     self._IdentifyDisks()
3187
3188     self._EnsureSecondary(target_node)
3189     self._GoStandalone()
3190     self._GoReconnect(True)
3191     self._WaitUntilSync()
3192
3193     self.feedback_fn("* migrating instance to %s" % target_node)
3194     time.sleep(10)
3195     result = rpc.call_instance_migrate(source_node, instance,
3196                                        self.nodes_ip[target_node],
3197                                        self.op.live)
3198     if not result or not result[0]:
3199       logger.Error("Instance migration failed, trying to revert disk status")
3200       try:
3201         self._EnsureSecondary(target_node)
3202         self._GoStandalone()
3203         self._GoReconnect(False)
3204         self._WaitUntilSync()
3205       except errors.OpExecError, err:
3206         logger.Error("Can't reconnect the drives: error '%s'\n"
3207                      "Please look and recover the instance status" % str(err))
3208
3209       raise errors.OpExecError("Could not migrate instance %s: %s" %
3210                                (instance.name, result[1]))
3211     time.sleep(10)
3212
3213     instance.primary_node = target_node
3214     # distribute new instance config to the other nodes
3215     self.cfg.Update(instance)
3216
3217     self._EnsureSecondary(source_node)
3218     self._WaitUntilSync()
3219     self._GoStandalone()
3220     self._GoReconnect(False)
3221     self._WaitUntilSync()
3222
3223     self.feedback_fn("* done")
3224
3225   def Exec(self, feedback_fn):
3226     """Perform the migration.
3227
3228     """
3229     self.feedback_fn = feedback_fn
3230
3231     self.source_node = self.instance.primary_node
3232     self.target_node = self.instance.secondary_nodes[0]
3233     self.all_nodes = [self.source_node, self.target_node]
3234     self.nodes_ip = {
3235       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3236       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3237       }
3238     if self.op.cleanup:
3239       return self._ExecCleanup()
3240     else:
3241       return self._ExecMigration()
3242
3243
3244 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3245   """Create a tree of block devices on the primary node.
3246
3247   This always creates all devices.
3248
3249   """
3250   if device.children:
3251     for child in device.children:
3252       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3253         return False
3254
3255   cfg.SetDiskID(device, node)
3256   new_id = rpc.call_blockdev_create(node, device, device.size,
3257                                     instance.name, True, info)
3258   if not new_id:
3259     return False
3260   if device.physical_id is None:
3261     device.physical_id = new_id
3262   return True
3263
3264
3265 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3266   """Create a tree of block devices on a secondary node.
3267
3268   If this device type has to be created on secondaries, create it and
3269   all its children.
3270
3271   If not, just recurse to children keeping the same 'force' value.
3272
3273   """
3274   if device.CreateOnSecondary():
3275     force = True
3276   if device.children:
3277     for child in device.children:
3278       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3279                                         child, force, info):
3280         return False
3281
3282   if not force:
3283     return True
3284   cfg.SetDiskID(device, node)
3285   new_id = rpc.call_blockdev_create(node, device, device.size,
3286                                     instance.name, False, info)
3287   if not new_id:
3288     return False
3289   if device.physical_id is None:
3290     device.physical_id = new_id
3291   return True
3292
3293
3294 def _GenerateUniqueNames(cfg, exts):
3295   """Generate a suitable LV name.
3296
3297   This will generate a logical volume name for the given instance.
3298
3299   """
3300   results = []
3301   for val in exts:
3302     new_id = cfg.GenerateUniqueID()
3303     results.append("%s%s" % (new_id, val))
3304   return results
3305
3306
3307 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3308   """Generate a drbd device complete with its children.
3309
3310   """
3311   port = cfg.AllocatePort()
3312   vgname = cfg.GetVGName()
3313   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3314                           logical_id=(vgname, names[0]))
3315   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3316                           logical_id=(vgname, names[1]))
3317   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3318                           logical_id = (primary, secondary, port),
3319                           children = [dev_data, dev_meta])
3320   return drbd_dev
3321
3322
3323 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3324   """Generate a drbd8 device complete with its children.
3325
3326   """
3327   port = cfg.AllocatePort()
3328   vgname = cfg.GetVGName()
3329   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3330                           logical_id=(vgname, names[0]))
3331   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3332                           logical_id=(vgname, names[1]))
3333   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3334                           logical_id = (primary, secondary, port),
3335                           children = [dev_data, dev_meta],
3336                           iv_name=iv_name)
3337   return drbd_dev
3338
3339 def _GenerateDiskTemplate(cfg, template_name,
3340                           instance_name, primary_node,
3341                           secondary_nodes, disk_sz, swap_sz):
3342   """Generate the entire disk layout for a given template type.
3343
3344   """
3345   #TODO: compute space requirements
3346
3347   vgname = cfg.GetVGName()
3348   if template_name == constants.DT_DISKLESS:
3349     disks = []
3350   elif template_name == constants.DT_PLAIN:
3351     if len(secondary_nodes) != 0:
3352       raise errors.ProgrammerError("Wrong template configuration")
3353
3354     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3355     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3356                            logical_id=(vgname, names[0]),
3357                            iv_name = "sda")
3358     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3359                            logical_id=(vgname, names[1]),
3360                            iv_name = "sdb")
3361     disks = [sda_dev, sdb_dev]
3362   elif template_name == constants.DT_LOCAL_RAID1:
3363     if len(secondary_nodes) != 0:
3364       raise errors.ProgrammerError("Wrong template configuration")
3365
3366
3367     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3368                                        ".sdb_m1", ".sdb_m2"])
3369     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3370                               logical_id=(vgname, names[0]))
3371     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3372                               logical_id=(vgname, names[1]))
3373     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3374                               size=disk_sz,
3375                               children = [sda_dev_m1, sda_dev_m2])
3376     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3377                               logical_id=(vgname, names[2]))
3378     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3379                               logical_id=(vgname, names[3]))
3380     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3381                               size=swap_sz,
3382                               children = [sdb_dev_m1, sdb_dev_m2])
3383     disks = [md_sda_dev, md_sdb_dev]
3384   elif template_name == constants.DT_REMOTE_RAID1:
3385     if len(secondary_nodes) != 1:
3386       raise errors.ProgrammerError("Wrong template configuration")
3387     remote_node = secondary_nodes[0]
3388     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3389                                        ".sdb_data", ".sdb_meta"])
3390     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3391                                          disk_sz, names[0:2])
3392     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3393                               children = [drbd_sda_dev], size=disk_sz)
3394     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3395                                          swap_sz, names[2:4])
3396     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3397                               children = [drbd_sdb_dev], size=swap_sz)
3398     disks = [md_sda_dev, md_sdb_dev]
3399   elif template_name == constants.DT_DRBD8:
3400     if len(secondary_nodes) != 1:
3401       raise errors.ProgrammerError("Wrong template configuration")
3402     remote_node = secondary_nodes[0]
3403     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3404                                        ".sdb_data", ".sdb_meta"])
3405     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3406                                          disk_sz, names[0:2], "sda")
3407     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3408                                          swap_sz, names[2:4], "sdb")
3409     disks = [drbd_sda_dev, drbd_sdb_dev]
3410   else:
3411     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3412   return disks
3413
3414
3415 def _GetInstanceInfoText(instance):
3416   """Compute that text that should be added to the disk's metadata.
3417
3418   """
3419   return "originstname+%s" % instance.name
3420
3421
3422 def _CreateDisks(cfg, instance):
3423   """Create all disks for an instance.
3424
3425   This abstracts away some work from AddInstance.
3426
3427   Args:
3428     instance: the instance object
3429
3430   Returns:
3431     True or False showing the success of the creation process
3432
3433   """
3434   info = _GetInstanceInfoText(instance)
3435
3436   for device in instance.disks:
3437     logger.Info("creating volume %s for instance %s" %
3438               (device.iv_name, instance.name))
3439     #HARDCODE
3440     for secondary_node in instance.secondary_nodes:
3441       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3442                                         device, False, info):
3443         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3444                      (device.iv_name, device, secondary_node))
3445         return False
3446     #HARDCODE
3447     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3448                                     instance, device, info):
3449       logger.Error("failed to create volume %s on primary!" %
3450                    device.iv_name)
3451       return False
3452   return True
3453
3454
3455 def _RemoveDisks(instance, cfg):
3456   """Remove all disks for an instance.
3457
3458   This abstracts away some work from `AddInstance()` and
3459   `RemoveInstance()`. Note that in case some of the devices couldn't
3460   be removed, the removal will continue with the other ones (compare
3461   with `_CreateDisks()`).
3462
3463   Args:
3464     instance: the instance object
3465
3466   Returns:
3467     True or False showing the success of the removal proces
3468
3469   """
3470   logger.Info("removing block devices for instance %s" % instance.name)
3471
3472   result = True
3473   for device in instance.disks:
3474     for node, disk in device.ComputeNodeTree(instance.primary_node):
3475       cfg.SetDiskID(disk, node)
3476       if not rpc.call_blockdev_remove(node, disk):
3477         logger.Error("could not remove block device %s on node %s,"
3478                      " continuing anyway" %
3479                      (device.iv_name, node))
3480         result = False
3481   return result
3482
3483
3484 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3485   """Compute disk size requirements in the volume group
3486
3487   This is currently hard-coded for the two-drive layout.
3488
3489   """
3490   # Required free disk space as a function of disk and swap space
3491   req_size_dict = {
3492     constants.DT_DISKLESS: None,
3493     constants.DT_PLAIN: disk_size + swap_size,
3494     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3495     # 256 MB are added for drbd metadata, 128MB for each drbd device
3496     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3497     constants.DT_DRBD8: disk_size + swap_size + 256,
3498   }
3499
3500   if disk_template not in req_size_dict:
3501     raise errors.ProgrammerError("Disk template '%s' size requirement"
3502                                  " is unknown" %  disk_template)
3503
3504   return req_size_dict[disk_template]
3505
3506
3507 class LUCreateInstance(LogicalUnit):
3508   """Create an instance.
3509
3510   """
3511   HPATH = "instance-add"
3512   HTYPE = constants.HTYPE_INSTANCE
3513   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3514               "disk_template", "swap_size", "mode", "start", "vcpus",
3515               "wait_for_sync", "ip_check", "mac", "auto_balance"]
3516
3517   def _RunAllocator(self):
3518     """Run the allocator based on input opcode.
3519
3520     """
3521     disks = [{"size": self.op.disk_size, "mode": "w"},
3522              {"size": self.op.swap_size, "mode": "w"}]
3523     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3524              "bridge": self.op.bridge}]
3525     ial = IAllocator(self.cfg, self.sstore,
3526                      mode=constants.IALLOCATOR_MODE_ALLOC,
3527                      name=self.op.instance_name,
3528                      disk_template=self.op.disk_template,
3529                      tags=[],
3530                      os=self.op.os_type,
3531                      vcpus=self.op.vcpus,
3532                      mem_size=self.op.mem_size,
3533                      disks=disks,
3534                      nics=nics,
3535                      )
3536
3537     ial.Run(self.op.iallocator)
3538
3539     if not ial.success:
3540       raise errors.OpPrereqError("Can't compute nodes using"
3541                                  " iallocator '%s': %s" % (self.op.iallocator,
3542                                                            ial.info))
3543     if len(ial.nodes) != ial.required_nodes:
3544       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3545                                  " of nodes (%s), required %s" %
3546                                  (self.op.iallocator, len(ial.nodes),
3547                                   ial.required_nodes))
3548     self.op.pnode = ial.nodes[0]
3549     logger.ToStdout("Selected nodes for the instance: %s" %
3550                     (", ".join(ial.nodes),))
3551     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3552                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3553     if ial.required_nodes == 2:
3554       self.op.snode = ial.nodes[1]
3555
3556   def BuildHooksEnv(self):
3557     """Build hooks env.
3558
3559     This runs on master, primary and secondary nodes of the instance.
3560
3561     """
3562     env = {
3563       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3564       "INSTANCE_DISK_SIZE": self.op.disk_size,
3565       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3566       "INSTANCE_ADD_MODE": self.op.mode,
3567       }
3568     if self.op.mode == constants.INSTANCE_IMPORT:
3569       env["INSTANCE_SRC_NODE"] = self.op.src_node
3570       env["INSTANCE_SRC_PATH"] = self.op.src_path
3571       env["INSTANCE_SRC_IMAGE"] = self.src_image
3572
3573     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3574       primary_node=self.op.pnode,
3575       secondary_nodes=self.secondaries,
3576       status=self.instance_status,
3577       os_type=self.op.os_type,
3578       memory=self.op.mem_size,
3579       vcpus=self.op.vcpus,
3580       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3581     ))
3582
3583     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3584           self.secondaries)
3585     return env, nl, nl
3586
3587
3588   def CheckPrereq(self):
3589     """Check prerequisites.
3590
3591     """
3592     # set optional parameters to none if they don't exist
3593     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3594                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3595                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3596       if not hasattr(self.op, attr):
3597         setattr(self.op, attr, None)
3598
3599     if self.op.mode not in (constants.INSTANCE_CREATE,
3600                             constants.INSTANCE_IMPORT):
3601       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3602                                  self.op.mode)
3603
3604     if self.op.mode == constants.INSTANCE_IMPORT:
3605       src_node = getattr(self.op, "src_node", None)
3606       src_path = getattr(self.op, "src_path", None)
3607       if src_node is None or src_path is None:
3608         raise errors.OpPrereqError("Importing an instance requires source"
3609                                    " node and path options")
3610       src_node_full = self.cfg.ExpandNodeName(src_node)
3611       if src_node_full is None:
3612         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3613       self.op.src_node = src_node = src_node_full
3614
3615       if not os.path.isabs(src_path):
3616         raise errors.OpPrereqError("The source path must be absolute")
3617
3618       export_info = rpc.call_export_info(src_node, src_path)
3619
3620       if not export_info:
3621         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3622
3623       if not export_info.has_section(constants.INISECT_EXP):
3624         raise errors.ProgrammerError("Corrupted export config")
3625
3626       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3627       if (int(ei_version) != constants.EXPORT_VERSION):
3628         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3629                                    (ei_version, constants.EXPORT_VERSION))
3630
3631       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3632         raise errors.OpPrereqError("Can't import instance with more than"
3633                                    " one data disk")
3634
3635       # FIXME: are the old os-es, disk sizes, etc. useful?
3636       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3637       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3638                                                          'disk0_dump'))
3639       self.src_image = diskimage
3640     else: # INSTANCE_CREATE
3641       if getattr(self.op, "os_type", None) is None:
3642         raise errors.OpPrereqError("No guest OS specified")
3643
3644     #### instance parameters check
3645
3646     # disk template and mirror node verification
3647     if self.op.disk_template not in constants.DISK_TEMPLATES:
3648       raise errors.OpPrereqError("Invalid disk template name")
3649
3650     # instance name verification
3651     hostname1 = utils.HostInfo(self.op.instance_name)
3652
3653     self.op.instance_name = instance_name = hostname1.name
3654     instance_list = self.cfg.GetInstanceList()
3655     if instance_name in instance_list:
3656       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3657                                  instance_name)
3658
3659     # ip validity checks
3660     ip = getattr(self.op, "ip", None)
3661     if ip is None or ip.lower() == "none":
3662       inst_ip = None
3663     elif ip.lower() == "auto":
3664       inst_ip = hostname1.ip
3665     else:
3666       if not utils.IsValidIP(ip):
3667         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3668                                    " like a valid IP" % ip)
3669       inst_ip = ip
3670     self.inst_ip = self.op.ip = inst_ip
3671
3672     if self.op.start and not self.op.ip_check:
3673       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3674                                  " adding an instance in start mode")
3675
3676     if self.op.ip_check:
3677       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3678         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3679                                    (hostname1.ip, instance_name))
3680
3681     # MAC address verification
3682     if self.op.mac != "auto":
3683       if not utils.IsValidMac(self.op.mac.lower()):
3684         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3685                                    self.op.mac)
3686
3687     # bridge verification
3688     bridge = getattr(self.op, "bridge", None)
3689     if bridge is None:
3690       self.op.bridge = self.cfg.GetDefBridge()
3691     else:
3692       self.op.bridge = bridge
3693
3694     # boot order verification
3695     if self.op.hvm_boot_order is not None:
3696       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3697         raise errors.OpPrereqError("invalid boot order specified,"
3698                                    " must be one or more of [acdn]")
3699     #### allocator run
3700
3701     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3702       raise errors.OpPrereqError("One and only one of iallocator and primary"
3703                                  " node must be given")
3704
3705     if self.op.iallocator is not None:
3706       self._RunAllocator()
3707
3708     #### node related checks
3709
3710     # check primary node
3711     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3712     if pnode is None:
3713       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3714                                  self.op.pnode)
3715     self.op.pnode = pnode.name
3716     self.pnode = pnode
3717     self.secondaries = []
3718
3719     # mirror node verification
3720     if self.op.disk_template in constants.DTS_NET_MIRROR:
3721       if getattr(self.op, "snode", None) is None:
3722         raise errors.OpPrereqError("The networked disk templates need"
3723                                    " a mirror node")
3724
3725       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3726       if snode_name is None:
3727         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3728                                    self.op.snode)
3729       elif snode_name == pnode.name:
3730         raise errors.OpPrereqError("The secondary node cannot be"
3731                                    " the primary node.")
3732       self.secondaries.append(snode_name)
3733
3734     req_size = _ComputeDiskSize(self.op.disk_template,
3735                                 self.op.disk_size, self.op.swap_size)
3736
3737     # Check lv size requirements
3738     if req_size is not None:
3739       nodenames = [pnode.name] + self.secondaries
3740       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3741       for node in nodenames:
3742         info = nodeinfo.get(node, None)
3743         if not info:
3744           raise errors.OpPrereqError("Cannot get current information"
3745                                      " from node '%s'" % node)
3746         vg_free = info.get('vg_free', None)
3747         if not isinstance(vg_free, int):
3748           raise errors.OpPrereqError("Can't compute free disk space on"
3749                                      " node %s" % node)
3750         if req_size > info['vg_free']:
3751           raise errors.OpPrereqError("Not enough disk space on target node %s."
3752                                      " %d MB available, %d MB required" %
3753                                      (node, info['vg_free'], req_size))
3754
3755     # os verification
3756     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3757     if not os_obj:
3758       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3759                                  " primary node"  % self.op.os_type)
3760
3761     if self.op.kernel_path == constants.VALUE_NONE:
3762       raise errors.OpPrereqError("Can't set instance kernel to none")
3763
3764
3765     # bridge check on primary node
3766     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3767       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3768                                  " destination node '%s'" %
3769                                  (self.op.bridge, pnode.name))
3770
3771     # memory check on primary node
3772     if self.op.start:
3773       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3774                            "creating instance %s" % self.op.instance_name,
3775                            self.op.mem_size)
3776
3777     # hvm_cdrom_image_path verification
3778     if self.op.hvm_cdrom_image_path is not None:
3779       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3780         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3781                                    " be an absolute path or None, not %s" %
3782                                    self.op.hvm_cdrom_image_path)
3783       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3784         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3785                                    " regular file or a symlink pointing to"
3786                                    " an existing regular file, not %s" %
3787                                    self.op.hvm_cdrom_image_path)
3788
3789     # vnc_bind_address verification
3790     if self.op.vnc_bind_address is not None:
3791       if not utils.IsValidIP(self.op.vnc_bind_address):
3792         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3793                                    " like a valid IP address" %
3794                                    self.op.vnc_bind_address)
3795
3796     # Xen HVM device type checks
3797     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3798       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3799         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3800                                    " hypervisor" % self.op.hvm_nic_type)
3801       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3802         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3803                                    " hypervisor" % self.op.hvm_disk_type)
3804
3805     if self.op.start:
3806       self.instance_status = 'up'
3807     else:
3808       self.instance_status = 'down'
3809
3810   def Exec(self, feedback_fn):
3811     """Create and add the instance to the cluster.
3812
3813     """
3814     instance = self.op.instance_name
3815     pnode_name = self.pnode.name
3816
3817     if self.op.mac == "auto":
3818       mac_address = self.cfg.GenerateMAC()
3819     else:
3820       mac_address = self.op.mac
3821
3822     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3823     if self.inst_ip is not None:
3824       nic.ip = self.inst_ip
3825
3826     ht_kind = self.sstore.GetHypervisorType()
3827     if ht_kind in constants.HTS_REQ_PORT:
3828       network_port = self.cfg.AllocatePort()
3829     else:
3830       network_port = None
3831
3832     if self.op.vnc_bind_address is None:
3833       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3834
3835     disks = _GenerateDiskTemplate(self.cfg,
3836                                   self.op.disk_template,
3837                                   instance, pnode_name,
3838                                   self.secondaries, self.op.disk_size,
3839                                   self.op.swap_size)
3840
3841     iobj = objects.Instance(name=instance, os=self.op.os_type,
3842                             primary_node=pnode_name,
3843                             memory=self.op.mem_size,
3844                             vcpus=self.op.vcpus,
3845                             nics=[nic], disks=disks,
3846                             disk_template=self.op.disk_template,
3847                             status=self.instance_status,
3848                             network_port=network_port,
3849                             kernel_path=self.op.kernel_path,
3850                             initrd_path=self.op.initrd_path,
3851                             hvm_boot_order=self.op.hvm_boot_order,
3852                             hvm_acpi=self.op.hvm_acpi,
3853                             hvm_pae=self.op.hvm_pae,
3854                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3855                             vnc_bind_address=self.op.vnc_bind_address,
3856                             hvm_nic_type=self.op.hvm_nic_type,
3857                             hvm_disk_type=self.op.hvm_disk_type,
3858                             auto_balance=bool(self.op.auto_balance),
3859                             )
3860
3861     feedback_fn("* creating instance disks...")
3862     if not _CreateDisks(self.cfg, iobj):
3863       _RemoveDisks(iobj, self.cfg)
3864       raise errors.OpExecError("Device creation failed, reverting...")
3865
3866     feedback_fn("adding instance %s to cluster config" % instance)
3867
3868     self.cfg.AddInstance(iobj)
3869
3870     if self.op.wait_for_sync:
3871       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3872     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3873       # make sure the disks are not degraded (still sync-ing is ok)
3874       time.sleep(15)
3875       feedback_fn("* checking mirrors status")
3876       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3877     else:
3878       disk_abort = False
3879
3880     if disk_abort:
3881       _RemoveDisks(iobj, self.cfg)
3882       self.cfg.RemoveInstance(iobj.name)
3883       raise errors.OpExecError("There are some degraded disks for"
3884                                " this instance")
3885
3886     feedback_fn("creating os for instance %s on node %s" %
3887                 (instance, pnode_name))
3888
3889     if iobj.disk_template != constants.DT_DISKLESS:
3890       if self.op.mode == constants.INSTANCE_CREATE:
3891         feedback_fn("* running the instance OS create scripts...")
3892         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3893           raise errors.OpExecError("could not add os for instance %s"
3894                                    " on node %s" %
3895                                    (instance, pnode_name))
3896
3897       elif self.op.mode == constants.INSTANCE_IMPORT:
3898         feedback_fn("* running the instance OS import scripts...")
3899         src_node = self.op.src_node
3900         src_image = self.src_image
3901         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3902                                                 src_node, src_image):
3903           raise errors.OpExecError("Could not import os for instance"
3904                                    " %s on node %s" %
3905                                    (instance, pnode_name))
3906       else:
3907         # also checked in the prereq part
3908         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3909                                      % self.op.mode)
3910
3911     if self.op.start:
3912       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3913       feedback_fn("* starting instance...")
3914       if not rpc.call_instance_start(pnode_name, iobj, None):
3915         raise errors.OpExecError("Could not start instance")
3916
3917
3918 class LUConnectConsole(NoHooksLU):
3919   """Connect to an instance's console.
3920
3921   This is somewhat special in that it returns the command line that
3922   you need to run on the master node in order to connect to the
3923   console.
3924
3925   """
3926   _OP_REQP = ["instance_name"]
3927
3928   def CheckPrereq(self):
3929     """Check prerequisites.
3930
3931     This checks that the instance is in the cluster.
3932
3933     """
3934     instance = self.cfg.GetInstanceInfo(
3935       self.cfg.ExpandInstanceName(self.op.instance_name))
3936     if instance is None:
3937       raise errors.OpPrereqError("Instance '%s' not known" %
3938                                  self.op.instance_name)
3939     self.instance = instance
3940
3941   def Exec(self, feedback_fn):
3942     """Connect to the console of an instance
3943
3944     """
3945     instance = self.instance
3946     node = instance.primary_node
3947
3948     node_insts = rpc.call_instance_list([node])[node]
3949     if node_insts is False:
3950       raise errors.OpExecError("Can't connect to node %s." % node)
3951
3952     if instance.name not in node_insts:
3953       raise errors.OpExecError("Instance %s is not running." % instance.name)
3954
3955     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3956
3957     hyper = hypervisor.GetHypervisor()
3958     console_cmd = hyper.GetShellCommandForConsole(instance)
3959     # build ssh cmdline
3960     argv = ["ssh", "-q", "-t"]
3961     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3962     argv.extend(ssh.BATCH_MODE_OPTS)
3963     argv.append(node)
3964     argv.append(console_cmd)
3965     return "ssh", argv
3966
3967
3968 class LUAddMDDRBDComponent(LogicalUnit):
3969   """Adda new mirror member to an instance's disk.
3970
3971   """
3972   HPATH = "mirror-add"
3973   HTYPE = constants.HTYPE_INSTANCE
3974   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3975
3976   def BuildHooksEnv(self):
3977     """Build hooks env.
3978
3979     This runs on the master, the primary and all the secondaries.
3980
3981     """
3982     env = {
3983       "NEW_SECONDARY": self.op.remote_node,
3984       "DISK_NAME": self.op.disk_name,
3985       }
3986     env.update(_BuildInstanceHookEnvByObject(self.instance))
3987     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3988           self.op.remote_node,] + list(self.instance.secondary_nodes)
3989     return env, nl, nl
3990
3991   def CheckPrereq(self):
3992     """Check prerequisites.
3993
3994     This checks that the instance is in the cluster.
3995
3996     """
3997     instance = self.cfg.GetInstanceInfo(
3998       self.cfg.ExpandInstanceName(self.op.instance_name))
3999     if instance is None:
4000       raise errors.OpPrereqError("Instance '%s' not known" %
4001                                  self.op.instance_name)
4002     self.instance = instance
4003
4004     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4005     if remote_node is None:
4006       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
4007     self.remote_node = remote_node
4008
4009     if remote_node == instance.primary_node:
4010       raise errors.OpPrereqError("The specified node is the primary node of"
4011                                  " the instance.")
4012
4013     if instance.disk_template != constants.DT_REMOTE_RAID1:
4014       raise errors.OpPrereqError("Instance's disk layout is not"
4015                                  " remote_raid1.")
4016     for disk in instance.disks:
4017       if disk.iv_name == self.op.disk_name:
4018         break
4019     else:
4020       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4021                                  " instance." % self.op.disk_name)
4022     if len(disk.children) > 1:
4023       raise errors.OpPrereqError("The device already has two slave devices."
4024                                  " This would create a 3-disk raid1 which we"
4025                                  " don't allow.")
4026     self.disk = disk
4027
4028   def Exec(self, feedback_fn):
4029     """Add the mirror component
4030
4031     """
4032     disk = self.disk
4033     instance = self.instance
4034
4035     remote_node = self.remote_node
4036     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4037     names = _GenerateUniqueNames(self.cfg, lv_names)
4038     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4039                                      remote_node, disk.size, names)
4040
4041     logger.Info("adding new mirror component on secondary")
4042     #HARDCODE
4043     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4044                                       new_drbd, False,
4045                                       _GetInstanceInfoText(instance)):
4046       raise errors.OpExecError("Failed to create new component on secondary"
4047                                " node %s" % remote_node)
4048
4049     logger.Info("adding new mirror component on primary")
4050     #HARDCODE
4051     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4052                                     instance, new_drbd,
4053                                     _GetInstanceInfoText(instance)):
4054       # remove secondary dev
4055       self.cfg.SetDiskID(new_drbd, remote_node)
4056       rpc.call_blockdev_remove(remote_node, new_drbd)
4057       raise errors.OpExecError("Failed to create volume on primary")
4058
4059     # the device exists now
4060     # call the primary node to add the mirror to md
4061     logger.Info("adding new mirror component to md")
4062     if not rpc.call_blockdev_addchildren(instance.primary_node,
4063                                          disk, [new_drbd]):
4064       logger.Error("Can't add mirror compoment to md!")
4065       self.cfg.SetDiskID(new_drbd, remote_node)
4066       if not rpc.call_blockdev_remove(remote_node, new_drbd):
4067         logger.Error("Can't rollback on secondary")
4068       self.cfg.SetDiskID(new_drbd, instance.primary_node)
4069       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4070         logger.Error("Can't rollback on primary")
4071       raise errors.OpExecError("Can't add mirror component to md array")
4072
4073     disk.children.append(new_drbd)
4074
4075     self.cfg.AddInstance(instance)
4076
4077     _WaitForSync(self.cfg, instance, self.proc)
4078
4079     return 0
4080
4081
4082 class LURemoveMDDRBDComponent(LogicalUnit):
4083   """Remove a component from a remote_raid1 disk.
4084
4085   """
4086   HPATH = "mirror-remove"
4087   HTYPE = constants.HTYPE_INSTANCE
4088   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4089
4090   def BuildHooksEnv(self):
4091     """Build hooks env.
4092
4093     This runs on the master, the primary and all the secondaries.
4094
4095     """
4096     env = {
4097       "DISK_NAME": self.op.disk_name,
4098       "DISK_ID": self.op.disk_id,
4099       "OLD_SECONDARY": self.old_secondary,
4100       }
4101     env.update(_BuildInstanceHookEnvByObject(self.instance))
4102     nl = [self.sstore.GetMasterNode(),
4103           self.instance.primary_node] + list(self.instance.secondary_nodes)
4104     return env, nl, nl
4105
4106   def CheckPrereq(self):
4107     """Check prerequisites.
4108
4109     This checks that the instance is in the cluster.
4110
4111     """
4112     instance = self.cfg.GetInstanceInfo(
4113       self.cfg.ExpandInstanceName(self.op.instance_name))
4114     if instance is None:
4115       raise errors.OpPrereqError("Instance '%s' not known" %
4116                                  self.op.instance_name)
4117     self.instance = instance
4118
4119     if instance.disk_template != constants.DT_REMOTE_RAID1:
4120       raise errors.OpPrereqError("Instance's disk layout is not"
4121                                  " remote_raid1.")
4122     for disk in instance.disks:
4123       if disk.iv_name == self.op.disk_name:
4124         break
4125     else:
4126       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4127                                  " instance." % self.op.disk_name)
4128     for child in disk.children:
4129       if (child.dev_type == constants.LD_DRBD7 and
4130           child.logical_id[2] == self.op.disk_id):
4131         break
4132     else:
4133       raise errors.OpPrereqError("Can't find the device with this port.")
4134
4135     if len(disk.children) < 2:
4136       raise errors.OpPrereqError("Cannot remove the last component from"
4137                                  " a mirror.")
4138     self.disk = disk
4139     self.child = child
4140     if self.child.logical_id[0] == instance.primary_node:
4141       oid = 1
4142     else:
4143       oid = 0
4144     self.old_secondary = self.child.logical_id[oid]
4145
4146   def Exec(self, feedback_fn):
4147     """Remove the mirror component
4148
4149     """
4150     instance = self.instance
4151     disk = self.disk
4152     child = self.child
4153     logger.Info("remove mirror component")
4154     self.cfg.SetDiskID(disk, instance.primary_node)
4155     if not rpc.call_blockdev_removechildren(instance.primary_node,
4156                                             disk, [child]):
4157       raise errors.OpExecError("Can't remove child from mirror.")
4158
4159     for node in child.logical_id[:2]:
4160       self.cfg.SetDiskID(child, node)
4161       if not rpc.call_blockdev_remove(node, child):
4162         logger.Error("Warning: failed to remove device from node %s,"
4163                      " continuing operation." % node)
4164
4165     disk.children.remove(child)
4166     self.cfg.AddInstance(instance)
4167
4168
4169 class LUReplaceDisks(LogicalUnit):
4170   """Replace the disks of an instance.
4171
4172   """
4173   HPATH = "mirrors-replace"
4174   HTYPE = constants.HTYPE_INSTANCE
4175   _OP_REQP = ["instance_name", "mode", "disks"]
4176
4177   def _RunAllocator(self):
4178     """Compute a new secondary node using an IAllocator.
4179
4180     """
4181     ial = IAllocator(self.cfg, self.sstore,
4182                      mode=constants.IALLOCATOR_MODE_RELOC,
4183                      name=self.op.instance_name,
4184                      relocate_from=[self.sec_node])
4185
4186     ial.Run(self.op.iallocator)
4187
4188     if not ial.success:
4189       raise errors.OpPrereqError("Can't compute nodes using"
4190                                  " iallocator '%s': %s" % (self.op.iallocator,
4191                                                            ial.info))
4192     if len(ial.nodes) != ial.required_nodes:
4193       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4194                                  " of nodes (%s), required %s" %
4195                                  (len(ial.nodes), ial.required_nodes))
4196     self.op.remote_node = ial.nodes[0]
4197     logger.ToStdout("Selected new secondary for the instance: %s" %
4198                     self.op.remote_node)
4199
4200   def BuildHooksEnv(self):
4201     """Build hooks env.
4202
4203     This runs on the master, the primary and all the secondaries.
4204
4205     """
4206     env = {
4207       "MODE": self.op.mode,
4208       "NEW_SECONDARY": self.op.remote_node,
4209       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4210       }
4211     env.update(_BuildInstanceHookEnvByObject(self.instance))
4212     nl = [
4213       self.sstore.GetMasterNode(),
4214       self.instance.primary_node,
4215       ]
4216     if self.op.remote_node is not None:
4217       nl.append(self.op.remote_node)
4218     return env, nl, nl
4219
4220   def CheckPrereq(self):
4221     """Check prerequisites.
4222
4223     This checks that the instance is in the cluster.
4224
4225     """
4226     if not hasattr(self.op, "remote_node"):
4227       self.op.remote_node = None
4228
4229     instance = self.cfg.GetInstanceInfo(
4230       self.cfg.ExpandInstanceName(self.op.instance_name))
4231     if instance is None:
4232       raise errors.OpPrereqError("Instance '%s' not known" %
4233                                  self.op.instance_name)
4234     self.instance = instance
4235     self.op.instance_name = instance.name
4236
4237     if instance.disk_template not in constants.DTS_NET_MIRROR:
4238       raise errors.OpPrereqError("Instance's disk layout is not"
4239                                  " network mirrored.")
4240
4241     if len(instance.secondary_nodes) != 1:
4242       raise errors.OpPrereqError("The instance has a strange layout,"
4243                                  " expected one secondary but found %d" %
4244                                  len(instance.secondary_nodes))
4245
4246     self.sec_node = instance.secondary_nodes[0]
4247
4248     ia_name = getattr(self.op, "iallocator", None)
4249     if ia_name is not None:
4250       if self.op.remote_node is not None:
4251         raise errors.OpPrereqError("Give either the iallocator or the new"
4252                                    " secondary, not both")
4253       self._RunAllocator()
4254
4255     remote_node = self.op.remote_node
4256     if remote_node is not None:
4257       remote_node = self.cfg.ExpandNodeName(remote_node)
4258       if remote_node is None:
4259         raise errors.OpPrereqError("Node '%s' not known" %
4260                                    self.op.remote_node)
4261       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4262     else:
4263       self.remote_node_info = None
4264     if remote_node == instance.primary_node:
4265       raise errors.OpPrereqError("The specified node is the primary node of"
4266                                  " the instance.")
4267     elif remote_node == self.sec_node:
4268       if self.op.mode == constants.REPLACE_DISK_SEC:
4269         # this is for DRBD8, where we can't execute the same mode of
4270         # replacement as for drbd7 (no different port allocated)
4271         raise errors.OpPrereqError("Same secondary given, cannot execute"
4272                                    " replacement")
4273       # the user gave the current secondary, switch to
4274       # 'no-replace-secondary' mode for drbd7
4275       remote_node = None
4276     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4277         self.op.mode != constants.REPLACE_DISK_ALL):
4278       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4279                                  " disks replacement, not individual ones")
4280     if instance.disk_template == constants.DT_DRBD8:
4281       if (self.op.mode == constants.REPLACE_DISK_ALL and
4282           remote_node is not None):
4283         # switch to replace secondary mode
4284         self.op.mode = constants.REPLACE_DISK_SEC
4285
4286       if self.op.mode == constants.REPLACE_DISK_ALL:
4287         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4288                                    " secondary disk replacement, not"
4289                                    " both at once")
4290       elif self.op.mode == constants.REPLACE_DISK_PRI:
4291         if remote_node is not None:
4292           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4293                                      " the secondary while doing a primary"
4294                                      " node disk replacement")
4295         self.tgt_node = instance.primary_node
4296         self.oth_node = instance.secondary_nodes[0]
4297       elif self.op.mode == constants.REPLACE_DISK_SEC:
4298         self.new_node = remote_node # this can be None, in which case
4299                                     # we don't change the secondary
4300         self.tgt_node = instance.secondary_nodes[0]
4301         self.oth_node = instance.primary_node
4302       else:
4303         raise errors.ProgrammerError("Unhandled disk replace mode")
4304
4305     for name in self.op.disks:
4306       if instance.FindDisk(name) is None:
4307         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4308                                    (name, instance.name))
4309     self.op.remote_node = remote_node
4310
4311   def _ExecRR1(self, feedback_fn):
4312     """Replace the disks of an instance.
4313
4314     """
4315     instance = self.instance
4316     iv_names = {}
4317     # start of work
4318     if self.op.remote_node is None:
4319       remote_node = self.sec_node
4320     else:
4321       remote_node = self.op.remote_node
4322     cfg = self.cfg
4323     for dev in instance.disks:
4324       size = dev.size
4325       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4326       names = _GenerateUniqueNames(cfg, lv_names)
4327       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4328                                        remote_node, size, names)
4329       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4330       logger.Info("adding new mirror component on secondary for %s" %
4331                   dev.iv_name)
4332       #HARDCODE
4333       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4334                                         new_drbd, False,
4335                                         _GetInstanceInfoText(instance)):
4336         raise errors.OpExecError("Failed to create new component on secondary"
4337                                  " node %s. Full abort, cleanup manually!" %
4338                                  remote_node)
4339
4340       logger.Info("adding new mirror component on primary")
4341       #HARDCODE
4342       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4343                                       instance, new_drbd,
4344                                       _GetInstanceInfoText(instance)):
4345         # remove secondary dev
4346         cfg.SetDiskID(new_drbd, remote_node)
4347         rpc.call_blockdev_remove(remote_node, new_drbd)
4348         raise errors.OpExecError("Failed to create volume on primary!"
4349                                  " Full abort, cleanup manually!!")
4350
4351       # the device exists now
4352       # call the primary node to add the mirror to md
4353       logger.Info("adding new mirror component to md")
4354       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4355                                            [new_drbd]):
4356         logger.Error("Can't add mirror compoment to md!")
4357         cfg.SetDiskID(new_drbd, remote_node)
4358         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4359           logger.Error("Can't rollback on secondary")
4360         cfg.SetDiskID(new_drbd, instance.primary_node)
4361         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4362           logger.Error("Can't rollback on primary")
4363         raise errors.OpExecError("Full abort, cleanup manually!!")
4364
4365       dev.children.append(new_drbd)
4366       cfg.AddInstance(instance)
4367
4368     # this can fail as the old devices are degraded and _WaitForSync
4369     # does a combined result over all disks, so we don't check its
4370     # return value
4371     _WaitForSync(cfg, instance, self.proc, unlock=True)
4372
4373     # so check manually all the devices
4374     for name in iv_names:
4375       dev, child, new_drbd = iv_names[name]
4376       cfg.SetDiskID(dev, instance.primary_node)
4377       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4378       if is_degr:
4379         raise errors.OpExecError("MD device %s is degraded!" % name)
4380       cfg.SetDiskID(new_drbd, instance.primary_node)
4381       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4382       if is_degr:
4383         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4384
4385     for name in iv_names:
4386       dev, child, new_drbd = iv_names[name]
4387       logger.Info("remove mirror %s component" % name)
4388       cfg.SetDiskID(dev, instance.primary_node)
4389       if not rpc.call_blockdev_removechildren(instance.primary_node,
4390                                               dev, [child]):
4391         logger.Error("Can't remove child from mirror, aborting"
4392                      " *this device cleanup*.\nYou need to cleanup manually!!")
4393         continue
4394
4395       for node in child.logical_id[:2]:
4396         logger.Info("remove child device on %s" % node)
4397         cfg.SetDiskID(child, node)
4398         if not rpc.call_blockdev_remove(node, child):
4399           logger.Error("Warning: failed to remove device from node %s,"
4400                        " continuing operation." % node)
4401
4402       dev.children.remove(child)
4403
4404       cfg.AddInstance(instance)
4405
4406   def _ExecD8DiskOnly(self, feedback_fn):
4407     """Replace a disk on the primary or secondary for dbrd8.
4408
4409     The algorithm for replace is quite complicated:
4410       - for each disk to be replaced:
4411         - create new LVs on the target node with unique names
4412         - detach old LVs from the drbd device
4413         - rename old LVs to name_replaced.<time_t>
4414         - rename new LVs to old LVs
4415         - attach the new LVs (with the old names now) to the drbd device
4416       - wait for sync across all devices
4417       - for each modified disk:
4418         - remove old LVs (which have the name name_replaces.<time_t>)
4419
4420     Failures are not very well handled.
4421
4422     """
4423     steps_total = 6
4424     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4425     instance = self.instance
4426     iv_names = {}
4427     vgname = self.cfg.GetVGName()
4428     # start of work
4429     cfg = self.cfg
4430     tgt_node = self.tgt_node
4431     oth_node = self.oth_node
4432
4433     # Step: check device activation
4434     self.proc.LogStep(1, steps_total, "check device existence")
4435     info("checking volume groups")
4436     my_vg = cfg.GetVGName()
4437     results = rpc.call_vg_list([oth_node, tgt_node])
4438     if not results:
4439       raise errors.OpExecError("Can't list volume groups on the nodes")
4440     for node in oth_node, tgt_node:
4441       res = results.get(node, False)
4442       if not res or my_vg not in res:
4443         raise errors.OpExecError("Volume group '%s' not found on %s" %
4444                                  (my_vg, node))
4445     for dev in instance.disks:
4446       if not dev.iv_name in self.op.disks:
4447         continue
4448       for node in tgt_node, oth_node:
4449         info("checking %s on %s" % (dev.iv_name, node))
4450         cfg.SetDiskID(dev, node)
4451         if not rpc.call_blockdev_find(node, dev):
4452           raise errors.OpExecError("Can't find device %s on node %s" %
4453                                    (dev.iv_name, node))
4454
4455     # Step: check other node consistency
4456     self.proc.LogStep(2, steps_total, "check peer consistency")
4457     for dev in instance.disks:
4458       if not dev.iv_name in self.op.disks:
4459         continue
4460       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4461       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4462                                    oth_node==instance.primary_node):
4463         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4464                                  " to replace disks on this node (%s)" %
4465                                  (oth_node, tgt_node))
4466
4467     # Step: create new storage
4468     self.proc.LogStep(3, steps_total, "allocate new storage")
4469     for dev in instance.disks:
4470       if not dev.iv_name in self.op.disks:
4471         continue
4472       size = dev.size
4473       cfg.SetDiskID(dev, tgt_node)
4474       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4475       names = _GenerateUniqueNames(cfg, lv_names)
4476       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4477                              logical_id=(vgname, names[0]))
4478       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4479                              logical_id=(vgname, names[1]))
4480       new_lvs = [lv_data, lv_meta]
4481       old_lvs = dev.children
4482       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4483       info("creating new local storage on %s for %s" %
4484            (tgt_node, dev.iv_name))
4485       # since we *always* want to create this LV, we use the
4486       # _Create...OnPrimary (which forces the creation), even if we
4487       # are talking about the secondary node
4488       for new_lv in new_lvs:
4489         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4490                                         _GetInstanceInfoText(instance)):
4491           raise errors.OpExecError("Failed to create new LV named '%s' on"
4492                                    " node '%s'" %
4493                                    (new_lv.logical_id[1], tgt_node))
4494
4495     # Step: for each lv, detach+rename*2+attach
4496     self.proc.LogStep(4, steps_total, "change drbd configuration")
4497     for dev, old_lvs, new_lvs in iv_names.itervalues():
4498       info("detaching %s drbd from local storage" % dev.iv_name)
4499       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4500         raise errors.OpExecError("Can't detach drbd from local storage on node"
4501                                  " %s for device %s" % (tgt_node, dev.iv_name))
4502       #dev.children = []
4503       #cfg.Update(instance)
4504
4505       # ok, we created the new LVs, so now we know we have the needed
4506       # storage; as such, we proceed on the target node to rename
4507       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4508       # using the assumption that logical_id == physical_id (which in
4509       # turn is the unique_id on that node)
4510
4511       # FIXME(iustin): use a better name for the replaced LVs
4512       temp_suffix = int(time.time())
4513       ren_fn = lambda d, suff: (d.physical_id[0],
4514                                 d.physical_id[1] + "_replaced-%s" % suff)
4515       # build the rename list based on what LVs exist on the node
4516       rlist = []
4517       for to_ren in old_lvs:
4518         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4519         if find_res is not None: # device exists
4520           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4521
4522       info("renaming the old LVs on the target node")
4523       if not rpc.call_blockdev_rename(tgt_node, rlist):
4524         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4525       # now we rename the new LVs to the old LVs
4526       info("renaming the new LVs on the target node")
4527       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4528       if not rpc.call_blockdev_rename(tgt_node, rlist):
4529         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4530
4531       for old, new in zip(old_lvs, new_lvs):
4532         new.logical_id = old.logical_id
4533         cfg.SetDiskID(new, tgt_node)
4534
4535       for disk in old_lvs:
4536         disk.logical_id = ren_fn(disk, temp_suffix)
4537         cfg.SetDiskID(disk, tgt_node)
4538
4539       # now that the new lvs have the old name, we can add them to the device
4540       info("adding new mirror component on %s" % tgt_node)
4541       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4542         for new_lv in new_lvs:
4543           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4544             warning("Can't rollback device %s", hint="manually cleanup unused"
4545                     " logical volumes")
4546         raise errors.OpExecError("Can't add local storage to drbd")
4547
4548       dev.children = new_lvs
4549       cfg.Update(instance)
4550
4551     # Step: wait for sync
4552
4553     # this can fail as the old devices are degraded and _WaitForSync
4554     # does a combined result over all disks, so we don't check its
4555     # return value
4556     self.proc.LogStep(5, steps_total, "sync devices")
4557     _WaitForSync(cfg, instance, self.proc, unlock=True)
4558
4559     # so check manually all the devices
4560     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4561       cfg.SetDiskID(dev, instance.primary_node)
4562       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4563       if is_degr:
4564         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4565
4566     # Step: remove old storage
4567     self.proc.LogStep(6, steps_total, "removing old storage")
4568     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4569       info("remove logical volumes for %s" % name)
4570       for lv in old_lvs:
4571         cfg.SetDiskID(lv, tgt_node)
4572         if not rpc.call_blockdev_remove(tgt_node, lv):
4573           warning("Can't remove old LV", hint="manually remove unused LVs")
4574           continue
4575
4576   def _ExecD8Secondary(self, feedback_fn):
4577     """Replace the secondary node for drbd8.
4578
4579     The algorithm for replace is quite complicated:
4580       - for all disks of the instance:
4581         - create new LVs on the new node with same names
4582         - shutdown the drbd device on the old secondary
4583         - disconnect the drbd network on the primary
4584         - create the drbd device on the new secondary
4585         - network attach the drbd on the primary, using an artifice:
4586           the drbd code for Attach() will connect to the network if it
4587           finds a device which is connected to the good local disks but
4588           not network enabled
4589       - wait for sync across all devices
4590       - remove all disks from the old secondary
4591
4592     Failures are not very well handled.
4593
4594     """
4595     steps_total = 6
4596     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4597     instance = self.instance
4598     iv_names = {}
4599     vgname = self.cfg.GetVGName()
4600     # start of work
4601     cfg = self.cfg
4602     old_node = self.tgt_node
4603     new_node = self.new_node
4604     pri_node = instance.primary_node
4605
4606     # Step: check device activation
4607     self.proc.LogStep(1, steps_total, "check device existence")
4608     info("checking volume groups")
4609     my_vg = cfg.GetVGName()
4610     results = rpc.call_vg_list([pri_node, new_node])
4611     if not results:
4612       raise errors.OpExecError("Can't list volume groups on the nodes")
4613     for node in pri_node, new_node:
4614       res = results.get(node, False)
4615       if not res or my_vg not in res:
4616         raise errors.OpExecError("Volume group '%s' not found on %s" %
4617                                  (my_vg, node))
4618     for dev in instance.disks:
4619       if not dev.iv_name in self.op.disks:
4620         continue
4621       info("checking %s on %s" % (dev.iv_name, pri_node))
4622       cfg.SetDiskID(dev, pri_node)
4623       if not rpc.call_blockdev_find(pri_node, dev):
4624         raise errors.OpExecError("Can't find device %s on node %s" %
4625                                  (dev.iv_name, pri_node))
4626
4627     # Step: check other node consistency
4628     self.proc.LogStep(2, steps_total, "check peer consistency")
4629     for dev in instance.disks:
4630       if not dev.iv_name in self.op.disks:
4631         continue
4632       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4633       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4634         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4635                                  " unsafe to replace the secondary" %
4636                                  pri_node)
4637
4638     # Step: create new storage
4639     self.proc.LogStep(3, steps_total, "allocate new storage")
4640     for dev in instance.disks:
4641       size = dev.size
4642       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4643       # since we *always* want to create this LV, we use the
4644       # _Create...OnPrimary (which forces the creation), even if we
4645       # are talking about the secondary node
4646       for new_lv in dev.children:
4647         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4648                                         _GetInstanceInfoText(instance)):
4649           raise errors.OpExecError("Failed to create new LV named '%s' on"
4650                                    " node '%s'" %
4651                                    (new_lv.logical_id[1], new_node))
4652
4653       iv_names[dev.iv_name] = (dev, dev.children)
4654
4655     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4656     for dev in instance.disks:
4657       size = dev.size
4658       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4659       # create new devices on new_node
4660       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4661                               logical_id=(pri_node, new_node,
4662                                           dev.logical_id[2]),
4663                               children=dev.children)
4664       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4665                                         new_drbd, False,
4666                                       _GetInstanceInfoText(instance)):
4667         raise errors.OpExecError("Failed to create new DRBD on"
4668                                  " node '%s'" % new_node)
4669
4670     for dev in instance.disks:
4671       # we have new devices, shutdown the drbd on the old secondary
4672       info("shutting down drbd for %s on old node" % dev.iv_name)
4673       cfg.SetDiskID(dev, old_node)
4674       if not rpc.call_blockdev_shutdown(old_node, dev):
4675         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4676                 hint="Please cleanup this device manually as soon as possible")
4677
4678     info("detaching primary drbds from the network (=> standalone)")
4679     done = 0
4680     for dev in instance.disks:
4681       cfg.SetDiskID(dev, pri_node)
4682       # set the physical (unique in bdev terms) id to None, meaning
4683       # detach from network
4684       dev.physical_id = (None,) * len(dev.physical_id)
4685       # and 'find' the device, which will 'fix' it to match the
4686       # standalone state
4687       if rpc.call_blockdev_find(pri_node, dev):
4688         done += 1
4689       else:
4690         warning("Failed to detach drbd %s from network, unusual case" %
4691                 dev.iv_name)
4692
4693     if not done:
4694       # no detaches succeeded (very unlikely)
4695       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4696
4697     # if we managed to detach at least one, we update all the disks of
4698     # the instance to point to the new secondary
4699     info("updating instance configuration")
4700     for dev in instance.disks:
4701       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4702       cfg.SetDiskID(dev, pri_node)
4703     cfg.Update(instance)
4704
4705     # and now perform the drbd attach
4706     info("attaching primary drbds to new secondary (standalone => connected)")
4707     failures = []
4708     for dev in instance.disks:
4709       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4710       # since the attach is smart, it's enough to 'find' the device,
4711       # it will automatically activate the network, if the physical_id
4712       # is correct
4713       cfg.SetDiskID(dev, pri_node)
4714       if not rpc.call_blockdev_find(pri_node, dev):
4715         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4716                 "please do a gnt-instance info to see the status of disks")
4717
4718     # this can fail as the old devices are degraded and _WaitForSync
4719     # does a combined result over all disks, so we don't check its
4720     # return value
4721     self.proc.LogStep(5, steps_total, "sync devices")
4722     _WaitForSync(cfg, instance, self.proc, unlock=True)
4723
4724     # so check manually all the devices
4725     for name, (dev, old_lvs) in iv_names.iteritems():
4726       cfg.SetDiskID(dev, pri_node)
4727       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4728       if is_degr:
4729         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4730
4731     self.proc.LogStep(6, steps_total, "removing old storage")
4732     for name, (dev, old_lvs) in iv_names.iteritems():
4733       info("remove logical volumes for %s" % name)
4734       for lv in old_lvs:
4735         cfg.SetDiskID(lv, old_node)
4736         if not rpc.call_blockdev_remove(old_node, lv):
4737           warning("Can't remove LV on old secondary",
4738                   hint="Cleanup stale volumes by hand")
4739
4740   def Exec(self, feedback_fn):
4741     """Execute disk replacement.
4742
4743     This dispatches the disk replacement to the appropriate handler.
4744
4745     """
4746     instance = self.instance
4747
4748     # Activate the instance disks if we're replacing them on a down instance
4749     if instance.status == "down":
4750       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4751       self.proc.ChainOpCode(op)
4752
4753     if instance.disk_template == constants.DT_REMOTE_RAID1:
4754       fn = self._ExecRR1
4755     elif instance.disk_template == constants.DT_DRBD8:
4756       if self.op.remote_node is None:
4757         fn = self._ExecD8DiskOnly
4758       else:
4759         fn = self._ExecD8Secondary
4760     else:
4761       raise errors.ProgrammerError("Unhandled disk replacement case")
4762
4763     ret = fn(feedback_fn)
4764
4765     # Deactivate the instance disks if we're replacing them on a down instance
4766     if instance.status == "down":
4767       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4768       self.proc.ChainOpCode(op)
4769
4770     return ret
4771
4772
4773 class LUGrowDisk(LogicalUnit):
4774   """Grow a disk of an instance.
4775
4776   """
4777   HPATH = "disk-grow"
4778   HTYPE = constants.HTYPE_INSTANCE
4779   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4780
4781   def BuildHooksEnv(self):
4782     """Build hooks env.
4783
4784     This runs on the master, the primary and all the secondaries.
4785
4786     """
4787     env = {
4788       "DISK": self.op.disk,
4789       "AMOUNT": self.op.amount,
4790       }
4791     env.update(_BuildInstanceHookEnvByObject(self.instance))
4792     nl = [
4793       self.sstore.GetMasterNode(),
4794       self.instance.primary_node,
4795       ]
4796     return env, nl, nl
4797
4798   def CheckPrereq(self):
4799     """Check prerequisites.
4800
4801     This checks that the instance is in the cluster.
4802
4803     """
4804     instance = self.cfg.GetInstanceInfo(
4805       self.cfg.ExpandInstanceName(self.op.instance_name))
4806     if instance is None:
4807       raise errors.OpPrereqError("Instance '%s' not known" %
4808                                  self.op.instance_name)
4809
4810     if self.op.amount <= 0:
4811       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4812
4813     self.instance = instance
4814     self.op.instance_name = instance.name
4815
4816     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4817       raise errors.OpPrereqError("Instance's disk layout does not support"
4818                                  " growing.")
4819
4820     self.disk = instance.FindDisk(self.op.disk)
4821     if self.disk is None:
4822       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4823                                  (self.op.disk, instance.name))
4824
4825     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4826     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4827     for node in nodenames:
4828       info = nodeinfo.get(node, None)
4829       if not info:
4830         raise errors.OpPrereqError("Cannot get current information"
4831                                    " from node '%s'" % node)
4832       vg_free = info.get('vg_free', None)
4833       if not isinstance(vg_free, int):
4834         raise errors.OpPrereqError("Can't compute free disk space on"
4835                                    " node %s" % node)
4836       if self.op.amount > info['vg_free']:
4837         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4838                                    " %d MiB available, %d MiB required" %
4839                                    (node, info['vg_free'], self.op.amount))
4840       is_primary = (node == instance.primary_node)
4841       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4842         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4843                                  " synchronized on node %s,"
4844                                  " aborting grow." % (self.op.disk, node))
4845
4846   def Exec(self, feedback_fn):
4847     """Execute disk grow.
4848
4849     """
4850     instance = self.instance
4851     disk = self.disk
4852     for node in (instance.secondary_nodes + (instance.primary_node,)):
4853       self.cfg.SetDiskID(disk, node)
4854       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4855       if not result or not isinstance(result, tuple) or len(result) != 2:
4856         raise errors.OpExecError("grow request failed to node %s" % node)
4857       elif not result[0]:
4858         raise errors.OpExecError("grow request failed to node %s: %s" %
4859                                  (node, result[1]))
4860     disk.RecordGrow(self.op.amount)
4861     self.cfg.Update(instance)
4862     if self.op.wait_for_sync:
4863       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4864       if disk_abort:
4865         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4866                      " Please check the instance.")
4867
4868
4869 class LUQueryInstanceData(NoHooksLU):
4870   """Query runtime instance data.
4871
4872   """
4873   _OP_REQP = ["instances", "static"]
4874
4875   def CheckPrereq(self):
4876     """Check prerequisites.
4877
4878     This only checks the optional instance list against the existing names.
4879
4880     """
4881     if not isinstance(self.op.instances, list):
4882       raise errors.OpPrereqError("Invalid argument type 'instances'")
4883     if self.op.instances:
4884       self.wanted_instances = []
4885       names = self.op.instances
4886       for name in names:
4887         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4888         if instance is None:
4889           raise errors.OpPrereqError("No such instance name '%s'" % name)
4890         self.wanted_instances.append(instance)
4891     else:
4892       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4893                                in self.cfg.GetInstanceList()]
4894     return
4895
4896
4897   def _ComputeDiskStatus(self, instance, snode, dev):
4898     """Compute block device status.
4899
4900     """
4901     static = self.op.static
4902     if not static:
4903       self.cfg.SetDiskID(dev, instance.primary_node)
4904       dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4905     else:
4906       dev_pstatus = None
4907
4908     if dev.dev_type in constants.LDS_DRBD:
4909       # we change the snode then (otherwise we use the one passed in)
4910       if dev.logical_id[0] == instance.primary_node:
4911         snode = dev.logical_id[1]
4912       else:
4913         snode = dev.logical_id[0]
4914
4915     if snode and not static:
4916       self.cfg.SetDiskID(dev, snode)
4917       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4918     else:
4919       dev_sstatus = None
4920
4921     if dev.children:
4922       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4923                       for child in dev.children]
4924     else:
4925       dev_children = []
4926
4927     data = {
4928       "iv_name": dev.iv_name,
4929       "dev_type": dev.dev_type,
4930       "logical_id": dev.logical_id,
4931       "physical_id": dev.physical_id,
4932       "pstatus": dev_pstatus,
4933       "sstatus": dev_sstatus,
4934       "children": dev_children,
4935       }
4936
4937     return data
4938
4939   def Exec(self, feedback_fn):
4940     """Gather and return data"""
4941     result = {}
4942     for instance in self.wanted_instances:
4943       if not self.op.static:
4944         remote_info = rpc.call_instance_info(instance.primary_node,
4945                                                   instance.name)
4946         if remote_info and "state" in remote_info:
4947           remote_state = "up"
4948         else:
4949           remote_state = "down"
4950       else:
4951         remote_state = None
4952       if instance.status == "down":
4953         config_state = "down"
4954       else:
4955         config_state = "up"
4956
4957       disks = [self._ComputeDiskStatus(instance, None, device)
4958                for device in instance.disks]
4959
4960       idict = {
4961         "name": instance.name,
4962         "config_state": config_state,
4963         "run_state": remote_state,
4964         "pnode": instance.primary_node,
4965         "snodes": instance.secondary_nodes,
4966         "os": instance.os,
4967         "memory": instance.memory,
4968         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4969         "disks": disks,
4970         "vcpus": instance.vcpus,
4971         "auto_balance": instance.auto_balance,
4972         }
4973
4974       htkind = self.sstore.GetHypervisorType()
4975       if htkind == constants.HT_XEN_PVM30:
4976         idict["kernel_path"] = instance.kernel_path
4977         idict["initrd_path"] = instance.initrd_path
4978
4979       if htkind == constants.HT_XEN_HVM31:
4980         idict["hvm_boot_order"] = instance.hvm_boot_order
4981         idict["hvm_acpi"] = instance.hvm_acpi
4982         idict["hvm_pae"] = instance.hvm_pae
4983         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4984         idict["hvm_nic_type"] = instance.hvm_nic_type
4985         idict["hvm_disk_type"] = instance.hvm_disk_type
4986
4987       if htkind in constants.HTS_REQ_PORT:
4988         if instance.vnc_bind_address is None:
4989           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4990         else:
4991           vnc_bind_address = instance.vnc_bind_address
4992         if instance.network_port is None:
4993           vnc_console_port = None
4994         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4995           vnc_console_port = "%s:%s" % (instance.primary_node,
4996                                        instance.network_port)
4997         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4998           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4999                                                    instance.network_port,
5000                                                    instance.primary_node)
5001         else:
5002           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
5003                                         instance.network_port)
5004         idict["vnc_console_port"] = vnc_console_port
5005         idict["vnc_bind_address"] = vnc_bind_address
5006         idict["network_port"] = instance.network_port
5007
5008       result[instance.name] = idict
5009
5010     return result
5011
5012
5013 class LUSetInstanceParms(LogicalUnit):
5014   """Modifies an instances's parameters.
5015
5016   """
5017   HPATH = "instance-modify"
5018   HTYPE = constants.HTYPE_INSTANCE
5019   _OP_REQP = ["instance_name"]
5020
5021   def BuildHooksEnv(self):
5022     """Build hooks env.
5023
5024     This runs on the master, primary and secondaries.
5025
5026     """
5027     args = dict()
5028     if self.mem:
5029       args['memory'] = self.mem
5030     if self.vcpus:
5031       args['vcpus'] = self.vcpus
5032     if self.do_ip or self.do_bridge or self.mac:
5033       if self.do_ip:
5034         ip = self.ip
5035       else:
5036         ip = self.instance.nics[0].ip
5037       if self.bridge:
5038         bridge = self.bridge
5039       else:
5040         bridge = self.instance.nics[0].bridge
5041       if self.mac:
5042         mac = self.mac
5043       else:
5044         mac = self.instance.nics[0].mac
5045       args['nics'] = [(ip, bridge, mac)]
5046     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5047     nl = [self.sstore.GetMasterNode(),
5048           self.instance.primary_node] + list(self.instance.secondary_nodes)
5049     return env, nl, nl
5050
5051   def CheckPrereq(self):
5052     """Check prerequisites.
5053
5054     This only checks the instance list against the existing names.
5055
5056     """
5057     self.mem = getattr(self.op, "mem", None)
5058     self.vcpus = getattr(self.op, "vcpus", None)
5059     self.ip = getattr(self.op, "ip", None)
5060     self.mac = getattr(self.op, "mac", None)
5061     self.bridge = getattr(self.op, "bridge", None)
5062     self.kernel_path = getattr(self.op, "kernel_path", None)
5063     self.initrd_path = getattr(self.op, "initrd_path", None)
5064     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5065     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5066     self.hvm_pae = getattr(self.op, "hvm_pae", None)
5067     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5068     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5069     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5070     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5071     self.force = getattr(self.op, "force", None)
5072     self.auto_balance = getattr(self.op, "auto_balance", None)
5073     all_parms = [
5074       self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5075       self.kernel_path, self.initrd_path, self.hvm_boot_order,
5076       self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5077       self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type,
5078       self.auto_balance,
5079       ]
5080     if all_parms.count(None) == len(all_parms):
5081       raise errors.OpPrereqError("No changes submitted")
5082     if self.mem is not None:
5083       try:
5084         self.mem = int(self.mem)
5085       except ValueError, err:
5086         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5087     if self.vcpus is not None:
5088       try:
5089         self.vcpus = int(self.vcpus)
5090       except ValueError, err:
5091         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5092     if self.ip is not None:
5093       self.do_ip = True
5094       if self.ip.lower() == "none":
5095         self.ip = None
5096       else:
5097         if not utils.IsValidIP(self.ip):
5098           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5099     else:
5100       self.do_ip = False
5101     self.do_bridge = (self.bridge is not None)
5102     if self.mac is not None:
5103       if self.cfg.IsMacInUse(self.mac):
5104         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5105                                    self.mac)
5106       if not utils.IsValidMac(self.mac):
5107         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5108
5109     if self.kernel_path is not None:
5110       self.do_kernel_path = True
5111       if self.kernel_path == constants.VALUE_NONE:
5112         raise errors.OpPrereqError("Can't set instance to no kernel")
5113
5114       if self.kernel_path != constants.VALUE_DEFAULT:
5115         if not os.path.isabs(self.kernel_path):
5116           raise errors.OpPrereqError("The kernel path must be an absolute"
5117                                     " filename")
5118     else:
5119       self.do_kernel_path = False
5120
5121     if self.initrd_path is not None:
5122       self.do_initrd_path = True
5123       if self.initrd_path not in (constants.VALUE_NONE,
5124                                   constants.VALUE_DEFAULT):
5125         if not os.path.isabs(self.initrd_path):
5126           raise errors.OpPrereqError("The initrd path must be an absolute"
5127                                     " filename")
5128     else:
5129       self.do_initrd_path = False
5130
5131     # boot order verification
5132     if self.hvm_boot_order is not None:
5133       if self.hvm_boot_order != constants.VALUE_DEFAULT:
5134         if len(self.hvm_boot_order.strip("acdn")) != 0:
5135           raise errors.OpPrereqError("invalid boot order specified,"
5136                                      " must be one or more of [acdn]"
5137                                      " or 'default'")
5138
5139     # hvm_cdrom_image_path verification
5140     if self.op.hvm_cdrom_image_path is not None:
5141       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5142               self.op.hvm_cdrom_image_path.lower() == "none"):
5143         raise errors.OpPrereqError("The path to the HVM CDROM image must"
5144                                    " be an absolute path or None, not %s" %
5145                                    self.op.hvm_cdrom_image_path)
5146       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5147               self.op.hvm_cdrom_image_path.lower() == "none"):
5148         raise errors.OpPrereqError("The HVM CDROM image must either be a"
5149                                    " regular file or a symlink pointing to"
5150                                    " an existing regular file, not %s" %
5151                                    self.op.hvm_cdrom_image_path)
5152
5153     # vnc_bind_address verification
5154     if self.op.vnc_bind_address is not None:
5155       if not utils.IsValidIP(self.op.vnc_bind_address):
5156         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5157                                    " like a valid IP address" %
5158                                    self.op.vnc_bind_address)
5159
5160     # Xen HVM device type checks
5161     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5162       if self.op.hvm_nic_type is not None:
5163         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5164           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5165                                      " HVM  hypervisor" % self.op.hvm_nic_type)
5166       if self.op.hvm_disk_type is not None:
5167         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5168           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5169                                      " HVM hypervisor" % self.op.hvm_disk_type)
5170
5171     # auto balance setting
5172     if self.auto_balance is not None:
5173       # convert the value to a proper bool value, if it's not
5174       self.auto_balance = bool(self.auto_balance)
5175
5176     instance = self.cfg.GetInstanceInfo(
5177       self.cfg.ExpandInstanceName(self.op.instance_name))
5178     if instance is None:
5179       raise errors.OpPrereqError("No such instance name '%s'" %
5180                                  self.op.instance_name)
5181     self.op.instance_name = instance.name
5182     self.instance = instance
5183     self.warn = []
5184     if self.mem is not None and not self.force:
5185       pnode = self.instance.primary_node
5186       nodelist = [pnode]
5187       if instance.auto_balance:
5188         nodelist.extend(instance.secondary_nodes)
5189       instance_info = rpc.call_instance_info(pnode, instance.name)
5190       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5191
5192       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5193         # Assume the primary node is unreachable and go ahead
5194         self.warn.append("Can't get info from primary node %s" % pnode)
5195       else:
5196         if instance_info:
5197           current_mem = instance_info['memory']
5198         else:
5199           # Assume instance not running
5200           # (there is a slight race condition here, but it's not very probable,
5201           # and we have no other way to check)
5202           current_mem = 0
5203         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5204         if miss_mem > 0:
5205           raise errors.OpPrereqError("This change will prevent the instance"
5206                                      " from starting, due to %d MB of memory"
5207                                      " missing on its primary node" % miss_mem)
5208
5209       if instance.auto_balance:
5210         for node in instance.secondary_nodes:
5211           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5212             self.warn.append("Can't get info from secondary node %s" % node)
5213           elif self.mem > nodeinfo[node]['memory_free']:
5214             self.warn.append("Not enough memory to failover instance to"
5215                              " secondary node %s" % node)
5216     return
5217
5218   def Exec(self, feedback_fn):
5219     """Modifies an instance.
5220
5221     All parameters take effect only at the next restart of the instance.
5222     """
5223     # Process here the warnings from CheckPrereq, as we don't have a
5224     # feedback_fn there.
5225     for warn in self.warn:
5226       feedback_fn("WARNING: %s" % warn)
5227
5228     result = []
5229     instance = self.instance
5230     if self.mem:
5231       instance.memory = self.mem
5232       result.append(("mem", self.mem))
5233     if self.vcpus:
5234       instance.vcpus = self.vcpus
5235       result.append(("vcpus",  self.vcpus))
5236     if self.do_ip:
5237       instance.nics[0].ip = self.ip
5238       result.append(("ip", self.ip))
5239     if self.bridge:
5240       instance.nics[0].bridge = self.bridge
5241       result.append(("bridge", self.bridge))
5242     if self.mac:
5243       instance.nics[0].mac = self.mac
5244       result.append(("mac", self.mac))
5245     if self.do_kernel_path:
5246       instance.kernel_path = self.kernel_path
5247       result.append(("kernel_path", self.kernel_path))
5248     if self.do_initrd_path:
5249       instance.initrd_path = self.initrd_path
5250       result.append(("initrd_path", self.initrd_path))
5251     if self.hvm_boot_order:
5252       if self.hvm_boot_order == constants.VALUE_DEFAULT:
5253         instance.hvm_boot_order = None
5254       else:
5255         instance.hvm_boot_order = self.hvm_boot_order
5256       result.append(("hvm_boot_order", self.hvm_boot_order))
5257     if self.hvm_acpi is not None:
5258       instance.hvm_acpi = self.hvm_acpi
5259       result.append(("hvm_acpi", self.hvm_acpi))
5260     if self.hvm_pae is not None:
5261       instance.hvm_pae = self.hvm_pae
5262       result.append(("hvm_pae", self.hvm_pae))
5263     if self.hvm_nic_type is not None:
5264       instance.hvm_nic_type = self.hvm_nic_type
5265       result.append(("hvm_nic_type", self.hvm_nic_type))
5266     if self.hvm_disk_type is not None:
5267       instance.hvm_disk_type = self.hvm_disk_type
5268       result.append(("hvm_disk_type", self.hvm_disk_type))
5269     if self.hvm_cdrom_image_path:
5270       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5271         instance.hvm_cdrom_image_path = None
5272       else:
5273         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5274       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5275     if self.vnc_bind_address:
5276       instance.vnc_bind_address = self.vnc_bind_address
5277       result.append(("vnc_bind_address", self.vnc_bind_address))
5278     if self.auto_balance is not None:
5279       instance.auto_balance = self.auto_balance
5280       result.append(("auto_balance", self.auto_balance))
5281
5282     self.cfg.AddInstance(instance)
5283
5284     return result
5285
5286
5287 class LUQueryExports(NoHooksLU):
5288   """Query the exports list
5289
5290   """
5291   _OP_REQP = []
5292
5293   def CheckPrereq(self):
5294     """Check that the nodelist contains only existing nodes.
5295
5296     """
5297     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5298
5299   def Exec(self, feedback_fn):
5300     """Compute the list of all the exported system images.
5301
5302     Returns:
5303       a dictionary with the structure node->(export-list)
5304       where export-list is a list of the instances exported on
5305       that node.
5306
5307     """
5308     return rpc.call_export_list(self.nodes)
5309
5310
5311 class LUExportInstance(LogicalUnit):
5312   """Export an instance to an image in the cluster.
5313
5314   """
5315   HPATH = "instance-export"
5316   HTYPE = constants.HTYPE_INSTANCE
5317   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5318
5319   def BuildHooksEnv(self):
5320     """Build hooks env.
5321
5322     This will run on the master, primary node and target node.
5323
5324     """
5325     env = {
5326       "EXPORT_NODE": self.op.target_node,
5327       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5328       }
5329     env.update(_BuildInstanceHookEnvByObject(self.instance))
5330     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5331           self.op.target_node]
5332     return env, nl, nl
5333
5334   def CheckPrereq(self):
5335     """Check prerequisites.
5336
5337     This checks that the instance and node names are valid.
5338
5339     """
5340     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5341     self.instance = self.cfg.GetInstanceInfo(instance_name)
5342     if self.instance is None:
5343       raise errors.OpPrereqError("Instance '%s' not found" %
5344                                  self.op.instance_name)
5345
5346     # node verification
5347     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5348     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5349
5350     if self.dst_node is None:
5351       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5352                                  self.op.target_node)
5353     self.op.target_node = self.dst_node.name
5354
5355   def Exec(self, feedback_fn):
5356     """Export an instance to an image in the cluster.
5357
5358     """
5359     instance = self.instance
5360     dst_node = self.dst_node
5361     src_node = instance.primary_node
5362     if self.op.shutdown:
5363       # shutdown the instance, but not the disks
5364       if not rpc.call_instance_shutdown(src_node, instance):
5365         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5366                                  (instance.name, src_node))
5367
5368     vgname = self.cfg.GetVGName()
5369
5370     snap_disks = []
5371
5372     try:
5373       for disk in instance.disks:
5374         if disk.iv_name == "sda":
5375           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5376           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5377
5378           if not new_dev_name:
5379             logger.Error("could not snapshot block device %s on node %s" %
5380                          (disk.logical_id[1], src_node))
5381           else:
5382             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5383                                       logical_id=(vgname, new_dev_name),
5384                                       physical_id=(vgname, new_dev_name),
5385                                       iv_name=disk.iv_name)
5386             snap_disks.append(new_dev)
5387
5388     finally:
5389       if self.op.shutdown and instance.status == "up":
5390         if not rpc.call_instance_start(src_node, instance, None):
5391           _ShutdownInstanceDisks(instance, self.cfg)
5392           raise errors.OpExecError("Could not start instance")
5393
5394     # TODO: check for size
5395
5396     for dev in snap_disks:
5397       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5398                                            instance):
5399         logger.Error("could not export block device %s from node"
5400                      " %s to node %s" %
5401                      (dev.logical_id[1], src_node, dst_node.name))
5402       if not rpc.call_blockdev_remove(src_node, dev):
5403         logger.Error("could not remove snapshot block device %s from"
5404                      " node %s" % (dev.logical_id[1], src_node))
5405
5406     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5407       logger.Error("could not finalize export for instance %s on node %s" %
5408                    (instance.name, dst_node.name))
5409
5410     nodelist = self.cfg.GetNodeList()
5411     nodelist.remove(dst_node.name)
5412
5413     # on one-node clusters nodelist will be empty after the removal
5414     # if we proceed the backup would be removed because OpQueryExports
5415     # substitutes an empty list with the full cluster node list.
5416     if nodelist:
5417       op = opcodes.OpQueryExports(nodes=nodelist)
5418       exportlist = self.proc.ChainOpCode(op)
5419       for node in exportlist:
5420         if instance.name in exportlist[node]:
5421           if not rpc.call_export_remove(node, instance.name):
5422             logger.Error("could not remove older export for instance %s"
5423                          " on node %s" % (instance.name, node))
5424
5425
5426 class LURemoveExport(NoHooksLU):
5427   """Remove exports related to the named instance.
5428
5429   """
5430   _OP_REQP = ["instance_name"]
5431
5432   def CheckPrereq(self):
5433     """Check prerequisites.
5434     """
5435     pass
5436
5437   def Exec(self, feedback_fn):
5438     """Remove any export.
5439
5440     """
5441     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5442     # If the instance was not found we'll try with the name that was passed in.
5443     # This will only work if it was an FQDN, though.
5444     fqdn_warn = False
5445     if not instance_name:
5446       fqdn_warn = True
5447       instance_name = self.op.instance_name
5448
5449     op = opcodes.OpQueryExports(nodes=[])
5450     exportlist = self.proc.ChainOpCode(op)
5451     found = False
5452     for node in exportlist:
5453       if instance_name in exportlist[node]:
5454         found = True
5455         if not rpc.call_export_remove(node, instance_name):
5456           logger.Error("could not remove export for instance %s"
5457                        " on node %s" % (instance_name, node))
5458
5459     if fqdn_warn and not found:
5460       feedback_fn("Export not found. If trying to remove an export belonging"
5461                   " to a deleted instance please use its Fully Qualified"
5462                   " Domain Name.")
5463
5464
5465 class TagsLU(NoHooksLU):
5466   """Generic tags LU.
5467
5468   This is an abstract class which is the parent of all the other tags LUs.
5469
5470   """
5471   def CheckPrereq(self):
5472     """Check prerequisites.
5473
5474     """
5475     if self.op.kind == constants.TAG_CLUSTER:
5476       self.target = self.cfg.GetClusterInfo()
5477     elif self.op.kind == constants.TAG_NODE:
5478       name = self.cfg.ExpandNodeName(self.op.name)
5479       if name is None:
5480         raise errors.OpPrereqError("Invalid node name (%s)" %
5481                                    (self.op.name,))
5482       self.op.name = name
5483       self.target = self.cfg.GetNodeInfo(name)
5484     elif self.op.kind == constants.TAG_INSTANCE:
5485       name = self.cfg.ExpandInstanceName(self.op.name)
5486       if name is None:
5487         raise errors.OpPrereqError("Invalid instance name (%s)" %
5488                                    (self.op.name,))
5489       self.op.name = name
5490       self.target = self.cfg.GetInstanceInfo(name)
5491     else:
5492       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5493                                  str(self.op.kind))
5494
5495
5496 class LUGetTags(TagsLU):
5497   """Returns the tags of a given object.
5498
5499   """
5500   _OP_REQP = ["kind", "name"]
5501
5502   def Exec(self, feedback_fn):
5503     """Returns the tag list.
5504
5505     """
5506     return self.target.GetTags()
5507
5508
5509 class LUSearchTags(NoHooksLU):
5510   """Searches the tags for a given pattern.
5511
5512   """
5513   _OP_REQP = ["pattern"]
5514
5515   def CheckPrereq(self):
5516     """Check prerequisites.
5517
5518     This checks the pattern passed for validity by compiling it.
5519
5520     """
5521     try:
5522       self.re = re.compile(self.op.pattern)
5523     except re.error, err:
5524       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5525                                  (self.op.pattern, err))
5526
5527   def Exec(self, feedback_fn):
5528     """Returns the tag list.
5529
5530     """
5531     cfg = self.cfg
5532     tgts = [("/cluster", cfg.GetClusterInfo())]
5533     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5534     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5535     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5536     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5537     results = []
5538     for path, target in tgts:
5539       for tag in target.GetTags():
5540         if self.re.search(tag):
5541           results.append((path, tag))
5542     return results
5543
5544
5545 class LUAddTags(TagsLU):
5546   """Sets a tag on a given object.
5547
5548   """
5549   _OP_REQP = ["kind", "name", "tags"]
5550
5551   def CheckPrereq(self):
5552     """Check prerequisites.
5553
5554     This checks the type and length of the tag name and value.
5555
5556     """
5557     TagsLU.CheckPrereq(self)
5558     for tag in self.op.tags:
5559       objects.TaggableObject.ValidateTag(tag)
5560
5561   def Exec(self, feedback_fn):
5562     """Sets the tag.
5563
5564     """
5565     try:
5566       for tag in self.op.tags:
5567         self.target.AddTag(tag)
5568     except errors.TagError, err:
5569       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5570     try:
5571       self.cfg.Update(self.target)
5572     except errors.ConfigurationError:
5573       raise errors.OpRetryError("There has been a modification to the"
5574                                 " config file and the operation has been"
5575                                 " aborted. Please retry.")
5576
5577
5578 class LUDelTags(TagsLU):
5579   """Delete a list of tags from a given object.
5580
5581   """
5582   _OP_REQP = ["kind", "name", "tags"]
5583
5584   def CheckPrereq(self):
5585     """Check prerequisites.
5586
5587     This checks that we have the given tag.
5588
5589     """
5590     TagsLU.CheckPrereq(self)
5591     for tag in self.op.tags:
5592       objects.TaggableObject.ValidateTag(tag, removal=True)
5593     del_tags = frozenset(self.op.tags)
5594     cur_tags = self.target.GetTags()
5595     if not del_tags <= cur_tags:
5596       diff_tags = del_tags - cur_tags
5597       diff_names = ["'%s'" % tag for tag in diff_tags]
5598       diff_names.sort()
5599       raise errors.OpPrereqError("Tag(s) %s not found" %
5600                                  (",".join(diff_names)))
5601
5602   def Exec(self, feedback_fn):
5603     """Remove the tag from the object.
5604
5605     """
5606     for tag in self.op.tags:
5607       self.target.RemoveTag(tag)
5608     try:
5609       self.cfg.Update(self.target)
5610     except errors.ConfigurationError:
5611       raise errors.OpRetryError("There has been a modification to the"
5612                                 " config file and the operation has been"
5613                                 " aborted. Please retry.")
5614
5615 class LUTestDelay(NoHooksLU):
5616   """Sleep for a specified amount of time.
5617
5618   This LU sleeps on the master and/or nodes for a specified amoutn of
5619   time.
5620
5621   """
5622   _OP_REQP = ["duration", "on_master", "on_nodes"]
5623
5624   def CheckPrereq(self):
5625     """Check prerequisites.
5626
5627     This checks that we have a good list of nodes and/or the duration
5628     is valid.
5629
5630     """
5631
5632     if self.op.on_nodes:
5633       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5634
5635   def Exec(self, feedback_fn):
5636     """Do the actual sleep.
5637
5638     """
5639     if self.op.on_master:
5640       if not utils.TestDelay(self.op.duration):
5641         raise errors.OpExecError("Error during master delay test")
5642     if self.op.on_nodes:
5643       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5644       if not result:
5645         raise errors.OpExecError("Complete failure from rpc call")
5646       for node, node_result in result.items():
5647         if not node_result:
5648           raise errors.OpExecError("Failure during rpc call to node %s,"
5649                                    " result: %s" % (node, node_result))
5650
5651
5652 class IAllocator(object):
5653   """IAllocator framework.
5654
5655   An IAllocator instance has three sets of attributes:
5656     - cfg/sstore that are needed to query the cluster
5657     - input data (all members of the _KEYS class attribute are required)
5658     - four buffer attributes (in|out_data|text), that represent the
5659       input (to the external script) in text and data structure format,
5660       and the output from it, again in two formats
5661     - the result variables from the script (success, info, nodes) for
5662       easy usage
5663
5664   """
5665   _ALLO_KEYS = [
5666     "mem_size", "disks", "disk_template",
5667     "os", "tags", "nics", "vcpus",
5668     ]
5669   _RELO_KEYS = [
5670     "relocate_from",
5671     ]
5672
5673   def __init__(self, cfg, sstore, mode, name, **kwargs):
5674     self.cfg = cfg
5675     self.sstore = sstore
5676     # init buffer variables
5677     self.in_text = self.out_text = self.in_data = self.out_data = None
5678     # init all input fields so that pylint is happy
5679     self.mode = mode
5680     self.name = name
5681     self.mem_size = self.disks = self.disk_template = None
5682     self.os = self.tags = self.nics = self.vcpus = None
5683     self.relocate_from = None
5684     # computed fields
5685     self.required_nodes = None
5686     # init result fields
5687     self.success = self.info = self.nodes = None
5688     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5689       keyset = self._ALLO_KEYS
5690     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5691       keyset = self._RELO_KEYS
5692     else:
5693       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5694                                    " IAllocator" % self.mode)
5695     for key in kwargs:
5696       if key not in keyset:
5697         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5698                                      " IAllocator" % key)
5699       setattr(self, key, kwargs[key])
5700     for key in keyset:
5701       if key not in kwargs:
5702         raise errors.ProgrammerError("Missing input parameter '%s' to"
5703                                      " IAllocator" % key)
5704     self._BuildInputData()
5705
5706   def _ComputeClusterData(self):
5707     """Compute the generic allocator input data.
5708
5709     This is the data that is independent of the actual operation.
5710
5711     """
5712     cfg = self.cfg
5713     # cluster data
5714     data = {
5715       "version": 1,
5716       "cluster_name": self.sstore.GetClusterName(),
5717       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5718       "hypervisor_type": self.sstore.GetHypervisorType(),
5719       # we don't have job IDs
5720       }
5721
5722     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5723
5724     # node data
5725     node_results = {}
5726     node_list = cfg.GetNodeList()
5727     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5728     for nname in node_list:
5729       ninfo = cfg.GetNodeInfo(nname)
5730       if nname not in node_data or not isinstance(node_data[nname], dict):
5731         raise errors.OpExecError("Can't get data for node %s" % nname)
5732       remote_info = node_data[nname]
5733       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5734                    'vg_size', 'vg_free', 'cpu_total']:
5735         if attr not in remote_info:
5736           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5737                                    (nname, attr))
5738         try:
5739           remote_info[attr] = int(remote_info[attr])
5740         except ValueError, err:
5741           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5742                                    " %s" % (nname, attr, str(err)))
5743       # compute memory used by primary instances
5744       i_p_mem = i_p_up_mem = 0
5745       for iinfo in i_list:
5746         if iinfo.primary_node == nname:
5747           i_p_mem += iinfo.memory
5748           if iinfo.status == "up":
5749             i_p_up_mem += iinfo.memory
5750
5751       # compute memory used by instances
5752       pnr = {
5753         "tags": list(ninfo.GetTags()),
5754         "total_memory": remote_info['memory_total'],
5755         "reserved_memory": remote_info['memory_dom0'],
5756         "free_memory": remote_info['memory_free'],
5757         "i_pri_memory": i_p_mem,
5758         "i_pri_up_memory": i_p_up_mem,
5759         "total_disk": remote_info['vg_size'],
5760         "free_disk": remote_info['vg_free'],
5761         "primary_ip": ninfo.primary_ip,
5762         "secondary_ip": ninfo.secondary_ip,
5763         "total_cpus": remote_info['cpu_total'],
5764         }
5765       node_results[nname] = pnr
5766     data["nodes"] = node_results
5767
5768     # instance data
5769     instance_data = {}
5770     for iinfo in i_list:
5771       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5772                   for n in iinfo.nics]
5773       pir = {
5774         "tags": list(iinfo.GetTags()),
5775         "should_run": iinfo.status == "up",
5776         "vcpus": iinfo.vcpus,
5777         "memory": iinfo.memory,
5778         "os": iinfo.os,
5779         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5780         "nics": nic_data,
5781         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5782         "disk_template": iinfo.disk_template,
5783         }
5784       instance_data[iinfo.name] = pir
5785
5786     data["instances"] = instance_data
5787
5788     self.in_data = data
5789
5790   def _AddNewInstance(self):
5791     """Add new instance data to allocator structure.
5792
5793     This in combination with _AllocatorGetClusterData will create the
5794     correct structure needed as input for the allocator.
5795
5796     The checks for the completeness of the opcode must have already been
5797     done.
5798
5799     """
5800     data = self.in_data
5801     if len(self.disks) != 2:
5802       raise errors.OpExecError("Only two-disk configurations supported")
5803
5804     disk_space = _ComputeDiskSize(self.disk_template,
5805                                   self.disks[0]["size"], self.disks[1]["size"])
5806
5807     if self.disk_template in constants.DTS_NET_MIRROR:
5808       self.required_nodes = 2
5809     else:
5810       self.required_nodes = 1
5811     request = {
5812       "type": "allocate",
5813       "name": self.name,
5814       "disk_template": self.disk_template,
5815       "tags": self.tags,
5816       "os": self.os,
5817       "vcpus": self.vcpus,
5818       "memory": self.mem_size,
5819       "disks": self.disks,
5820       "disk_space_total": disk_space,
5821       "nics": self.nics,
5822       "required_nodes": self.required_nodes,
5823       }
5824     data["request"] = request
5825
5826   def _AddRelocateInstance(self):
5827     """Add relocate instance data to allocator structure.
5828
5829     This in combination with _IAllocatorGetClusterData will create the
5830     correct structure needed as input for the allocator.
5831
5832     The checks for the completeness of the opcode must have already been
5833     done.
5834
5835     """
5836     instance = self.cfg.GetInstanceInfo(self.name)
5837     if instance is None:
5838       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5839                                    " IAllocator" % self.name)
5840
5841     if instance.disk_template not in constants.DTS_NET_MIRROR:
5842       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5843
5844     if len(instance.secondary_nodes) != 1:
5845       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5846
5847     self.required_nodes = 1
5848
5849     disk_space = _ComputeDiskSize(instance.disk_template,
5850                                   instance.disks[0].size,
5851                                   instance.disks[1].size)
5852
5853     request = {
5854       "type": "relocate",
5855       "name": self.name,
5856       "disk_space_total": disk_space,
5857       "required_nodes": self.required_nodes,
5858       "relocate_from": self.relocate_from,
5859       }
5860     self.in_data["request"] = request
5861
5862   def _BuildInputData(self):
5863     """Build input data structures.
5864
5865     """
5866     self._ComputeClusterData()
5867
5868     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5869       self._AddNewInstance()
5870     else:
5871       self._AddRelocateInstance()
5872
5873     self.in_text = serializer.Dump(self.in_data)
5874
5875   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5876     """Run an instance allocator and return the results.
5877
5878     """
5879     data = self.in_text
5880
5881     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5882
5883     if not isinstance(result, tuple) or len(result) != 4:
5884       raise errors.OpExecError("Invalid result from master iallocator runner")
5885
5886     rcode, stdout, stderr, fail = result
5887
5888     if rcode == constants.IARUN_NOTFOUND:
5889       raise errors.OpExecError("Can't find allocator '%s'" % name)
5890     elif rcode == constants.IARUN_FAILURE:
5891         raise errors.OpExecError("Instance allocator call failed: %s,"
5892                                  " output: %s" %
5893                                  (fail, stdout+stderr))
5894     self.out_text = stdout
5895     if validate:
5896       self._ValidateResult()
5897
5898   def _ValidateResult(self):
5899     """Process the allocator results.
5900
5901     This will process and if successful save the result in
5902     self.out_data and the other parameters.
5903
5904     """
5905     try:
5906       rdict = serializer.Load(self.out_text)
5907     except Exception, err:
5908       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5909
5910     if not isinstance(rdict, dict):
5911       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5912
5913     for key in "success", "info", "nodes":
5914       if key not in rdict:
5915         raise errors.OpExecError("Can't parse iallocator results:"
5916                                  " missing key '%s'" % key)
5917       setattr(self, key, rdict[key])
5918
5919     if not isinstance(rdict["nodes"], list):
5920       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5921                                " is not a list")
5922     self.out_data = rdict
5923
5924
5925 class LUTestAllocator(NoHooksLU):
5926   """Run allocator tests.
5927
5928   This LU runs the allocator tests
5929
5930   """
5931   _OP_REQP = ["direction", "mode", "name"]
5932
5933   def CheckPrereq(self):
5934     """Check prerequisites.
5935
5936     This checks the opcode parameters depending on the director and mode test.
5937
5938     """
5939     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5940       for attr in ["name", "mem_size", "disks", "disk_template",
5941                    "os", "tags", "nics", "vcpus"]:
5942         if not hasattr(self.op, attr):
5943           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5944                                      attr)
5945       iname = self.cfg.ExpandInstanceName(self.op.name)
5946       if iname is not None:
5947         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5948                                    iname)
5949       if not isinstance(self.op.nics, list):
5950         raise errors.OpPrereqError("Invalid parameter 'nics'")
5951       for row in self.op.nics:
5952         if (not isinstance(row, dict) or
5953             "mac" not in row or
5954             "ip" not in row or
5955             "bridge" not in row):
5956           raise errors.OpPrereqError("Invalid contents of the"
5957                                      " 'nics' parameter")
5958       if not isinstance(self.op.disks, list):
5959         raise errors.OpPrereqError("Invalid parameter 'disks'")
5960       if len(self.op.disks) != 2:
5961         raise errors.OpPrereqError("Only two-disk configurations supported")
5962       for row in self.op.disks:
5963         if (not isinstance(row, dict) or
5964             "size" not in row or
5965             not isinstance(row["size"], int) or
5966             "mode" not in row or
5967             row["mode"] not in ['r', 'w']):
5968           raise errors.OpPrereqError("Invalid contents of the"
5969                                      " 'disks' parameter")
5970     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5971       if not hasattr(self.op, "name"):
5972         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5973       fname = self.cfg.ExpandInstanceName(self.op.name)
5974       if fname is None:
5975         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5976                                    self.op.name)
5977       self.op.name = fname
5978       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5979     else:
5980       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5981                                  self.op.mode)
5982
5983     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5984       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5985         raise errors.OpPrereqError("Missing allocator name")
5986     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5987       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5988                                  self.op.direction)
5989
5990   def Exec(self, feedback_fn):
5991     """Run the allocator test.
5992
5993     """
5994     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5995       ial = IAllocator(self.cfg, self.sstore,
5996                        mode=self.op.mode,
5997                        name=self.op.name,
5998                        mem_size=self.op.mem_size,
5999                        disks=self.op.disks,
6000                        disk_template=self.op.disk_template,
6001                        os=self.op.os,
6002                        tags=self.op.tags,
6003                        nics=self.op.nics,
6004                        vcpus=self.op.vcpus,
6005                        )
6006     else:
6007       ial = IAllocator(self.cfg, self.sstore,
6008                        mode=self.op.mode,
6009                        name=self.op.name,
6010                        relocate_from=list(self.relocate_from),
6011                        )
6012
6013     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6014       result = ial.in_text
6015     else:
6016       ial.Run(self.op.allocator, validate=False)
6017       result = ial.out_text
6018     return result