Check memory size before setting it
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     tags = self.cfg.GetClusterInfo().GetTags()
862     # TODO: populate the environment with useful information for verify hooks
863     env = {
864       "CLUSTER_TAGS": " ".join(tags),
865       }
866     return env, [], all_nodes
867
868   def Exec(self, feedback_fn):
869     """Verify integrity of cluster, performing various test on nodes.
870
871     """
872     bad = False
873     feedback_fn("* Verifying global settings")
874     for msg in self.cfg.VerifyConfig():
875       feedback_fn("  - ERROR: %s" % msg)
876
877     vg_name = self.cfg.GetVGName()
878     nodelist = utils.NiceSort(self.cfg.GetNodeList())
879     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881     i_non_redundant = [] # Non redundant instances
882     node_volume = {}
883     node_instance = {}
884     node_info = {}
885     instance_cfg = {}
886
887     # FIXME: verify OS list
888     # do local checksums
889     file_names = list(self.sstore.GetFileList())
890     file_names.append(constants.SSL_CERT_FILE)
891     file_names.append(constants.CLUSTER_CONF_FILE)
892     local_checksums = utils.FingerprintFiles(file_names)
893
894     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896     all_instanceinfo = rpc.call_instance_list(nodelist)
897     all_vglist = rpc.call_vg_list(nodelist)
898     node_verify_param = {
899       'filelist': file_names,
900       'nodelist': nodelist,
901       'hypervisor': None,
902       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903                         for node in nodeinfo]
904       }
905     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906     all_rversion = rpc.call_version(nodelist)
907     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
908
909     incomplete_nodeinfo = False
910
911     for node in nodelist:
912       feedback_fn("* Verifying node %s" % node)
913       result = self._VerifyNode(node, file_names, local_checksums,
914                                 all_vglist[node], all_nvinfo[node],
915                                 all_rversion[node], feedback_fn)
916       bad = bad or result
917
918       # node_volume
919       volumeinfo = all_volumeinfo[node]
920
921       if isinstance(volumeinfo, basestring):
922         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
923                     (node, volumeinfo[-400:].encode('string_escape')))
924         bad = True
925         node_volume[node] = {}
926       elif not isinstance(volumeinfo, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         incomplete_nodeinfo = True
930         continue
931       else:
932         node_volume[node] = volumeinfo
933
934       # node_instance
935       nodeinstance = all_instanceinfo[node]
936       if type(nodeinstance) != list:
937         feedback_fn("  - ERROR: connection to %s failed" % (node,))
938         bad = True
939         incomplete_nodeinfo = True
940         continue
941
942       node_instance[node] = nodeinstance
943
944       # node_info
945       nodeinfo = all_ninfo[node]
946       if not isinstance(nodeinfo, dict):
947         feedback_fn("  - ERROR: connection to %s failed" % (node,))
948         bad = True
949         incomplete_nodeinfo = True
950         continue
951
952       try:
953         node_info[node] = {
954           "mfree": int(nodeinfo['memory_free']),
955           "dfree": int(nodeinfo['vg_free']),
956           "pinst": [],
957           "sinst": [],
958           # dictionary holding all instances this node is secondary for,
959           # grouped by their primary node. Each key is a cluster node, and each
960           # value is a list of instances which have the key as primary and the
961           # current node as secondary.  this is handy to calculate N+1 memory
962           # availability if you can only failover from a primary to its
963           # secondary.
964           "sinst-by-pnode": {},
965         }
966       except (ValueError, TypeError):
967         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968         bad = True
969         incomplete_nodeinfo = True
970         continue
971
972     node_vol_should = {}
973
974     for instance in instancelist:
975       feedback_fn("* Verifying instance %s" % instance)
976       inst_config = self.cfg.GetInstanceInfo(instance)
977       result =  self._VerifyInstance(instance, inst_config, node_volume,
978                                      node_instance, feedback_fn)
979       bad = bad or result
980
981       inst_config.MapLVsByNode(node_vol_should)
982
983       instance_cfg[instance] = inst_config
984
985       pnode = inst_config.primary_node
986       if pnode in node_info:
987         node_info[pnode]['pinst'].append(instance)
988       else:
989         feedback_fn("  - ERROR: instance %s, connection to primary node"
990                     " %s failed" % (instance, pnode))
991         bad = True
992
993       # If the instance is non-redundant we cannot survive losing its primary
994       # node, so we are not N+1 compliant. On the other hand we have no disk
995       # templates with more than one secondary so that situation is not well
996       # supported either.
997       # FIXME: does not support file-backed instances
998       if len(inst_config.secondary_nodes) == 0:
999         i_non_redundant.append(instance)
1000       elif len(inst_config.secondary_nodes) > 1:
1001         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1002                     % instance)
1003
1004       for snode in inst_config.secondary_nodes:
1005         if snode in node_info:
1006           node_info[snode]['sinst'].append(instance)
1007           if pnode not in node_info[snode]['sinst-by-pnode']:
1008             node_info[snode]['sinst-by-pnode'][pnode] = []
1009           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1010         else:
1011           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1012                       " %s failed" % (instance, snode))
1013
1014     feedback_fn("* Verifying orphan volumes")
1015     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1016                                        feedback_fn)
1017     bad = bad or result
1018
1019     feedback_fn("* Verifying remaining instances")
1020     result = self._VerifyOrphanInstances(instancelist, node_instance,
1021                                          feedback_fn)
1022     bad = bad or result
1023
1024     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025         not incomplete_nodeinfo):
1026       feedback_fn("* Verifying N+1 Memory redundancy")
1027       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1028       bad = bad or result
1029
1030     feedback_fn("* Other Notes")
1031     if i_non_redundant:
1032       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1033                   % len(i_non_redundant))
1034
1035     return int(bad)
1036
1037   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038     """Analize the post-hooks' result, handle it, and send some
1039     nicely-formatted feedback back to the user.
1040
1041     Args:
1042       phase: the hooks phase that has just been run
1043       hooks_results: the results of the multi-node hooks rpc call
1044       feedback_fn: function to send feedback back to the caller
1045       lu_result: previous Exec result
1046
1047     """
1048     # We only really run POST phase hooks, and are only interested in their results
1049     if phase == constants.HOOKS_PHASE_POST:
1050       # Used to change hooks' output to proper indentation
1051       indent_re = re.compile('^', re.M)
1052       feedback_fn("* Hooks Results")
1053       if not hooks_results:
1054         feedback_fn("  - ERROR: general communication failure")
1055         lu_result = 1
1056       else:
1057         for node_name in hooks_results:
1058           show_node_header = True
1059           res = hooks_results[node_name]
1060           if res is False or not isinstance(res, list):
1061             feedback_fn("    Communication failure")
1062             lu_result = 1
1063             continue
1064           for script, hkr, output in res:
1065             if hkr == constants.HKR_FAIL:
1066               # The node header is only shown once, if there are
1067               # failing hooks on that node
1068               if show_node_header:
1069                 feedback_fn("  Node %s:" % node_name)
1070                 show_node_header = False
1071               feedback_fn("    ERROR: Script %s failed, output:" % script)
1072               output = indent_re.sub('      ', output)
1073               feedback_fn("%s" % output)
1074               lu_result = 1
1075
1076       return lu_result
1077
1078
1079 class LUVerifyDisks(NoHooksLU):
1080   """Verifies the cluster disks status.
1081
1082   """
1083   _OP_REQP = []
1084
1085   def CheckPrereq(self):
1086     """Check prerequisites.
1087
1088     This has no prerequisites.
1089
1090     """
1091     pass
1092
1093   def Exec(self, feedback_fn):
1094     """Verify integrity of cluster disks.
1095
1096     """
1097     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1098
1099     vg_name = self.cfg.GetVGName()
1100     nodes = utils.NiceSort(self.cfg.GetNodeList())
1101     instances = [self.cfg.GetInstanceInfo(name)
1102                  for name in self.cfg.GetInstanceList()]
1103
1104     nv_dict = {}
1105     for inst in instances:
1106       inst_lvs = {}
1107       if (inst.status != "up" or
1108           inst.disk_template not in constants.DTS_NET_MIRROR):
1109         continue
1110       inst.MapLVsByNode(inst_lvs)
1111       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112       for node, vol_list in inst_lvs.iteritems():
1113         for vol in vol_list:
1114           nv_dict[(node, vol)] = inst
1115
1116     if not nv_dict:
1117       return result
1118
1119     node_lvs = rpc.call_volume_list(nodes, vg_name)
1120
1121     to_act = set()
1122     for node in nodes:
1123       # node_volume
1124       lvs = node_lvs[node]
1125
1126       if isinstance(lvs, basestring):
1127         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128         res_nlvm[node] = lvs
1129       elif not isinstance(lvs, dict):
1130         logger.Info("connection to node %s failed or invalid data returned" %
1131                     (node,))
1132         res_nodes.append(node)
1133         continue
1134
1135       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136         inst = nv_dict.pop((node, lv_name), None)
1137         if (not lv_online and inst is not None
1138             and inst.name not in res_instances):
1139           res_instances.append(inst.name)
1140
1141     # any leftover items in nv_dict are missing LVs, let's arrange the
1142     # data better
1143     for key, inst in nv_dict.iteritems():
1144       if inst.name not in res_missing:
1145         res_missing[inst.name] = []
1146       res_missing[inst.name].append(key)
1147
1148     return result
1149
1150
1151 class LURenameCluster(LogicalUnit):
1152   """Rename the cluster.
1153
1154   """
1155   HPATH = "cluster-rename"
1156   HTYPE = constants.HTYPE_CLUSTER
1157   _OP_REQP = ["name"]
1158
1159   def BuildHooksEnv(self):
1160     """Build hooks env.
1161
1162     """
1163     env = {
1164       "OP_TARGET": self.sstore.GetClusterName(),
1165       "NEW_NAME": self.op.name,
1166       }
1167     mn = self.sstore.GetMasterNode()
1168     return env, [mn], [mn]
1169
1170   def CheckPrereq(self):
1171     """Verify that the passed name is a valid one.
1172
1173     """
1174     hostname = utils.HostInfo(self.op.name)
1175
1176     new_name = hostname.name
1177     self.ip = new_ip = hostname.ip
1178     old_name = self.sstore.GetClusterName()
1179     old_ip = self.sstore.GetMasterIP()
1180     if new_name == old_name and new_ip == old_ip:
1181       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182                                  " cluster has changed")
1183     if new_ip != old_ip:
1184       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1185         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1186                                    " reachable on the network. Aborting." %
1187                                    new_ip)
1188
1189     self.op.name = new_name
1190
1191   def Exec(self, feedback_fn):
1192     """Rename the cluster.
1193
1194     """
1195     clustername = self.op.name
1196     ip = self.ip
1197     ss = self.sstore
1198
1199     # shutdown the master IP
1200     master = ss.GetMasterNode()
1201     if not rpc.call_node_stop_master(master):
1202       raise errors.OpExecError("Could not disable the master role")
1203
1204     try:
1205       # modify the sstore
1206       ss.SetKey(ss.SS_MASTER_IP, ip)
1207       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1208
1209       # Distribute updated ss config to all nodes
1210       myself = self.cfg.GetNodeInfo(master)
1211       dist_nodes = self.cfg.GetNodeList()
1212       if myself.name in dist_nodes:
1213         dist_nodes.remove(myself.name)
1214
1215       logger.Debug("Copying updated ssconf data to all nodes")
1216       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1217         fname = ss.KeyToFilename(keyname)
1218         result = rpc.call_upload_file(dist_nodes, fname)
1219         for to_node in dist_nodes:
1220           if not result[to_node]:
1221             logger.Error("copy of file %s to node %s failed" %
1222                          (fname, to_node))
1223     finally:
1224       if not rpc.call_node_start_master(master):
1225         logger.Error("Could not re-enable the master role on the master,"
1226                      " please restart manually.")
1227
1228
1229 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1230   """Sleep and poll for an instance's disk to sync.
1231
1232   """
1233   if not instance.disks:
1234     return True
1235
1236   if not oneshot:
1237     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238
1239   node = instance.primary_node
1240
1241   for dev in instance.disks:
1242     cfgw.SetDiskID(dev, node)
1243
1244   retries = 0
1245   while True:
1246     max_time = 0
1247     done = True
1248     cumul_degraded = False
1249     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250     if not rstats:
1251       proc.LogWarning("Can't get any data from node %s" % node)
1252       retries += 1
1253       if retries >= 10:
1254         raise errors.RemoteError("Can't contact node %s for mirror data,"
1255                                  " aborting." % node)
1256       time.sleep(6)
1257       continue
1258     retries = 0
1259     for i in range(len(rstats)):
1260       mstat = rstats[i]
1261       if mstat is None:
1262         proc.LogWarning("Can't compute data for node %s/%s" %
1263                         (node, instance.disks[i].iv_name))
1264         continue
1265       # we ignore the ldisk parameter
1266       perc_done, est_time, is_degraded, _ = mstat
1267       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268       if perc_done is not None:
1269         done = False
1270         if est_time is not None:
1271           rem_time = "%d estimated seconds remaining" % est_time
1272           max_time = est_time
1273         else:
1274           rem_time = "no time estimate"
1275         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276                      (instance.disks[i].iv_name, perc_done, rem_time))
1277     if done or oneshot:
1278       break
1279
1280     if unlock:
1281       utils.Unlock('cmd')
1282     try:
1283       time.sleep(min(60, max_time))
1284     finally:
1285       if unlock:
1286         utils.Lock('cmd')
1287
1288   if done:
1289     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1290   return not cumul_degraded
1291
1292
1293 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1294   """Check that mirrors are not degraded.
1295
1296   The ldisk parameter, if True, will change the test from the
1297   is_degraded attribute (which represents overall non-ok status for
1298   the device(s)) to the ldisk (representing the local storage status).
1299
1300   """
1301   cfgw.SetDiskID(dev, node)
1302   if ldisk:
1303     idx = 6
1304   else:
1305     idx = 5
1306
1307   result = True
1308   if on_primary or dev.AssembleOnSecondary():
1309     rstats = rpc.call_blockdev_find(node, dev)
1310     if not rstats:
1311       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1312       result = False
1313     else:
1314       result = result and (not rstats[idx])
1315   if dev.children:
1316     for child in dev.children:
1317       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1318
1319   return result
1320
1321
1322 class LUDiagnoseOS(NoHooksLU):
1323   """Logical unit for OS diagnose/query.
1324
1325   """
1326   _OP_REQP = ["output_fields", "names"]
1327
1328   def CheckPrereq(self):
1329     """Check prerequisites.
1330
1331     This always succeeds, since this is a pure query LU.
1332
1333     """
1334     if self.op.names:
1335       raise errors.OpPrereqError("Selective OS query not supported")
1336
1337     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1338     _CheckOutputFields(static=[],
1339                        dynamic=self.dynamic_fields,
1340                        selected=self.op.output_fields)
1341
1342   @staticmethod
1343   def _DiagnoseByOS(node_list, rlist):
1344     """Remaps a per-node return list into an a per-os per-node dictionary
1345
1346       Args:
1347         node_list: a list with the names of all nodes
1348         rlist: a map with node names as keys and OS objects as values
1349
1350       Returns:
1351         map: a map with osnames as keys and as value another map, with
1352              nodes as
1353              keys and list of OS objects as values
1354              e.g. {"debian-etch": {"node1": [<object>,...],
1355                                    "node2": [<object>,]}
1356                   }
1357
1358     """
1359     all_os = {}
1360     for node_name, nr in rlist.iteritems():
1361       if not nr:
1362         continue
1363       for os in nr:
1364         if os.name not in all_os:
1365           # build a list of nodes for this os containing empty lists
1366           # for each node in node_list
1367           all_os[os.name] = {}
1368           for nname in node_list:
1369             all_os[os.name][nname] = []
1370         all_os[os.name][node_name].append(os)
1371     return all_os
1372
1373   def Exec(self, feedback_fn):
1374     """Compute the list of OSes.
1375
1376     """
1377     node_list = self.cfg.GetNodeList()
1378     node_data = rpc.call_os_diagnose(node_list)
1379     if node_data == False:
1380       raise errors.OpExecError("Can't gather the list of OSes")
1381     pol = self._DiagnoseByOS(node_list, node_data)
1382     output = []
1383     for os_name, os_data in pol.iteritems():
1384       row = []
1385       for field in self.op.output_fields:
1386         if field == "name":
1387           val = os_name
1388         elif field == "valid":
1389           val = utils.all([osl and osl[0] for osl in os_data.values()])
1390         elif field == "node_status":
1391           val = {}
1392           for node_name, nos_list in os_data.iteritems():
1393             val[node_name] = [(v.status, v.path) for v in nos_list]
1394         else:
1395           raise errors.ParameterError(field)
1396         row.append(val)
1397       output.append(row)
1398
1399     return output
1400
1401
1402 class LURemoveNode(LogicalUnit):
1403   """Logical unit for removing a node.
1404
1405   """
1406   HPATH = "node-remove"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This doesn't run on the target node in the pre phase as a failed
1414     node would not allows itself to run.
1415
1416     """
1417     env = {
1418       "OP_TARGET": self.op.node_name,
1419       "NODE_NAME": self.op.node_name,
1420       }
1421     all_nodes = self.cfg.GetNodeList()
1422     all_nodes.remove(self.op.node_name)
1423     return env, all_nodes, all_nodes
1424
1425   def CheckPrereq(self):
1426     """Check prerequisites.
1427
1428     This checks:
1429      - the node exists in the configuration
1430      - it does not have primary or secondary instances
1431      - it's not the master
1432
1433     Any errors are signalled by raising errors.OpPrereqError.
1434
1435     """
1436     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1437     if node is None:
1438       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1439
1440     instance_list = self.cfg.GetInstanceList()
1441
1442     masternode = self.sstore.GetMasterNode()
1443     if node.name == masternode:
1444       raise errors.OpPrereqError("Node is the master node,"
1445                                  " you need to failover first.")
1446
1447     for instance_name in instance_list:
1448       instance = self.cfg.GetInstanceInfo(instance_name)
1449       if node.name == instance.primary_node:
1450         raise errors.OpPrereqError("Instance %s still running on the node,"
1451                                    " please remove first." % instance_name)
1452       if node.name in instance.secondary_nodes:
1453         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454                                    " please remove first." % instance_name)
1455     self.op.node_name = node.name
1456     self.node = node
1457
1458   def Exec(self, feedback_fn):
1459     """Removes the node from the cluster.
1460
1461     """
1462     node = self.node
1463     logger.Info("stopping the node daemon and removing configs from node %s" %
1464                 node.name)
1465
1466     rpc.call_node_leave_cluster(node.name)
1467
1468     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1469
1470     logger.Info("Removing node %s from config" % node.name)
1471
1472     self.cfg.RemoveNode(node.name)
1473
1474     _RemoveHostFromEtcHosts(node.name)
1475
1476
1477 class LUQueryNodes(NoHooksLU):
1478   """Logical unit for querying nodes.
1479
1480   """
1481   _OP_REQP = ["output_fields", "names"]
1482
1483   def CheckPrereq(self):
1484     """Check prerequisites.
1485
1486     This checks that the fields required are valid output fields.
1487
1488     """
1489     self.dynamic_fields = frozenset([
1490       "dtotal", "dfree",
1491       "mtotal", "mnode", "mfree",
1492       "bootid",
1493       "ctotal",
1494       ])
1495
1496     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1497                                "pinst_list", "sinst_list",
1498                                "pip", "sip", "tags"],
1499                        dynamic=self.dynamic_fields,
1500                        selected=self.op.output_fields)
1501
1502     self.wanted = _GetWantedNodes(self, self.op.names)
1503
1504   def Exec(self, feedback_fn):
1505     """Computes the list of nodes and their attributes.
1506
1507     """
1508     nodenames = self.wanted
1509     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1510
1511     # begin data gathering
1512
1513     if self.dynamic_fields.intersection(self.op.output_fields):
1514       live_data = {}
1515       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1516       for name in nodenames:
1517         nodeinfo = node_data.get(name, None)
1518         if nodeinfo:
1519           live_data[name] = {
1520             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1521             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1522             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1523             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1524             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1525             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1526             "bootid": nodeinfo['bootid'],
1527             }
1528         else:
1529           live_data[name] = {}
1530     else:
1531       live_data = dict.fromkeys(nodenames, {})
1532
1533     node_to_primary = dict([(name, set()) for name in nodenames])
1534     node_to_secondary = dict([(name, set()) for name in nodenames])
1535
1536     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1537                              "sinst_cnt", "sinst_list"))
1538     if inst_fields & frozenset(self.op.output_fields):
1539       instancelist = self.cfg.GetInstanceList()
1540
1541       for instance_name in instancelist:
1542         inst = self.cfg.GetInstanceInfo(instance_name)
1543         if inst.primary_node in node_to_primary:
1544           node_to_primary[inst.primary_node].add(inst.name)
1545         for secnode in inst.secondary_nodes:
1546           if secnode in node_to_secondary:
1547             node_to_secondary[secnode].add(inst.name)
1548
1549     # end data gathering
1550
1551     output = []
1552     for node in nodelist:
1553       node_output = []
1554       for field in self.op.output_fields:
1555         if field == "name":
1556           val = node.name
1557         elif field == "pinst_list":
1558           val = list(node_to_primary[node.name])
1559         elif field == "sinst_list":
1560           val = list(node_to_secondary[node.name])
1561         elif field == "pinst_cnt":
1562           val = len(node_to_primary[node.name])
1563         elif field == "sinst_cnt":
1564           val = len(node_to_secondary[node.name])
1565         elif field == "pip":
1566           val = node.primary_ip
1567         elif field == "sip":
1568           val = node.secondary_ip
1569         elif field == "tags":
1570           val = list(node.GetTags())
1571         elif field in self.dynamic_fields:
1572           val = live_data[node.name].get(field, None)
1573         else:
1574           raise errors.ParameterError(field)
1575         node_output.append(val)
1576       output.append(node_output)
1577
1578     return output
1579
1580
1581 class LUQueryNodeVolumes(NoHooksLU):
1582   """Logical unit for getting volumes on node(s).
1583
1584   """
1585   _OP_REQP = ["nodes", "output_fields"]
1586
1587   def CheckPrereq(self):
1588     """Check prerequisites.
1589
1590     This checks that the fields required are valid output fields.
1591
1592     """
1593     self.nodes = _GetWantedNodes(self, self.op.nodes)
1594
1595     _CheckOutputFields(static=["node"],
1596                        dynamic=["phys", "vg", "name", "size", "instance"],
1597                        selected=self.op.output_fields)
1598
1599
1600   def Exec(self, feedback_fn):
1601     """Computes the list of nodes and their attributes.
1602
1603     """
1604     nodenames = self.nodes
1605     volumes = rpc.call_node_volumes(nodenames)
1606
1607     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1608              in self.cfg.GetInstanceList()]
1609
1610     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1611
1612     output = []
1613     for node in nodenames:
1614       if node not in volumes or not volumes[node]:
1615         continue
1616
1617       node_vols = volumes[node][:]
1618       node_vols.sort(key=lambda vol: vol['dev'])
1619
1620       for vol in node_vols:
1621         node_output = []
1622         for field in self.op.output_fields:
1623           if field == "node":
1624             val = node
1625           elif field == "phys":
1626             val = vol['dev']
1627           elif field == "vg":
1628             val = vol['vg']
1629           elif field == "name":
1630             val = vol['name']
1631           elif field == "size":
1632             val = int(float(vol['size']))
1633           elif field == "instance":
1634             for inst in ilist:
1635               if node not in lv_by_node[inst]:
1636                 continue
1637               if vol['name'] in lv_by_node[inst][node]:
1638                 val = inst.name
1639                 break
1640             else:
1641               val = '-'
1642           else:
1643             raise errors.ParameterError(field)
1644           node_output.append(str(val))
1645
1646         output.append(node_output)
1647
1648     return output
1649
1650
1651 class LUAddNode(LogicalUnit):
1652   """Logical unit for adding node to the cluster.
1653
1654   """
1655   HPATH = "node-add"
1656   HTYPE = constants.HTYPE_NODE
1657   _OP_REQP = ["node_name"]
1658
1659   def BuildHooksEnv(self):
1660     """Build hooks env.
1661
1662     This will run on all nodes before, and on all nodes + the new node after.
1663
1664     """
1665     env = {
1666       "OP_TARGET": self.op.node_name,
1667       "NODE_NAME": self.op.node_name,
1668       "NODE_PIP": self.op.primary_ip,
1669       "NODE_SIP": self.op.secondary_ip,
1670       }
1671     nodes_0 = self.cfg.GetNodeList()
1672     nodes_1 = nodes_0 + [self.op.node_name, ]
1673     return env, nodes_0, nodes_1
1674
1675   def CheckPrereq(self):
1676     """Check prerequisites.
1677
1678     This checks:
1679      - the new node is not already in the config
1680      - it is resolvable
1681      - its parameters (single/dual homed) matches the cluster
1682
1683     Any errors are signalled by raising errors.OpPrereqError.
1684
1685     """
1686     node_name = self.op.node_name
1687     cfg = self.cfg
1688
1689     dns_data = utils.HostInfo(node_name)
1690
1691     node = dns_data.name
1692     primary_ip = self.op.primary_ip = dns_data.ip
1693     secondary_ip = getattr(self.op, "secondary_ip", None)
1694     if secondary_ip is None:
1695       secondary_ip = primary_ip
1696     if not utils.IsValidIP(secondary_ip):
1697       raise errors.OpPrereqError("Invalid secondary IP given")
1698     self.op.secondary_ip = secondary_ip
1699
1700     node_list = cfg.GetNodeList()
1701     if not self.op.readd and node in node_list:
1702       raise errors.OpPrereqError("Node %s is already in the configuration" %
1703                                  node)
1704     elif self.op.readd and node not in node_list:
1705       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1706
1707     for existing_node_name in node_list:
1708       existing_node = cfg.GetNodeInfo(existing_node_name)
1709
1710       if self.op.readd and node == existing_node_name:
1711         if (existing_node.primary_ip != primary_ip or
1712             existing_node.secondary_ip != secondary_ip):
1713           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1714                                      " address configuration as before")
1715         continue
1716
1717       if (existing_node.primary_ip == primary_ip or
1718           existing_node.secondary_ip == primary_ip or
1719           existing_node.primary_ip == secondary_ip or
1720           existing_node.secondary_ip == secondary_ip):
1721         raise errors.OpPrereqError("New node ip address(es) conflict with"
1722                                    " existing node %s" % existing_node.name)
1723
1724     # check that the type of the node (single versus dual homed) is the
1725     # same as for the master
1726     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1727     master_singlehomed = myself.secondary_ip == myself.primary_ip
1728     newbie_singlehomed = secondary_ip == primary_ip
1729     if master_singlehomed != newbie_singlehomed:
1730       if master_singlehomed:
1731         raise errors.OpPrereqError("The master has no private ip but the"
1732                                    " new node has one")
1733       else:
1734         raise errors.OpPrereqError("The master has a private ip but the"
1735                                    " new node doesn't have one")
1736
1737     # checks reachablity
1738     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1739       raise errors.OpPrereqError("Node not reachable by ping")
1740
1741     if not newbie_singlehomed:
1742       # check reachability from my secondary ip to newbie's secondary ip
1743       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1744                            source=myself.secondary_ip):
1745         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1746                                    " based ping to noded port")
1747
1748     self.new_node = objects.Node(name=node,
1749                                  primary_ip=primary_ip,
1750                                  secondary_ip=secondary_ip)
1751
1752     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1753       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1754         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1755                                    constants.VNC_PASSWORD_FILE)
1756
1757   def Exec(self, feedback_fn):
1758     """Adds the new node to the cluster.
1759
1760     """
1761     new_node = self.new_node
1762     node = new_node.name
1763
1764     # set up inter-node password and certificate and restarts the node daemon
1765     gntpass = self.sstore.GetNodeDaemonPassword()
1766     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1767       raise errors.OpExecError("ganeti password corruption detected")
1768     f = open(constants.SSL_CERT_FILE)
1769     try:
1770       gntpem = f.read(8192)
1771     finally:
1772       f.close()
1773     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1774     # so we use this to detect an invalid certificate; as long as the
1775     # cert doesn't contain this, the here-document will be correctly
1776     # parsed by the shell sequence below
1777     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1778       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1779     if not gntpem.endswith("\n"):
1780       raise errors.OpExecError("PEM must end with newline")
1781     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1782
1783     # and then connect with ssh to set password and start ganeti-noded
1784     # note that all the below variables are sanitized at this point,
1785     # either by being constants or by the checks above
1786     ss = self.sstore
1787     mycommand = ("umask 077 && "
1788                  "echo '%s' > '%s' && "
1789                  "cat > '%s' << '!EOF.' && \n"
1790                  "%s!EOF.\n%s restart" %
1791                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1792                   constants.SSL_CERT_FILE, gntpem,
1793                   constants.NODE_INITD_SCRIPT))
1794
1795     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1796     if result.failed:
1797       raise errors.OpExecError("Remote command on node %s, error: %s,"
1798                                " output: %s" %
1799                                (node, result.fail_reason, result.output))
1800
1801     # check connectivity
1802     time.sleep(4)
1803
1804     result = rpc.call_version([node])[node]
1805     if result:
1806       if constants.PROTOCOL_VERSION == result:
1807         logger.Info("communication to node %s fine, sw version %s match" %
1808                     (node, result))
1809       else:
1810         raise errors.OpExecError("Version mismatch master version %s,"
1811                                  " node version %s" %
1812                                  (constants.PROTOCOL_VERSION, result))
1813     else:
1814       raise errors.OpExecError("Cannot get version from the new node")
1815
1816     # setup ssh on node
1817     logger.Info("copy ssh key to node %s" % node)
1818     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1819     keyarray = []
1820     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1821                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1822                 priv_key, pub_key]
1823
1824     for i in keyfiles:
1825       f = open(i, 'r')
1826       try:
1827         keyarray.append(f.read())
1828       finally:
1829         f.close()
1830
1831     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1832                                keyarray[3], keyarray[4], keyarray[5])
1833
1834     if not result:
1835       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1836
1837     # Add node to our /etc/hosts, and add key to known_hosts
1838     _AddHostToEtcHosts(new_node.name)
1839
1840     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1841                       self.cfg.GetHostKey())
1842
1843     if new_node.secondary_ip != new_node.primary_ip:
1844       if not rpc.call_node_tcp_ping(new_node.name,
1845                                     constants.LOCALHOST_IP_ADDRESS,
1846                                     new_node.secondary_ip,
1847                                     constants.DEFAULT_NODED_PORT,
1848                                     10, False):
1849         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1850                                  " you gave (%s). Please fix and re-run this"
1851                                  " command." % new_node.secondary_ip)
1852
1853     success, msg = ssh.VerifyNodeHostname(node)
1854     if not success:
1855       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1856                                " than the one the resolver gives: %s."
1857                                " Please fix and re-run this command." %
1858                                (node, msg))
1859
1860     # Distribute updated /etc/hosts and known_hosts to all nodes,
1861     # including the node just added
1862     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1863     dist_nodes = self.cfg.GetNodeList()
1864     if not self.op.readd:
1865       dist_nodes.append(node)
1866     if myself.name in dist_nodes:
1867       dist_nodes.remove(myself.name)
1868
1869     logger.Debug("Copying hosts and known_hosts to all nodes")
1870     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1871       result = rpc.call_upload_file(dist_nodes, fname)
1872       for to_node in dist_nodes:
1873         if not result[to_node]:
1874           logger.Error("copy of file %s to node %s failed" %
1875                        (fname, to_node))
1876
1877     to_copy = ss.GetFileList()
1878     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1879       to_copy.append(constants.VNC_PASSWORD_FILE)
1880     for fname in to_copy:
1881       if not ssh.CopyFileToNode(node, fname):
1882         logger.Error("could not copy file %s to node %s" % (fname, node))
1883
1884     if not self.op.readd:
1885       logger.Info("adding node %s to cluster.conf" % node)
1886       self.cfg.AddNode(new_node)
1887
1888
1889 class LUMasterFailover(LogicalUnit):
1890   """Failover the master node to the current node.
1891
1892   This is a special LU in that it must run on a non-master node.
1893
1894   """
1895   HPATH = "master-failover"
1896   HTYPE = constants.HTYPE_CLUSTER
1897   REQ_MASTER = False
1898   _OP_REQP = []
1899
1900   def BuildHooksEnv(self):
1901     """Build hooks env.
1902
1903     This will run on the new master only in the pre phase, and on all
1904     the nodes in the post phase.
1905
1906     """
1907     env = {
1908       "OP_TARGET": self.new_master,
1909       "NEW_MASTER": self.new_master,
1910       "OLD_MASTER": self.old_master,
1911       }
1912     return env, [self.new_master], self.cfg.GetNodeList()
1913
1914   def CheckPrereq(self):
1915     """Check prerequisites.
1916
1917     This checks that we are not already the master.
1918
1919     """
1920     self.new_master = utils.HostInfo().name
1921     self.old_master = self.sstore.GetMasterNode()
1922
1923     if self.old_master == self.new_master:
1924       raise errors.OpPrereqError("This commands must be run on the node"
1925                                  " where you want the new master to be."
1926                                  " %s is already the master" %
1927                                  self.old_master)
1928
1929   def Exec(self, feedback_fn):
1930     """Failover the master node.
1931
1932     This command, when run on a non-master node, will cause the current
1933     master to cease being master, and the non-master to become new
1934     master.
1935
1936     """
1937     #TODO: do not rely on gethostname returning the FQDN
1938     logger.Info("setting master to %s, old master: %s" %
1939                 (self.new_master, self.old_master))
1940
1941     if not rpc.call_node_stop_master(self.old_master):
1942       logger.Error("could disable the master role on the old master"
1943                    " %s, please disable manually" % self.old_master)
1944
1945     ss = self.sstore
1946     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1947     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1948                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1949       logger.Error("could not distribute the new simple store master file"
1950                    " to the other nodes, please check.")
1951
1952     if not rpc.call_node_start_master(self.new_master):
1953       logger.Error("could not start the master role on the new master"
1954                    " %s, please check" % self.new_master)
1955       feedback_fn("Error in activating the master IP on the new master,"
1956                   " please fix manually.")
1957
1958
1959
1960 class LUQueryClusterInfo(NoHooksLU):
1961   """Query cluster configuration.
1962
1963   """
1964   _OP_REQP = []
1965   REQ_MASTER = False
1966
1967   def CheckPrereq(self):
1968     """No prerequsites needed for this LU.
1969
1970     """
1971     pass
1972
1973   def Exec(self, feedback_fn):
1974     """Return cluster config.
1975
1976     """
1977     result = {
1978       "name": self.sstore.GetClusterName(),
1979       "software_version": constants.RELEASE_VERSION,
1980       "protocol_version": constants.PROTOCOL_VERSION,
1981       "config_version": constants.CONFIG_VERSION,
1982       "os_api_version": constants.OS_API_VERSION,
1983       "export_version": constants.EXPORT_VERSION,
1984       "master": self.sstore.GetMasterNode(),
1985       "architecture": (platform.architecture()[0], platform.machine()),
1986       "hypervisor_type": self.sstore.GetHypervisorType(),
1987       }
1988
1989     return result
1990
1991
1992 class LUClusterCopyFile(NoHooksLU):
1993   """Copy file to cluster.
1994
1995   """
1996   _OP_REQP = ["nodes", "filename"]
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     It should check that the named file exists and that the given list
2002     of nodes is valid.
2003
2004     """
2005     if not os.path.exists(self.op.filename):
2006       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2007
2008     self.nodes = _GetWantedNodes(self, self.op.nodes)
2009
2010   def Exec(self, feedback_fn):
2011     """Copy a file from master to some nodes.
2012
2013     Args:
2014       opts - class with options as members
2015       args - list containing a single element, the file name
2016     Opts used:
2017       nodes - list containing the name of target nodes; if empty, all nodes
2018
2019     """
2020     filename = self.op.filename
2021
2022     myname = utils.HostInfo().name
2023
2024     for node in self.nodes:
2025       if node == myname:
2026         continue
2027       if not ssh.CopyFileToNode(node, filename):
2028         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2029
2030
2031 class LUDumpClusterConfig(NoHooksLU):
2032   """Return a text-representation of the cluster-config.
2033
2034   """
2035   _OP_REQP = []
2036
2037   def CheckPrereq(self):
2038     """No prerequisites.
2039
2040     """
2041     pass
2042
2043   def Exec(self, feedback_fn):
2044     """Dump a representation of the cluster config to the standard output.
2045
2046     """
2047     return self.cfg.DumpConfig()
2048
2049
2050 class LURunClusterCommand(NoHooksLU):
2051   """Run a command on some nodes.
2052
2053   """
2054   _OP_REQP = ["command", "nodes"]
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     It checks that the given list of nodes is valid.
2060
2061     """
2062     self.nodes = _GetWantedNodes(self, self.op.nodes)
2063
2064   def Exec(self, feedback_fn):
2065     """Run a command on some nodes.
2066
2067     """
2068     # put the master at the end of the nodes list
2069     master_node = self.sstore.GetMasterNode()
2070     if master_node in self.nodes:
2071       self.nodes.remove(master_node)
2072       self.nodes.append(master_node)
2073
2074     data = []
2075     for node in self.nodes:
2076       result = ssh.SSHCall(node, "root", self.op.command)
2077       data.append((node, result.output, result.exit_code))
2078
2079     return data
2080
2081
2082 class LUActivateInstanceDisks(NoHooksLU):
2083   """Bring up an instance's disks.
2084
2085   """
2086   _OP_REQP = ["instance_name"]
2087
2088   def CheckPrereq(self):
2089     """Check prerequisites.
2090
2091     This checks that the instance is in the cluster.
2092
2093     """
2094     instance = self.cfg.GetInstanceInfo(
2095       self.cfg.ExpandInstanceName(self.op.instance_name))
2096     if instance is None:
2097       raise errors.OpPrereqError("Instance '%s' not known" %
2098                                  self.op.instance_name)
2099     self.instance = instance
2100
2101
2102   def Exec(self, feedback_fn):
2103     """Activate the disks.
2104
2105     """
2106     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2107     if not disks_ok:
2108       raise errors.OpExecError("Cannot activate block devices")
2109
2110     return disks_info
2111
2112
2113 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2114   """Prepare the block devices for an instance.
2115
2116   This sets up the block devices on all nodes.
2117
2118   Args:
2119     instance: a ganeti.objects.Instance object
2120     ignore_secondaries: if true, errors on secondary nodes won't result
2121                         in an error return from the function
2122
2123   Returns:
2124     false if the operation failed
2125     list of (host, instance_visible_name, node_visible_name) if the operation
2126          suceeded with the mapping from node devices to instance devices
2127   """
2128   device_info = []
2129   disks_ok = True
2130   iname = instance.name
2131   # With the two passes mechanism we try to reduce the window of
2132   # opportunity for the race condition of switching DRBD to primary
2133   # before handshaking occured, but we do not eliminate it
2134
2135   # The proper fix would be to wait (with some limits) until the
2136   # connection has been made and drbd transitions from WFConnection
2137   # into any other network-connected state (Connected, SyncTarget,
2138   # SyncSource, etc.)
2139
2140   # 1st pass, assemble on all nodes in secondary mode
2141   for inst_disk in instance.disks:
2142     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2143       cfg.SetDiskID(node_disk, node)
2144       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2145       if not result:
2146         logger.Error("could not prepare block device %s on node %s"
2147                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2148         if not ignore_secondaries:
2149           disks_ok = False
2150
2151   # FIXME: race condition on drbd migration to primary
2152
2153   # 2nd pass, do only the primary node
2154   for inst_disk in instance.disks:
2155     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2156       if node != instance.primary_node:
2157         continue
2158       cfg.SetDiskID(node_disk, node)
2159       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2160       if not result:
2161         logger.Error("could not prepare block device %s on node %s"
2162                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2163         disks_ok = False
2164     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2165
2166   # leave the disks configured for the primary node
2167   # this is a workaround that would be fixed better by
2168   # improving the logical/physical id handling
2169   for disk in instance.disks:
2170     cfg.SetDiskID(disk, instance.primary_node)
2171
2172   return disks_ok, device_info
2173
2174
2175 def _StartInstanceDisks(cfg, instance, force):
2176   """Start the disks of an instance.
2177
2178   """
2179   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2180                                            ignore_secondaries=force)
2181   if not disks_ok:
2182     _ShutdownInstanceDisks(instance, cfg)
2183     if force is not None and not force:
2184       logger.Error("If the message above refers to a secondary node,"
2185                    " you can retry the operation using '--force'.")
2186     raise errors.OpExecError("Disk consistency error")
2187
2188
2189 class LUDeactivateInstanceDisks(NoHooksLU):
2190   """Shutdown an instance's disks.
2191
2192   """
2193   _OP_REQP = ["instance_name"]
2194
2195   def CheckPrereq(self):
2196     """Check prerequisites.
2197
2198     This checks that the instance is in the cluster.
2199
2200     """
2201     instance = self.cfg.GetInstanceInfo(
2202       self.cfg.ExpandInstanceName(self.op.instance_name))
2203     if instance is None:
2204       raise errors.OpPrereqError("Instance '%s' not known" %
2205                                  self.op.instance_name)
2206     self.instance = instance
2207
2208   def Exec(self, feedback_fn):
2209     """Deactivate the disks
2210
2211     """
2212     instance = self.instance
2213     ins_l = rpc.call_instance_list([instance.primary_node])
2214     ins_l = ins_l[instance.primary_node]
2215     if not type(ins_l) is list:
2216       raise errors.OpExecError("Can't contact node '%s'" %
2217                                instance.primary_node)
2218
2219     if self.instance.name in ins_l:
2220       raise errors.OpExecError("Instance is running, can't shutdown"
2221                                " block devices.")
2222
2223     _ShutdownInstanceDisks(instance, self.cfg)
2224
2225
2226 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2227   """Shutdown block devices of an instance.
2228
2229   This does the shutdown on all nodes of the instance.
2230
2231   If the ignore_primary is false, errors on the primary node are
2232   ignored.
2233
2234   """
2235   result = True
2236   for disk in instance.disks:
2237     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2238       cfg.SetDiskID(top_disk, node)
2239       if not rpc.call_blockdev_shutdown(node, top_disk):
2240         logger.Error("could not shutdown block device %s on node %s" %
2241                      (disk.iv_name, node))
2242         if not ignore_primary or node != instance.primary_node:
2243           result = False
2244   return result
2245
2246
2247 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2248   """Checks if a node has enough free memory.
2249
2250   This function check if a given node has the needed amount of free
2251   memory. In case the node has less memory or we cannot get the
2252   information from the node, this function raise an OpPrereqError
2253   exception.
2254
2255   Args:
2256     - cfg: a ConfigWriter instance
2257     - node: the node name
2258     - reason: string to use in the error message
2259     - requested: the amount of memory in MiB
2260
2261   """
2262   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2263   if not (nodeinfo and isinstance(nodeinfo, dict) and
2264           node in nodeinfo and isinstance(nodeinfo[node], dict)):
2265     raise errors.OpPrereqError("Could not contact node %s for resource"
2266                              " information" % (node,))
2267
2268   free_mem = nodeinfo[node].get('memory_free')
2269   if not isinstance(free_mem, int):
2270     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271                              " was '%s'" % (node, free_mem))
2272   if requested > free_mem:
2273     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274                              " needed %s MiB, available %s MiB" %
2275                              (node, reason, requested, free_mem))
2276
2277
2278 class LUStartupInstance(LogicalUnit):
2279   """Starts an instance.
2280
2281   """
2282   HPATH = "instance-start"
2283   HTYPE = constants.HTYPE_INSTANCE
2284   _OP_REQP = ["instance_name", "force"]
2285
2286   def BuildHooksEnv(self):
2287     """Build hooks env.
2288
2289     This runs on master, primary and secondary nodes of the instance.
2290
2291     """
2292     env = {
2293       "FORCE": self.op.force,
2294       }
2295     env.update(_BuildInstanceHookEnvByObject(self.instance))
2296     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297           list(self.instance.secondary_nodes))
2298     return env, nl, nl
2299
2300   def CheckPrereq(self):
2301     """Check prerequisites.
2302
2303     This checks that the instance is in the cluster.
2304
2305     """
2306     instance = self.cfg.GetInstanceInfo(
2307       self.cfg.ExpandInstanceName(self.op.instance_name))
2308     if instance is None:
2309       raise errors.OpPrereqError("Instance '%s' not known" %
2310                                  self.op.instance_name)
2311
2312     # check bridges existance
2313     _CheckInstanceBridgesExist(instance)
2314
2315     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316                          "starting instance %s" % instance.name,
2317                          instance.memory)
2318
2319     self.instance = instance
2320     self.op.instance_name = instance.name
2321
2322   def Exec(self, feedback_fn):
2323     """Start the instance.
2324
2325     """
2326     instance = self.instance
2327     force = self.op.force
2328     extra_args = getattr(self.op, "extra_args", "")
2329
2330     self.cfg.MarkInstanceUp(instance.name)
2331
2332     node_current = instance.primary_node
2333
2334     _StartInstanceDisks(self.cfg, instance, force)
2335
2336     if not rpc.call_instance_start(node_current, instance, extra_args):
2337       _ShutdownInstanceDisks(instance, self.cfg)
2338       raise errors.OpExecError("Could not start instance")
2339
2340
2341 class LURebootInstance(LogicalUnit):
2342   """Reboot an instance.
2343
2344   """
2345   HPATH = "instance-reboot"
2346   HTYPE = constants.HTYPE_INSTANCE
2347   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2348
2349   def BuildHooksEnv(self):
2350     """Build hooks env.
2351
2352     This runs on master, primary and secondary nodes of the instance.
2353
2354     """
2355     env = {
2356       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2357       }
2358     env.update(_BuildInstanceHookEnvByObject(self.instance))
2359     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360           list(self.instance.secondary_nodes))
2361     return env, nl, nl
2362
2363   def CheckPrereq(self):
2364     """Check prerequisites.
2365
2366     This checks that the instance is in the cluster.
2367
2368     """
2369     instance = self.cfg.GetInstanceInfo(
2370       self.cfg.ExpandInstanceName(self.op.instance_name))
2371     if instance is None:
2372       raise errors.OpPrereqError("Instance '%s' not known" %
2373                                  self.op.instance_name)
2374
2375     # check bridges existance
2376     _CheckInstanceBridgesExist(instance)
2377
2378     self.instance = instance
2379     self.op.instance_name = instance.name
2380
2381   def Exec(self, feedback_fn):
2382     """Reboot the instance.
2383
2384     """
2385     instance = self.instance
2386     ignore_secondaries = self.op.ignore_secondaries
2387     reboot_type = self.op.reboot_type
2388     extra_args = getattr(self.op, "extra_args", "")
2389
2390     node_current = instance.primary_node
2391
2392     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393                            constants.INSTANCE_REBOOT_HARD,
2394                            constants.INSTANCE_REBOOT_FULL]:
2395       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396                                   (constants.INSTANCE_REBOOT_SOFT,
2397                                    constants.INSTANCE_REBOOT_HARD,
2398                                    constants.INSTANCE_REBOOT_FULL))
2399
2400     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401                        constants.INSTANCE_REBOOT_HARD]:
2402       if not rpc.call_instance_reboot(node_current, instance,
2403                                       reboot_type, extra_args):
2404         raise errors.OpExecError("Could not reboot instance")
2405     else:
2406       if not rpc.call_instance_shutdown(node_current, instance):
2407         raise errors.OpExecError("could not shutdown instance for full reboot")
2408       _ShutdownInstanceDisks(instance, self.cfg)
2409       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410       if not rpc.call_instance_start(node_current, instance, extra_args):
2411         _ShutdownInstanceDisks(instance, self.cfg)
2412         raise errors.OpExecError("Could not start instance for full reboot")
2413
2414     self.cfg.MarkInstanceUp(instance.name)
2415
2416
2417 class LUShutdownInstance(LogicalUnit):
2418   """Shutdown an instance.
2419
2420   """
2421   HPATH = "instance-stop"
2422   HTYPE = constants.HTYPE_INSTANCE
2423   _OP_REQP = ["instance_name"]
2424
2425   def BuildHooksEnv(self):
2426     """Build hooks env.
2427
2428     This runs on master, primary and secondary nodes of the instance.
2429
2430     """
2431     env = _BuildInstanceHookEnvByObject(self.instance)
2432     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433           list(self.instance.secondary_nodes))
2434     return env, nl, nl
2435
2436   def CheckPrereq(self):
2437     """Check prerequisites.
2438
2439     This checks that the instance is in the cluster.
2440
2441     """
2442     instance = self.cfg.GetInstanceInfo(
2443       self.cfg.ExpandInstanceName(self.op.instance_name))
2444     if instance is None:
2445       raise errors.OpPrereqError("Instance '%s' not known" %
2446                                  self.op.instance_name)
2447     self.instance = instance
2448
2449   def Exec(self, feedback_fn):
2450     """Shutdown the instance.
2451
2452     """
2453     instance = self.instance
2454     node_current = instance.primary_node
2455     self.cfg.MarkInstanceDown(instance.name)
2456     if not rpc.call_instance_shutdown(node_current, instance):
2457       logger.Error("could not shutdown instance")
2458
2459     _ShutdownInstanceDisks(instance, self.cfg)
2460
2461
2462 class LUReinstallInstance(LogicalUnit):
2463   """Reinstall an instance.
2464
2465   """
2466   HPATH = "instance-reinstall"
2467   HTYPE = constants.HTYPE_INSTANCE
2468   _OP_REQP = ["instance_name"]
2469
2470   def BuildHooksEnv(self):
2471     """Build hooks env.
2472
2473     This runs on master, primary and secondary nodes of the instance.
2474
2475     """
2476     env = _BuildInstanceHookEnvByObject(self.instance)
2477     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478           list(self.instance.secondary_nodes))
2479     return env, nl, nl
2480
2481   def CheckPrereq(self):
2482     """Check prerequisites.
2483
2484     This checks that the instance is in the cluster and is not running.
2485
2486     """
2487     instance = self.cfg.GetInstanceInfo(
2488       self.cfg.ExpandInstanceName(self.op.instance_name))
2489     if instance is None:
2490       raise errors.OpPrereqError("Instance '%s' not known" %
2491                                  self.op.instance_name)
2492     if instance.disk_template == constants.DT_DISKLESS:
2493       raise errors.OpPrereqError("Instance '%s' has no disks" %
2494                                  self.op.instance_name)
2495     if instance.status != "down":
2496       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497                                  self.op.instance_name)
2498     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2499     if remote_info:
2500       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501                                  (self.op.instance_name,
2502                                   instance.primary_node))
2503
2504     self.op.os_type = getattr(self.op, "os_type", None)
2505     if self.op.os_type is not None:
2506       # OS verification
2507       pnode = self.cfg.GetNodeInfo(
2508         self.cfg.ExpandNodeName(instance.primary_node))
2509       if pnode is None:
2510         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2511                                    self.op.pnode)
2512       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2513       if not os_obj:
2514         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515                                    " primary node"  % self.op.os_type)
2516
2517     self.instance = instance
2518
2519   def Exec(self, feedback_fn):
2520     """Reinstall the instance.
2521
2522     """
2523     inst = self.instance
2524
2525     if self.op.os_type is not None:
2526       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527       inst.os = self.op.os_type
2528       self.cfg.AddInstance(inst)
2529
2530     _StartInstanceDisks(self.cfg, inst, None)
2531     try:
2532       feedback_fn("Running the instance OS create scripts...")
2533       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534         raise errors.OpExecError("Could not install OS for instance %s"
2535                                  " on node %s" %
2536                                  (inst.name, inst.primary_node))
2537     finally:
2538       _ShutdownInstanceDisks(inst, self.cfg)
2539
2540
2541 class LURenameInstance(LogicalUnit):
2542   """Rename an instance.
2543
2544   """
2545   HPATH = "instance-rename"
2546   HTYPE = constants.HTYPE_INSTANCE
2547   _OP_REQP = ["instance_name", "new_name"]
2548
2549   def BuildHooksEnv(self):
2550     """Build hooks env.
2551
2552     This runs on master, primary and secondary nodes of the instance.
2553
2554     """
2555     env = _BuildInstanceHookEnvByObject(self.instance)
2556     env["INSTANCE_NEW_NAME"] = self.op.new_name
2557     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558           list(self.instance.secondary_nodes))
2559     return env, nl, nl
2560
2561   def CheckPrereq(self):
2562     """Check prerequisites.
2563
2564     This checks that the instance is in the cluster and is not running.
2565
2566     """
2567     instance = self.cfg.GetInstanceInfo(
2568       self.cfg.ExpandInstanceName(self.op.instance_name))
2569     if instance is None:
2570       raise errors.OpPrereqError("Instance '%s' not known" %
2571                                  self.op.instance_name)
2572     if instance.status != "down":
2573       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574                                  self.op.instance_name)
2575     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2576     if remote_info:
2577       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578                                  (self.op.instance_name,
2579                                   instance.primary_node))
2580     self.instance = instance
2581
2582     # new name verification
2583     name_info = utils.HostInfo(self.op.new_name)
2584
2585     self.op.new_name = new_name = name_info.name
2586     instance_list = self.cfg.GetInstanceList()
2587     if new_name in instance_list:
2588       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2589                                  new_name)
2590
2591     if not getattr(self.op, "ignore_ip", False):
2592       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2593         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2594                                    (name_info.ip, new_name))
2595
2596
2597   def Exec(self, feedback_fn):
2598     """Reinstall the instance.
2599
2600     """
2601     inst = self.instance
2602     old_name = inst.name
2603
2604     self.cfg.RenameInstance(inst.name, self.op.new_name)
2605
2606     # re-read the instance from the configuration after rename
2607     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2608
2609     _StartInstanceDisks(self.cfg, inst, None)
2610     try:
2611       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2612                                           "sda", "sdb"):
2613         msg = ("Could not run OS rename script for instance %s on node %s"
2614                " (but the instance has been renamed in Ganeti)" %
2615                (inst.name, inst.primary_node))
2616         logger.Error(msg)
2617     finally:
2618       _ShutdownInstanceDisks(inst, self.cfg)
2619
2620
2621 class LURemoveInstance(LogicalUnit):
2622   """Remove an instance.
2623
2624   """
2625   HPATH = "instance-remove"
2626   HTYPE = constants.HTYPE_INSTANCE
2627   _OP_REQP = ["instance_name", "ignore_failures"]
2628
2629   def BuildHooksEnv(self):
2630     """Build hooks env.
2631
2632     This runs on master, primary and secondary nodes of the instance.
2633
2634     """
2635     env = _BuildInstanceHookEnvByObject(self.instance)
2636     nl = [self.sstore.GetMasterNode()]
2637     return env, nl, nl
2638
2639   def CheckPrereq(self):
2640     """Check prerequisites.
2641
2642     This checks that the instance is in the cluster.
2643
2644     """
2645     instance = self.cfg.GetInstanceInfo(
2646       self.cfg.ExpandInstanceName(self.op.instance_name))
2647     if instance is None:
2648       raise errors.OpPrereqError("Instance '%s' not known" %
2649                                  self.op.instance_name)
2650     self.instance = instance
2651
2652   def Exec(self, feedback_fn):
2653     """Remove the instance.
2654
2655     """
2656     instance = self.instance
2657     logger.Info("shutting down instance %s on node %s" %
2658                 (instance.name, instance.primary_node))
2659
2660     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2661       if self.op.ignore_failures:
2662         feedback_fn("Warning: can't shutdown instance")
2663       else:
2664         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2665                                  (instance.name, instance.primary_node))
2666
2667     logger.Info("removing block devices for instance %s" % instance.name)
2668
2669     if not _RemoveDisks(instance, self.cfg):
2670       if self.op.ignore_failures:
2671         feedback_fn("Warning: can't remove instance's disks")
2672       else:
2673         raise errors.OpExecError("Can't remove instance's disks")
2674
2675     logger.Info("removing instance %s out of cluster config" % instance.name)
2676
2677     self.cfg.RemoveInstance(instance.name)
2678
2679
2680 class LUQueryInstances(NoHooksLU):
2681   """Logical unit for querying instances.
2682
2683   """
2684   _OP_REQP = ["output_fields", "names"]
2685
2686   def CheckPrereq(self):
2687     """Check prerequisites.
2688
2689     This checks that the fields required are valid output fields.
2690
2691     """
2692     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2693     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2694                                "admin_state", "admin_ram",
2695                                "disk_template", "ip", "mac", "bridge",
2696                                "sda_size", "sdb_size", "vcpus", "tags"],
2697                        dynamic=self.dynamic_fields,
2698                        selected=self.op.output_fields)
2699
2700     self.wanted = _GetWantedInstances(self, self.op.names)
2701
2702   def Exec(self, feedback_fn):
2703     """Computes the list of nodes and their attributes.
2704
2705     """
2706     instance_names = self.wanted
2707     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2708                      in instance_names]
2709
2710     # begin data gathering
2711
2712     nodes = frozenset([inst.primary_node for inst in instance_list])
2713
2714     bad_nodes = []
2715     if self.dynamic_fields.intersection(self.op.output_fields):
2716       live_data = {}
2717       node_data = rpc.call_all_instances_info(nodes)
2718       for name in nodes:
2719         result = node_data[name]
2720         if result:
2721           live_data.update(result)
2722         elif result == False:
2723           bad_nodes.append(name)
2724         # else no instance is alive
2725     else:
2726       live_data = dict([(name, {}) for name in instance_names])
2727
2728     # end data gathering
2729
2730     output = []
2731     for instance in instance_list:
2732       iout = []
2733       for field in self.op.output_fields:
2734         if field == "name":
2735           val = instance.name
2736         elif field == "os":
2737           val = instance.os
2738         elif field == "pnode":
2739           val = instance.primary_node
2740         elif field == "snodes":
2741           val = list(instance.secondary_nodes)
2742         elif field == "admin_state":
2743           val = (instance.status != "down")
2744         elif field == "oper_state":
2745           if instance.primary_node in bad_nodes:
2746             val = None
2747           else:
2748             val = bool(live_data.get(instance.name))
2749         elif field == "status":
2750           if instance.primary_node in bad_nodes:
2751             val = "ERROR_nodedown"
2752           else:
2753             running = bool(live_data.get(instance.name))
2754             if running:
2755               if instance.status != "down":
2756                 val = "running"
2757               else:
2758                 val = "ERROR_up"
2759             else:
2760               if instance.status != "down":
2761                 val = "ERROR_down"
2762               else:
2763                 val = "ADMIN_down"
2764         elif field == "admin_ram":
2765           val = instance.memory
2766         elif field == "oper_ram":
2767           if instance.primary_node in bad_nodes:
2768             val = None
2769           elif instance.name in live_data:
2770             val = live_data[instance.name].get("memory", "?")
2771           else:
2772             val = "-"
2773         elif field == "disk_template":
2774           val = instance.disk_template
2775         elif field == "ip":
2776           val = instance.nics[0].ip
2777         elif field == "bridge":
2778           val = instance.nics[0].bridge
2779         elif field == "mac":
2780           val = instance.nics[0].mac
2781         elif field == "sda_size" or field == "sdb_size":
2782           disk = instance.FindDisk(field[:3])
2783           if disk is None:
2784             val = None
2785           else:
2786             val = disk.size
2787         elif field == "vcpus":
2788           val = instance.vcpus
2789         elif field == "tags":
2790           val = list(instance.GetTags())
2791         else:
2792           raise errors.ParameterError(field)
2793         iout.append(val)
2794       output.append(iout)
2795
2796     return output
2797
2798
2799 class LUFailoverInstance(LogicalUnit):
2800   """Failover an instance.
2801
2802   """
2803   HPATH = "instance-failover"
2804   HTYPE = constants.HTYPE_INSTANCE
2805   _OP_REQP = ["instance_name", "ignore_consistency"]
2806
2807   def BuildHooksEnv(self):
2808     """Build hooks env.
2809
2810     This runs on master, primary and secondary nodes of the instance.
2811
2812     """
2813     env = {
2814       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2815       }
2816     env.update(_BuildInstanceHookEnvByObject(self.instance))
2817     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2818     return env, nl, nl
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     instance = self.cfg.GetInstanceInfo(
2827       self.cfg.ExpandInstanceName(self.op.instance_name))
2828     if instance is None:
2829       raise errors.OpPrereqError("Instance '%s' not known" %
2830                                  self.op.instance_name)
2831
2832     if instance.disk_template not in constants.DTS_NET_MIRROR:
2833       raise errors.OpPrereqError("Instance's disk layout is not"
2834                                  " network mirrored, cannot failover.")
2835
2836     secondary_nodes = instance.secondary_nodes
2837     if not secondary_nodes:
2838       raise errors.ProgrammerError("no secondary node but using "
2839                                    "DT_REMOTE_RAID1 template")
2840
2841     target_node = secondary_nodes[0]
2842     # check memory requirements on the secondary node
2843     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2844                          instance.name, instance.memory)
2845
2846     # check bridge existance
2847     brlist = [nic.bridge for nic in instance.nics]
2848     if not rpc.call_bridges_exist(target_node, brlist):
2849       raise errors.OpPrereqError("One or more target bridges %s does not"
2850                                  " exist on destination node '%s'" %
2851                                  (brlist, target_node))
2852
2853     self.instance = instance
2854
2855   def Exec(self, feedback_fn):
2856     """Failover an instance.
2857
2858     The failover is done by shutting it down on its present node and
2859     starting it on the secondary.
2860
2861     """
2862     instance = self.instance
2863
2864     source_node = instance.primary_node
2865     target_node = instance.secondary_nodes[0]
2866
2867     feedback_fn("* checking disk consistency between source and target")
2868     for dev in instance.disks:
2869       # for remote_raid1, these are md over drbd
2870       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2871         if instance.status == "up" and not self.op.ignore_consistency:
2872           raise errors.OpExecError("Disk %s is degraded on target node,"
2873                                    " aborting failover." % dev.iv_name)
2874
2875     feedback_fn("* shutting down instance on source node")
2876     logger.Info("Shutting down instance %s on node %s" %
2877                 (instance.name, source_node))
2878
2879     if not rpc.call_instance_shutdown(source_node, instance):
2880       if self.op.ignore_consistency:
2881         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2882                      " anyway. Please make sure node %s is down"  %
2883                      (instance.name, source_node, source_node))
2884       else:
2885         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2886                                  (instance.name, source_node))
2887
2888     feedback_fn("* deactivating the instance's disks on source node")
2889     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2890       raise errors.OpExecError("Can't shut down the instance's disks.")
2891
2892     instance.primary_node = target_node
2893     # distribute new instance config to the other nodes
2894     self.cfg.Update(instance)
2895
2896     # Only start the instance if it's marked as up
2897     if instance.status == "up":
2898       feedback_fn("* activating the instance's disks on target node")
2899       logger.Info("Starting instance %s on node %s" %
2900                   (instance.name, target_node))
2901
2902       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2903                                                ignore_secondaries=True)
2904       if not disks_ok:
2905         _ShutdownInstanceDisks(instance, self.cfg)
2906         raise errors.OpExecError("Can't activate the instance's disks")
2907
2908       feedback_fn("* starting the instance on the target node")
2909       if not rpc.call_instance_start(target_node, instance, None):
2910         _ShutdownInstanceDisks(instance, self.cfg)
2911         raise errors.OpExecError("Could not start instance %s on node %s." %
2912                                  (instance.name, target_node))
2913
2914
2915 class LUMigrateInstance(LogicalUnit):
2916   """Migrate an instance.
2917
2918   This is migration without shutting down, compared to the failover,
2919   which is done with shutdown.
2920
2921   """
2922   HPATH = "instance-migrate"
2923   HTYPE = constants.HTYPE_INSTANCE
2924   _OP_REQP = ["instance_name", "live", "cleanup"]
2925
2926   def BuildHooksEnv(self):
2927     """Build hooks env.
2928
2929     This runs on master, primary and secondary nodes of the instance.
2930
2931     """
2932     env = _BuildInstanceHookEnvByObject(self.instance)
2933     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2934     return env, nl, nl
2935
2936   def CheckPrereq(self):
2937     """Check prerequisites.
2938
2939     This checks that the instance is in the cluster.
2940
2941     """
2942     instance = self.cfg.GetInstanceInfo(
2943       self.cfg.ExpandInstanceName(self.op.instance_name))
2944     if instance is None:
2945       raise errors.OpPrereqError("Instance '%s' not known" %
2946                                  self.op.instance_name)
2947
2948     if instance.disk_template != constants.DT_DRBD8:
2949       raise errors.OpPrereqError("Instance's disk layout is not"
2950                                  " drbd8, cannot migrate.")
2951
2952     secondary_nodes = instance.secondary_nodes
2953     if not secondary_nodes:
2954       raise errors.ProgrammerError("no secondary node but using "
2955                                    "drbd8 disk template")
2956
2957     target_node = secondary_nodes[0]
2958     # check memory requirements on the secondary node
2959     _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2960                          instance.name, instance.memory)
2961
2962     # check bridge existance
2963     brlist = [nic.bridge for nic in instance.nics]
2964     if not rpc.call_bridges_exist(target_node, brlist):
2965       raise errors.OpPrereqError("One or more target bridges %s does not"
2966                                  " exist on destination node '%s'" %
2967                                  (brlist, target_node))
2968
2969     if not self.op.cleanup:
2970       migratable = rpc.call_instance_migratable(instance.primary_node,
2971                                                 instance)
2972       if not migratable:
2973         raise errors.OpPrereqError("Can't contact node '%s'" %
2974                                    instance.primary_node)
2975       if not migratable[0]:
2976         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2977                                    migratable[1])
2978
2979     self.instance = instance
2980
2981   def _WaitUntilSync(self):
2982     """Poll with custom rpc for disk sync.
2983
2984     This uses our own step-based rpc call.
2985
2986     """
2987     self.feedback_fn("* wait until resync is done")
2988     all_done = False
2989     while not all_done:
2990       all_done = True
2991       result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
2992                                           self.instance.disks,
2993                                           self.nodes_ip, False,
2994                                           constants.DRBD_RECONF_RPC_WFSYNC)
2995       min_percent = 100
2996       for node in self.all_nodes:
2997         if not result[node] or not result[node][0]:
2998           raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
2999         node_done, node_percent = result[node][1]
3000         all_done = all_done and node_done
3001         if node_percent is not None:
3002           min_percent = min(min_percent, node_percent)
3003       if not all_done:
3004         if min_percent < 100:
3005           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3006         time.sleep(2)
3007
3008   def _EnsureSecondary(self, node):
3009     """Demote a node to secondary.
3010
3011     """
3012     self.feedback_fn("* switching node %s to secondary mode" % node)
3013     result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3014                                         self.instance.disks,
3015                                         self.nodes_ip, False,
3016                                         constants.DRBD_RECONF_RPC_SECONDARY)
3017     if not result[node] or not result[node][0]:
3018         raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3019                                  " error %s" %
3020                                  (node, result[node][1]))
3021
3022   def _GoStandalone(self):
3023     """Disconnect from the network.
3024
3025     """
3026     self.feedback_fn("* changing into standalone mode")
3027     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3028                                         self.instance.disks,
3029                                         self.nodes_ip, True,
3030                                         constants.DRBD_RECONF_RPC_DISCONNECT)
3031     for node in self.all_nodes:
3032       if not result[node] or not result[node][0]:
3033         raise errors.OpExecError("Cannot disconnect disks node %s,"
3034                                  " error %s" % (node, result[node][1]))
3035
3036   def _GoReconnect(self, multimaster):
3037     """Reconnect to the network.
3038
3039     """
3040     if multimaster:
3041       msg = "dual-master"
3042     else:
3043       msg = "single-master"
3044     self.feedback_fn("* changing disks into %s mode" % msg)
3045     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3046                                         self.instance.disks,
3047                                         self.nodes_ip,
3048                                         multimaster,
3049                                         constants.DRBD_RECONF_RPC_RECONNECT)
3050     for node in self.all_nodes:
3051       if not result[node] or not result[node][0]:
3052         raise errors.OpExecError("Cannot change disks config on node %s,"
3053                                  " error %s" % (node, result[node][1]))
3054
3055   def _IdentifyDisks(self):
3056     """Start the migration RPC sequence.
3057
3058     """
3059     self.feedback_fn("* identifying disks")
3060     result = rpc.call_drbd_reconfig_net(self.all_nodes,
3061                                         self.instance.name,
3062                                         self.instance.disks,
3063                                         self.nodes_ip, True,
3064                                         constants.DRBD_RECONF_RPC_INIT)
3065     for node in self.all_nodes:
3066       if not result[node] or not result[node][0]:
3067         raise errors.OpExecError("Cannot identify disks node %s,"
3068                                  " error %s" % (node, result[node][1]))
3069
3070   def _ExecCleanup(self):
3071     """Try to cleanup after a failed migration.
3072
3073     The cleanup is done by:
3074       - check that the instance is running only on one node
3075         (and update the config if needed)
3076       - change disks on its secondary node to secondary
3077       - wait until disks are fully synchronized
3078       - disconnect from the network
3079       - change disks into single-master mode
3080       - wait again until disks are fully synchronized
3081
3082     """
3083     instance = self.instance
3084     target_node = self.target_node
3085     source_node = self.source_node
3086
3087     # check running on only one node
3088     self.feedback_fn("* checking where the instance actually runs"
3089                      " (if this hangs, the hypervisor might be in"
3090                      " a bad state)")
3091     ins_l = rpc.call_instance_list(self.all_nodes)
3092     for node in self.all_nodes:
3093       if not type(ins_l[node]) is list:
3094         raise errors.OpExecError("Can't contact node '%s'" % node)
3095
3096     runningon_source = instance.name in ins_l[source_node]
3097     runningon_target = instance.name in ins_l[target_node]
3098
3099     if runningon_source and runningon_target:
3100       raise errors.OpExecError("Instance seems to be running on two nodes,"
3101                                " or the hypervisor is confused. You will have"
3102                                " to ensure manually that it runs only on one"
3103                                " and restart this operation.")
3104
3105     if not (runningon_source or runningon_target):
3106       raise errors.OpExecError("Instance does not seem to be running at all."
3107                                " In this case, it's safer to repair by"
3108                                " running 'gnt-instance stop' to ensure disk"
3109                                " shutdown, and then restarting it.")
3110
3111     if runningon_target:
3112       # the migration has actually succeeded, we need to update the config
3113       self.feedback_fn("* instance running on secondary node (%s),"
3114                        " updating config" % target_node)
3115       instance.primary_node = target_node
3116       self.cfg.Update(instance)
3117       demoted_node = source_node
3118     else:
3119       self.feedback_fn("* instance confirmed to be running on its"
3120                        " primary node (%s)" % source_node)
3121       demoted_node = target_node
3122
3123     self._IdentifyDisks()
3124
3125     self._EnsureSecondary(demoted_node)
3126     self._WaitUntilSync()
3127     self._GoStandalone()
3128     self._GoReconnect(False)
3129     self._WaitUntilSync()
3130
3131     self.feedback_fn("* done")
3132
3133   def _ExecMigration(self):
3134     """Migrate an instance.
3135
3136     The migrate is done by:
3137       - change the disks into dual-master mode
3138       - wait until disks are fully synchronized again
3139       - migrate the instance
3140       - change disks on the new secondary node (the old primary) to secondary
3141       - wait until disks are fully synchronized
3142       - change disks into single-master mode
3143
3144     """
3145     instance = self.instance
3146     target_node = self.target_node
3147     source_node = self.source_node
3148
3149     self.feedback_fn("* checking disk consistency between source and target")
3150     for dev in instance.disks:
3151       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3152         raise errors.OpExecError("Disk %s is degraded or not fully"
3153                                  " synchronized on target node,"
3154                                  " aborting migrate." % dev.iv_name)
3155
3156     self._IdentifyDisks()
3157
3158     self._EnsureSecondary(target_node)
3159     self._GoStandalone()
3160     self._GoReconnect(True)
3161     self._WaitUntilSync()
3162
3163     self.feedback_fn("* migrating instance to %s" % target_node)
3164     time.sleep(10)
3165     result = rpc.call_instance_migrate(source_node, instance,
3166                                        self.nodes_ip[target_node],
3167                                        self.op.live)
3168     if not result or not result[0]:
3169       logger.Error("Instance migration failed, trying to revert disk status")
3170       try:
3171         self._EnsureSecondary(target_node)
3172         self._GoStandalone()
3173         self._GoReconnect(False)
3174         self._WaitUntilSync()
3175       except errors.OpExecError, err:
3176         logger.Error("Can't reconnect the drives: error '%s'\n"
3177                      "Please look and recover the instance status" % str(err))
3178
3179       raise errors.OpExecError("Could not migrate instance %s: %s" %
3180                                (instance.name, result[1]))
3181     time.sleep(10)
3182
3183     instance.primary_node = target_node
3184     # distribute new instance config to the other nodes
3185     self.cfg.Update(instance)
3186
3187     self._EnsureSecondary(source_node)
3188     self._WaitUntilSync()
3189     self._GoStandalone()
3190     self._GoReconnect(False)
3191     self._WaitUntilSync()
3192
3193     self.feedback_fn("* done")
3194
3195   def Exec(self, feedback_fn):
3196     """Perform the migration.
3197
3198     """
3199     self.feedback_fn = feedback_fn
3200
3201     self.source_node = self.instance.primary_node
3202     self.target_node = self.instance.secondary_nodes[0]
3203     self.all_nodes = [self.source_node, self.target_node]
3204     self.nodes_ip = {
3205       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3206       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3207       }
3208     if self.op.cleanup:
3209       return self._ExecCleanup()
3210     else:
3211       return self._ExecMigration()
3212
3213
3214 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3215   """Create a tree of block devices on the primary node.
3216
3217   This always creates all devices.
3218
3219   """
3220   if device.children:
3221     for child in device.children:
3222       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3223         return False
3224
3225   cfg.SetDiskID(device, node)
3226   new_id = rpc.call_blockdev_create(node, device, device.size,
3227                                     instance.name, True, info)
3228   if not new_id:
3229     return False
3230   if device.physical_id is None:
3231     device.physical_id = new_id
3232   return True
3233
3234
3235 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3236   """Create a tree of block devices on a secondary node.
3237
3238   If this device type has to be created on secondaries, create it and
3239   all its children.
3240
3241   If not, just recurse to children keeping the same 'force' value.
3242
3243   """
3244   if device.CreateOnSecondary():
3245     force = True
3246   if device.children:
3247     for child in device.children:
3248       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3249                                         child, force, info):
3250         return False
3251
3252   if not force:
3253     return True
3254   cfg.SetDiskID(device, node)
3255   new_id = rpc.call_blockdev_create(node, device, device.size,
3256                                     instance.name, False, info)
3257   if not new_id:
3258     return False
3259   if device.physical_id is None:
3260     device.physical_id = new_id
3261   return True
3262
3263
3264 def _GenerateUniqueNames(cfg, exts):
3265   """Generate a suitable LV name.
3266
3267   This will generate a logical volume name for the given instance.
3268
3269   """
3270   results = []
3271   for val in exts:
3272     new_id = cfg.GenerateUniqueID()
3273     results.append("%s%s" % (new_id, val))
3274   return results
3275
3276
3277 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3278   """Generate a drbd device complete with its children.
3279
3280   """
3281   port = cfg.AllocatePort()
3282   vgname = cfg.GetVGName()
3283   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3284                           logical_id=(vgname, names[0]))
3285   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3286                           logical_id=(vgname, names[1]))
3287   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3288                           logical_id = (primary, secondary, port),
3289                           children = [dev_data, dev_meta])
3290   return drbd_dev
3291
3292
3293 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3294   """Generate a drbd8 device complete with its children.
3295
3296   """
3297   port = cfg.AllocatePort()
3298   vgname = cfg.GetVGName()
3299   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3300                           logical_id=(vgname, names[0]))
3301   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3302                           logical_id=(vgname, names[1]))
3303   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3304                           logical_id = (primary, secondary, port),
3305                           children = [dev_data, dev_meta],
3306                           iv_name=iv_name)
3307   return drbd_dev
3308
3309 def _GenerateDiskTemplate(cfg, template_name,
3310                           instance_name, primary_node,
3311                           secondary_nodes, disk_sz, swap_sz):
3312   """Generate the entire disk layout for a given template type.
3313
3314   """
3315   #TODO: compute space requirements
3316
3317   vgname = cfg.GetVGName()
3318   if template_name == constants.DT_DISKLESS:
3319     disks = []
3320   elif template_name == constants.DT_PLAIN:
3321     if len(secondary_nodes) != 0:
3322       raise errors.ProgrammerError("Wrong template configuration")
3323
3324     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3325     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3326                            logical_id=(vgname, names[0]),
3327                            iv_name = "sda")
3328     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3329                            logical_id=(vgname, names[1]),
3330                            iv_name = "sdb")
3331     disks = [sda_dev, sdb_dev]
3332   elif template_name == constants.DT_LOCAL_RAID1:
3333     if len(secondary_nodes) != 0:
3334       raise errors.ProgrammerError("Wrong template configuration")
3335
3336
3337     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3338                                        ".sdb_m1", ".sdb_m2"])
3339     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3340                               logical_id=(vgname, names[0]))
3341     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3342                               logical_id=(vgname, names[1]))
3343     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3344                               size=disk_sz,
3345                               children = [sda_dev_m1, sda_dev_m2])
3346     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3347                               logical_id=(vgname, names[2]))
3348     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3349                               logical_id=(vgname, names[3]))
3350     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3351                               size=swap_sz,
3352                               children = [sdb_dev_m1, sdb_dev_m2])
3353     disks = [md_sda_dev, md_sdb_dev]
3354   elif template_name == constants.DT_REMOTE_RAID1:
3355     if len(secondary_nodes) != 1:
3356       raise errors.ProgrammerError("Wrong template configuration")
3357     remote_node = secondary_nodes[0]
3358     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3359                                        ".sdb_data", ".sdb_meta"])
3360     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3361                                          disk_sz, names[0:2])
3362     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3363                               children = [drbd_sda_dev], size=disk_sz)
3364     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3365                                          swap_sz, names[2:4])
3366     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3367                               children = [drbd_sdb_dev], size=swap_sz)
3368     disks = [md_sda_dev, md_sdb_dev]
3369   elif template_name == constants.DT_DRBD8:
3370     if len(secondary_nodes) != 1:
3371       raise errors.ProgrammerError("Wrong template configuration")
3372     remote_node = secondary_nodes[0]
3373     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3374                                        ".sdb_data", ".sdb_meta"])
3375     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3376                                          disk_sz, names[0:2], "sda")
3377     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3378                                          swap_sz, names[2:4], "sdb")
3379     disks = [drbd_sda_dev, drbd_sdb_dev]
3380   else:
3381     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3382   return disks
3383
3384
3385 def _GetInstanceInfoText(instance):
3386   """Compute that text that should be added to the disk's metadata.
3387
3388   """
3389   return "originstname+%s" % instance.name
3390
3391
3392 def _CreateDisks(cfg, instance):
3393   """Create all disks for an instance.
3394
3395   This abstracts away some work from AddInstance.
3396
3397   Args:
3398     instance: the instance object
3399
3400   Returns:
3401     True or False showing the success of the creation process
3402
3403   """
3404   info = _GetInstanceInfoText(instance)
3405
3406   for device in instance.disks:
3407     logger.Info("creating volume %s for instance %s" %
3408               (device.iv_name, instance.name))
3409     #HARDCODE
3410     for secondary_node in instance.secondary_nodes:
3411       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3412                                         device, False, info):
3413         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3414                      (device.iv_name, device, secondary_node))
3415         return False
3416     #HARDCODE
3417     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3418                                     instance, device, info):
3419       logger.Error("failed to create volume %s on primary!" %
3420                    device.iv_name)
3421       return False
3422   return True
3423
3424
3425 def _RemoveDisks(instance, cfg):
3426   """Remove all disks for an instance.
3427
3428   This abstracts away some work from `AddInstance()` and
3429   `RemoveInstance()`. Note that in case some of the devices couldn't
3430   be removed, the removal will continue with the other ones (compare
3431   with `_CreateDisks()`).
3432
3433   Args:
3434     instance: the instance object
3435
3436   Returns:
3437     True or False showing the success of the removal proces
3438
3439   """
3440   logger.Info("removing block devices for instance %s" % instance.name)
3441
3442   result = True
3443   for device in instance.disks:
3444     for node, disk in device.ComputeNodeTree(instance.primary_node):
3445       cfg.SetDiskID(disk, node)
3446       if not rpc.call_blockdev_remove(node, disk):
3447         logger.Error("could not remove block device %s on node %s,"
3448                      " continuing anyway" %
3449                      (device.iv_name, node))
3450         result = False
3451   return result
3452
3453
3454 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3455   """Compute disk size requirements in the volume group
3456
3457   This is currently hard-coded for the two-drive layout.
3458
3459   """
3460   # Required free disk space as a function of disk and swap space
3461   req_size_dict = {
3462     constants.DT_DISKLESS: None,
3463     constants.DT_PLAIN: disk_size + swap_size,
3464     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3465     # 256 MB are added for drbd metadata, 128MB for each drbd device
3466     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3467     constants.DT_DRBD8: disk_size + swap_size + 256,
3468   }
3469
3470   if disk_template not in req_size_dict:
3471     raise errors.ProgrammerError("Disk template '%s' size requirement"
3472                                  " is unknown" %  disk_template)
3473
3474   return req_size_dict[disk_template]
3475
3476
3477 class LUCreateInstance(LogicalUnit):
3478   """Create an instance.
3479
3480   """
3481   HPATH = "instance-add"
3482   HTYPE = constants.HTYPE_INSTANCE
3483   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3484               "disk_template", "swap_size", "mode", "start", "vcpus",
3485               "wait_for_sync", "ip_check", "mac"]
3486
3487   def _RunAllocator(self):
3488     """Run the allocator based on input opcode.
3489
3490     """
3491     disks = [{"size": self.op.disk_size, "mode": "w"},
3492              {"size": self.op.swap_size, "mode": "w"}]
3493     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3494              "bridge": self.op.bridge}]
3495     ial = IAllocator(self.cfg, self.sstore,
3496                      mode=constants.IALLOCATOR_MODE_ALLOC,
3497                      name=self.op.instance_name,
3498                      disk_template=self.op.disk_template,
3499                      tags=[],
3500                      os=self.op.os_type,
3501                      vcpus=self.op.vcpus,
3502                      mem_size=self.op.mem_size,
3503                      disks=disks,
3504                      nics=nics,
3505                      )
3506
3507     ial.Run(self.op.iallocator)
3508
3509     if not ial.success:
3510       raise errors.OpPrereqError("Can't compute nodes using"
3511                                  " iallocator '%s': %s" % (self.op.iallocator,
3512                                                            ial.info))
3513     if len(ial.nodes) != ial.required_nodes:
3514       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3515                                  " of nodes (%s), required %s" %
3516                                  (len(ial.nodes), ial.required_nodes))
3517     self.op.pnode = ial.nodes[0]
3518     logger.ToStdout("Selected nodes for the instance: %s" %
3519                     (", ".join(ial.nodes),))
3520     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3521                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3522     if ial.required_nodes == 2:
3523       self.op.snode = ial.nodes[1]
3524
3525   def BuildHooksEnv(self):
3526     """Build hooks env.
3527
3528     This runs on master, primary and secondary nodes of the instance.
3529
3530     """
3531     env = {
3532       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3533       "INSTANCE_DISK_SIZE": self.op.disk_size,
3534       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3535       "INSTANCE_ADD_MODE": self.op.mode,
3536       }
3537     if self.op.mode == constants.INSTANCE_IMPORT:
3538       env["INSTANCE_SRC_NODE"] = self.op.src_node
3539       env["INSTANCE_SRC_PATH"] = self.op.src_path
3540       env["INSTANCE_SRC_IMAGE"] = self.src_image
3541
3542     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3543       primary_node=self.op.pnode,
3544       secondary_nodes=self.secondaries,
3545       status=self.instance_status,
3546       os_type=self.op.os_type,
3547       memory=self.op.mem_size,
3548       vcpus=self.op.vcpus,
3549       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3550     ))
3551
3552     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3553           self.secondaries)
3554     return env, nl, nl
3555
3556
3557   def CheckPrereq(self):
3558     """Check prerequisites.
3559
3560     """
3561     # set optional parameters to none if they don't exist
3562     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3563                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3564                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3565       if not hasattr(self.op, attr):
3566         setattr(self.op, attr, None)
3567
3568     if self.op.mode not in (constants.INSTANCE_CREATE,
3569                             constants.INSTANCE_IMPORT):
3570       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3571                                  self.op.mode)
3572
3573     if self.op.mode == constants.INSTANCE_IMPORT:
3574       src_node = getattr(self.op, "src_node", None)
3575       src_path = getattr(self.op, "src_path", None)
3576       if src_node is None or src_path is None:
3577         raise errors.OpPrereqError("Importing an instance requires source"
3578                                    " node and path options")
3579       src_node_full = self.cfg.ExpandNodeName(src_node)
3580       if src_node_full is None:
3581         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3582       self.op.src_node = src_node = src_node_full
3583
3584       if not os.path.isabs(src_path):
3585         raise errors.OpPrereqError("The source path must be absolute")
3586
3587       export_info = rpc.call_export_info(src_node, src_path)
3588
3589       if not export_info:
3590         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3591
3592       if not export_info.has_section(constants.INISECT_EXP):
3593         raise errors.ProgrammerError("Corrupted export config")
3594
3595       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3596       if (int(ei_version) != constants.EXPORT_VERSION):
3597         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3598                                    (ei_version, constants.EXPORT_VERSION))
3599
3600       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3601         raise errors.OpPrereqError("Can't import instance with more than"
3602                                    " one data disk")
3603
3604       # FIXME: are the old os-es, disk sizes, etc. useful?
3605       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3606       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3607                                                          'disk0_dump'))
3608       self.src_image = diskimage
3609     else: # INSTANCE_CREATE
3610       if getattr(self.op, "os_type", None) is None:
3611         raise errors.OpPrereqError("No guest OS specified")
3612
3613     #### instance parameters check
3614
3615     # disk template and mirror node verification
3616     if self.op.disk_template not in constants.DISK_TEMPLATES:
3617       raise errors.OpPrereqError("Invalid disk template name")
3618
3619     # instance name verification
3620     hostname1 = utils.HostInfo(self.op.instance_name)
3621
3622     self.op.instance_name = instance_name = hostname1.name
3623     instance_list = self.cfg.GetInstanceList()
3624     if instance_name in instance_list:
3625       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3626                                  instance_name)
3627
3628     # ip validity checks
3629     ip = getattr(self.op, "ip", None)
3630     if ip is None or ip.lower() == "none":
3631       inst_ip = None
3632     elif ip.lower() == "auto":
3633       inst_ip = hostname1.ip
3634     else:
3635       if not utils.IsValidIP(ip):
3636         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3637                                    " like a valid IP" % ip)
3638       inst_ip = ip
3639     self.inst_ip = self.op.ip = inst_ip
3640
3641     if self.op.start and not self.op.ip_check:
3642       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3643                                  " adding an instance in start mode")
3644
3645     if self.op.ip_check:
3646       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3647         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3648                                    (hostname1.ip, instance_name))
3649
3650     # MAC address verification
3651     if self.op.mac != "auto":
3652       if not utils.IsValidMac(self.op.mac.lower()):
3653         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3654                                    self.op.mac)
3655
3656     # bridge verification
3657     bridge = getattr(self.op, "bridge", None)
3658     if bridge is None:
3659       self.op.bridge = self.cfg.GetDefBridge()
3660     else:
3661       self.op.bridge = bridge
3662
3663     # boot order verification
3664     if self.op.hvm_boot_order is not None:
3665       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3666         raise errors.OpPrereqError("invalid boot order specified,"
3667                                    " must be one or more of [acdn]")
3668     #### allocator run
3669
3670     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3671       raise errors.OpPrereqError("One and only one of iallocator and primary"
3672                                  " node must be given")
3673
3674     if self.op.iallocator is not None:
3675       self._RunAllocator()
3676
3677     #### node related checks
3678
3679     # check primary node
3680     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3681     if pnode is None:
3682       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3683                                  self.op.pnode)
3684     self.op.pnode = pnode.name
3685     self.pnode = pnode
3686     self.secondaries = []
3687
3688     # mirror node verification
3689     if self.op.disk_template in constants.DTS_NET_MIRROR:
3690       if getattr(self.op, "snode", None) is None:
3691         raise errors.OpPrereqError("The networked disk templates need"
3692                                    " a mirror node")
3693
3694       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3695       if snode_name is None:
3696         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3697                                    self.op.snode)
3698       elif snode_name == pnode.name:
3699         raise errors.OpPrereqError("The secondary node cannot be"
3700                                    " the primary node.")
3701       self.secondaries.append(snode_name)
3702
3703     req_size = _ComputeDiskSize(self.op.disk_template,
3704                                 self.op.disk_size, self.op.swap_size)
3705
3706     # Check lv size requirements
3707     if req_size is not None:
3708       nodenames = [pnode.name] + self.secondaries
3709       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3710       for node in nodenames:
3711         info = nodeinfo.get(node, None)
3712         if not info:
3713           raise errors.OpPrereqError("Cannot get current information"
3714                                      " from node '%s'" % node)
3715         vg_free = info.get('vg_free', None)
3716         if not isinstance(vg_free, int):
3717           raise errors.OpPrereqError("Can't compute free disk space on"
3718                                      " node %s" % node)
3719         if req_size > info['vg_free']:
3720           raise errors.OpPrereqError("Not enough disk space on target node %s."
3721                                      " %d MB available, %d MB required" %
3722                                      (node, info['vg_free'], req_size))
3723
3724     # os verification
3725     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3726     if not os_obj:
3727       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3728                                  " primary node"  % self.op.os_type)
3729
3730     if self.op.kernel_path == constants.VALUE_NONE:
3731       raise errors.OpPrereqError("Can't set instance kernel to none")
3732
3733
3734     # bridge check on primary node
3735     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3736       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3737                                  " destination node '%s'" %
3738                                  (self.op.bridge, pnode.name))
3739
3740     # memory check on primary node
3741     if self.op.start:
3742       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3743                            "creating instance %s" % self.op.instance_name,
3744                            self.op.mem_size)
3745
3746     # hvm_cdrom_image_path verification
3747     if self.op.hvm_cdrom_image_path is not None:
3748       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3749         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3750                                    " be an absolute path or None, not %s" %
3751                                    self.op.hvm_cdrom_image_path)
3752       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3753         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3754                                    " regular file or a symlink pointing to"
3755                                    " an existing regular file, not %s" %
3756                                    self.op.hvm_cdrom_image_path)
3757
3758     # vnc_bind_address verification
3759     if self.op.vnc_bind_address is not None:
3760       if not utils.IsValidIP(self.op.vnc_bind_address):
3761         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3762                                    " like a valid IP address" %
3763                                    self.op.vnc_bind_address)
3764
3765     # Xen HVM device type checks
3766     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3767       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3768         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3769                                    " hypervisor" % self.op.hvm_nic_type)
3770       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3771         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3772                                    " hypervisor" % self.op.hvm_disk_type)
3773
3774     if self.op.start:
3775       self.instance_status = 'up'
3776     else:
3777       self.instance_status = 'down'
3778
3779   def Exec(self, feedback_fn):
3780     """Create and add the instance to the cluster.
3781
3782     """
3783     instance = self.op.instance_name
3784     pnode_name = self.pnode.name
3785
3786     if self.op.mac == "auto":
3787       mac_address = self.cfg.GenerateMAC()
3788     else:
3789       mac_address = self.op.mac
3790
3791     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3792     if self.inst_ip is not None:
3793       nic.ip = self.inst_ip
3794
3795     ht_kind = self.sstore.GetHypervisorType()
3796     if ht_kind in constants.HTS_REQ_PORT:
3797       network_port = self.cfg.AllocatePort()
3798     else:
3799       network_port = None
3800
3801     if self.op.vnc_bind_address is None:
3802       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3803
3804     disks = _GenerateDiskTemplate(self.cfg,
3805                                   self.op.disk_template,
3806                                   instance, pnode_name,
3807                                   self.secondaries, self.op.disk_size,
3808                                   self.op.swap_size)
3809
3810     iobj = objects.Instance(name=instance, os=self.op.os_type,
3811                             primary_node=pnode_name,
3812                             memory=self.op.mem_size,
3813                             vcpus=self.op.vcpus,
3814                             nics=[nic], disks=disks,
3815                             disk_template=self.op.disk_template,
3816                             status=self.instance_status,
3817                             network_port=network_port,
3818                             kernel_path=self.op.kernel_path,
3819                             initrd_path=self.op.initrd_path,
3820                             hvm_boot_order=self.op.hvm_boot_order,
3821                             hvm_acpi=self.op.hvm_acpi,
3822                             hvm_pae=self.op.hvm_pae,
3823                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3824                             vnc_bind_address=self.op.vnc_bind_address,
3825                             hvm_nic_type=self.op.hvm_nic_type,
3826                             hvm_disk_type=self.op.hvm_disk_type,
3827                             )
3828
3829     feedback_fn("* creating instance disks...")
3830     if not _CreateDisks(self.cfg, iobj):
3831       _RemoveDisks(iobj, self.cfg)
3832       raise errors.OpExecError("Device creation failed, reverting...")
3833
3834     feedback_fn("adding instance %s to cluster config" % instance)
3835
3836     self.cfg.AddInstance(iobj)
3837
3838     if self.op.wait_for_sync:
3839       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3840     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3841       # make sure the disks are not degraded (still sync-ing is ok)
3842       time.sleep(15)
3843       feedback_fn("* checking mirrors status")
3844       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3845     else:
3846       disk_abort = False
3847
3848     if disk_abort:
3849       _RemoveDisks(iobj, self.cfg)
3850       self.cfg.RemoveInstance(iobj.name)
3851       raise errors.OpExecError("There are some degraded disks for"
3852                                " this instance")
3853
3854     feedback_fn("creating os for instance %s on node %s" %
3855                 (instance, pnode_name))
3856
3857     if iobj.disk_template != constants.DT_DISKLESS:
3858       if self.op.mode == constants.INSTANCE_CREATE:
3859         feedback_fn("* running the instance OS create scripts...")
3860         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3861           raise errors.OpExecError("could not add os for instance %s"
3862                                    " on node %s" %
3863                                    (instance, pnode_name))
3864
3865       elif self.op.mode == constants.INSTANCE_IMPORT:
3866         feedback_fn("* running the instance OS import scripts...")
3867         src_node = self.op.src_node
3868         src_image = self.src_image
3869         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3870                                                 src_node, src_image):
3871           raise errors.OpExecError("Could not import os for instance"
3872                                    " %s on node %s" %
3873                                    (instance, pnode_name))
3874       else:
3875         # also checked in the prereq part
3876         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3877                                      % self.op.mode)
3878
3879     if self.op.start:
3880       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3881       feedback_fn("* starting instance...")
3882       if not rpc.call_instance_start(pnode_name, iobj, None):
3883         raise errors.OpExecError("Could not start instance")
3884
3885
3886 class LUConnectConsole(NoHooksLU):
3887   """Connect to an instance's console.
3888
3889   This is somewhat special in that it returns the command line that
3890   you need to run on the master node in order to connect to the
3891   console.
3892
3893   """
3894   _OP_REQP = ["instance_name"]
3895
3896   def CheckPrereq(self):
3897     """Check prerequisites.
3898
3899     This checks that the instance is in the cluster.
3900
3901     """
3902     instance = self.cfg.GetInstanceInfo(
3903       self.cfg.ExpandInstanceName(self.op.instance_name))
3904     if instance is None:
3905       raise errors.OpPrereqError("Instance '%s' not known" %
3906                                  self.op.instance_name)
3907     self.instance = instance
3908
3909   def Exec(self, feedback_fn):
3910     """Connect to the console of an instance
3911
3912     """
3913     instance = self.instance
3914     node = instance.primary_node
3915
3916     node_insts = rpc.call_instance_list([node])[node]
3917     if node_insts is False:
3918       raise errors.OpExecError("Can't connect to node %s." % node)
3919
3920     if instance.name not in node_insts:
3921       raise errors.OpExecError("Instance %s is not running." % instance.name)
3922
3923     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3924
3925     hyper = hypervisor.GetHypervisor()
3926     console_cmd = hyper.GetShellCommandForConsole(instance)
3927     # build ssh cmdline
3928     argv = ["ssh", "-q", "-t"]
3929     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3930     argv.extend(ssh.BATCH_MODE_OPTS)
3931     argv.append(node)
3932     argv.append(console_cmd)
3933     return "ssh", argv
3934
3935
3936 class LUAddMDDRBDComponent(LogicalUnit):
3937   """Adda new mirror member to an instance's disk.
3938
3939   """
3940   HPATH = "mirror-add"
3941   HTYPE = constants.HTYPE_INSTANCE
3942   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3943
3944   def BuildHooksEnv(self):
3945     """Build hooks env.
3946
3947     This runs on the master, the primary and all the secondaries.
3948
3949     """
3950     env = {
3951       "NEW_SECONDARY": self.op.remote_node,
3952       "DISK_NAME": self.op.disk_name,
3953       }
3954     env.update(_BuildInstanceHookEnvByObject(self.instance))
3955     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3956           self.op.remote_node,] + list(self.instance.secondary_nodes)
3957     return env, nl, nl
3958
3959   def CheckPrereq(self):
3960     """Check prerequisites.
3961
3962     This checks that the instance is in the cluster.
3963
3964     """
3965     instance = self.cfg.GetInstanceInfo(
3966       self.cfg.ExpandInstanceName(self.op.instance_name))
3967     if instance is None:
3968       raise errors.OpPrereqError("Instance '%s' not known" %
3969                                  self.op.instance_name)
3970     self.instance = instance
3971
3972     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3973     if remote_node is None:
3974       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3975     self.remote_node = remote_node
3976
3977     if remote_node == instance.primary_node:
3978       raise errors.OpPrereqError("The specified node is the primary node of"
3979                                  " the instance.")
3980
3981     if instance.disk_template != constants.DT_REMOTE_RAID1:
3982       raise errors.OpPrereqError("Instance's disk layout is not"
3983                                  " remote_raid1.")
3984     for disk in instance.disks:
3985       if disk.iv_name == self.op.disk_name:
3986         break
3987     else:
3988       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3989                                  " instance." % self.op.disk_name)
3990     if len(disk.children) > 1:
3991       raise errors.OpPrereqError("The device already has two slave devices."
3992                                  " This would create a 3-disk raid1 which we"
3993                                  " don't allow.")
3994     self.disk = disk
3995
3996   def Exec(self, feedback_fn):
3997     """Add the mirror component
3998
3999     """
4000     disk = self.disk
4001     instance = self.instance
4002
4003     remote_node = self.remote_node
4004     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4005     names = _GenerateUniqueNames(self.cfg, lv_names)
4006     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4007                                      remote_node, disk.size, names)
4008
4009     logger.Info("adding new mirror component on secondary")
4010     #HARDCODE
4011     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4012                                       new_drbd, False,
4013                                       _GetInstanceInfoText(instance)):
4014       raise errors.OpExecError("Failed to create new component on secondary"
4015                                " node %s" % remote_node)
4016
4017     logger.Info("adding new mirror component on primary")
4018     #HARDCODE
4019     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4020                                     instance, new_drbd,
4021                                     _GetInstanceInfoText(instance)):
4022       # remove secondary dev
4023       self.cfg.SetDiskID(new_drbd, remote_node)
4024       rpc.call_blockdev_remove(remote_node, new_drbd)
4025       raise errors.OpExecError("Failed to create volume on primary")
4026
4027     # the device exists now
4028     # call the primary node to add the mirror to md
4029     logger.Info("adding new mirror component to md")
4030     if not rpc.call_blockdev_addchildren(instance.primary_node,
4031                                          disk, [new_drbd]):
4032       logger.Error("Can't add mirror compoment to md!")
4033       self.cfg.SetDiskID(new_drbd, remote_node)
4034       if not rpc.call_blockdev_remove(remote_node, new_drbd):
4035         logger.Error("Can't rollback on secondary")
4036       self.cfg.SetDiskID(new_drbd, instance.primary_node)
4037       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4038         logger.Error("Can't rollback on primary")
4039       raise errors.OpExecError("Can't add mirror component to md array")
4040
4041     disk.children.append(new_drbd)
4042
4043     self.cfg.AddInstance(instance)
4044
4045     _WaitForSync(self.cfg, instance, self.proc)
4046
4047     return 0
4048
4049
4050 class LURemoveMDDRBDComponent(LogicalUnit):
4051   """Remove a component from a remote_raid1 disk.
4052
4053   """
4054   HPATH = "mirror-remove"
4055   HTYPE = constants.HTYPE_INSTANCE
4056   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4057
4058   def BuildHooksEnv(self):
4059     """Build hooks env.
4060
4061     This runs on the master, the primary and all the secondaries.
4062
4063     """
4064     env = {
4065       "DISK_NAME": self.op.disk_name,
4066       "DISK_ID": self.op.disk_id,
4067       "OLD_SECONDARY": self.old_secondary,
4068       }
4069     env.update(_BuildInstanceHookEnvByObject(self.instance))
4070     nl = [self.sstore.GetMasterNode(),
4071           self.instance.primary_node] + list(self.instance.secondary_nodes)
4072     return env, nl, nl
4073
4074   def CheckPrereq(self):
4075     """Check prerequisites.
4076
4077     This checks that the instance is in the cluster.
4078
4079     """
4080     instance = self.cfg.GetInstanceInfo(
4081       self.cfg.ExpandInstanceName(self.op.instance_name))
4082     if instance is None:
4083       raise errors.OpPrereqError("Instance '%s' not known" %
4084                                  self.op.instance_name)
4085     self.instance = instance
4086
4087     if instance.disk_template != constants.DT_REMOTE_RAID1:
4088       raise errors.OpPrereqError("Instance's disk layout is not"
4089                                  " remote_raid1.")
4090     for disk in instance.disks:
4091       if disk.iv_name == self.op.disk_name:
4092         break
4093     else:
4094       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4095                                  " instance." % self.op.disk_name)
4096     for child in disk.children:
4097       if (child.dev_type == constants.LD_DRBD7 and
4098           child.logical_id[2] == self.op.disk_id):
4099         break
4100     else:
4101       raise errors.OpPrereqError("Can't find the device with this port.")
4102
4103     if len(disk.children) < 2:
4104       raise errors.OpPrereqError("Cannot remove the last component from"
4105                                  " a mirror.")
4106     self.disk = disk
4107     self.child = child
4108     if self.child.logical_id[0] == instance.primary_node:
4109       oid = 1
4110     else:
4111       oid = 0
4112     self.old_secondary = self.child.logical_id[oid]
4113
4114   def Exec(self, feedback_fn):
4115     """Remove the mirror component
4116
4117     """
4118     instance = self.instance
4119     disk = self.disk
4120     child = self.child
4121     logger.Info("remove mirror component")
4122     self.cfg.SetDiskID(disk, instance.primary_node)
4123     if not rpc.call_blockdev_removechildren(instance.primary_node,
4124                                             disk, [child]):
4125       raise errors.OpExecError("Can't remove child from mirror.")
4126
4127     for node in child.logical_id[:2]:
4128       self.cfg.SetDiskID(child, node)
4129       if not rpc.call_blockdev_remove(node, child):
4130         logger.Error("Warning: failed to remove device from node %s,"
4131                      " continuing operation." % node)
4132
4133     disk.children.remove(child)
4134     self.cfg.AddInstance(instance)
4135
4136
4137 class LUReplaceDisks(LogicalUnit):
4138   """Replace the disks of an instance.
4139
4140   """
4141   HPATH = "mirrors-replace"
4142   HTYPE = constants.HTYPE_INSTANCE
4143   _OP_REQP = ["instance_name", "mode", "disks"]
4144
4145   def _RunAllocator(self):
4146     """Compute a new secondary node using an IAllocator.
4147
4148     """
4149     ial = IAllocator(self.cfg, self.sstore,
4150                      mode=constants.IALLOCATOR_MODE_RELOC,
4151                      name=self.op.instance_name,
4152                      relocate_from=[self.sec_node])
4153
4154     ial.Run(self.op.iallocator)
4155
4156     if not ial.success:
4157       raise errors.OpPrereqError("Can't compute nodes using"
4158                                  " iallocator '%s': %s" % (self.op.iallocator,
4159                                                            ial.info))
4160     if len(ial.nodes) != ial.required_nodes:
4161       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4162                                  " of nodes (%s), required %s" %
4163                                  (len(ial.nodes), ial.required_nodes))
4164     self.op.remote_node = ial.nodes[0]
4165     logger.ToStdout("Selected new secondary for the instance: %s" %
4166                     self.op.remote_node)
4167
4168   def BuildHooksEnv(self):
4169     """Build hooks env.
4170
4171     This runs on the master, the primary and all the secondaries.
4172
4173     """
4174     env = {
4175       "MODE": self.op.mode,
4176       "NEW_SECONDARY": self.op.remote_node,
4177       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4178       }
4179     env.update(_BuildInstanceHookEnvByObject(self.instance))
4180     nl = [
4181       self.sstore.GetMasterNode(),
4182       self.instance.primary_node,
4183       ]
4184     if self.op.remote_node is not None:
4185       nl.append(self.op.remote_node)
4186     return env, nl, nl
4187
4188   def CheckPrereq(self):
4189     """Check prerequisites.
4190
4191     This checks that the instance is in the cluster.
4192
4193     """
4194     if not hasattr(self.op, "remote_node"):
4195       self.op.remote_node = None
4196
4197     instance = self.cfg.GetInstanceInfo(
4198       self.cfg.ExpandInstanceName(self.op.instance_name))
4199     if instance is None:
4200       raise errors.OpPrereqError("Instance '%s' not known" %
4201                                  self.op.instance_name)
4202     self.instance = instance
4203     self.op.instance_name = instance.name
4204
4205     if instance.disk_template not in constants.DTS_NET_MIRROR:
4206       raise errors.OpPrereqError("Instance's disk layout is not"
4207                                  " network mirrored.")
4208
4209     if len(instance.secondary_nodes) != 1:
4210       raise errors.OpPrereqError("The instance has a strange layout,"
4211                                  " expected one secondary but found %d" %
4212                                  len(instance.secondary_nodes))
4213
4214     self.sec_node = instance.secondary_nodes[0]
4215
4216     ia_name = getattr(self.op, "iallocator", None)
4217     if ia_name is not None:
4218       if self.op.remote_node is not None:
4219         raise errors.OpPrereqError("Give either the iallocator or the new"
4220                                    " secondary, not both")
4221       self._RunAllocator()
4222
4223     remote_node = self.op.remote_node
4224     if remote_node is not None:
4225       remote_node = self.cfg.ExpandNodeName(remote_node)
4226       if remote_node is None:
4227         raise errors.OpPrereqError("Node '%s' not known" %
4228                                    self.op.remote_node)
4229       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4230     else:
4231       self.remote_node_info = None
4232     if remote_node == instance.primary_node:
4233       raise errors.OpPrereqError("The specified node is the primary node of"
4234                                  " the instance.")
4235     elif remote_node == self.sec_node:
4236       if self.op.mode == constants.REPLACE_DISK_SEC:
4237         # this is for DRBD8, where we can't execute the same mode of
4238         # replacement as for drbd7 (no different port allocated)
4239         raise errors.OpPrereqError("Same secondary given, cannot execute"
4240                                    " replacement")
4241       # the user gave the current secondary, switch to
4242       # 'no-replace-secondary' mode for drbd7
4243       remote_node = None
4244     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4245         self.op.mode != constants.REPLACE_DISK_ALL):
4246       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4247                                  " disks replacement, not individual ones")
4248     if instance.disk_template == constants.DT_DRBD8:
4249       if (self.op.mode == constants.REPLACE_DISK_ALL and
4250           remote_node is not None):
4251         # switch to replace secondary mode
4252         self.op.mode = constants.REPLACE_DISK_SEC
4253
4254       if self.op.mode == constants.REPLACE_DISK_ALL:
4255         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4256                                    " secondary disk replacement, not"
4257                                    " both at once")
4258       elif self.op.mode == constants.REPLACE_DISK_PRI:
4259         if remote_node is not None:
4260           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4261                                      " the secondary while doing a primary"
4262                                      " node disk replacement")
4263         self.tgt_node = instance.primary_node
4264         self.oth_node = instance.secondary_nodes[0]
4265       elif self.op.mode == constants.REPLACE_DISK_SEC:
4266         self.new_node = remote_node # this can be None, in which case
4267                                     # we don't change the secondary
4268         self.tgt_node = instance.secondary_nodes[0]
4269         self.oth_node = instance.primary_node
4270       else:
4271         raise errors.ProgrammerError("Unhandled disk replace mode")
4272
4273     for name in self.op.disks:
4274       if instance.FindDisk(name) is None:
4275         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4276                                    (name, instance.name))
4277     self.op.remote_node = remote_node
4278
4279   def _ExecRR1(self, feedback_fn):
4280     """Replace the disks of an instance.
4281
4282     """
4283     instance = self.instance
4284     iv_names = {}
4285     # start of work
4286     if self.op.remote_node is None:
4287       remote_node = self.sec_node
4288     else:
4289       remote_node = self.op.remote_node
4290     cfg = self.cfg
4291     for dev in instance.disks:
4292       size = dev.size
4293       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4294       names = _GenerateUniqueNames(cfg, lv_names)
4295       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4296                                        remote_node, size, names)
4297       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4298       logger.Info("adding new mirror component on secondary for %s" %
4299                   dev.iv_name)
4300       #HARDCODE
4301       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4302                                         new_drbd, False,
4303                                         _GetInstanceInfoText(instance)):
4304         raise errors.OpExecError("Failed to create new component on secondary"
4305                                  " node %s. Full abort, cleanup manually!" %
4306                                  remote_node)
4307
4308       logger.Info("adding new mirror component on primary")
4309       #HARDCODE
4310       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4311                                       instance, new_drbd,
4312                                       _GetInstanceInfoText(instance)):
4313         # remove secondary dev
4314         cfg.SetDiskID(new_drbd, remote_node)
4315         rpc.call_blockdev_remove(remote_node, new_drbd)
4316         raise errors.OpExecError("Failed to create volume on primary!"
4317                                  " Full abort, cleanup manually!!")
4318
4319       # the device exists now
4320       # call the primary node to add the mirror to md
4321       logger.Info("adding new mirror component to md")
4322       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4323                                            [new_drbd]):
4324         logger.Error("Can't add mirror compoment to md!")
4325         cfg.SetDiskID(new_drbd, remote_node)
4326         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4327           logger.Error("Can't rollback on secondary")
4328         cfg.SetDiskID(new_drbd, instance.primary_node)
4329         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4330           logger.Error("Can't rollback on primary")
4331         raise errors.OpExecError("Full abort, cleanup manually!!")
4332
4333       dev.children.append(new_drbd)
4334       cfg.AddInstance(instance)
4335
4336     # this can fail as the old devices are degraded and _WaitForSync
4337     # does a combined result over all disks, so we don't check its
4338     # return value
4339     _WaitForSync(cfg, instance, self.proc, unlock=True)
4340
4341     # so check manually all the devices
4342     for name in iv_names:
4343       dev, child, new_drbd = iv_names[name]
4344       cfg.SetDiskID(dev, instance.primary_node)
4345       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4346       if is_degr:
4347         raise errors.OpExecError("MD device %s is degraded!" % name)
4348       cfg.SetDiskID(new_drbd, instance.primary_node)
4349       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4350       if is_degr:
4351         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4352
4353     for name in iv_names:
4354       dev, child, new_drbd = iv_names[name]
4355       logger.Info("remove mirror %s component" % name)
4356       cfg.SetDiskID(dev, instance.primary_node)
4357       if not rpc.call_blockdev_removechildren(instance.primary_node,
4358                                               dev, [child]):
4359         logger.Error("Can't remove child from mirror, aborting"
4360                      " *this device cleanup*.\nYou need to cleanup manually!!")
4361         continue
4362
4363       for node in child.logical_id[:2]:
4364         logger.Info("remove child device on %s" % node)
4365         cfg.SetDiskID(child, node)
4366         if not rpc.call_blockdev_remove(node, child):
4367           logger.Error("Warning: failed to remove device from node %s,"
4368                        " continuing operation." % node)
4369
4370       dev.children.remove(child)
4371
4372       cfg.AddInstance(instance)
4373
4374   def _ExecD8DiskOnly(self, feedback_fn):
4375     """Replace a disk on the primary or secondary for dbrd8.
4376
4377     The algorithm for replace is quite complicated:
4378       - for each disk to be replaced:
4379         - create new LVs on the target node with unique names
4380         - detach old LVs from the drbd device
4381         - rename old LVs to name_replaced.<time_t>
4382         - rename new LVs to old LVs
4383         - attach the new LVs (with the old names now) to the drbd device
4384       - wait for sync across all devices
4385       - for each modified disk:
4386         - remove old LVs (which have the name name_replaces.<time_t>)
4387
4388     Failures are not very well handled.
4389
4390     """
4391     steps_total = 6
4392     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4393     instance = self.instance
4394     iv_names = {}
4395     vgname = self.cfg.GetVGName()
4396     # start of work
4397     cfg = self.cfg
4398     tgt_node = self.tgt_node
4399     oth_node = self.oth_node
4400
4401     # Step: check device activation
4402     self.proc.LogStep(1, steps_total, "check device existence")
4403     info("checking volume groups")
4404     my_vg = cfg.GetVGName()
4405     results = rpc.call_vg_list([oth_node, tgt_node])
4406     if not results:
4407       raise errors.OpExecError("Can't list volume groups on the nodes")
4408     for node in oth_node, tgt_node:
4409       res = results.get(node, False)
4410       if not res or my_vg not in res:
4411         raise errors.OpExecError("Volume group '%s' not found on %s" %
4412                                  (my_vg, node))
4413     for dev in instance.disks:
4414       if not dev.iv_name in self.op.disks:
4415         continue
4416       for node in tgt_node, oth_node:
4417         info("checking %s on %s" % (dev.iv_name, node))
4418         cfg.SetDiskID(dev, node)
4419         if not rpc.call_blockdev_find(node, dev):
4420           raise errors.OpExecError("Can't find device %s on node %s" %
4421                                    (dev.iv_name, node))
4422
4423     # Step: check other node consistency
4424     self.proc.LogStep(2, steps_total, "check peer consistency")
4425     for dev in instance.disks:
4426       if not dev.iv_name in self.op.disks:
4427         continue
4428       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4429       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4430                                    oth_node==instance.primary_node):
4431         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4432                                  " to replace disks on this node (%s)" %
4433                                  (oth_node, tgt_node))
4434
4435     # Step: create new storage
4436     self.proc.LogStep(3, steps_total, "allocate new storage")
4437     for dev in instance.disks:
4438       if not dev.iv_name in self.op.disks:
4439         continue
4440       size = dev.size
4441       cfg.SetDiskID(dev, tgt_node)
4442       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4443       names = _GenerateUniqueNames(cfg, lv_names)
4444       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4445                              logical_id=(vgname, names[0]))
4446       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4447                              logical_id=(vgname, names[1]))
4448       new_lvs = [lv_data, lv_meta]
4449       old_lvs = dev.children
4450       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4451       info("creating new local storage on %s for %s" %
4452            (tgt_node, dev.iv_name))
4453       # since we *always* want to create this LV, we use the
4454       # _Create...OnPrimary (which forces the creation), even if we
4455       # are talking about the secondary node
4456       for new_lv in new_lvs:
4457         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4458                                         _GetInstanceInfoText(instance)):
4459           raise errors.OpExecError("Failed to create new LV named '%s' on"
4460                                    " node '%s'" %
4461                                    (new_lv.logical_id[1], tgt_node))
4462
4463     # Step: for each lv, detach+rename*2+attach
4464     self.proc.LogStep(4, steps_total, "change drbd configuration")
4465     for dev, old_lvs, new_lvs in iv_names.itervalues():
4466       info("detaching %s drbd from local storage" % dev.iv_name)
4467       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4468         raise errors.OpExecError("Can't detach drbd from local storage on node"
4469                                  " %s for device %s" % (tgt_node, dev.iv_name))
4470       #dev.children = []
4471       #cfg.Update(instance)
4472
4473       # ok, we created the new LVs, so now we know we have the needed
4474       # storage; as such, we proceed on the target node to rename
4475       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4476       # using the assumption that logical_id == physical_id (which in
4477       # turn is the unique_id on that node)
4478
4479       # FIXME(iustin): use a better name for the replaced LVs
4480       temp_suffix = int(time.time())
4481       ren_fn = lambda d, suff: (d.physical_id[0],
4482                                 d.physical_id[1] + "_replaced-%s" % suff)
4483       # build the rename list based on what LVs exist on the node
4484       rlist = []
4485       for to_ren in old_lvs:
4486         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4487         if find_res is not None: # device exists
4488           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4489
4490       info("renaming the old LVs on the target node")
4491       if not rpc.call_blockdev_rename(tgt_node, rlist):
4492         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4493       # now we rename the new LVs to the old LVs
4494       info("renaming the new LVs on the target node")
4495       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4496       if not rpc.call_blockdev_rename(tgt_node, rlist):
4497         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4498
4499       for old, new in zip(old_lvs, new_lvs):
4500         new.logical_id = old.logical_id
4501         cfg.SetDiskID(new, tgt_node)
4502
4503       for disk in old_lvs:
4504         disk.logical_id = ren_fn(disk, temp_suffix)
4505         cfg.SetDiskID(disk, tgt_node)
4506
4507       # now that the new lvs have the old name, we can add them to the device
4508       info("adding new mirror component on %s" % tgt_node)
4509       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4510         for new_lv in new_lvs:
4511           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4512             warning("Can't rollback device %s", hint="manually cleanup unused"
4513                     " logical volumes")
4514         raise errors.OpExecError("Can't add local storage to drbd")
4515
4516       dev.children = new_lvs
4517       cfg.Update(instance)
4518
4519     # Step: wait for sync
4520
4521     # this can fail as the old devices are degraded and _WaitForSync
4522     # does a combined result over all disks, so we don't check its
4523     # return value
4524     self.proc.LogStep(5, steps_total, "sync devices")
4525     _WaitForSync(cfg, instance, self.proc, unlock=True)
4526
4527     # so check manually all the devices
4528     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4529       cfg.SetDiskID(dev, instance.primary_node)
4530       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4531       if is_degr:
4532         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4533
4534     # Step: remove old storage
4535     self.proc.LogStep(6, steps_total, "removing old storage")
4536     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4537       info("remove logical volumes for %s" % name)
4538       for lv in old_lvs:
4539         cfg.SetDiskID(lv, tgt_node)
4540         if not rpc.call_blockdev_remove(tgt_node, lv):
4541           warning("Can't remove old LV", hint="manually remove unused LVs")
4542           continue
4543
4544   def _ExecD8Secondary(self, feedback_fn):
4545     """Replace the secondary node for drbd8.
4546
4547     The algorithm for replace is quite complicated:
4548       - for all disks of the instance:
4549         - create new LVs on the new node with same names
4550         - shutdown the drbd device on the old secondary
4551         - disconnect the drbd network on the primary
4552         - create the drbd device on the new secondary
4553         - network attach the drbd on the primary, using an artifice:
4554           the drbd code for Attach() will connect to the network if it
4555           finds a device which is connected to the good local disks but
4556           not network enabled
4557       - wait for sync across all devices
4558       - remove all disks from the old secondary
4559
4560     Failures are not very well handled.
4561
4562     """
4563     steps_total = 6
4564     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4565     instance = self.instance
4566     iv_names = {}
4567     vgname = self.cfg.GetVGName()
4568     # start of work
4569     cfg = self.cfg
4570     old_node = self.tgt_node
4571     new_node = self.new_node
4572     pri_node = instance.primary_node
4573
4574     # Step: check device activation
4575     self.proc.LogStep(1, steps_total, "check device existence")
4576     info("checking volume groups")
4577     my_vg = cfg.GetVGName()
4578     results = rpc.call_vg_list([pri_node, new_node])
4579     if not results:
4580       raise errors.OpExecError("Can't list volume groups on the nodes")
4581     for node in pri_node, new_node:
4582       res = results.get(node, False)
4583       if not res or my_vg not in res:
4584         raise errors.OpExecError("Volume group '%s' not found on %s" %
4585                                  (my_vg, node))
4586     for dev in instance.disks:
4587       if not dev.iv_name in self.op.disks:
4588         continue
4589       info("checking %s on %s" % (dev.iv_name, pri_node))
4590       cfg.SetDiskID(dev, pri_node)
4591       if not rpc.call_blockdev_find(pri_node, dev):
4592         raise errors.OpExecError("Can't find device %s on node %s" %
4593                                  (dev.iv_name, pri_node))
4594
4595     # Step: check other node consistency
4596     self.proc.LogStep(2, steps_total, "check peer consistency")
4597     for dev in instance.disks:
4598       if not dev.iv_name in self.op.disks:
4599         continue
4600       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4601       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4602         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4603                                  " unsafe to replace the secondary" %
4604                                  pri_node)
4605
4606     # Step: create new storage
4607     self.proc.LogStep(3, steps_total, "allocate new storage")
4608     for dev in instance.disks:
4609       size = dev.size
4610       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4611       # since we *always* want to create this LV, we use the
4612       # _Create...OnPrimary (which forces the creation), even if we
4613       # are talking about the secondary node
4614       for new_lv in dev.children:
4615         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4616                                         _GetInstanceInfoText(instance)):
4617           raise errors.OpExecError("Failed to create new LV named '%s' on"
4618                                    " node '%s'" %
4619                                    (new_lv.logical_id[1], new_node))
4620
4621       iv_names[dev.iv_name] = (dev, dev.children)
4622
4623     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4624     for dev in instance.disks:
4625       size = dev.size
4626       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4627       # create new devices on new_node
4628       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4629                               logical_id=(pri_node, new_node,
4630                                           dev.logical_id[2]),
4631                               children=dev.children)
4632       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4633                                         new_drbd, False,
4634                                       _GetInstanceInfoText(instance)):
4635         raise errors.OpExecError("Failed to create new DRBD on"
4636                                  " node '%s'" % new_node)
4637
4638     for dev in instance.disks:
4639       # we have new devices, shutdown the drbd on the old secondary
4640       info("shutting down drbd for %s on old node" % dev.iv_name)
4641       cfg.SetDiskID(dev, old_node)
4642       if not rpc.call_blockdev_shutdown(old_node, dev):
4643         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4644                 hint="Please cleanup this device manually as soon as possible")
4645
4646     info("detaching primary drbds from the network (=> standalone)")
4647     done = 0
4648     for dev in instance.disks:
4649       cfg.SetDiskID(dev, pri_node)
4650       # set the physical (unique in bdev terms) id to None, meaning
4651       # detach from network
4652       dev.physical_id = (None,) * len(dev.physical_id)
4653       # and 'find' the device, which will 'fix' it to match the
4654       # standalone state
4655       if rpc.call_blockdev_find(pri_node, dev):
4656         done += 1
4657       else:
4658         warning("Failed to detach drbd %s from network, unusual case" %
4659                 dev.iv_name)
4660
4661     if not done:
4662       # no detaches succeeded (very unlikely)
4663       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4664
4665     # if we managed to detach at least one, we update all the disks of
4666     # the instance to point to the new secondary
4667     info("updating instance configuration")
4668     for dev in instance.disks:
4669       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4670       cfg.SetDiskID(dev, pri_node)
4671     cfg.Update(instance)
4672
4673     # and now perform the drbd attach
4674     info("attaching primary drbds to new secondary (standalone => connected)")
4675     failures = []
4676     for dev in instance.disks:
4677       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4678       # since the attach is smart, it's enough to 'find' the device,
4679       # it will automatically activate the network, if the physical_id
4680       # is correct
4681       cfg.SetDiskID(dev, pri_node)
4682       if not rpc.call_blockdev_find(pri_node, dev):
4683         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4684                 "please do a gnt-instance info to see the status of disks")
4685
4686     # this can fail as the old devices are degraded and _WaitForSync
4687     # does a combined result over all disks, so we don't check its
4688     # return value
4689     self.proc.LogStep(5, steps_total, "sync devices")
4690     _WaitForSync(cfg, instance, self.proc, unlock=True)
4691
4692     # so check manually all the devices
4693     for name, (dev, old_lvs) in iv_names.iteritems():
4694       cfg.SetDiskID(dev, pri_node)
4695       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4696       if is_degr:
4697         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4698
4699     self.proc.LogStep(6, steps_total, "removing old storage")
4700     for name, (dev, old_lvs) in iv_names.iteritems():
4701       info("remove logical volumes for %s" % name)
4702       for lv in old_lvs:
4703         cfg.SetDiskID(lv, old_node)
4704         if not rpc.call_blockdev_remove(old_node, lv):
4705           warning("Can't remove LV on old secondary",
4706                   hint="Cleanup stale volumes by hand")
4707
4708   def Exec(self, feedback_fn):
4709     """Execute disk replacement.
4710
4711     This dispatches the disk replacement to the appropriate handler.
4712
4713     """
4714     instance = self.instance
4715
4716     # Activate the instance disks if we're replacing them on a down instance
4717     if instance.status == "down":
4718       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4719       self.proc.ChainOpCode(op)
4720
4721     if instance.disk_template == constants.DT_REMOTE_RAID1:
4722       fn = self._ExecRR1
4723     elif instance.disk_template == constants.DT_DRBD8:
4724       if self.op.remote_node is None:
4725         fn = self._ExecD8DiskOnly
4726       else:
4727         fn = self._ExecD8Secondary
4728     else:
4729       raise errors.ProgrammerError("Unhandled disk replacement case")
4730
4731     ret = fn(feedback_fn)
4732
4733     # Deactivate the instance disks if we're replacing them on a down instance
4734     if instance.status == "down":
4735       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4736       self.proc.ChainOpCode(op)
4737
4738     return ret
4739
4740
4741 class LUGrowDisk(LogicalUnit):
4742   """Grow a disk of an instance.
4743
4744   """
4745   HPATH = "disk-grow"
4746   HTYPE = constants.HTYPE_INSTANCE
4747   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4748
4749   def BuildHooksEnv(self):
4750     """Build hooks env.
4751
4752     This runs on the master, the primary and all the secondaries.
4753
4754     """
4755     env = {
4756       "DISK": self.op.disk,
4757       "AMOUNT": self.op.amount,
4758       }
4759     env.update(_BuildInstanceHookEnvByObject(self.instance))
4760     nl = [
4761       self.sstore.GetMasterNode(),
4762       self.instance.primary_node,
4763       ]
4764     return env, nl, nl
4765
4766   def CheckPrereq(self):
4767     """Check prerequisites.
4768
4769     This checks that the instance is in the cluster.
4770
4771     """
4772     instance = self.cfg.GetInstanceInfo(
4773       self.cfg.ExpandInstanceName(self.op.instance_name))
4774     if instance is None:
4775       raise errors.OpPrereqError("Instance '%s' not known" %
4776                                  self.op.instance_name)
4777
4778     if self.op.amount <= 0:
4779       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4780
4781     self.instance = instance
4782     self.op.instance_name = instance.name
4783
4784     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4785       raise errors.OpPrereqError("Instance's disk layout does not support"
4786                                  " growing.")
4787
4788     self.disk = instance.FindDisk(self.op.disk)
4789     if self.disk is None:
4790       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4791                                  (self.op.disk, instance.name))
4792
4793     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4794     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4795     for node in nodenames:
4796       info = nodeinfo.get(node, None)
4797       if not info:
4798         raise errors.OpPrereqError("Cannot get current information"
4799                                    " from node '%s'" % node)
4800       vg_free = info.get('vg_free', None)
4801       if not isinstance(vg_free, int):
4802         raise errors.OpPrereqError("Can't compute free disk space on"
4803                                    " node %s" % node)
4804       if self.op.amount > info['vg_free']:
4805         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4806                                    " %d MiB available, %d MiB required" %
4807                                    (node, info['vg_free'], self.op.amount))
4808       is_primary = (node == instance.primary_node)
4809       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4810         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4811                                  " synchronized on node %s,"
4812                                  " aborting grow." % (self.op.disk, node))
4813
4814   def Exec(self, feedback_fn):
4815     """Execute disk grow.
4816
4817     """
4818     instance = self.instance
4819     disk = self.disk
4820     for node in (instance.secondary_nodes + (instance.primary_node,)):
4821       self.cfg.SetDiskID(disk, node)
4822       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4823       if not result or not isinstance(result, tuple) or len(result) != 2:
4824         raise errors.OpExecError("grow request failed to node %s" % node)
4825       elif not result[0]:
4826         raise errors.OpExecError("grow request failed to node %s: %s" %
4827                                  (node, result[1]))
4828     disk.RecordGrow(self.op.amount)
4829     self.cfg.Update(instance)
4830     if self.op.wait_for_sync:
4831       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4832       if disk_abort:
4833         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4834                      " Please check the instance.")
4835
4836
4837 class LUQueryInstanceData(NoHooksLU):
4838   """Query runtime instance data.
4839
4840   """
4841   _OP_REQP = ["instances"]
4842
4843   def CheckPrereq(self):
4844     """Check prerequisites.
4845
4846     This only checks the optional instance list against the existing names.
4847
4848     """
4849     if not isinstance(self.op.instances, list):
4850       raise errors.OpPrereqError("Invalid argument type 'instances'")
4851     if self.op.instances:
4852       self.wanted_instances = []
4853       names = self.op.instances
4854       for name in names:
4855         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4856         if instance is None:
4857           raise errors.OpPrereqError("No such instance name '%s'" % name)
4858         self.wanted_instances.append(instance)
4859     else:
4860       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4861                                in self.cfg.GetInstanceList()]
4862     return
4863
4864
4865   def _ComputeDiskStatus(self, instance, snode, dev):
4866     """Compute block device status.
4867
4868     """
4869     self.cfg.SetDiskID(dev, instance.primary_node)
4870     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4871     if dev.dev_type in constants.LDS_DRBD:
4872       # we change the snode then (otherwise we use the one passed in)
4873       if dev.logical_id[0] == instance.primary_node:
4874         snode = dev.logical_id[1]
4875       else:
4876         snode = dev.logical_id[0]
4877
4878     if snode:
4879       self.cfg.SetDiskID(dev, snode)
4880       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4881     else:
4882       dev_sstatus = None
4883
4884     if dev.children:
4885       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4886                       for child in dev.children]
4887     else:
4888       dev_children = []
4889
4890     data = {
4891       "iv_name": dev.iv_name,
4892       "dev_type": dev.dev_type,
4893       "logical_id": dev.logical_id,
4894       "physical_id": dev.physical_id,
4895       "pstatus": dev_pstatus,
4896       "sstatus": dev_sstatus,
4897       "children": dev_children,
4898       }
4899
4900     return data
4901
4902   def Exec(self, feedback_fn):
4903     """Gather and return data"""
4904     result = {}
4905     for instance in self.wanted_instances:
4906       remote_info = rpc.call_instance_info(instance.primary_node,
4907                                                 instance.name)
4908       if remote_info and "state" in remote_info:
4909         remote_state = "up"
4910       else:
4911         remote_state = "down"
4912       if instance.status == "down":
4913         config_state = "down"
4914       else:
4915         config_state = "up"
4916
4917       disks = [self._ComputeDiskStatus(instance, None, device)
4918                for device in instance.disks]
4919
4920       idict = {
4921         "name": instance.name,
4922         "config_state": config_state,
4923         "run_state": remote_state,
4924         "pnode": instance.primary_node,
4925         "snodes": instance.secondary_nodes,
4926         "os": instance.os,
4927         "memory": instance.memory,
4928         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4929         "disks": disks,
4930         "vcpus": instance.vcpus,
4931         }
4932
4933       htkind = self.sstore.GetHypervisorType()
4934       if htkind == constants.HT_XEN_PVM30:
4935         idict["kernel_path"] = instance.kernel_path
4936         idict["initrd_path"] = instance.initrd_path
4937
4938       if htkind == constants.HT_XEN_HVM31:
4939         idict["hvm_boot_order"] = instance.hvm_boot_order
4940         idict["hvm_acpi"] = instance.hvm_acpi
4941         idict["hvm_pae"] = instance.hvm_pae
4942         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4943         idict["hvm_nic_type"] = instance.hvm_nic_type
4944         idict["hvm_disk_type"] = instance.hvm_disk_type
4945
4946       if htkind in constants.HTS_REQ_PORT:
4947         if instance.network_port is None:
4948           vnc_console_port = None
4949         elif instance.vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4950           vnc_console_port = "%s:%s" % (instance.primary_node,
4951                                        instance.network_port)
4952         elif instance.vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4953           vnc_console_port = "%s:%s on node %s" % (instance.vnc_bind_address,
4954                                                    instance.network_port,
4955                                                    instance.primary_node)
4956         else:
4957           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4958                                         instance.network_port)
4959         idict["vnc_console_port"] = vnc_console_port
4960         idict["vnc_bind_address"] = instance.vnc_bind_address
4961         idict["network_port"] = instance.network_port
4962
4963       result[instance.name] = idict
4964
4965     return result
4966
4967
4968 class LUSetInstanceParms(LogicalUnit):
4969   """Modifies an instances's parameters.
4970
4971   """
4972   HPATH = "instance-modify"
4973   HTYPE = constants.HTYPE_INSTANCE
4974   _OP_REQP = ["instance_name"]
4975
4976   def BuildHooksEnv(self):
4977     """Build hooks env.
4978
4979     This runs on the master, primary and secondaries.
4980
4981     """
4982     args = dict()
4983     if self.mem:
4984       args['memory'] = self.mem
4985     if self.vcpus:
4986       args['vcpus'] = self.vcpus
4987     if self.do_ip or self.do_bridge or self.mac:
4988       if self.do_ip:
4989         ip = self.ip
4990       else:
4991         ip = self.instance.nics[0].ip
4992       if self.bridge:
4993         bridge = self.bridge
4994       else:
4995         bridge = self.instance.nics[0].bridge
4996       if self.mac:
4997         mac = self.mac
4998       else:
4999         mac = self.instance.nics[0].mac
5000       args['nics'] = [(ip, bridge, mac)]
5001     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5002     nl = [self.sstore.GetMasterNode(),
5003           self.instance.primary_node] + list(self.instance.secondary_nodes)
5004     return env, nl, nl
5005
5006   def CheckPrereq(self):
5007     """Check prerequisites.
5008
5009     This only checks the instance list against the existing names.
5010
5011     """
5012     self.mem = getattr(self.op, "mem", None)
5013     self.vcpus = getattr(self.op, "vcpus", None)
5014     self.ip = getattr(self.op, "ip", None)
5015     self.mac = getattr(self.op, "mac", None)
5016     self.bridge = getattr(self.op, "bridge", None)
5017     self.kernel_path = getattr(self.op, "kernel_path", None)
5018     self.initrd_path = getattr(self.op, "initrd_path", None)
5019     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5020     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5021     self.hvm_pae = getattr(self.op, "hvm_pae", None)
5022     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5023     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5024     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5025     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5026     self.force = getattr(self.op, "force", None)
5027     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5028                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
5029                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5030                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
5031     if all_parms.count(None) == len(all_parms):
5032       raise errors.OpPrereqError("No changes submitted")
5033     if self.mem is not None:
5034       try:
5035         self.mem = int(self.mem)
5036       except ValueError, err:
5037         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5038     if self.vcpus is not None:
5039       try:
5040         self.vcpus = int(self.vcpus)
5041       except ValueError, err:
5042         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5043     if self.ip is not None:
5044       self.do_ip = True
5045       if self.ip.lower() == "none":
5046         self.ip = None
5047       else:
5048         if not utils.IsValidIP(self.ip):
5049           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5050     else:
5051       self.do_ip = False
5052     self.do_bridge = (self.bridge is not None)
5053     if self.mac is not None:
5054       if self.cfg.IsMacInUse(self.mac):
5055         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5056                                    self.mac)
5057       if not utils.IsValidMac(self.mac):
5058         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5059
5060     if self.kernel_path is not None:
5061       self.do_kernel_path = True
5062       if self.kernel_path == constants.VALUE_NONE:
5063         raise errors.OpPrereqError("Can't set instance to no kernel")
5064
5065       if self.kernel_path != constants.VALUE_DEFAULT:
5066         if not os.path.isabs(self.kernel_path):
5067           raise errors.OpPrereqError("The kernel path must be an absolute"
5068                                     " filename")
5069     else:
5070       self.do_kernel_path = False
5071
5072     if self.initrd_path is not None:
5073       self.do_initrd_path = True
5074       if self.initrd_path not in (constants.VALUE_NONE,
5075                                   constants.VALUE_DEFAULT):
5076         if not os.path.isabs(self.initrd_path):
5077           raise errors.OpPrereqError("The initrd path must be an absolute"
5078                                     " filename")
5079     else:
5080       self.do_initrd_path = False
5081
5082     # boot order verification
5083     if self.hvm_boot_order is not None:
5084       if self.hvm_boot_order != constants.VALUE_DEFAULT:
5085         if len(self.hvm_boot_order.strip("acdn")) != 0:
5086           raise errors.OpPrereqError("invalid boot order specified,"
5087                                      " must be one or more of [acdn]"
5088                                      " or 'default'")
5089
5090     # hvm_cdrom_image_path verification
5091     if self.op.hvm_cdrom_image_path is not None:
5092       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5093               self.op.hvm_cdrom_image_path.lower() == "none"):
5094         raise errors.OpPrereqError("The path to the HVM CDROM image must"
5095                                    " be an absolute path or None, not %s" %
5096                                    self.op.hvm_cdrom_image_path)
5097       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5098               self.op.hvm_cdrom_image_path.lower() == "none"):
5099         raise errors.OpPrereqError("The HVM CDROM image must either be a"
5100                                    " regular file or a symlink pointing to"
5101                                    " an existing regular file, not %s" %
5102                                    self.op.hvm_cdrom_image_path)
5103
5104     # vnc_bind_address verification
5105     if self.op.vnc_bind_address is not None:
5106       if not utils.IsValidIP(self.op.vnc_bind_address):
5107         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5108                                    " like a valid IP address" %
5109                                    self.op.vnc_bind_address)
5110
5111     # Xen HVM device type checks
5112     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5113       if self.op.hvm_nic_type is not None:
5114         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5115           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5116                                      " HVM  hypervisor" % self.op.hvm_nic_type)
5117       if self.op.hvm_disk_type is not None:
5118         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5119           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5120                                      " HVM hypervisor" % self.op.hvm_disk_type)
5121
5122     instance = self.cfg.GetInstanceInfo(
5123       self.cfg.ExpandInstanceName(self.op.instance_name))
5124     if instance is None:
5125       raise errors.OpPrereqError("No such instance name '%s'" %
5126                                  self.op.instance_name)
5127     self.op.instance_name = instance.name
5128     self.instance = instance
5129     self.warn = []
5130     if self.mem is not None and not self.force:
5131       pnode = self.instance.primary_node
5132       nodelist = [pnode]
5133       nodelist.extend(instance.secondary_nodes)
5134       instance_info = rpc.call_instance_info(pnode, instance.name)
5135       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5136
5137       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5138         # Assume the primary node is unreachable and go ahead
5139         self.warn.append("Can't get info from primary node %s" % pnode)
5140       else:
5141         if instance_info:
5142           current_mem = instance_info['memory']
5143         else:
5144           # Assume instance not running
5145           # (there is a slight race condition here, but it's not very probable,
5146           # and we have no other way to check)
5147           current_mem = 0
5148         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5149         if miss_mem > 0:
5150           raise errors.OpPrereqError("This change will prevent the instance"
5151                                      " from starting, due to %d MB of memory"
5152                                      " missing on its primary node" % miss_mem)
5153
5154       for node in instance.secondary_nodes:
5155         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5156           self.warn.append("Can't get info from secondary node %s" % node)
5157         elif self.mem > nodeinfo[node]['memory_free']:
5158           self.warn.append("Not enough memory to failover instance to secondary"
5159                            " node %s" % node)
5160     return
5161
5162   def Exec(self, feedback_fn):
5163     """Modifies an instance.
5164
5165     All parameters take effect only at the next restart of the instance.
5166     """
5167     # Process here the warnings from CheckPrereq, as we don't have a
5168     # feedback_fn there.
5169     for warn in self.warn:
5170       feedback_fn("WARNING: %s" % warn)
5171
5172     result = []
5173     instance = self.instance
5174     if self.mem:
5175       instance.memory = self.mem
5176       result.append(("mem", self.mem))
5177     if self.vcpus:
5178       instance.vcpus = self.vcpus
5179       result.append(("vcpus",  self.vcpus))
5180     if self.do_ip:
5181       instance.nics[0].ip = self.ip
5182       result.append(("ip", self.ip))
5183     if self.bridge:
5184       instance.nics[0].bridge = self.bridge
5185       result.append(("bridge", self.bridge))
5186     if self.mac:
5187       instance.nics[0].mac = self.mac
5188       result.append(("mac", self.mac))
5189     if self.do_kernel_path:
5190       instance.kernel_path = self.kernel_path
5191       result.append(("kernel_path", self.kernel_path))
5192     if self.do_initrd_path:
5193       instance.initrd_path = self.initrd_path
5194       result.append(("initrd_path", self.initrd_path))
5195     if self.hvm_boot_order:
5196       if self.hvm_boot_order == constants.VALUE_DEFAULT:
5197         instance.hvm_boot_order = None
5198       else:
5199         instance.hvm_boot_order = self.hvm_boot_order
5200       result.append(("hvm_boot_order", self.hvm_boot_order))
5201     if self.hvm_acpi is not None:
5202       instance.hvm_acpi = self.hvm_acpi
5203       result.append(("hvm_acpi", self.hvm_acpi))
5204     if self.hvm_pae is not None:
5205       instance.hvm_pae = self.hvm_pae
5206       result.append(("hvm_pae", self.hvm_pae))
5207     if self.hvm_nic_type is not None:
5208       instance.hvm_nic_type = self.hvm_nic_type
5209       result.append(("hvm_nic_type", self.hvm_nic_type))
5210     if self.hvm_disk_type is not None:
5211       instance.hvm_disk_type = self.hvm_disk_type
5212       result.append(("hvm_disk_type", self.hvm_disk_type))
5213     if self.hvm_cdrom_image_path:
5214       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5215         instance.hvm_cdrom_image_path = None
5216       else:
5217         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5218       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5219     if self.vnc_bind_address:
5220       instance.vnc_bind_address = self.vnc_bind_address
5221       result.append(("vnc_bind_address", self.vnc_bind_address))
5222
5223     self.cfg.AddInstance(instance)
5224
5225     return result
5226
5227
5228 class LUQueryExports(NoHooksLU):
5229   """Query the exports list
5230
5231   """
5232   _OP_REQP = []
5233
5234   def CheckPrereq(self):
5235     """Check that the nodelist contains only existing nodes.
5236
5237     """
5238     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5239
5240   def Exec(self, feedback_fn):
5241     """Compute the list of all the exported system images.
5242
5243     Returns:
5244       a dictionary with the structure node->(export-list)
5245       where export-list is a list of the instances exported on
5246       that node.
5247
5248     """
5249     return rpc.call_export_list(self.nodes)
5250
5251
5252 class LUExportInstance(LogicalUnit):
5253   """Export an instance to an image in the cluster.
5254
5255   """
5256   HPATH = "instance-export"
5257   HTYPE = constants.HTYPE_INSTANCE
5258   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5259
5260   def BuildHooksEnv(self):
5261     """Build hooks env.
5262
5263     This will run on the master, primary node and target node.
5264
5265     """
5266     env = {
5267       "EXPORT_NODE": self.op.target_node,
5268       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5269       }
5270     env.update(_BuildInstanceHookEnvByObject(self.instance))
5271     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5272           self.op.target_node]
5273     return env, nl, nl
5274
5275   def CheckPrereq(self):
5276     """Check prerequisites.
5277
5278     This checks that the instance and node names are valid.
5279
5280     """
5281     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5282     self.instance = self.cfg.GetInstanceInfo(instance_name)
5283     if self.instance is None:
5284       raise errors.OpPrereqError("Instance '%s' not found" %
5285                                  self.op.instance_name)
5286
5287     # node verification
5288     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5289     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5290
5291     if self.dst_node is None:
5292       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5293                                  self.op.target_node)
5294     self.op.target_node = self.dst_node.name
5295
5296   def Exec(self, feedback_fn):
5297     """Export an instance to an image in the cluster.
5298
5299     """
5300     instance = self.instance
5301     dst_node = self.dst_node
5302     src_node = instance.primary_node
5303     if self.op.shutdown:
5304       # shutdown the instance, but not the disks
5305       if not rpc.call_instance_shutdown(src_node, instance):
5306         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5307                                  (instance.name, src_node))
5308
5309     vgname = self.cfg.GetVGName()
5310
5311     snap_disks = []
5312
5313     try:
5314       for disk in instance.disks:
5315         if disk.iv_name == "sda":
5316           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5317           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5318
5319           if not new_dev_name:
5320             logger.Error("could not snapshot block device %s on node %s" %
5321                          (disk.logical_id[1], src_node))
5322           else:
5323             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5324                                       logical_id=(vgname, new_dev_name),
5325                                       physical_id=(vgname, new_dev_name),
5326                                       iv_name=disk.iv_name)
5327             snap_disks.append(new_dev)
5328
5329     finally:
5330       if self.op.shutdown and instance.status == "up":
5331         if not rpc.call_instance_start(src_node, instance, None):
5332           _ShutdownInstanceDisks(instance, self.cfg)
5333           raise errors.OpExecError("Could not start instance")
5334
5335     # TODO: check for size
5336
5337     for dev in snap_disks:
5338       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5339                                            instance):
5340         logger.Error("could not export block device %s from node"
5341                      " %s to node %s" %
5342                      (dev.logical_id[1], src_node, dst_node.name))
5343       if not rpc.call_blockdev_remove(src_node, dev):
5344         logger.Error("could not remove snapshot block device %s from"
5345                      " node %s" % (dev.logical_id[1], src_node))
5346
5347     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5348       logger.Error("could not finalize export for instance %s on node %s" %
5349                    (instance.name, dst_node.name))
5350
5351     nodelist = self.cfg.GetNodeList()
5352     nodelist.remove(dst_node.name)
5353
5354     # on one-node clusters nodelist will be empty after the removal
5355     # if we proceed the backup would be removed because OpQueryExports
5356     # substitutes an empty list with the full cluster node list.
5357     if nodelist:
5358       op = opcodes.OpQueryExports(nodes=nodelist)
5359       exportlist = self.proc.ChainOpCode(op)
5360       for node in exportlist:
5361         if instance.name in exportlist[node]:
5362           if not rpc.call_export_remove(node, instance.name):
5363             logger.Error("could not remove older export for instance %s"
5364                          " on node %s" % (instance.name, node))
5365
5366
5367 class LURemoveExport(NoHooksLU):
5368   """Remove exports related to the named instance.
5369
5370   """
5371   _OP_REQP = ["instance_name"]
5372
5373   def CheckPrereq(self):
5374     """Check prerequisites.
5375     """
5376     pass
5377
5378   def Exec(self, feedback_fn):
5379     """Remove any export.
5380
5381     """
5382     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5383     # If the instance was not found we'll try with the name that was passed in.
5384     # This will only work if it was an FQDN, though.
5385     fqdn_warn = False
5386     if not instance_name:
5387       fqdn_warn = True
5388       instance_name = self.op.instance_name
5389
5390     op = opcodes.OpQueryExports(nodes=[])
5391     exportlist = self.proc.ChainOpCode(op)
5392     found = False
5393     for node in exportlist:
5394       if instance_name in exportlist[node]:
5395         found = True
5396         if not rpc.call_export_remove(node, instance_name):
5397           logger.Error("could not remove export for instance %s"
5398                        " on node %s" % (instance_name, node))
5399
5400     if fqdn_warn and not found:
5401       feedback_fn("Export not found. If trying to remove an export belonging"
5402                   " to a deleted instance please use its Fully Qualified"
5403                   " Domain Name.")
5404
5405
5406 class TagsLU(NoHooksLU):
5407   """Generic tags LU.
5408
5409   This is an abstract class which is the parent of all the other tags LUs.
5410
5411   """
5412   def CheckPrereq(self):
5413     """Check prerequisites.
5414
5415     """
5416     if self.op.kind == constants.TAG_CLUSTER:
5417       self.target = self.cfg.GetClusterInfo()
5418     elif self.op.kind == constants.TAG_NODE:
5419       name = self.cfg.ExpandNodeName(self.op.name)
5420       if name is None:
5421         raise errors.OpPrereqError("Invalid node name (%s)" %
5422                                    (self.op.name,))
5423       self.op.name = name
5424       self.target = self.cfg.GetNodeInfo(name)
5425     elif self.op.kind == constants.TAG_INSTANCE:
5426       name = self.cfg.ExpandInstanceName(self.op.name)
5427       if name is None:
5428         raise errors.OpPrereqError("Invalid instance name (%s)" %
5429                                    (self.op.name,))
5430       self.op.name = name
5431       self.target = self.cfg.GetInstanceInfo(name)
5432     else:
5433       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5434                                  str(self.op.kind))
5435
5436
5437 class LUGetTags(TagsLU):
5438   """Returns the tags of a given object.
5439
5440   """
5441   _OP_REQP = ["kind", "name"]
5442
5443   def Exec(self, feedback_fn):
5444     """Returns the tag list.
5445
5446     """
5447     return self.target.GetTags()
5448
5449
5450 class LUSearchTags(NoHooksLU):
5451   """Searches the tags for a given pattern.
5452
5453   """
5454   _OP_REQP = ["pattern"]
5455
5456   def CheckPrereq(self):
5457     """Check prerequisites.
5458
5459     This checks the pattern passed for validity by compiling it.
5460
5461     """
5462     try:
5463       self.re = re.compile(self.op.pattern)
5464     except re.error, err:
5465       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5466                                  (self.op.pattern, err))
5467
5468   def Exec(self, feedback_fn):
5469     """Returns the tag list.
5470
5471     """
5472     cfg = self.cfg
5473     tgts = [("/cluster", cfg.GetClusterInfo())]
5474     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5475     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5476     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5477     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5478     results = []
5479     for path, target in tgts:
5480       for tag in target.GetTags():
5481         if self.re.search(tag):
5482           results.append((path, tag))
5483     return results
5484
5485
5486 class LUAddTags(TagsLU):
5487   """Sets a tag on a given object.
5488
5489   """
5490   _OP_REQP = ["kind", "name", "tags"]
5491
5492   def CheckPrereq(self):
5493     """Check prerequisites.
5494
5495     This checks the type and length of the tag name and value.
5496
5497     """
5498     TagsLU.CheckPrereq(self)
5499     for tag in self.op.tags:
5500       objects.TaggableObject.ValidateTag(tag)
5501
5502   def Exec(self, feedback_fn):
5503     """Sets the tag.
5504
5505     """
5506     try:
5507       for tag in self.op.tags:
5508         self.target.AddTag(tag)
5509     except errors.TagError, err:
5510       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5511     try:
5512       self.cfg.Update(self.target)
5513     except errors.ConfigurationError:
5514       raise errors.OpRetryError("There has been a modification to the"
5515                                 " config file and the operation has been"
5516                                 " aborted. Please retry.")
5517
5518
5519 class LUDelTags(TagsLU):
5520   """Delete a list of tags from a given object.
5521
5522   """
5523   _OP_REQP = ["kind", "name", "tags"]
5524
5525   def CheckPrereq(self):
5526     """Check prerequisites.
5527
5528     This checks that we have the given tag.
5529
5530     """
5531     TagsLU.CheckPrereq(self)
5532     for tag in self.op.tags:
5533       objects.TaggableObject.ValidateTag(tag, removal=True)
5534     del_tags = frozenset(self.op.tags)
5535     cur_tags = self.target.GetTags()
5536     if not del_tags <= cur_tags:
5537       diff_tags = del_tags - cur_tags
5538       diff_names = ["'%s'" % tag for tag in diff_tags]
5539       diff_names.sort()
5540       raise errors.OpPrereqError("Tag(s) %s not found" %
5541                                  (",".join(diff_names)))
5542
5543   def Exec(self, feedback_fn):
5544     """Remove the tag from the object.
5545
5546     """
5547     for tag in self.op.tags:
5548       self.target.RemoveTag(tag)
5549     try:
5550       self.cfg.Update(self.target)
5551     except errors.ConfigurationError:
5552       raise errors.OpRetryError("There has been a modification to the"
5553                                 " config file and the operation has been"
5554                                 " aborted. Please retry.")
5555
5556 class LUTestDelay(NoHooksLU):
5557   """Sleep for a specified amount of time.
5558
5559   This LU sleeps on the master and/or nodes for a specified amoutn of
5560   time.
5561
5562   """
5563   _OP_REQP = ["duration", "on_master", "on_nodes"]
5564
5565   def CheckPrereq(self):
5566     """Check prerequisites.
5567
5568     This checks that we have a good list of nodes and/or the duration
5569     is valid.
5570
5571     """
5572
5573     if self.op.on_nodes:
5574       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5575
5576   def Exec(self, feedback_fn):
5577     """Do the actual sleep.
5578
5579     """
5580     if self.op.on_master:
5581       if not utils.TestDelay(self.op.duration):
5582         raise errors.OpExecError("Error during master delay test")
5583     if self.op.on_nodes:
5584       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5585       if not result:
5586         raise errors.OpExecError("Complete failure from rpc call")
5587       for node, node_result in result.items():
5588         if not node_result:
5589           raise errors.OpExecError("Failure during rpc call to node %s,"
5590                                    " result: %s" % (node, node_result))
5591
5592
5593 class IAllocator(object):
5594   """IAllocator framework.
5595
5596   An IAllocator instance has three sets of attributes:
5597     - cfg/sstore that are needed to query the cluster
5598     - input data (all members of the _KEYS class attribute are required)
5599     - four buffer attributes (in|out_data|text), that represent the
5600       input (to the external script) in text and data structure format,
5601       and the output from it, again in two formats
5602     - the result variables from the script (success, info, nodes) for
5603       easy usage
5604
5605   """
5606   _ALLO_KEYS = [
5607     "mem_size", "disks", "disk_template",
5608     "os", "tags", "nics", "vcpus",
5609     ]
5610   _RELO_KEYS = [
5611     "relocate_from",
5612     ]
5613
5614   def __init__(self, cfg, sstore, mode, name, **kwargs):
5615     self.cfg = cfg
5616     self.sstore = sstore
5617     # init buffer variables
5618     self.in_text = self.out_text = self.in_data = self.out_data = None
5619     # init all input fields so that pylint is happy
5620     self.mode = mode
5621     self.name = name
5622     self.mem_size = self.disks = self.disk_template = None
5623     self.os = self.tags = self.nics = self.vcpus = None
5624     self.relocate_from = None
5625     # computed fields
5626     self.required_nodes = None
5627     # init result fields
5628     self.success = self.info = self.nodes = None
5629     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5630       keyset = self._ALLO_KEYS
5631     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5632       keyset = self._RELO_KEYS
5633     else:
5634       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5635                                    " IAllocator" % self.mode)
5636     for key in kwargs:
5637       if key not in keyset:
5638         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5639                                      " IAllocator" % key)
5640       setattr(self, key, kwargs[key])
5641     for key in keyset:
5642       if key not in kwargs:
5643         raise errors.ProgrammerError("Missing input parameter '%s' to"
5644                                      " IAllocator" % key)
5645     self._BuildInputData()
5646
5647   def _ComputeClusterData(self):
5648     """Compute the generic allocator input data.
5649
5650     This is the data that is independent of the actual operation.
5651
5652     """
5653     cfg = self.cfg
5654     # cluster data
5655     data = {
5656       "version": 1,
5657       "cluster_name": self.sstore.GetClusterName(),
5658       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5659       "hypervisor_type": self.sstore.GetHypervisorType(),
5660       # we don't have job IDs
5661       }
5662
5663     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5664
5665     # node data
5666     node_results = {}
5667     node_list = cfg.GetNodeList()
5668     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5669     for nname in node_list:
5670       ninfo = cfg.GetNodeInfo(nname)
5671       if nname not in node_data or not isinstance(node_data[nname], dict):
5672         raise errors.OpExecError("Can't get data for node %s" % nname)
5673       remote_info = node_data[nname]
5674       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5675                    'vg_size', 'vg_free', 'cpu_total']:
5676         if attr not in remote_info:
5677           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5678                                    (nname, attr))
5679         try:
5680           remote_info[attr] = int(remote_info[attr])
5681         except ValueError, err:
5682           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5683                                    " %s" % (nname, attr, str(err)))
5684       # compute memory used by primary instances
5685       i_p_mem = i_p_up_mem = 0
5686       for iinfo in i_list:
5687         if iinfo.primary_node == nname:
5688           i_p_mem += iinfo.memory
5689           if iinfo.status == "up":
5690             i_p_up_mem += iinfo.memory
5691
5692       # compute memory used by instances
5693       pnr = {
5694         "tags": list(ninfo.GetTags()),
5695         "total_memory": remote_info['memory_total'],
5696         "reserved_memory": remote_info['memory_dom0'],
5697         "free_memory": remote_info['memory_free'],
5698         "i_pri_memory": i_p_mem,
5699         "i_pri_up_memory": i_p_up_mem,
5700         "total_disk": remote_info['vg_size'],
5701         "free_disk": remote_info['vg_free'],
5702         "primary_ip": ninfo.primary_ip,
5703         "secondary_ip": ninfo.secondary_ip,
5704         "total_cpus": remote_info['cpu_total'],
5705         }
5706       node_results[nname] = pnr
5707     data["nodes"] = node_results
5708
5709     # instance data
5710     instance_data = {}
5711     for iinfo in i_list:
5712       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5713                   for n in iinfo.nics]
5714       pir = {
5715         "tags": list(iinfo.GetTags()),
5716         "should_run": iinfo.status == "up",
5717         "vcpus": iinfo.vcpus,
5718         "memory": iinfo.memory,
5719         "os": iinfo.os,
5720         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5721         "nics": nic_data,
5722         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5723         "disk_template": iinfo.disk_template,
5724         }
5725       instance_data[iinfo.name] = pir
5726
5727     data["instances"] = instance_data
5728
5729     self.in_data = data
5730
5731   def _AddNewInstance(self):
5732     """Add new instance data to allocator structure.
5733
5734     This in combination with _AllocatorGetClusterData will create the
5735     correct structure needed as input for the allocator.
5736
5737     The checks for the completeness of the opcode must have already been
5738     done.
5739
5740     """
5741     data = self.in_data
5742     if len(self.disks) != 2:
5743       raise errors.OpExecError("Only two-disk configurations supported")
5744
5745     disk_space = _ComputeDiskSize(self.disk_template,
5746                                   self.disks[0]["size"], self.disks[1]["size"])
5747
5748     if self.disk_template in constants.DTS_NET_MIRROR:
5749       self.required_nodes = 2
5750     else:
5751       self.required_nodes = 1
5752     request = {
5753       "type": "allocate",
5754       "name": self.name,
5755       "disk_template": self.disk_template,
5756       "tags": self.tags,
5757       "os": self.os,
5758       "vcpus": self.vcpus,
5759       "memory": self.mem_size,
5760       "disks": self.disks,
5761       "disk_space_total": disk_space,
5762       "nics": self.nics,
5763       "required_nodes": self.required_nodes,
5764       }
5765     data["request"] = request
5766
5767   def _AddRelocateInstance(self):
5768     """Add relocate instance data to allocator structure.
5769
5770     This in combination with _IAllocatorGetClusterData will create the
5771     correct structure needed as input for the allocator.
5772
5773     The checks for the completeness of the opcode must have already been
5774     done.
5775
5776     """
5777     instance = self.cfg.GetInstanceInfo(self.name)
5778     if instance is None:
5779       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5780                                    " IAllocator" % self.name)
5781
5782     if instance.disk_template not in constants.DTS_NET_MIRROR:
5783       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5784
5785     if len(instance.secondary_nodes) != 1:
5786       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5787
5788     self.required_nodes = 1
5789
5790     disk_space = _ComputeDiskSize(instance.disk_template,
5791                                   instance.disks[0].size,
5792                                   instance.disks[1].size)
5793
5794     request = {
5795       "type": "relocate",
5796       "name": self.name,
5797       "disk_space_total": disk_space,
5798       "required_nodes": self.required_nodes,
5799       "relocate_from": self.relocate_from,
5800       }
5801     self.in_data["request"] = request
5802
5803   def _BuildInputData(self):
5804     """Build input data structures.
5805
5806     """
5807     self._ComputeClusterData()
5808
5809     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5810       self._AddNewInstance()
5811     else:
5812       self._AddRelocateInstance()
5813
5814     self.in_text = serializer.Dump(self.in_data)
5815
5816   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5817     """Run an instance allocator and return the results.
5818
5819     """
5820     data = self.in_text
5821
5822     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5823
5824     if not isinstance(result, tuple) or len(result) != 4:
5825       raise errors.OpExecError("Invalid result from master iallocator runner")
5826
5827     rcode, stdout, stderr, fail = result
5828
5829     if rcode == constants.IARUN_NOTFOUND:
5830       raise errors.OpExecError("Can't find allocator '%s'" % name)
5831     elif rcode == constants.IARUN_FAILURE:
5832         raise errors.OpExecError("Instance allocator call failed: %s,"
5833                                  " output: %s" %
5834                                  (fail, stdout+stderr))
5835     self.out_text = stdout
5836     if validate:
5837       self._ValidateResult()
5838
5839   def _ValidateResult(self):
5840     """Process the allocator results.
5841
5842     This will process and if successful save the result in
5843     self.out_data and the other parameters.
5844
5845     """
5846     try:
5847       rdict = serializer.Load(self.out_text)
5848     except Exception, err:
5849       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5850
5851     if not isinstance(rdict, dict):
5852       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5853
5854     for key in "success", "info", "nodes":
5855       if key not in rdict:
5856         raise errors.OpExecError("Can't parse iallocator results:"
5857                                  " missing key '%s'" % key)
5858       setattr(self, key, rdict[key])
5859
5860     if not isinstance(rdict["nodes"], list):
5861       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5862                                " is not a list")
5863     self.out_data = rdict
5864
5865
5866 class LUTestAllocator(NoHooksLU):
5867   """Run allocator tests.
5868
5869   This LU runs the allocator tests
5870
5871   """
5872   _OP_REQP = ["direction", "mode", "name"]
5873
5874   def CheckPrereq(self):
5875     """Check prerequisites.
5876
5877     This checks the opcode parameters depending on the director and mode test.
5878
5879     """
5880     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5881       for attr in ["name", "mem_size", "disks", "disk_template",
5882                    "os", "tags", "nics", "vcpus"]:
5883         if not hasattr(self.op, attr):
5884           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5885                                      attr)
5886       iname = self.cfg.ExpandInstanceName(self.op.name)
5887       if iname is not None:
5888         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5889                                    iname)
5890       if not isinstance(self.op.nics, list):
5891         raise errors.OpPrereqError("Invalid parameter 'nics'")
5892       for row in self.op.nics:
5893         if (not isinstance(row, dict) or
5894             "mac" not in row or
5895             "ip" not in row or
5896             "bridge" not in row):
5897           raise errors.OpPrereqError("Invalid contents of the"
5898                                      " 'nics' parameter")
5899       if not isinstance(self.op.disks, list):
5900         raise errors.OpPrereqError("Invalid parameter 'disks'")
5901       if len(self.op.disks) != 2:
5902         raise errors.OpPrereqError("Only two-disk configurations supported")
5903       for row in self.op.disks:
5904         if (not isinstance(row, dict) or
5905             "size" not in row or
5906             not isinstance(row["size"], int) or
5907             "mode" not in row or
5908             row["mode"] not in ['r', 'w']):
5909           raise errors.OpPrereqError("Invalid contents of the"
5910                                      " 'disks' parameter")
5911     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5912       if not hasattr(self.op, "name"):
5913         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5914       fname = self.cfg.ExpandInstanceName(self.op.name)
5915       if fname is None:
5916         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5917                                    self.op.name)
5918       self.op.name = fname
5919       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5920     else:
5921       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5922                                  self.op.mode)
5923
5924     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5925       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5926         raise errors.OpPrereqError("Missing allocator name")
5927     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5928       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5929                                  self.op.direction)
5930
5931   def Exec(self, feedback_fn):
5932     """Run the allocator test.
5933
5934     """
5935     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5936       ial = IAllocator(self.cfg, self.sstore,
5937                        mode=self.op.mode,
5938                        name=self.op.name,
5939                        mem_size=self.op.mem_size,
5940                        disks=self.op.disks,
5941                        disk_template=self.op.disk_template,
5942                        os=self.op.os,
5943                        tags=self.op.tags,
5944                        nics=self.op.nics,
5945                        vcpus=self.op.vcpus,
5946                        )
5947     else:
5948       ial = IAllocator(self.cfg, self.sstore,
5949                        mode=self.op.mode,
5950                        name=self.op.name,
5951                        relocate_from=list(self.relocate_from),
5952                        )
5953
5954     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5955       result = ial.in_text
5956     else:
5957       ial.Run(self.op.allocator, validate=False)
5958       result = ial.out_text
5959     return result