Instance migration: add delays around migration
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     tags = self.cfg.GetClusterInfo().GetTags()
862     # TODO: populate the environment with useful information for verify hooks
863     env = {
864       "CLUSTER_TAGS": " ".join(tags),
865       }
866     return env, [], all_nodes
867
868   def Exec(self, feedback_fn):
869     """Verify integrity of cluster, performing various test on nodes.
870
871     """
872     bad = False
873     feedback_fn("* Verifying global settings")
874     for msg in self.cfg.VerifyConfig():
875       feedback_fn("  - ERROR: %s" % msg)
876
877     vg_name = self.cfg.GetVGName()
878     nodelist = utils.NiceSort(self.cfg.GetNodeList())
879     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881     i_non_redundant = [] # Non redundant instances
882     node_volume = {}
883     node_instance = {}
884     node_info = {}
885     instance_cfg = {}
886
887     # FIXME: verify OS list
888     # do local checksums
889     file_names = list(self.sstore.GetFileList())
890     file_names.append(constants.SSL_CERT_FILE)
891     file_names.append(constants.CLUSTER_CONF_FILE)
892     local_checksums = utils.FingerprintFiles(file_names)
893
894     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896     all_instanceinfo = rpc.call_instance_list(nodelist)
897     all_vglist = rpc.call_vg_list(nodelist)
898     node_verify_param = {
899       'filelist': file_names,
900       'nodelist': nodelist,
901       'hypervisor': None,
902       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903                         for node in nodeinfo]
904       }
905     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906     all_rversion = rpc.call_version(nodelist)
907     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
908
909     incomplete_nodeinfo = False
910
911     for node in nodelist:
912       feedback_fn("* Verifying node %s" % node)
913       result = self._VerifyNode(node, file_names, local_checksums,
914                                 all_vglist[node], all_nvinfo[node],
915                                 all_rversion[node], feedback_fn)
916       bad = bad or result
917
918       # node_volume
919       volumeinfo = all_volumeinfo[node]
920
921       if isinstance(volumeinfo, basestring):
922         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
923                     (node, volumeinfo[-400:].encode('string_escape')))
924         bad = True
925         node_volume[node] = {}
926       elif not isinstance(volumeinfo, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         incomplete_nodeinfo = True
930         continue
931       else:
932         node_volume[node] = volumeinfo
933
934       # node_instance
935       nodeinstance = all_instanceinfo[node]
936       if type(nodeinstance) != list:
937         feedback_fn("  - ERROR: connection to %s failed" % (node,))
938         bad = True
939         incomplete_nodeinfo = True
940         continue
941
942       node_instance[node] = nodeinstance
943
944       # node_info
945       nodeinfo = all_ninfo[node]
946       if not isinstance(nodeinfo, dict):
947         feedback_fn("  - ERROR: connection to %s failed" % (node,))
948         bad = True
949         incomplete_nodeinfo = True
950         continue
951
952       try:
953         node_info[node] = {
954           "mfree": int(nodeinfo['memory_free']),
955           "dfree": int(nodeinfo['vg_free']),
956           "pinst": [],
957           "sinst": [],
958           # dictionary holding all instances this node is secondary for,
959           # grouped by their primary node. Each key is a cluster node, and each
960           # value is a list of instances which have the key as primary and the
961           # current node as secondary.  this is handy to calculate N+1 memory
962           # availability if you can only failover from a primary to its
963           # secondary.
964           "sinst-by-pnode": {},
965         }
966       except (ValueError, TypeError):
967         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968         bad = True
969         incomplete_nodeinfo = True
970         continue
971
972     node_vol_should = {}
973
974     for instance in instancelist:
975       feedback_fn("* Verifying instance %s" % instance)
976       inst_config = self.cfg.GetInstanceInfo(instance)
977       result =  self._VerifyInstance(instance, inst_config, node_volume,
978                                      node_instance, feedback_fn)
979       bad = bad or result
980
981       inst_config.MapLVsByNode(node_vol_should)
982
983       instance_cfg[instance] = inst_config
984
985       pnode = inst_config.primary_node
986       if pnode in node_info:
987         node_info[pnode]['pinst'].append(instance)
988       else:
989         feedback_fn("  - ERROR: instance %s, connection to primary node"
990                     " %s failed" % (instance, pnode))
991         bad = True
992
993       # If the instance is non-redundant we cannot survive losing its primary
994       # node, so we are not N+1 compliant. On the other hand we have no disk
995       # templates with more than one secondary so that situation is not well
996       # supported either.
997       # FIXME: does not support file-backed instances
998       if len(inst_config.secondary_nodes) == 0:
999         i_non_redundant.append(instance)
1000       elif len(inst_config.secondary_nodes) > 1:
1001         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1002                     % instance)
1003
1004       for snode in inst_config.secondary_nodes:
1005         if snode in node_info:
1006           node_info[snode]['sinst'].append(instance)
1007           if pnode not in node_info[snode]['sinst-by-pnode']:
1008             node_info[snode]['sinst-by-pnode'][pnode] = []
1009           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1010         else:
1011           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1012                       " %s failed" % (instance, snode))
1013
1014     feedback_fn("* Verifying orphan volumes")
1015     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1016                                        feedback_fn)
1017     bad = bad or result
1018
1019     feedback_fn("* Verifying remaining instances")
1020     result = self._VerifyOrphanInstances(instancelist, node_instance,
1021                                          feedback_fn)
1022     bad = bad or result
1023
1024     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025         not incomplete_nodeinfo):
1026       feedback_fn("* Verifying N+1 Memory redundancy")
1027       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1028       bad = bad or result
1029
1030     feedback_fn("* Other Notes")
1031     if i_non_redundant:
1032       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1033                   % len(i_non_redundant))
1034
1035     return int(bad)
1036
1037   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038     """Analize the post-hooks' result, handle it, and send some
1039     nicely-formatted feedback back to the user.
1040
1041     Args:
1042       phase: the hooks phase that has just been run
1043       hooks_results: the results of the multi-node hooks rpc call
1044       feedback_fn: function to send feedback back to the caller
1045       lu_result: previous Exec result
1046
1047     """
1048     # We only really run POST phase hooks, and are only interested in their results
1049     if phase == constants.HOOKS_PHASE_POST:
1050       # Used to change hooks' output to proper indentation
1051       indent_re = re.compile('^', re.M)
1052       feedback_fn("* Hooks Results")
1053       if not hooks_results:
1054         feedback_fn("  - ERROR: general communication failure")
1055         lu_result = 1
1056       else:
1057         for node_name in hooks_results:
1058           show_node_header = True
1059           res = hooks_results[node_name]
1060           if res is False or not isinstance(res, list):
1061             feedback_fn("    Communication failure")
1062             lu_result = 1
1063             continue
1064           for script, hkr, output in res:
1065             if hkr == constants.HKR_FAIL:
1066               # The node header is only shown once, if there are
1067               # failing hooks on that node
1068               if show_node_header:
1069                 feedback_fn("  Node %s:" % node_name)
1070                 show_node_header = False
1071               feedback_fn("    ERROR: Script %s failed, output:" % script)
1072               output = indent_re.sub('      ', output)
1073               feedback_fn("%s" % output)
1074               lu_result = 1
1075
1076       return lu_result
1077
1078
1079 class LUVerifyDisks(NoHooksLU):
1080   """Verifies the cluster disks status.
1081
1082   """
1083   _OP_REQP = []
1084
1085   def CheckPrereq(self):
1086     """Check prerequisites.
1087
1088     This has no prerequisites.
1089
1090     """
1091     pass
1092
1093   def Exec(self, feedback_fn):
1094     """Verify integrity of cluster disks.
1095
1096     """
1097     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1098
1099     vg_name = self.cfg.GetVGName()
1100     nodes = utils.NiceSort(self.cfg.GetNodeList())
1101     instances = [self.cfg.GetInstanceInfo(name)
1102                  for name in self.cfg.GetInstanceList()]
1103
1104     nv_dict = {}
1105     for inst in instances:
1106       inst_lvs = {}
1107       if (inst.status != "up" or
1108           inst.disk_template not in constants.DTS_NET_MIRROR):
1109         continue
1110       inst.MapLVsByNode(inst_lvs)
1111       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112       for node, vol_list in inst_lvs.iteritems():
1113         for vol in vol_list:
1114           nv_dict[(node, vol)] = inst
1115
1116     if not nv_dict:
1117       return result
1118
1119     node_lvs = rpc.call_volume_list(nodes, vg_name)
1120
1121     to_act = set()
1122     for node in nodes:
1123       # node_volume
1124       lvs = node_lvs[node]
1125
1126       if isinstance(lvs, basestring):
1127         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128         res_nlvm[node] = lvs
1129       elif not isinstance(lvs, dict):
1130         logger.Info("connection to node %s failed or invalid data returned" %
1131                     (node,))
1132         res_nodes.append(node)
1133         continue
1134
1135       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136         inst = nv_dict.pop((node, lv_name), None)
1137         if (not lv_online and inst is not None
1138             and inst.name not in res_instances):
1139           res_instances.append(inst.name)
1140
1141     # any leftover items in nv_dict are missing LVs, let's arrange the
1142     # data better
1143     for key, inst in nv_dict.iteritems():
1144       if inst.name not in res_missing:
1145         res_missing[inst.name] = []
1146       res_missing[inst.name].append(key)
1147
1148     return result
1149
1150
1151 class LURenameCluster(LogicalUnit):
1152   """Rename the cluster.
1153
1154   """
1155   HPATH = "cluster-rename"
1156   HTYPE = constants.HTYPE_CLUSTER
1157   _OP_REQP = ["name"]
1158
1159   def BuildHooksEnv(self):
1160     """Build hooks env.
1161
1162     """
1163     env = {
1164       "OP_TARGET": self.sstore.GetClusterName(),
1165       "NEW_NAME": self.op.name,
1166       }
1167     mn = self.sstore.GetMasterNode()
1168     return env, [mn], [mn]
1169
1170   def CheckPrereq(self):
1171     """Verify that the passed name is a valid one.
1172
1173     """
1174     hostname = utils.HostInfo(self.op.name)
1175
1176     new_name = hostname.name
1177     self.ip = new_ip = hostname.ip
1178     old_name = self.sstore.GetClusterName()
1179     old_ip = self.sstore.GetMasterIP()
1180     if new_name == old_name and new_ip == old_ip:
1181       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182                                  " cluster has changed")
1183     if new_ip != old_ip:
1184       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1185         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1186                                    " reachable on the network. Aborting." %
1187                                    new_ip)
1188
1189     self.op.name = new_name
1190
1191   def Exec(self, feedback_fn):
1192     """Rename the cluster.
1193
1194     """
1195     clustername = self.op.name
1196     ip = self.ip
1197     ss = self.sstore
1198
1199     # shutdown the master IP
1200     master = ss.GetMasterNode()
1201     if not rpc.call_node_stop_master(master):
1202       raise errors.OpExecError("Could not disable the master role")
1203
1204     try:
1205       # modify the sstore
1206       ss.SetKey(ss.SS_MASTER_IP, ip)
1207       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1208
1209       # Distribute updated ss config to all nodes
1210       myself = self.cfg.GetNodeInfo(master)
1211       dist_nodes = self.cfg.GetNodeList()
1212       if myself.name in dist_nodes:
1213         dist_nodes.remove(myself.name)
1214
1215       logger.Debug("Copying updated ssconf data to all nodes")
1216       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1217         fname = ss.KeyToFilename(keyname)
1218         result = rpc.call_upload_file(dist_nodes, fname)
1219         for to_node in dist_nodes:
1220           if not result[to_node]:
1221             logger.Error("copy of file %s to node %s failed" %
1222                          (fname, to_node))
1223     finally:
1224       if not rpc.call_node_start_master(master):
1225         logger.Error("Could not re-enable the master role on the master,"
1226                      " please restart manually.")
1227
1228
1229 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1230   """Sleep and poll for an instance's disk to sync.
1231
1232   """
1233   if not instance.disks:
1234     return True
1235
1236   if not oneshot:
1237     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238
1239   node = instance.primary_node
1240
1241   for dev in instance.disks:
1242     cfgw.SetDiskID(dev, node)
1243
1244   retries = 0
1245   while True:
1246     max_time = 0
1247     done = True
1248     cumul_degraded = False
1249     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250     if not rstats:
1251       proc.LogWarning("Can't get any data from node %s" % node)
1252       retries += 1
1253       if retries >= 10:
1254         raise errors.RemoteError("Can't contact node %s for mirror data,"
1255                                  " aborting." % node)
1256       time.sleep(6)
1257       continue
1258     retries = 0
1259     for i in range(len(rstats)):
1260       mstat = rstats[i]
1261       if mstat is None:
1262         proc.LogWarning("Can't compute data for node %s/%s" %
1263                         (node, instance.disks[i].iv_name))
1264         continue
1265       # we ignore the ldisk parameter
1266       perc_done, est_time, is_degraded, _ = mstat
1267       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268       if perc_done is not None:
1269         done = False
1270         if est_time is not None:
1271           rem_time = "%d estimated seconds remaining" % est_time
1272           max_time = est_time
1273         else:
1274           rem_time = "no time estimate"
1275         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276                      (instance.disks[i].iv_name, perc_done, rem_time))
1277     if done or oneshot:
1278       break
1279
1280     if unlock:
1281       utils.Unlock('cmd')
1282     try:
1283       time.sleep(min(60, max_time))
1284     finally:
1285       if unlock:
1286         utils.Lock('cmd')
1287
1288   if done:
1289     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1290   return not cumul_degraded
1291
1292
1293 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1294   """Check that mirrors are not degraded.
1295
1296   The ldisk parameter, if True, will change the test from the
1297   is_degraded attribute (which represents overall non-ok status for
1298   the device(s)) to the ldisk (representing the local storage status).
1299
1300   """
1301   cfgw.SetDiskID(dev, node)
1302   if ldisk:
1303     idx = 6
1304   else:
1305     idx = 5
1306
1307   result = True
1308   if on_primary or dev.AssembleOnSecondary():
1309     rstats = rpc.call_blockdev_find(node, dev)
1310     if not rstats:
1311       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1312       result = False
1313     else:
1314       result = result and (not rstats[idx])
1315   if dev.children:
1316     for child in dev.children:
1317       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1318
1319   return result
1320
1321
1322 class LUDiagnoseOS(NoHooksLU):
1323   """Logical unit for OS diagnose/query.
1324
1325   """
1326   _OP_REQP = ["output_fields", "names"]
1327
1328   def CheckPrereq(self):
1329     """Check prerequisites.
1330
1331     This always succeeds, since this is a pure query LU.
1332
1333     """
1334     if self.op.names:
1335       raise errors.OpPrereqError("Selective OS query not supported")
1336
1337     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1338     _CheckOutputFields(static=[],
1339                        dynamic=self.dynamic_fields,
1340                        selected=self.op.output_fields)
1341
1342   @staticmethod
1343   def _DiagnoseByOS(node_list, rlist):
1344     """Remaps a per-node return list into an a per-os per-node dictionary
1345
1346       Args:
1347         node_list: a list with the names of all nodes
1348         rlist: a map with node names as keys and OS objects as values
1349
1350       Returns:
1351         map: a map with osnames as keys and as value another map, with
1352              nodes as
1353              keys and list of OS objects as values
1354              e.g. {"debian-etch": {"node1": [<object>,...],
1355                                    "node2": [<object>,]}
1356                   }
1357
1358     """
1359     all_os = {}
1360     for node_name, nr in rlist.iteritems():
1361       if not nr:
1362         continue
1363       for os in nr:
1364         if os.name not in all_os:
1365           # build a list of nodes for this os containing empty lists
1366           # for each node in node_list
1367           all_os[os.name] = {}
1368           for nname in node_list:
1369             all_os[os.name][nname] = []
1370         all_os[os.name][node_name].append(os)
1371     return all_os
1372
1373   def Exec(self, feedback_fn):
1374     """Compute the list of OSes.
1375
1376     """
1377     node_list = self.cfg.GetNodeList()
1378     node_data = rpc.call_os_diagnose(node_list)
1379     if node_data == False:
1380       raise errors.OpExecError("Can't gather the list of OSes")
1381     pol = self._DiagnoseByOS(node_list, node_data)
1382     output = []
1383     for os_name, os_data in pol.iteritems():
1384       row = []
1385       for field in self.op.output_fields:
1386         if field == "name":
1387           val = os_name
1388         elif field == "valid":
1389           val = utils.all([osl and osl[0] for osl in os_data.values()])
1390         elif field == "node_status":
1391           val = {}
1392           for node_name, nos_list in os_data.iteritems():
1393             val[node_name] = [(v.status, v.path) for v in nos_list]
1394         else:
1395           raise errors.ParameterError(field)
1396         row.append(val)
1397       output.append(row)
1398
1399     return output
1400
1401
1402 class LURemoveNode(LogicalUnit):
1403   """Logical unit for removing a node.
1404
1405   """
1406   HPATH = "node-remove"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This doesn't run on the target node in the pre phase as a failed
1414     node would not allows itself to run.
1415
1416     """
1417     env = {
1418       "OP_TARGET": self.op.node_name,
1419       "NODE_NAME": self.op.node_name,
1420       }
1421     all_nodes = self.cfg.GetNodeList()
1422     all_nodes.remove(self.op.node_name)
1423     return env, all_nodes, all_nodes
1424
1425   def CheckPrereq(self):
1426     """Check prerequisites.
1427
1428     This checks:
1429      - the node exists in the configuration
1430      - it does not have primary or secondary instances
1431      - it's not the master
1432
1433     Any errors are signalled by raising errors.OpPrereqError.
1434
1435     """
1436     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1437     if node is None:
1438       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1439
1440     instance_list = self.cfg.GetInstanceList()
1441
1442     masternode = self.sstore.GetMasterNode()
1443     if node.name == masternode:
1444       raise errors.OpPrereqError("Node is the master node,"
1445                                  " you need to failover first.")
1446
1447     for instance_name in instance_list:
1448       instance = self.cfg.GetInstanceInfo(instance_name)
1449       if node.name == instance.primary_node:
1450         raise errors.OpPrereqError("Instance %s still running on the node,"
1451                                    " please remove first." % instance_name)
1452       if node.name in instance.secondary_nodes:
1453         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454                                    " please remove first." % instance_name)
1455     self.op.node_name = node.name
1456     self.node = node
1457
1458   def Exec(self, feedback_fn):
1459     """Removes the node from the cluster.
1460
1461     """
1462     node = self.node
1463     logger.Info("stopping the node daemon and removing configs from node %s" %
1464                 node.name)
1465
1466     rpc.call_node_leave_cluster(node.name)
1467
1468     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1469
1470     logger.Info("Removing node %s from config" % node.name)
1471
1472     self.cfg.RemoveNode(node.name)
1473
1474     _RemoveHostFromEtcHosts(node.name)
1475
1476
1477 class LUQueryNodes(NoHooksLU):
1478   """Logical unit for querying nodes.
1479
1480   """
1481   _OP_REQP = ["output_fields", "names"]
1482
1483   def CheckPrereq(self):
1484     """Check prerequisites.
1485
1486     This checks that the fields required are valid output fields.
1487
1488     """
1489     self.dynamic_fields = frozenset([
1490       "dtotal", "dfree",
1491       "mtotal", "mnode", "mfree",
1492       "bootid",
1493       "ctotal",
1494       ])
1495
1496     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1497                                "pinst_list", "sinst_list",
1498                                "pip", "sip", "tags"],
1499                        dynamic=self.dynamic_fields,
1500                        selected=self.op.output_fields)
1501
1502     self.wanted = _GetWantedNodes(self, self.op.names)
1503
1504   def Exec(self, feedback_fn):
1505     """Computes the list of nodes and their attributes.
1506
1507     """
1508     nodenames = self.wanted
1509     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1510
1511     # begin data gathering
1512
1513     if self.dynamic_fields.intersection(self.op.output_fields):
1514       live_data = {}
1515       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1516       for name in nodenames:
1517         nodeinfo = node_data.get(name, None)
1518         if nodeinfo:
1519           live_data[name] = {
1520             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1521             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1522             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1523             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1524             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1525             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1526             "bootid": nodeinfo['bootid'],
1527             }
1528         else:
1529           live_data[name] = {}
1530     else:
1531       live_data = dict.fromkeys(nodenames, {})
1532
1533     node_to_primary = dict([(name, set()) for name in nodenames])
1534     node_to_secondary = dict([(name, set()) for name in nodenames])
1535
1536     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1537                              "sinst_cnt", "sinst_list"))
1538     if inst_fields & frozenset(self.op.output_fields):
1539       instancelist = self.cfg.GetInstanceList()
1540
1541       for instance_name in instancelist:
1542         inst = self.cfg.GetInstanceInfo(instance_name)
1543         if inst.primary_node in node_to_primary:
1544           node_to_primary[inst.primary_node].add(inst.name)
1545         for secnode in inst.secondary_nodes:
1546           if secnode in node_to_secondary:
1547             node_to_secondary[secnode].add(inst.name)
1548
1549     # end data gathering
1550
1551     output = []
1552     for node in nodelist:
1553       node_output = []
1554       for field in self.op.output_fields:
1555         if field == "name":
1556           val = node.name
1557         elif field == "pinst_list":
1558           val = list(node_to_primary[node.name])
1559         elif field == "sinst_list":
1560           val = list(node_to_secondary[node.name])
1561         elif field == "pinst_cnt":
1562           val = len(node_to_primary[node.name])
1563         elif field == "sinst_cnt":
1564           val = len(node_to_secondary[node.name])
1565         elif field == "pip":
1566           val = node.primary_ip
1567         elif field == "sip":
1568           val = node.secondary_ip
1569         elif field == "tags":
1570           val = list(node.GetTags())
1571         elif field in self.dynamic_fields:
1572           val = live_data[node.name].get(field, None)
1573         else:
1574           raise errors.ParameterError(field)
1575         node_output.append(val)
1576       output.append(node_output)
1577
1578     return output
1579
1580
1581 class LUQueryNodeVolumes(NoHooksLU):
1582   """Logical unit for getting volumes on node(s).
1583
1584   """
1585   _OP_REQP = ["nodes", "output_fields"]
1586
1587   def CheckPrereq(self):
1588     """Check prerequisites.
1589
1590     This checks that the fields required are valid output fields.
1591
1592     """
1593     self.nodes = _GetWantedNodes(self, self.op.nodes)
1594
1595     _CheckOutputFields(static=["node"],
1596                        dynamic=["phys", "vg", "name", "size", "instance"],
1597                        selected=self.op.output_fields)
1598
1599
1600   def Exec(self, feedback_fn):
1601     """Computes the list of nodes and their attributes.
1602
1603     """
1604     nodenames = self.nodes
1605     volumes = rpc.call_node_volumes(nodenames)
1606
1607     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1608              in self.cfg.GetInstanceList()]
1609
1610     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1611
1612     output = []
1613     for node in nodenames:
1614       if node not in volumes or not volumes[node]:
1615         continue
1616
1617       node_vols = volumes[node][:]
1618       node_vols.sort(key=lambda vol: vol['dev'])
1619
1620       for vol in node_vols:
1621         node_output = []
1622         for field in self.op.output_fields:
1623           if field == "node":
1624             val = node
1625           elif field == "phys":
1626             val = vol['dev']
1627           elif field == "vg":
1628             val = vol['vg']
1629           elif field == "name":
1630             val = vol['name']
1631           elif field == "size":
1632             val = int(float(vol['size']))
1633           elif field == "instance":
1634             for inst in ilist:
1635               if node not in lv_by_node[inst]:
1636                 continue
1637               if vol['name'] in lv_by_node[inst][node]:
1638                 val = inst.name
1639                 break
1640             else:
1641               val = '-'
1642           else:
1643             raise errors.ParameterError(field)
1644           node_output.append(str(val))
1645
1646         output.append(node_output)
1647
1648     return output
1649
1650
1651 class LUAddNode(LogicalUnit):
1652   """Logical unit for adding node to the cluster.
1653
1654   """
1655   HPATH = "node-add"
1656   HTYPE = constants.HTYPE_NODE
1657   _OP_REQP = ["node_name"]
1658
1659   def BuildHooksEnv(self):
1660     """Build hooks env.
1661
1662     This will run on all nodes before, and on all nodes + the new node after.
1663
1664     """
1665     env = {
1666       "OP_TARGET": self.op.node_name,
1667       "NODE_NAME": self.op.node_name,
1668       "NODE_PIP": self.op.primary_ip,
1669       "NODE_SIP": self.op.secondary_ip,
1670       }
1671     nodes_0 = self.cfg.GetNodeList()
1672     nodes_1 = nodes_0 + [self.op.node_name, ]
1673     return env, nodes_0, nodes_1
1674
1675   def CheckPrereq(self):
1676     """Check prerequisites.
1677
1678     This checks:
1679      - the new node is not already in the config
1680      - it is resolvable
1681      - its parameters (single/dual homed) matches the cluster
1682
1683     Any errors are signalled by raising errors.OpPrereqError.
1684
1685     """
1686     node_name = self.op.node_name
1687     cfg = self.cfg
1688
1689     dns_data = utils.HostInfo(node_name)
1690
1691     node = dns_data.name
1692     primary_ip = self.op.primary_ip = dns_data.ip
1693     secondary_ip = getattr(self.op, "secondary_ip", None)
1694     if secondary_ip is None:
1695       secondary_ip = primary_ip
1696     if not utils.IsValidIP(secondary_ip):
1697       raise errors.OpPrereqError("Invalid secondary IP given")
1698     self.op.secondary_ip = secondary_ip
1699
1700     node_list = cfg.GetNodeList()
1701     if not self.op.readd and node in node_list:
1702       raise errors.OpPrereqError("Node %s is already in the configuration" %
1703                                  node)
1704     elif self.op.readd and node not in node_list:
1705       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1706
1707     for existing_node_name in node_list:
1708       existing_node = cfg.GetNodeInfo(existing_node_name)
1709
1710       if self.op.readd and node == existing_node_name:
1711         if (existing_node.primary_ip != primary_ip or
1712             existing_node.secondary_ip != secondary_ip):
1713           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1714                                      " address configuration as before")
1715         continue
1716
1717       if (existing_node.primary_ip == primary_ip or
1718           existing_node.secondary_ip == primary_ip or
1719           existing_node.primary_ip == secondary_ip or
1720           existing_node.secondary_ip == secondary_ip):
1721         raise errors.OpPrereqError("New node ip address(es) conflict with"
1722                                    " existing node %s" % existing_node.name)
1723
1724     # check that the type of the node (single versus dual homed) is the
1725     # same as for the master
1726     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1727     master_singlehomed = myself.secondary_ip == myself.primary_ip
1728     newbie_singlehomed = secondary_ip == primary_ip
1729     if master_singlehomed != newbie_singlehomed:
1730       if master_singlehomed:
1731         raise errors.OpPrereqError("The master has no private ip but the"
1732                                    " new node has one")
1733       else:
1734         raise errors.OpPrereqError("The master has a private ip but the"
1735                                    " new node doesn't have one")
1736
1737     # checks reachablity
1738     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1739       raise errors.OpPrereqError("Node not reachable by ping")
1740
1741     if not newbie_singlehomed:
1742       # check reachability from my secondary ip to newbie's secondary ip
1743       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1744                            source=myself.secondary_ip):
1745         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1746                                    " based ping to noded port")
1747
1748     self.new_node = objects.Node(name=node,
1749                                  primary_ip=primary_ip,
1750                                  secondary_ip=secondary_ip)
1751
1752     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1753       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1754         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1755                                    constants.VNC_PASSWORD_FILE)
1756
1757   def Exec(self, feedback_fn):
1758     """Adds the new node to the cluster.
1759
1760     """
1761     new_node = self.new_node
1762     node = new_node.name
1763
1764     # set up inter-node password and certificate and restarts the node daemon
1765     gntpass = self.sstore.GetNodeDaemonPassword()
1766     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1767       raise errors.OpExecError("ganeti password corruption detected")
1768     f = open(constants.SSL_CERT_FILE)
1769     try:
1770       gntpem = f.read(8192)
1771     finally:
1772       f.close()
1773     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1774     # so we use this to detect an invalid certificate; as long as the
1775     # cert doesn't contain this, the here-document will be correctly
1776     # parsed by the shell sequence below
1777     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1778       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1779     if not gntpem.endswith("\n"):
1780       raise errors.OpExecError("PEM must end with newline")
1781     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1782
1783     # and then connect with ssh to set password and start ganeti-noded
1784     # note that all the below variables are sanitized at this point,
1785     # either by being constants or by the checks above
1786     ss = self.sstore
1787     mycommand = ("umask 077 && "
1788                  "echo '%s' > '%s' && "
1789                  "cat > '%s' << '!EOF.' && \n"
1790                  "%s!EOF.\n%s restart" %
1791                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1792                   constants.SSL_CERT_FILE, gntpem,
1793                   constants.NODE_INITD_SCRIPT))
1794
1795     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1796     if result.failed:
1797       raise errors.OpExecError("Remote command on node %s, error: %s,"
1798                                " output: %s" %
1799                                (node, result.fail_reason, result.output))
1800
1801     # check connectivity
1802     time.sleep(4)
1803
1804     result = rpc.call_version([node])[node]
1805     if result:
1806       if constants.PROTOCOL_VERSION == result:
1807         logger.Info("communication to node %s fine, sw version %s match" %
1808                     (node, result))
1809       else:
1810         raise errors.OpExecError("Version mismatch master version %s,"
1811                                  " node version %s" %
1812                                  (constants.PROTOCOL_VERSION, result))
1813     else:
1814       raise errors.OpExecError("Cannot get version from the new node")
1815
1816     # setup ssh on node
1817     logger.Info("copy ssh key to node %s" % node)
1818     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1819     keyarray = []
1820     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1821                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1822                 priv_key, pub_key]
1823
1824     for i in keyfiles:
1825       f = open(i, 'r')
1826       try:
1827         keyarray.append(f.read())
1828       finally:
1829         f.close()
1830
1831     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1832                                keyarray[3], keyarray[4], keyarray[5])
1833
1834     if not result:
1835       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1836
1837     # Add node to our /etc/hosts, and add key to known_hosts
1838     _AddHostToEtcHosts(new_node.name)
1839
1840     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1841                       self.cfg.GetHostKey())
1842
1843     if new_node.secondary_ip != new_node.primary_ip:
1844       if not rpc.call_node_tcp_ping(new_node.name,
1845                                     constants.LOCALHOST_IP_ADDRESS,
1846                                     new_node.secondary_ip,
1847                                     constants.DEFAULT_NODED_PORT,
1848                                     10, False):
1849         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1850                                  " you gave (%s). Please fix and re-run this"
1851                                  " command." % new_node.secondary_ip)
1852
1853     success, msg = ssh.VerifyNodeHostname(node)
1854     if not success:
1855       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1856                                " than the one the resolver gives: %s."
1857                                " Please fix and re-run this command." %
1858                                (node, msg))
1859
1860     # Distribute updated /etc/hosts and known_hosts to all nodes,
1861     # including the node just added
1862     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1863     dist_nodes = self.cfg.GetNodeList()
1864     if not self.op.readd:
1865       dist_nodes.append(node)
1866     if myself.name in dist_nodes:
1867       dist_nodes.remove(myself.name)
1868
1869     logger.Debug("Copying hosts and known_hosts to all nodes")
1870     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1871       result = rpc.call_upload_file(dist_nodes, fname)
1872       for to_node in dist_nodes:
1873         if not result[to_node]:
1874           logger.Error("copy of file %s to node %s failed" %
1875                        (fname, to_node))
1876
1877     to_copy = ss.GetFileList()
1878     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1879       to_copy.append(constants.VNC_PASSWORD_FILE)
1880     for fname in to_copy:
1881       if not ssh.CopyFileToNode(node, fname):
1882         logger.Error("could not copy file %s to node %s" % (fname, node))
1883
1884     if not self.op.readd:
1885       logger.Info("adding node %s to cluster.conf" % node)
1886       self.cfg.AddNode(new_node)
1887
1888
1889 class LUMasterFailover(LogicalUnit):
1890   """Failover the master node to the current node.
1891
1892   This is a special LU in that it must run on a non-master node.
1893
1894   """
1895   HPATH = "master-failover"
1896   HTYPE = constants.HTYPE_CLUSTER
1897   REQ_MASTER = False
1898   _OP_REQP = []
1899
1900   def BuildHooksEnv(self):
1901     """Build hooks env.
1902
1903     This will run on the new master only in the pre phase, and on all
1904     the nodes in the post phase.
1905
1906     """
1907     env = {
1908       "OP_TARGET": self.new_master,
1909       "NEW_MASTER": self.new_master,
1910       "OLD_MASTER": self.old_master,
1911       }
1912     return env, [self.new_master], self.cfg.GetNodeList()
1913
1914   def CheckPrereq(self):
1915     """Check prerequisites.
1916
1917     This checks that we are not already the master.
1918
1919     """
1920     self.new_master = utils.HostInfo().name
1921     self.old_master = self.sstore.GetMasterNode()
1922
1923     if self.old_master == self.new_master:
1924       raise errors.OpPrereqError("This commands must be run on the node"
1925                                  " where you want the new master to be."
1926                                  " %s is already the master" %
1927                                  self.old_master)
1928
1929   def Exec(self, feedback_fn):
1930     """Failover the master node.
1931
1932     This command, when run on a non-master node, will cause the current
1933     master to cease being master, and the non-master to become new
1934     master.
1935
1936     """
1937     #TODO: do not rely on gethostname returning the FQDN
1938     logger.Info("setting master to %s, old master: %s" %
1939                 (self.new_master, self.old_master))
1940
1941     if not rpc.call_node_stop_master(self.old_master):
1942       logger.Error("could disable the master role on the old master"
1943                    " %s, please disable manually" % self.old_master)
1944
1945     ss = self.sstore
1946     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1947     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1948                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1949       logger.Error("could not distribute the new simple store master file"
1950                    " to the other nodes, please check.")
1951
1952     if not rpc.call_node_start_master(self.new_master):
1953       logger.Error("could not start the master role on the new master"
1954                    " %s, please check" % self.new_master)
1955       feedback_fn("Error in activating the master IP on the new master,"
1956                   " please fix manually.")
1957
1958
1959
1960 class LUQueryClusterInfo(NoHooksLU):
1961   """Query cluster configuration.
1962
1963   """
1964   _OP_REQP = []
1965   REQ_MASTER = False
1966
1967   def CheckPrereq(self):
1968     """No prerequsites needed for this LU.
1969
1970     """
1971     pass
1972
1973   def Exec(self, feedback_fn):
1974     """Return cluster config.
1975
1976     """
1977     result = {
1978       "name": self.sstore.GetClusterName(),
1979       "software_version": constants.RELEASE_VERSION,
1980       "protocol_version": constants.PROTOCOL_VERSION,
1981       "config_version": constants.CONFIG_VERSION,
1982       "os_api_version": constants.OS_API_VERSION,
1983       "export_version": constants.EXPORT_VERSION,
1984       "master": self.sstore.GetMasterNode(),
1985       "architecture": (platform.architecture()[0], platform.machine()),
1986       "hypervisor_type": self.sstore.GetHypervisorType(),
1987       }
1988
1989     return result
1990
1991
1992 class LUClusterCopyFile(NoHooksLU):
1993   """Copy file to cluster.
1994
1995   """
1996   _OP_REQP = ["nodes", "filename"]
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     It should check that the named file exists and that the given list
2002     of nodes is valid.
2003
2004     """
2005     if not os.path.exists(self.op.filename):
2006       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2007
2008     self.nodes = _GetWantedNodes(self, self.op.nodes)
2009
2010   def Exec(self, feedback_fn):
2011     """Copy a file from master to some nodes.
2012
2013     Args:
2014       opts - class with options as members
2015       args - list containing a single element, the file name
2016     Opts used:
2017       nodes - list containing the name of target nodes; if empty, all nodes
2018
2019     """
2020     filename = self.op.filename
2021
2022     myname = utils.HostInfo().name
2023
2024     for node in self.nodes:
2025       if node == myname:
2026         continue
2027       if not ssh.CopyFileToNode(node, filename):
2028         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2029
2030
2031 class LUDumpClusterConfig(NoHooksLU):
2032   """Return a text-representation of the cluster-config.
2033
2034   """
2035   _OP_REQP = []
2036
2037   def CheckPrereq(self):
2038     """No prerequisites.
2039
2040     """
2041     pass
2042
2043   def Exec(self, feedback_fn):
2044     """Dump a representation of the cluster config to the standard output.
2045
2046     """
2047     return self.cfg.DumpConfig()
2048
2049
2050 class LURunClusterCommand(NoHooksLU):
2051   """Run a command on some nodes.
2052
2053   """
2054   _OP_REQP = ["command", "nodes"]
2055
2056   def CheckPrereq(self):
2057     """Check prerequisites.
2058
2059     It checks that the given list of nodes is valid.
2060
2061     """
2062     self.nodes = _GetWantedNodes(self, self.op.nodes)
2063
2064   def Exec(self, feedback_fn):
2065     """Run a command on some nodes.
2066
2067     """
2068     # put the master at the end of the nodes list
2069     master_node = self.sstore.GetMasterNode()
2070     if master_node in self.nodes:
2071       self.nodes.remove(master_node)
2072       self.nodes.append(master_node)
2073
2074     data = []
2075     for node in self.nodes:
2076       result = ssh.SSHCall(node, "root", self.op.command)
2077       data.append((node, result.output, result.exit_code))
2078
2079     return data
2080
2081
2082 class LUActivateInstanceDisks(NoHooksLU):
2083   """Bring up an instance's disks.
2084
2085   """
2086   _OP_REQP = ["instance_name"]
2087
2088   def CheckPrereq(self):
2089     """Check prerequisites.
2090
2091     This checks that the instance is in the cluster.
2092
2093     """
2094     instance = self.cfg.GetInstanceInfo(
2095       self.cfg.ExpandInstanceName(self.op.instance_name))
2096     if instance is None:
2097       raise errors.OpPrereqError("Instance '%s' not known" %
2098                                  self.op.instance_name)
2099     self.instance = instance
2100
2101
2102   def Exec(self, feedback_fn):
2103     """Activate the disks.
2104
2105     """
2106     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2107     if not disks_ok:
2108       raise errors.OpExecError("Cannot activate block devices")
2109
2110     return disks_info
2111
2112
2113 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2114   """Prepare the block devices for an instance.
2115
2116   This sets up the block devices on all nodes.
2117
2118   Args:
2119     instance: a ganeti.objects.Instance object
2120     ignore_secondaries: if true, errors on secondary nodes won't result
2121                         in an error return from the function
2122
2123   Returns:
2124     false if the operation failed
2125     list of (host, instance_visible_name, node_visible_name) if the operation
2126          suceeded with the mapping from node devices to instance devices
2127   """
2128   device_info = []
2129   disks_ok = True
2130   iname = instance.name
2131   # With the two passes mechanism we try to reduce the window of
2132   # opportunity for the race condition of switching DRBD to primary
2133   # before handshaking occured, but we do not eliminate it
2134
2135   # The proper fix would be to wait (with some limits) until the
2136   # connection has been made and drbd transitions from WFConnection
2137   # into any other network-connected state (Connected, SyncTarget,
2138   # SyncSource, etc.)
2139
2140   # 1st pass, assemble on all nodes in secondary mode
2141   for inst_disk in instance.disks:
2142     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2143       cfg.SetDiskID(node_disk, node)
2144       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2145       if not result:
2146         logger.Error("could not prepare block device %s on node %s"
2147                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2148         if not ignore_secondaries:
2149           disks_ok = False
2150
2151   # FIXME: race condition on drbd migration to primary
2152
2153   # 2nd pass, do only the primary node
2154   for inst_disk in instance.disks:
2155     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2156       if node != instance.primary_node:
2157         continue
2158       cfg.SetDiskID(node_disk, node)
2159       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2160       if not result:
2161         logger.Error("could not prepare block device %s on node %s"
2162                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2163         disks_ok = False
2164     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2165
2166   # leave the disks configured for the primary node
2167   # this is a workaround that would be fixed better by
2168   # improving the logical/physical id handling
2169   for disk in instance.disks:
2170     cfg.SetDiskID(disk, instance.primary_node)
2171
2172   return disks_ok, device_info
2173
2174
2175 def _StartInstanceDisks(cfg, instance, force):
2176   """Start the disks of an instance.
2177
2178   """
2179   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2180                                            ignore_secondaries=force)
2181   if not disks_ok:
2182     _ShutdownInstanceDisks(instance, cfg)
2183     if force is not None and not force:
2184       logger.Error("If the message above refers to a secondary node,"
2185                    " you can retry the operation using '--force'.")
2186     raise errors.OpExecError("Disk consistency error")
2187
2188
2189 class LUDeactivateInstanceDisks(NoHooksLU):
2190   """Shutdown an instance's disks.
2191
2192   """
2193   _OP_REQP = ["instance_name"]
2194
2195   def CheckPrereq(self):
2196     """Check prerequisites.
2197
2198     This checks that the instance is in the cluster.
2199
2200     """
2201     instance = self.cfg.GetInstanceInfo(
2202       self.cfg.ExpandInstanceName(self.op.instance_name))
2203     if instance is None:
2204       raise errors.OpPrereqError("Instance '%s' not known" %
2205                                  self.op.instance_name)
2206     self.instance = instance
2207
2208   def Exec(self, feedback_fn):
2209     """Deactivate the disks
2210
2211     """
2212     instance = self.instance
2213     ins_l = rpc.call_instance_list([instance.primary_node])
2214     ins_l = ins_l[instance.primary_node]
2215     if not type(ins_l) is list:
2216       raise errors.OpExecError("Can't contact node '%s'" %
2217                                instance.primary_node)
2218
2219     if self.instance.name in ins_l:
2220       raise errors.OpExecError("Instance is running, can't shutdown"
2221                                " block devices.")
2222
2223     _ShutdownInstanceDisks(instance, self.cfg)
2224
2225
2226 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2227   """Shutdown block devices of an instance.
2228
2229   This does the shutdown on all nodes of the instance.
2230
2231   If the ignore_primary is false, errors on the primary node are
2232   ignored.
2233
2234   """
2235   result = True
2236   for disk in instance.disks:
2237     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2238       cfg.SetDiskID(top_disk, node)
2239       if not rpc.call_blockdev_shutdown(node, top_disk):
2240         logger.Error("could not shutdown block device %s on node %s" %
2241                      (disk.iv_name, node))
2242         if not ignore_primary or node != instance.primary_node:
2243           result = False
2244   return result
2245
2246
2247 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2248   """Checks if a node has enough free memory.
2249
2250   This function check if a given node has the needed amount of free
2251   memory. In case the node has less memory or we cannot get the
2252   information from the node, this function raise an OpPrereqError
2253   exception.
2254
2255   Args:
2256     - cfg: a ConfigWriter instance
2257     - node: the node name
2258     - reason: string to use in the error message
2259     - requested: the amount of memory in MiB
2260
2261   """
2262   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2263   if not nodeinfo or not isinstance(nodeinfo, dict):
2264     raise errors.OpPrereqError("Could not contact node %s for resource"
2265                              " information" % (node,))
2266
2267   free_mem = nodeinfo[node].get('memory_free')
2268   if not isinstance(free_mem, int):
2269     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2270                              " was '%s'" % (node, free_mem))
2271   if requested > free_mem:
2272     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2273                              " needed %s MiB, available %s MiB" %
2274                              (node, reason, requested, free_mem))
2275
2276
2277 class LUStartupInstance(LogicalUnit):
2278   """Starts an instance.
2279
2280   """
2281   HPATH = "instance-start"
2282   HTYPE = constants.HTYPE_INSTANCE
2283   _OP_REQP = ["instance_name", "force"]
2284
2285   def BuildHooksEnv(self):
2286     """Build hooks env.
2287
2288     This runs on master, primary and secondary nodes of the instance.
2289
2290     """
2291     env = {
2292       "FORCE": self.op.force,
2293       }
2294     env.update(_BuildInstanceHookEnvByObject(self.instance))
2295     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2296           list(self.instance.secondary_nodes))
2297     return env, nl, nl
2298
2299   def CheckPrereq(self):
2300     """Check prerequisites.
2301
2302     This checks that the instance is in the cluster.
2303
2304     """
2305     instance = self.cfg.GetInstanceInfo(
2306       self.cfg.ExpandInstanceName(self.op.instance_name))
2307     if instance is None:
2308       raise errors.OpPrereqError("Instance '%s' not known" %
2309                                  self.op.instance_name)
2310
2311     # check bridges existance
2312     _CheckInstanceBridgesExist(instance)
2313
2314     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2315                          "starting instance %s" % instance.name,
2316                          instance.memory)
2317
2318     self.instance = instance
2319     self.op.instance_name = instance.name
2320
2321   def Exec(self, feedback_fn):
2322     """Start the instance.
2323
2324     """
2325     instance = self.instance
2326     force = self.op.force
2327     extra_args = getattr(self.op, "extra_args", "")
2328
2329     self.cfg.MarkInstanceUp(instance.name)
2330
2331     node_current = instance.primary_node
2332
2333     _StartInstanceDisks(self.cfg, instance, force)
2334
2335     if not rpc.call_instance_start(node_current, instance, extra_args):
2336       _ShutdownInstanceDisks(instance, self.cfg)
2337       raise errors.OpExecError("Could not start instance")
2338
2339
2340 class LURebootInstance(LogicalUnit):
2341   """Reboot an instance.
2342
2343   """
2344   HPATH = "instance-reboot"
2345   HTYPE = constants.HTYPE_INSTANCE
2346   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2347
2348   def BuildHooksEnv(self):
2349     """Build hooks env.
2350
2351     This runs on master, primary and secondary nodes of the instance.
2352
2353     """
2354     env = {
2355       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2356       }
2357     env.update(_BuildInstanceHookEnvByObject(self.instance))
2358     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2359           list(self.instance.secondary_nodes))
2360     return env, nl, nl
2361
2362   def CheckPrereq(self):
2363     """Check prerequisites.
2364
2365     This checks that the instance is in the cluster.
2366
2367     """
2368     instance = self.cfg.GetInstanceInfo(
2369       self.cfg.ExpandInstanceName(self.op.instance_name))
2370     if instance is None:
2371       raise errors.OpPrereqError("Instance '%s' not known" %
2372                                  self.op.instance_name)
2373
2374     # check bridges existance
2375     _CheckInstanceBridgesExist(instance)
2376
2377     self.instance = instance
2378     self.op.instance_name = instance.name
2379
2380   def Exec(self, feedback_fn):
2381     """Reboot the instance.
2382
2383     """
2384     instance = self.instance
2385     ignore_secondaries = self.op.ignore_secondaries
2386     reboot_type = self.op.reboot_type
2387     extra_args = getattr(self.op, "extra_args", "")
2388
2389     node_current = instance.primary_node
2390
2391     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2392                            constants.INSTANCE_REBOOT_HARD,
2393                            constants.INSTANCE_REBOOT_FULL]:
2394       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2395                                   (constants.INSTANCE_REBOOT_SOFT,
2396                                    constants.INSTANCE_REBOOT_HARD,
2397                                    constants.INSTANCE_REBOOT_FULL))
2398
2399     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2400                        constants.INSTANCE_REBOOT_HARD]:
2401       if not rpc.call_instance_reboot(node_current, instance,
2402                                       reboot_type, extra_args):
2403         raise errors.OpExecError("Could not reboot instance")
2404     else:
2405       if not rpc.call_instance_shutdown(node_current, instance):
2406         raise errors.OpExecError("could not shutdown instance for full reboot")
2407       _ShutdownInstanceDisks(instance, self.cfg)
2408       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2409       if not rpc.call_instance_start(node_current, instance, extra_args):
2410         _ShutdownInstanceDisks(instance, self.cfg)
2411         raise errors.OpExecError("Could not start instance for full reboot")
2412
2413     self.cfg.MarkInstanceUp(instance.name)
2414
2415
2416 class LUShutdownInstance(LogicalUnit):
2417   """Shutdown an instance.
2418
2419   """
2420   HPATH = "instance-stop"
2421   HTYPE = constants.HTYPE_INSTANCE
2422   _OP_REQP = ["instance_name"]
2423
2424   def BuildHooksEnv(self):
2425     """Build hooks env.
2426
2427     This runs on master, primary and secondary nodes of the instance.
2428
2429     """
2430     env = _BuildInstanceHookEnvByObject(self.instance)
2431     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2432           list(self.instance.secondary_nodes))
2433     return env, nl, nl
2434
2435   def CheckPrereq(self):
2436     """Check prerequisites.
2437
2438     This checks that the instance is in the cluster.
2439
2440     """
2441     instance = self.cfg.GetInstanceInfo(
2442       self.cfg.ExpandInstanceName(self.op.instance_name))
2443     if instance is None:
2444       raise errors.OpPrereqError("Instance '%s' not known" %
2445                                  self.op.instance_name)
2446     self.instance = instance
2447
2448   def Exec(self, feedback_fn):
2449     """Shutdown the instance.
2450
2451     """
2452     instance = self.instance
2453     node_current = instance.primary_node
2454     self.cfg.MarkInstanceDown(instance.name)
2455     if not rpc.call_instance_shutdown(node_current, instance):
2456       logger.Error("could not shutdown instance")
2457
2458     _ShutdownInstanceDisks(instance, self.cfg)
2459
2460
2461 class LUReinstallInstance(LogicalUnit):
2462   """Reinstall an instance.
2463
2464   """
2465   HPATH = "instance-reinstall"
2466   HTYPE = constants.HTYPE_INSTANCE
2467   _OP_REQP = ["instance_name"]
2468
2469   def BuildHooksEnv(self):
2470     """Build hooks env.
2471
2472     This runs on master, primary and secondary nodes of the instance.
2473
2474     """
2475     env = _BuildInstanceHookEnvByObject(self.instance)
2476     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2477           list(self.instance.secondary_nodes))
2478     return env, nl, nl
2479
2480   def CheckPrereq(self):
2481     """Check prerequisites.
2482
2483     This checks that the instance is in the cluster and is not running.
2484
2485     """
2486     instance = self.cfg.GetInstanceInfo(
2487       self.cfg.ExpandInstanceName(self.op.instance_name))
2488     if instance is None:
2489       raise errors.OpPrereqError("Instance '%s' not known" %
2490                                  self.op.instance_name)
2491     if instance.disk_template == constants.DT_DISKLESS:
2492       raise errors.OpPrereqError("Instance '%s' has no disks" %
2493                                  self.op.instance_name)
2494     if instance.status != "down":
2495       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2496                                  self.op.instance_name)
2497     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2498     if remote_info:
2499       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2500                                  (self.op.instance_name,
2501                                   instance.primary_node))
2502
2503     self.op.os_type = getattr(self.op, "os_type", None)
2504     if self.op.os_type is not None:
2505       # OS verification
2506       pnode = self.cfg.GetNodeInfo(
2507         self.cfg.ExpandNodeName(instance.primary_node))
2508       if pnode is None:
2509         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2510                                    self.op.pnode)
2511       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2512       if not os_obj:
2513         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2514                                    " primary node"  % self.op.os_type)
2515
2516     self.instance = instance
2517
2518   def Exec(self, feedback_fn):
2519     """Reinstall the instance.
2520
2521     """
2522     inst = self.instance
2523
2524     if self.op.os_type is not None:
2525       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2526       inst.os = self.op.os_type
2527       self.cfg.AddInstance(inst)
2528
2529     _StartInstanceDisks(self.cfg, inst, None)
2530     try:
2531       feedback_fn("Running the instance OS create scripts...")
2532       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2533         raise errors.OpExecError("Could not install OS for instance %s"
2534                                  " on node %s" %
2535                                  (inst.name, inst.primary_node))
2536     finally:
2537       _ShutdownInstanceDisks(inst, self.cfg)
2538
2539
2540 class LURenameInstance(LogicalUnit):
2541   """Rename an instance.
2542
2543   """
2544   HPATH = "instance-rename"
2545   HTYPE = constants.HTYPE_INSTANCE
2546   _OP_REQP = ["instance_name", "new_name"]
2547
2548   def BuildHooksEnv(self):
2549     """Build hooks env.
2550
2551     This runs on master, primary and secondary nodes of the instance.
2552
2553     """
2554     env = _BuildInstanceHookEnvByObject(self.instance)
2555     env["INSTANCE_NEW_NAME"] = self.op.new_name
2556     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2557           list(self.instance.secondary_nodes))
2558     return env, nl, nl
2559
2560   def CheckPrereq(self):
2561     """Check prerequisites.
2562
2563     This checks that the instance is in the cluster and is not running.
2564
2565     """
2566     instance = self.cfg.GetInstanceInfo(
2567       self.cfg.ExpandInstanceName(self.op.instance_name))
2568     if instance is None:
2569       raise errors.OpPrereqError("Instance '%s' not known" %
2570                                  self.op.instance_name)
2571     if instance.status != "down":
2572       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2573                                  self.op.instance_name)
2574     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2575     if remote_info:
2576       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2577                                  (self.op.instance_name,
2578                                   instance.primary_node))
2579     self.instance = instance
2580
2581     # new name verification
2582     name_info = utils.HostInfo(self.op.new_name)
2583
2584     self.op.new_name = new_name = name_info.name
2585     instance_list = self.cfg.GetInstanceList()
2586     if new_name in instance_list:
2587       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2588                                  new_name)
2589
2590     if not getattr(self.op, "ignore_ip", False):
2591       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2592         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2593                                    (name_info.ip, new_name))
2594
2595
2596   def Exec(self, feedback_fn):
2597     """Reinstall the instance.
2598
2599     """
2600     inst = self.instance
2601     old_name = inst.name
2602
2603     self.cfg.RenameInstance(inst.name, self.op.new_name)
2604
2605     # re-read the instance from the configuration after rename
2606     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2607
2608     _StartInstanceDisks(self.cfg, inst, None)
2609     try:
2610       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2611                                           "sda", "sdb"):
2612         msg = ("Could run OS rename script for instance %s on node %s (but the"
2613                " instance has been renamed in Ganeti)" %
2614                (inst.name, inst.primary_node))
2615         logger.Error(msg)
2616     finally:
2617       _ShutdownInstanceDisks(inst, self.cfg)
2618
2619
2620 class LURemoveInstance(LogicalUnit):
2621   """Remove an instance.
2622
2623   """
2624   HPATH = "instance-remove"
2625   HTYPE = constants.HTYPE_INSTANCE
2626   _OP_REQP = ["instance_name", "ignore_failures"]
2627
2628   def BuildHooksEnv(self):
2629     """Build hooks env.
2630
2631     This runs on master, primary and secondary nodes of the instance.
2632
2633     """
2634     env = _BuildInstanceHookEnvByObject(self.instance)
2635     nl = [self.sstore.GetMasterNode()]
2636     return env, nl, nl
2637
2638   def CheckPrereq(self):
2639     """Check prerequisites.
2640
2641     This checks that the instance is in the cluster.
2642
2643     """
2644     instance = self.cfg.GetInstanceInfo(
2645       self.cfg.ExpandInstanceName(self.op.instance_name))
2646     if instance is None:
2647       raise errors.OpPrereqError("Instance '%s' not known" %
2648                                  self.op.instance_name)
2649     self.instance = instance
2650
2651   def Exec(self, feedback_fn):
2652     """Remove the instance.
2653
2654     """
2655     instance = self.instance
2656     logger.Info("shutting down instance %s on node %s" %
2657                 (instance.name, instance.primary_node))
2658
2659     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2660       if self.op.ignore_failures:
2661         feedback_fn("Warning: can't shutdown instance")
2662       else:
2663         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2664                                  (instance.name, instance.primary_node))
2665
2666     logger.Info("removing block devices for instance %s" % instance.name)
2667
2668     if not _RemoveDisks(instance, self.cfg):
2669       if self.op.ignore_failures:
2670         feedback_fn("Warning: can't remove instance's disks")
2671       else:
2672         raise errors.OpExecError("Can't remove instance's disks")
2673
2674     logger.Info("removing instance %s out of cluster config" % instance.name)
2675
2676     self.cfg.RemoveInstance(instance.name)
2677
2678
2679 class LUQueryInstances(NoHooksLU):
2680   """Logical unit for querying instances.
2681
2682   """
2683   _OP_REQP = ["output_fields", "names"]
2684
2685   def CheckPrereq(self):
2686     """Check prerequisites.
2687
2688     This checks that the fields required are valid output fields.
2689
2690     """
2691     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2692     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2693                                "admin_state", "admin_ram",
2694                                "disk_template", "ip", "mac", "bridge",
2695                                "sda_size", "sdb_size", "vcpus", "tags"],
2696                        dynamic=self.dynamic_fields,
2697                        selected=self.op.output_fields)
2698
2699     self.wanted = _GetWantedInstances(self, self.op.names)
2700
2701   def Exec(self, feedback_fn):
2702     """Computes the list of nodes and their attributes.
2703
2704     """
2705     instance_names = self.wanted
2706     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2707                      in instance_names]
2708
2709     # begin data gathering
2710
2711     nodes = frozenset([inst.primary_node for inst in instance_list])
2712
2713     bad_nodes = []
2714     if self.dynamic_fields.intersection(self.op.output_fields):
2715       live_data = {}
2716       node_data = rpc.call_all_instances_info(nodes)
2717       for name in nodes:
2718         result = node_data[name]
2719         if result:
2720           live_data.update(result)
2721         elif result == False:
2722           bad_nodes.append(name)
2723         # else no instance is alive
2724     else:
2725       live_data = dict([(name, {}) for name in instance_names])
2726
2727     # end data gathering
2728
2729     output = []
2730     for instance in instance_list:
2731       iout = []
2732       for field in self.op.output_fields:
2733         if field == "name":
2734           val = instance.name
2735         elif field == "os":
2736           val = instance.os
2737         elif field == "pnode":
2738           val = instance.primary_node
2739         elif field == "snodes":
2740           val = list(instance.secondary_nodes)
2741         elif field == "admin_state":
2742           val = (instance.status != "down")
2743         elif field == "oper_state":
2744           if instance.primary_node in bad_nodes:
2745             val = None
2746           else:
2747             val = bool(live_data.get(instance.name))
2748         elif field == "status":
2749           if instance.primary_node in bad_nodes:
2750             val = "ERROR_nodedown"
2751           else:
2752             running = bool(live_data.get(instance.name))
2753             if running:
2754               if instance.status != "down":
2755                 val = "running"
2756               else:
2757                 val = "ERROR_up"
2758             else:
2759               if instance.status != "down":
2760                 val = "ERROR_down"
2761               else:
2762                 val = "ADMIN_down"
2763         elif field == "admin_ram":
2764           val = instance.memory
2765         elif field == "oper_ram":
2766           if instance.primary_node in bad_nodes:
2767             val = None
2768           elif instance.name in live_data:
2769             val = live_data[instance.name].get("memory", "?")
2770           else:
2771             val = "-"
2772         elif field == "disk_template":
2773           val = instance.disk_template
2774         elif field == "ip":
2775           val = instance.nics[0].ip
2776         elif field == "bridge":
2777           val = instance.nics[0].bridge
2778         elif field == "mac":
2779           val = instance.nics[0].mac
2780         elif field == "sda_size" or field == "sdb_size":
2781           disk = instance.FindDisk(field[:3])
2782           if disk is None:
2783             val = None
2784           else:
2785             val = disk.size
2786         elif field == "vcpus":
2787           val = instance.vcpus
2788         elif field == "tags":
2789           val = list(instance.GetTags())
2790         else:
2791           raise errors.ParameterError(field)
2792         iout.append(val)
2793       output.append(iout)
2794
2795     return output
2796
2797
2798 class LUFailoverInstance(LogicalUnit):
2799   """Failover an instance.
2800
2801   """
2802   HPATH = "instance-failover"
2803   HTYPE = constants.HTYPE_INSTANCE
2804   _OP_REQP = ["instance_name", "ignore_consistency"]
2805
2806   def BuildHooksEnv(self):
2807     """Build hooks env.
2808
2809     This runs on master, primary and secondary nodes of the instance.
2810
2811     """
2812     env = {
2813       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2814       }
2815     env.update(_BuildInstanceHookEnvByObject(self.instance))
2816     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2817     return env, nl, nl
2818
2819   def CheckPrereq(self):
2820     """Check prerequisites.
2821
2822     This checks that the instance is in the cluster.
2823
2824     """
2825     instance = self.cfg.GetInstanceInfo(
2826       self.cfg.ExpandInstanceName(self.op.instance_name))
2827     if instance is None:
2828       raise errors.OpPrereqError("Instance '%s' not known" %
2829                                  self.op.instance_name)
2830
2831     if instance.disk_template not in constants.DTS_NET_MIRROR:
2832       raise errors.OpPrereqError("Instance's disk layout is not"
2833                                  " network mirrored, cannot failover.")
2834
2835     secondary_nodes = instance.secondary_nodes
2836     if not secondary_nodes:
2837       raise errors.ProgrammerError("no secondary node but using "
2838                                    "DT_REMOTE_RAID1 template")
2839
2840     target_node = secondary_nodes[0]
2841     # check memory requirements on the secondary node
2842     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2843                          instance.name, instance.memory)
2844
2845     # check bridge existance
2846     brlist = [nic.bridge for nic in instance.nics]
2847     if not rpc.call_bridges_exist(target_node, brlist):
2848       raise errors.OpPrereqError("One or more target bridges %s does not"
2849                                  " exist on destination node '%s'" %
2850                                  (brlist, target_node))
2851
2852     self.instance = instance
2853
2854   def Exec(self, feedback_fn):
2855     """Failover an instance.
2856
2857     The failover is done by shutting it down on its present node and
2858     starting it on the secondary.
2859
2860     """
2861     instance = self.instance
2862
2863     source_node = instance.primary_node
2864     target_node = instance.secondary_nodes[0]
2865
2866     feedback_fn("* checking disk consistency between source and target")
2867     for dev in instance.disks:
2868       # for remote_raid1, these are md over drbd
2869       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2870         if instance.status == "up" and not self.op.ignore_consistency:
2871           raise errors.OpExecError("Disk %s is degraded on target node,"
2872                                    " aborting failover." % dev.iv_name)
2873
2874     feedback_fn("* shutting down instance on source node")
2875     logger.Info("Shutting down instance %s on node %s" %
2876                 (instance.name, source_node))
2877
2878     if not rpc.call_instance_shutdown(source_node, instance):
2879       if self.op.ignore_consistency:
2880         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2881                      " anyway. Please make sure node %s is down"  %
2882                      (instance.name, source_node, source_node))
2883       else:
2884         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2885                                  (instance.name, source_node))
2886
2887     feedback_fn("* deactivating the instance's disks on source node")
2888     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2889       raise errors.OpExecError("Can't shut down the instance's disks.")
2890
2891     instance.primary_node = target_node
2892     # distribute new instance config to the other nodes
2893     self.cfg.Update(instance)
2894
2895     # Only start the instance if it's marked as up
2896     if instance.status == "up":
2897       feedback_fn("* activating the instance's disks on target node")
2898       logger.Info("Starting instance %s on node %s" %
2899                   (instance.name, target_node))
2900
2901       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2902                                                ignore_secondaries=True)
2903       if not disks_ok:
2904         _ShutdownInstanceDisks(instance, self.cfg)
2905         raise errors.OpExecError("Can't activate the instance's disks")
2906
2907       feedback_fn("* starting the instance on the target node")
2908       if not rpc.call_instance_start(target_node, instance, None):
2909         _ShutdownInstanceDisks(instance, self.cfg)
2910         raise errors.OpExecError("Could not start instance %s on node %s." %
2911                                  (instance.name, target_node))
2912
2913
2914 class LUMigrateInstance(LogicalUnit):
2915   """Migrate an instance.
2916
2917   This is migration without shutting down, compared to the failover,
2918   which is done with shutdown.
2919
2920   """
2921   HPATH = "instance-migrate"
2922   HTYPE = constants.HTYPE_INSTANCE
2923   _OP_REQP = ["instance_name", "live", "cleanup"]
2924
2925   def BuildHooksEnv(self):
2926     """Build hooks env.
2927
2928     This runs on master, primary and secondary nodes of the instance.
2929
2930     """
2931     env = _BuildInstanceHookEnvByObject(self.instance)
2932     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2933     return env, nl, nl
2934
2935   def CheckPrereq(self):
2936     """Check prerequisites.
2937
2938     This checks that the instance is in the cluster.
2939
2940     """
2941     instance = self.cfg.GetInstanceInfo(
2942       self.cfg.ExpandInstanceName(self.op.instance_name))
2943     if instance is None:
2944       raise errors.OpPrereqError("Instance '%s' not known" %
2945                                  self.op.instance_name)
2946
2947     if instance.disk_template != constants.DT_DRBD8:
2948       raise errors.OpPrereqError("Instance's disk layout is not"
2949                                  " drbd8, cannot migrate.")
2950
2951     secondary_nodes = instance.secondary_nodes
2952     if not secondary_nodes:
2953       raise errors.ProgrammerError("no secondary node but using "
2954                                    "drbd8 disk template")
2955
2956     target_node = secondary_nodes[0]
2957     # check memory requirements on the secondary node
2958     _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2959                          instance.name, instance.memory)
2960
2961     # check bridge existance
2962     brlist = [nic.bridge for nic in instance.nics]
2963     if not rpc.call_bridges_exist(target_node, brlist):
2964       raise errors.OpPrereqError("One or more target bridges %s does not"
2965                                  " exist on destination node '%s'" %
2966                                  (brlist, target_node))
2967
2968     if not self.op.cleanup:
2969       migratable = rpc.call_instance_migratable(instance.primary_node,
2970                                                 instance)
2971       if not migratable:
2972         raise errors.OpPrereqError("Can't contact node '%s'" %
2973                                    instance.primary_node)
2974       if not migratable[0]:
2975         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2976                                    migratable[1])
2977
2978     self.instance = instance
2979
2980   def _WaitUntilSync(self):
2981     """Poll with custom rpc for disk sync.
2982
2983     This uses our own step-based rpc call.
2984
2985     """
2986     self.feedback_fn("* wait until resync is done")
2987     all_done = False
2988     while not all_done:
2989       all_done = True
2990       result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
2991                                           self.instance.disks,
2992                                           self.nodes_ip, False,
2993                                           constants.DRBD_RECONF_RPC_WFSYNC)
2994       min_percent = 100
2995       for node in self.all_nodes:
2996         if not result[node] or not result[node][0]:
2997           raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
2998         node_done, node_percent = result[node][1]
2999         all_done = all_done and node_done
3000         if node_percent is not None:
3001           min_percent = min(min_percent, node_percent)
3002       if not all_done:
3003         if min_percent < 100:
3004           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3005         time.sleep(2)
3006
3007   def _EnsureSecondary(self, node):
3008     """Demote a node to secondary.
3009
3010     """
3011     self.feedback_fn("* switching node %s to secondary mode" % node)
3012     result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3013                                         self.instance.disks,
3014                                         self.nodes_ip, False,
3015                                         constants.DRBD_RECONF_RPC_SECONDARY)
3016     if not result[node] or not result[node][0]:
3017         raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3018                                  " error %s" %
3019                                  (node, result[node][1]))
3020
3021   def _GoStandalone(self):
3022     """Disconnect from the network.
3023
3024     """
3025     self.feedback_fn("* changing into standalone mode")
3026     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3027                                         self.instance.disks,
3028                                         self.nodes_ip, True,
3029                                         constants.DRBD_RECONF_RPC_DISCONNECT)
3030     for node in self.all_nodes:
3031       if not result[node] or not result[node][0]:
3032         raise errors.OpExecError("Cannot disconnect disks node %s,"
3033                                  " error %s" % (node, result[node][1]))
3034
3035   def _GoReconnect(self, multimaster):
3036     """Reconnect to the network.
3037
3038     """
3039     if multimaster:
3040       msg = "dual-master"
3041     else:
3042       msg = "single-master"
3043     self.feedback_fn("* changing disks into %s mode" % msg)
3044     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3045                                         self.instance.disks,
3046                                         self.nodes_ip,
3047                                         multimaster,
3048                                         constants.DRBD_RECONF_RPC_RECONNECT)
3049     for node in self.all_nodes:
3050       if not result[node] or not result[node][0]:
3051         raise errors.OpExecError("Cannot change disks config on node %s,"
3052                                  " error %s" % (node, result[node][1]))
3053
3054   def _IdentifyDisks(self):
3055     """Start the migration RPC sequence.
3056
3057     """
3058     self.feedback_fn("* identifying disks")
3059     result = rpc.call_drbd_reconfig_net(self.all_nodes,
3060                                         self.instance.name,
3061                                         self.instance.disks,
3062                                         self.nodes_ip, True,
3063                                         constants.DRBD_RECONF_RPC_INIT)
3064     for node in self.all_nodes:
3065       if not result[node] or not result[node][0]:
3066         raise errors.OpExecError("Cannot identify disks node %s,"
3067                                  " error %s" % (node, result[node][1]))
3068
3069   def _ExecCleanup(self):
3070     """Try to cleanup after a failed migration.
3071
3072     The cleanup is done by:
3073       - check that the instance is running only on one node
3074         (and update the config if needed)
3075       - change disks on its secondary node to secondary
3076       - wait until disks are fully synchronized
3077       - disconnect from the network
3078       - change disks into single-master mode
3079       - wait again until disks are fully synchronized
3080
3081     """
3082     instance = self.instance
3083     target_node = self.target_node
3084     source_node = self.source_node
3085
3086     # check running on only one node
3087     self.feedback_fn("* checking where the instance actually runs"
3088                      " (if this hangs, the hypervisor might be in"
3089                      " a bad state)")
3090     ins_l = rpc.call_instance_list(self.all_nodes)
3091     for node in self.all_nodes:
3092       if not type(ins_l[node]) is list:
3093         raise errors.OpExecError("Can't contact node '%s'" % node)
3094
3095     runningon_source = instance.name in ins_l[source_node]
3096     runningon_target = instance.name in ins_l[target_node]
3097
3098     if runningon_source and runningon_target:
3099       raise errors.OpExecError("Instance seems to be running on two nodes,"
3100                                " or the hypervisor is confused. You will have"
3101                                " to ensure manually that it runs only on one"
3102                                " and restart this operation.")
3103
3104     if not (runningon_source or runningon_target):
3105       raise errors.OpExecError("Instance does not seem to be running at all."
3106                                " In this case, it's safer to repair by"
3107                                " running 'gnt-instance stop' to ensure disk"
3108                                " shutdown, and then restarting it.")
3109
3110     if runningon_target:
3111       # the migration has actually succeeded, we need to update the config
3112       self.feedback_fn("* instance running on secondary node (%s),"
3113                        " updating config" % target_node)
3114       instance.primary_node = target_node
3115       self.cfg.Update(instance)
3116       demoted_node = source_node
3117     else:
3118       self.feedback_fn("* instance confirmed to be running on its"
3119                        " primary node (%s)" % source_node)
3120       demoted_node = target_node
3121
3122     self._IdentifyDisks()
3123
3124     self._EnsureSecondary(demoted_node)
3125     self._WaitUntilSync()
3126     self._GoStandalone()
3127     self._GoReconnect(False)
3128     self._WaitUntilSync()
3129
3130     self.feedback_fn("* done")
3131
3132   def _ExecMigration(self):
3133     """Migrate an instance.
3134
3135     The migrate is done by:
3136       - change the disks into dual-master mode
3137       - wait until disks are fully synchronized again
3138       - migrate the instance
3139       - change disks on the new secondary node (the old primary) to secondary
3140       - wait until disks are fully synchronized
3141       - change disks into single-master mode
3142
3143     """
3144     instance = self.instance
3145     target_node = self.target_node
3146     source_node = self.source_node
3147
3148     self.feedback_fn("* checking disk consistency between source and target")
3149     for dev in instance.disks:
3150       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3151         raise errors.OpExecError("Disk %s is degraded or not fully"
3152                                  " synchronized on target node,"
3153                                  " aborting migrate." % dev.iv_name)
3154
3155     self._IdentifyDisks()
3156
3157     self._EnsureSecondary(target_node)
3158     self._GoStandalone()
3159     self._GoReconnect(True)
3160     self._WaitUntilSync()
3161
3162     self.feedback_fn("* migrating instance to %s" % target_node)
3163     time.sleep(2)
3164     result = rpc.call_instance_migrate(source_node, instance,
3165                                        self.nodes_ip[target_node],
3166                                        self.op.live)
3167     if not result or not result[0]:
3168       logger.Error("Instance migration failed, trying to revert disk status")
3169       try:
3170         self._EnsureSecondary(target_node)
3171         self._GoStandalone()
3172         self._GoReconnect(False)
3173         self._WaitUntilSync()
3174       except errors.OpExecError, err:
3175         logger.Error("Can't reconnect the drives: error '%s'\n"
3176                      "Please look and recover the instance status" % str(err))
3177
3178       raise errors.OpExecError("Could not migrate instance %s: %s" %
3179                                (instance.name, result[1]))
3180     time.sleep(2)
3181
3182     instance.primary_node = target_node
3183     # distribute new instance config to the other nodes
3184     self.cfg.Update(instance)
3185
3186     self._EnsureSecondary(source_node)
3187     self._WaitUntilSync()
3188     self._GoStandalone()
3189     self._GoReconnect(False)
3190     self._WaitUntilSync()
3191
3192     self.feedback_fn("* done")
3193
3194   def Exec(self, feedback_fn):
3195     """Perform the migration.
3196
3197     """
3198     self.feedback_fn = feedback_fn
3199
3200     self.source_node = self.instance.primary_node
3201     self.target_node = self.instance.secondary_nodes[0]
3202     self.all_nodes = [self.source_node, self.target_node]
3203     self.nodes_ip = {
3204       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3205       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3206       }
3207     if self.op.cleanup:
3208       return self._ExecCleanup()
3209     else:
3210       return self._ExecMigration()
3211
3212
3213 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3214   """Create a tree of block devices on the primary node.
3215
3216   This always creates all devices.
3217
3218   """
3219   if device.children:
3220     for child in device.children:
3221       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3222         return False
3223
3224   cfg.SetDiskID(device, node)
3225   new_id = rpc.call_blockdev_create(node, device, device.size,
3226                                     instance.name, True, info)
3227   if not new_id:
3228     return False
3229   if device.physical_id is None:
3230     device.physical_id = new_id
3231   return True
3232
3233
3234 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3235   """Create a tree of block devices on a secondary node.
3236
3237   If this device type has to be created on secondaries, create it and
3238   all its children.
3239
3240   If not, just recurse to children keeping the same 'force' value.
3241
3242   """
3243   if device.CreateOnSecondary():
3244     force = True
3245   if device.children:
3246     for child in device.children:
3247       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3248                                         child, force, info):
3249         return False
3250
3251   if not force:
3252     return True
3253   cfg.SetDiskID(device, node)
3254   new_id = rpc.call_blockdev_create(node, device, device.size,
3255                                     instance.name, False, info)
3256   if not new_id:
3257     return False
3258   if device.physical_id is None:
3259     device.physical_id = new_id
3260   return True
3261
3262
3263 def _GenerateUniqueNames(cfg, exts):
3264   """Generate a suitable LV name.
3265
3266   This will generate a logical volume name for the given instance.
3267
3268   """
3269   results = []
3270   for val in exts:
3271     new_id = cfg.GenerateUniqueID()
3272     results.append("%s%s" % (new_id, val))
3273   return results
3274
3275
3276 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3277   """Generate a drbd device complete with its children.
3278
3279   """
3280   port = cfg.AllocatePort()
3281   vgname = cfg.GetVGName()
3282   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3283                           logical_id=(vgname, names[0]))
3284   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3285                           logical_id=(vgname, names[1]))
3286   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3287                           logical_id = (primary, secondary, port),
3288                           children = [dev_data, dev_meta])
3289   return drbd_dev
3290
3291
3292 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3293   """Generate a drbd8 device complete with its children.
3294
3295   """
3296   port = cfg.AllocatePort()
3297   vgname = cfg.GetVGName()
3298   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3299                           logical_id=(vgname, names[0]))
3300   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3301                           logical_id=(vgname, names[1]))
3302   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3303                           logical_id = (primary, secondary, port),
3304                           children = [dev_data, dev_meta],
3305                           iv_name=iv_name)
3306   return drbd_dev
3307
3308 def _GenerateDiskTemplate(cfg, template_name,
3309                           instance_name, primary_node,
3310                           secondary_nodes, disk_sz, swap_sz):
3311   """Generate the entire disk layout for a given template type.
3312
3313   """
3314   #TODO: compute space requirements
3315
3316   vgname = cfg.GetVGName()
3317   if template_name == constants.DT_DISKLESS:
3318     disks = []
3319   elif template_name == constants.DT_PLAIN:
3320     if len(secondary_nodes) != 0:
3321       raise errors.ProgrammerError("Wrong template configuration")
3322
3323     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3324     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3325                            logical_id=(vgname, names[0]),
3326                            iv_name = "sda")
3327     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3328                            logical_id=(vgname, names[1]),
3329                            iv_name = "sdb")
3330     disks = [sda_dev, sdb_dev]
3331   elif template_name == constants.DT_LOCAL_RAID1:
3332     if len(secondary_nodes) != 0:
3333       raise errors.ProgrammerError("Wrong template configuration")
3334
3335
3336     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3337                                        ".sdb_m1", ".sdb_m2"])
3338     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3339                               logical_id=(vgname, names[0]))
3340     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3341                               logical_id=(vgname, names[1]))
3342     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3343                               size=disk_sz,
3344                               children = [sda_dev_m1, sda_dev_m2])
3345     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3346                               logical_id=(vgname, names[2]))
3347     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3348                               logical_id=(vgname, names[3]))
3349     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3350                               size=swap_sz,
3351                               children = [sdb_dev_m1, sdb_dev_m2])
3352     disks = [md_sda_dev, md_sdb_dev]
3353   elif template_name == constants.DT_REMOTE_RAID1:
3354     if len(secondary_nodes) != 1:
3355       raise errors.ProgrammerError("Wrong template configuration")
3356     remote_node = secondary_nodes[0]
3357     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3358                                        ".sdb_data", ".sdb_meta"])
3359     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3360                                          disk_sz, names[0:2])
3361     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3362                               children = [drbd_sda_dev], size=disk_sz)
3363     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3364                                          swap_sz, names[2:4])
3365     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3366                               children = [drbd_sdb_dev], size=swap_sz)
3367     disks = [md_sda_dev, md_sdb_dev]
3368   elif template_name == constants.DT_DRBD8:
3369     if len(secondary_nodes) != 1:
3370       raise errors.ProgrammerError("Wrong template configuration")
3371     remote_node = secondary_nodes[0]
3372     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3373                                        ".sdb_data", ".sdb_meta"])
3374     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3375                                          disk_sz, names[0:2], "sda")
3376     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3377                                          swap_sz, names[2:4], "sdb")
3378     disks = [drbd_sda_dev, drbd_sdb_dev]
3379   else:
3380     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3381   return disks
3382
3383
3384 def _GetInstanceInfoText(instance):
3385   """Compute that text that should be added to the disk's metadata.
3386
3387   """
3388   return "originstname+%s" % instance.name
3389
3390
3391 def _CreateDisks(cfg, instance):
3392   """Create all disks for an instance.
3393
3394   This abstracts away some work from AddInstance.
3395
3396   Args:
3397     instance: the instance object
3398
3399   Returns:
3400     True or False showing the success of the creation process
3401
3402   """
3403   info = _GetInstanceInfoText(instance)
3404
3405   for device in instance.disks:
3406     logger.Info("creating volume %s for instance %s" %
3407               (device.iv_name, instance.name))
3408     #HARDCODE
3409     for secondary_node in instance.secondary_nodes:
3410       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3411                                         device, False, info):
3412         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3413                      (device.iv_name, device, secondary_node))
3414         return False
3415     #HARDCODE
3416     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3417                                     instance, device, info):
3418       logger.Error("failed to create volume %s on primary!" %
3419                    device.iv_name)
3420       return False
3421   return True
3422
3423
3424 def _RemoveDisks(instance, cfg):
3425   """Remove all disks for an instance.
3426
3427   This abstracts away some work from `AddInstance()` and
3428   `RemoveInstance()`. Note that in case some of the devices couldn't
3429   be removed, the removal will continue with the other ones (compare
3430   with `_CreateDisks()`).
3431
3432   Args:
3433     instance: the instance object
3434
3435   Returns:
3436     True or False showing the success of the removal proces
3437
3438   """
3439   logger.Info("removing block devices for instance %s" % instance.name)
3440
3441   result = True
3442   for device in instance.disks:
3443     for node, disk in device.ComputeNodeTree(instance.primary_node):
3444       cfg.SetDiskID(disk, node)
3445       if not rpc.call_blockdev_remove(node, disk):
3446         logger.Error("could not remove block device %s on node %s,"
3447                      " continuing anyway" %
3448                      (device.iv_name, node))
3449         result = False
3450   return result
3451
3452
3453 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3454   """Compute disk size requirements in the volume group
3455
3456   This is currently hard-coded for the two-drive layout.
3457
3458   """
3459   # Required free disk space as a function of disk and swap space
3460   req_size_dict = {
3461     constants.DT_DISKLESS: None,
3462     constants.DT_PLAIN: disk_size + swap_size,
3463     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3464     # 256 MB are added for drbd metadata, 128MB for each drbd device
3465     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3466     constants.DT_DRBD8: disk_size + swap_size + 256,
3467   }
3468
3469   if disk_template not in req_size_dict:
3470     raise errors.ProgrammerError("Disk template '%s' size requirement"
3471                                  " is unknown" %  disk_template)
3472
3473   return req_size_dict[disk_template]
3474
3475
3476 class LUCreateInstance(LogicalUnit):
3477   """Create an instance.
3478
3479   """
3480   HPATH = "instance-add"
3481   HTYPE = constants.HTYPE_INSTANCE
3482   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3483               "disk_template", "swap_size", "mode", "start", "vcpus",
3484               "wait_for_sync", "ip_check", "mac"]
3485
3486   def _RunAllocator(self):
3487     """Run the allocator based on input opcode.
3488
3489     """
3490     disks = [{"size": self.op.disk_size, "mode": "w"},
3491              {"size": self.op.swap_size, "mode": "w"}]
3492     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3493              "bridge": self.op.bridge}]
3494     ial = IAllocator(self.cfg, self.sstore,
3495                      mode=constants.IALLOCATOR_MODE_ALLOC,
3496                      name=self.op.instance_name,
3497                      disk_template=self.op.disk_template,
3498                      tags=[],
3499                      os=self.op.os_type,
3500                      vcpus=self.op.vcpus,
3501                      mem_size=self.op.mem_size,
3502                      disks=disks,
3503                      nics=nics,
3504                      )
3505
3506     ial.Run(self.op.iallocator)
3507
3508     if not ial.success:
3509       raise errors.OpPrereqError("Can't compute nodes using"
3510                                  " iallocator '%s': %s" % (self.op.iallocator,
3511                                                            ial.info))
3512     if len(ial.nodes) != ial.required_nodes:
3513       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3514                                  " of nodes (%s), required %s" %
3515                                  (len(ial.nodes), ial.required_nodes))
3516     self.op.pnode = ial.nodes[0]
3517     logger.ToStdout("Selected nodes for the instance: %s" %
3518                     (", ".join(ial.nodes),))
3519     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3520                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3521     if ial.required_nodes == 2:
3522       self.op.snode = ial.nodes[1]
3523
3524   def BuildHooksEnv(self):
3525     """Build hooks env.
3526
3527     This runs on master, primary and secondary nodes of the instance.
3528
3529     """
3530     env = {
3531       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3532       "INSTANCE_DISK_SIZE": self.op.disk_size,
3533       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3534       "INSTANCE_ADD_MODE": self.op.mode,
3535       }
3536     if self.op.mode == constants.INSTANCE_IMPORT:
3537       env["INSTANCE_SRC_NODE"] = self.op.src_node
3538       env["INSTANCE_SRC_PATH"] = self.op.src_path
3539       env["INSTANCE_SRC_IMAGE"] = self.src_image
3540
3541     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3542       primary_node=self.op.pnode,
3543       secondary_nodes=self.secondaries,
3544       status=self.instance_status,
3545       os_type=self.op.os_type,
3546       memory=self.op.mem_size,
3547       vcpus=self.op.vcpus,
3548       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3549     ))
3550
3551     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3552           self.secondaries)
3553     return env, nl, nl
3554
3555
3556   def CheckPrereq(self):
3557     """Check prerequisites.
3558
3559     """
3560     # set optional parameters to none if they don't exist
3561     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3562                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3563                  "vnc_bind_address"]:
3564       if not hasattr(self.op, attr):
3565         setattr(self.op, attr, None)
3566
3567     if self.op.mode not in (constants.INSTANCE_CREATE,
3568                             constants.INSTANCE_IMPORT):
3569       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3570                                  self.op.mode)
3571
3572     if self.op.mode == constants.INSTANCE_IMPORT:
3573       src_node = getattr(self.op, "src_node", None)
3574       src_path = getattr(self.op, "src_path", None)
3575       if src_node is None or src_path is None:
3576         raise errors.OpPrereqError("Importing an instance requires source"
3577                                    " node and path options")
3578       src_node_full = self.cfg.ExpandNodeName(src_node)
3579       if src_node_full is None:
3580         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3581       self.op.src_node = src_node = src_node_full
3582
3583       if not os.path.isabs(src_path):
3584         raise errors.OpPrereqError("The source path must be absolute")
3585
3586       export_info = rpc.call_export_info(src_node, src_path)
3587
3588       if not export_info:
3589         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3590
3591       if not export_info.has_section(constants.INISECT_EXP):
3592         raise errors.ProgrammerError("Corrupted export config")
3593
3594       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3595       if (int(ei_version) != constants.EXPORT_VERSION):
3596         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3597                                    (ei_version, constants.EXPORT_VERSION))
3598
3599       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3600         raise errors.OpPrereqError("Can't import instance with more than"
3601                                    " one data disk")
3602
3603       # FIXME: are the old os-es, disk sizes, etc. useful?
3604       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3605       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3606                                                          'disk0_dump'))
3607       self.src_image = diskimage
3608     else: # INSTANCE_CREATE
3609       if getattr(self.op, "os_type", None) is None:
3610         raise errors.OpPrereqError("No guest OS specified")
3611
3612     #### instance parameters check
3613
3614     # disk template and mirror node verification
3615     if self.op.disk_template not in constants.DISK_TEMPLATES:
3616       raise errors.OpPrereqError("Invalid disk template name")
3617
3618     # instance name verification
3619     hostname1 = utils.HostInfo(self.op.instance_name)
3620
3621     self.op.instance_name = instance_name = hostname1.name
3622     instance_list = self.cfg.GetInstanceList()
3623     if instance_name in instance_list:
3624       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3625                                  instance_name)
3626
3627     # ip validity checks
3628     ip = getattr(self.op, "ip", None)
3629     if ip is None or ip.lower() == "none":
3630       inst_ip = None
3631     elif ip.lower() == "auto":
3632       inst_ip = hostname1.ip
3633     else:
3634       if not utils.IsValidIP(ip):
3635         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3636                                    " like a valid IP" % ip)
3637       inst_ip = ip
3638     self.inst_ip = self.op.ip = inst_ip
3639
3640     if self.op.start and not self.op.ip_check:
3641       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3642                                  " adding an instance in start mode")
3643
3644     if self.op.ip_check:
3645       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3646         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3647                                    (hostname1.ip, instance_name))
3648
3649     # MAC address verification
3650     if self.op.mac != "auto":
3651       if not utils.IsValidMac(self.op.mac.lower()):
3652         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3653                                    self.op.mac)
3654
3655     # bridge verification
3656     bridge = getattr(self.op, "bridge", None)
3657     if bridge is None:
3658       self.op.bridge = self.cfg.GetDefBridge()
3659     else:
3660       self.op.bridge = bridge
3661
3662     # boot order verification
3663     if self.op.hvm_boot_order is not None:
3664       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3665         raise errors.OpPrereqError("invalid boot order specified,"
3666                                    " must be one or more of [acdn]")
3667     #### allocator run
3668
3669     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3670       raise errors.OpPrereqError("One and only one of iallocator and primary"
3671                                  " node must be given")
3672
3673     if self.op.iallocator is not None:
3674       self._RunAllocator()
3675
3676     #### node related checks
3677
3678     # check primary node
3679     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3680     if pnode is None:
3681       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3682                                  self.op.pnode)
3683     self.op.pnode = pnode.name
3684     self.pnode = pnode
3685     self.secondaries = []
3686
3687     # mirror node verification
3688     if self.op.disk_template in constants.DTS_NET_MIRROR:
3689       if getattr(self.op, "snode", None) is None:
3690         raise errors.OpPrereqError("The networked disk templates need"
3691                                    " a mirror node")
3692
3693       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3694       if snode_name is None:
3695         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3696                                    self.op.snode)
3697       elif snode_name == pnode.name:
3698         raise errors.OpPrereqError("The secondary node cannot be"
3699                                    " the primary node.")
3700       self.secondaries.append(snode_name)
3701
3702     req_size = _ComputeDiskSize(self.op.disk_template,
3703                                 self.op.disk_size, self.op.swap_size)
3704
3705     # Check lv size requirements
3706     if req_size is not None:
3707       nodenames = [pnode.name] + self.secondaries
3708       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3709       for node in nodenames:
3710         info = nodeinfo.get(node, None)
3711         if not info:
3712           raise errors.OpPrereqError("Cannot get current information"
3713                                      " from node '%s'" % node)
3714         vg_free = info.get('vg_free', None)
3715         if not isinstance(vg_free, int):
3716           raise errors.OpPrereqError("Can't compute free disk space on"
3717                                      " node %s" % node)
3718         if req_size > info['vg_free']:
3719           raise errors.OpPrereqError("Not enough disk space on target node %s."
3720                                      " %d MB available, %d MB required" %
3721                                      (node, info['vg_free'], req_size))
3722
3723     # os verification
3724     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3725     if not os_obj:
3726       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3727                                  " primary node"  % self.op.os_type)
3728
3729     if self.op.kernel_path == constants.VALUE_NONE:
3730       raise errors.OpPrereqError("Can't set instance kernel to none")
3731
3732
3733     # bridge check on primary node
3734     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3735       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3736                                  " destination node '%s'" %
3737                                  (self.op.bridge, pnode.name))
3738
3739     # memory check on primary node
3740     if self.op.start:
3741       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3742                            "creating instance %s" % self.op.instance_name,
3743                            self.op.mem_size)
3744
3745     # hvm_cdrom_image_path verification
3746     if self.op.hvm_cdrom_image_path is not None:
3747       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3748         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3749                                    " be an absolute path or None, not %s" %
3750                                    self.op.hvm_cdrom_image_path)
3751       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3752         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3753                                    " regular file or a symlink pointing to"
3754                                    " an existing regular file, not %s" %
3755                                    self.op.hvm_cdrom_image_path)
3756
3757     # vnc_bind_address verification
3758     if self.op.vnc_bind_address is not None:
3759       if not utils.IsValidIP(self.op.vnc_bind_address):
3760         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3761                                    " like a valid IP address" %
3762                                    self.op.vnc_bind_address)
3763
3764     if self.op.start:
3765       self.instance_status = 'up'
3766     else:
3767       self.instance_status = 'down'
3768
3769   def Exec(self, feedback_fn):
3770     """Create and add the instance to the cluster.
3771
3772     """
3773     instance = self.op.instance_name
3774     pnode_name = self.pnode.name
3775
3776     if self.op.mac == "auto":
3777       mac_address = self.cfg.GenerateMAC()
3778     else:
3779       mac_address = self.op.mac
3780
3781     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3782     if self.inst_ip is not None:
3783       nic.ip = self.inst_ip
3784
3785     ht_kind = self.sstore.GetHypervisorType()
3786     if ht_kind in constants.HTS_REQ_PORT:
3787       network_port = self.cfg.AllocatePort()
3788     else:
3789       network_port = None
3790
3791     if self.op.vnc_bind_address is None:
3792       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3793
3794     disks = _GenerateDiskTemplate(self.cfg,
3795                                   self.op.disk_template,
3796                                   instance, pnode_name,
3797                                   self.secondaries, self.op.disk_size,
3798                                   self.op.swap_size)
3799
3800     iobj = objects.Instance(name=instance, os=self.op.os_type,
3801                             primary_node=pnode_name,
3802                             memory=self.op.mem_size,
3803                             vcpus=self.op.vcpus,
3804                             nics=[nic], disks=disks,
3805                             disk_template=self.op.disk_template,
3806                             status=self.instance_status,
3807                             network_port=network_port,
3808                             kernel_path=self.op.kernel_path,
3809                             initrd_path=self.op.initrd_path,
3810                             hvm_boot_order=self.op.hvm_boot_order,
3811                             hvm_acpi=self.op.hvm_acpi,
3812                             hvm_pae=self.op.hvm_pae,
3813                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3814                             vnc_bind_address=self.op.vnc_bind_address,
3815                             )
3816
3817     feedback_fn("* creating instance disks...")
3818     if not _CreateDisks(self.cfg, iobj):
3819       _RemoveDisks(iobj, self.cfg)
3820       raise errors.OpExecError("Device creation failed, reverting...")
3821
3822     feedback_fn("adding instance %s to cluster config" % instance)
3823
3824     self.cfg.AddInstance(iobj)
3825
3826     if self.op.wait_for_sync:
3827       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3828     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3829       # make sure the disks are not degraded (still sync-ing is ok)
3830       time.sleep(15)
3831       feedback_fn("* checking mirrors status")
3832       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3833     else:
3834       disk_abort = False
3835
3836     if disk_abort:
3837       _RemoveDisks(iobj, self.cfg)
3838       self.cfg.RemoveInstance(iobj.name)
3839       raise errors.OpExecError("There are some degraded disks for"
3840                                " this instance")
3841
3842     feedback_fn("creating os for instance %s on node %s" %
3843                 (instance, pnode_name))
3844
3845     if iobj.disk_template != constants.DT_DISKLESS:
3846       if self.op.mode == constants.INSTANCE_CREATE:
3847         feedback_fn("* running the instance OS create scripts...")
3848         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3849           raise errors.OpExecError("could not add os for instance %s"
3850                                    " on node %s" %
3851                                    (instance, pnode_name))
3852
3853       elif self.op.mode == constants.INSTANCE_IMPORT:
3854         feedback_fn("* running the instance OS import scripts...")
3855         src_node = self.op.src_node
3856         src_image = self.src_image
3857         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3858                                                 src_node, src_image):
3859           raise errors.OpExecError("Could not import os for instance"
3860                                    " %s on node %s" %
3861                                    (instance, pnode_name))
3862       else:
3863         # also checked in the prereq part
3864         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3865                                      % self.op.mode)
3866
3867     if self.op.start:
3868       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3869       feedback_fn("* starting instance...")
3870       if not rpc.call_instance_start(pnode_name, iobj, None):
3871         raise errors.OpExecError("Could not start instance")
3872
3873
3874 class LUConnectConsole(NoHooksLU):
3875   """Connect to an instance's console.
3876
3877   This is somewhat special in that it returns the command line that
3878   you need to run on the master node in order to connect to the
3879   console.
3880
3881   """
3882   _OP_REQP = ["instance_name"]
3883
3884   def CheckPrereq(self):
3885     """Check prerequisites.
3886
3887     This checks that the instance is in the cluster.
3888
3889     """
3890     instance = self.cfg.GetInstanceInfo(
3891       self.cfg.ExpandInstanceName(self.op.instance_name))
3892     if instance is None:
3893       raise errors.OpPrereqError("Instance '%s' not known" %
3894                                  self.op.instance_name)
3895     self.instance = instance
3896
3897   def Exec(self, feedback_fn):
3898     """Connect to the console of an instance
3899
3900     """
3901     instance = self.instance
3902     node = instance.primary_node
3903
3904     node_insts = rpc.call_instance_list([node])[node]
3905     if node_insts is False:
3906       raise errors.OpExecError("Can't connect to node %s." % node)
3907
3908     if instance.name not in node_insts:
3909       raise errors.OpExecError("Instance %s is not running." % instance.name)
3910
3911     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3912
3913     hyper = hypervisor.GetHypervisor()
3914     console_cmd = hyper.GetShellCommandForConsole(instance)
3915     # build ssh cmdline
3916     argv = ["ssh", "-q", "-t"]
3917     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3918     argv.extend(ssh.BATCH_MODE_OPTS)
3919     argv.append(node)
3920     argv.append(console_cmd)
3921     return "ssh", argv
3922
3923
3924 class LUAddMDDRBDComponent(LogicalUnit):
3925   """Adda new mirror member to an instance's disk.
3926
3927   """
3928   HPATH = "mirror-add"
3929   HTYPE = constants.HTYPE_INSTANCE
3930   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3931
3932   def BuildHooksEnv(self):
3933     """Build hooks env.
3934
3935     This runs on the master, the primary and all the secondaries.
3936
3937     """
3938     env = {
3939       "NEW_SECONDARY": self.op.remote_node,
3940       "DISK_NAME": self.op.disk_name,
3941       }
3942     env.update(_BuildInstanceHookEnvByObject(self.instance))
3943     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3944           self.op.remote_node,] + list(self.instance.secondary_nodes)
3945     return env, nl, nl
3946
3947   def CheckPrereq(self):
3948     """Check prerequisites.
3949
3950     This checks that the instance is in the cluster.
3951
3952     """
3953     instance = self.cfg.GetInstanceInfo(
3954       self.cfg.ExpandInstanceName(self.op.instance_name))
3955     if instance is None:
3956       raise errors.OpPrereqError("Instance '%s' not known" %
3957                                  self.op.instance_name)
3958     self.instance = instance
3959
3960     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3961     if remote_node is None:
3962       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3963     self.remote_node = remote_node
3964
3965     if remote_node == instance.primary_node:
3966       raise errors.OpPrereqError("The specified node is the primary node of"
3967                                  " the instance.")
3968
3969     if instance.disk_template != constants.DT_REMOTE_RAID1:
3970       raise errors.OpPrereqError("Instance's disk layout is not"
3971                                  " remote_raid1.")
3972     for disk in instance.disks:
3973       if disk.iv_name == self.op.disk_name:
3974         break
3975     else:
3976       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3977                                  " instance." % self.op.disk_name)
3978     if len(disk.children) > 1:
3979       raise errors.OpPrereqError("The device already has two slave devices."
3980                                  " This would create a 3-disk raid1 which we"
3981                                  " don't allow.")
3982     self.disk = disk
3983
3984   def Exec(self, feedback_fn):
3985     """Add the mirror component
3986
3987     """
3988     disk = self.disk
3989     instance = self.instance
3990
3991     remote_node = self.remote_node
3992     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3993     names = _GenerateUniqueNames(self.cfg, lv_names)
3994     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3995                                      remote_node, disk.size, names)
3996
3997     logger.Info("adding new mirror component on secondary")
3998     #HARDCODE
3999     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4000                                       new_drbd, False,
4001                                       _GetInstanceInfoText(instance)):
4002       raise errors.OpExecError("Failed to create new component on secondary"
4003                                " node %s" % remote_node)
4004
4005     logger.Info("adding new mirror component on primary")
4006     #HARDCODE
4007     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4008                                     instance, new_drbd,
4009                                     _GetInstanceInfoText(instance)):
4010       # remove secondary dev
4011       self.cfg.SetDiskID(new_drbd, remote_node)
4012       rpc.call_blockdev_remove(remote_node, new_drbd)
4013       raise errors.OpExecError("Failed to create volume on primary")
4014
4015     # the device exists now
4016     # call the primary node to add the mirror to md
4017     logger.Info("adding new mirror component to md")
4018     if not rpc.call_blockdev_addchildren(instance.primary_node,
4019                                          disk, [new_drbd]):
4020       logger.Error("Can't add mirror compoment to md!")
4021       self.cfg.SetDiskID(new_drbd, remote_node)
4022       if not rpc.call_blockdev_remove(remote_node, new_drbd):
4023         logger.Error("Can't rollback on secondary")
4024       self.cfg.SetDiskID(new_drbd, instance.primary_node)
4025       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4026         logger.Error("Can't rollback on primary")
4027       raise errors.OpExecError("Can't add mirror component to md array")
4028
4029     disk.children.append(new_drbd)
4030
4031     self.cfg.AddInstance(instance)
4032
4033     _WaitForSync(self.cfg, instance, self.proc)
4034
4035     return 0
4036
4037
4038 class LURemoveMDDRBDComponent(LogicalUnit):
4039   """Remove a component from a remote_raid1 disk.
4040
4041   """
4042   HPATH = "mirror-remove"
4043   HTYPE = constants.HTYPE_INSTANCE
4044   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4045
4046   def BuildHooksEnv(self):
4047     """Build hooks env.
4048
4049     This runs on the master, the primary and all the secondaries.
4050
4051     """
4052     env = {
4053       "DISK_NAME": self.op.disk_name,
4054       "DISK_ID": self.op.disk_id,
4055       "OLD_SECONDARY": self.old_secondary,
4056       }
4057     env.update(_BuildInstanceHookEnvByObject(self.instance))
4058     nl = [self.sstore.GetMasterNode(),
4059           self.instance.primary_node] + list(self.instance.secondary_nodes)
4060     return env, nl, nl
4061
4062   def CheckPrereq(self):
4063     """Check prerequisites.
4064
4065     This checks that the instance is in the cluster.
4066
4067     """
4068     instance = self.cfg.GetInstanceInfo(
4069       self.cfg.ExpandInstanceName(self.op.instance_name))
4070     if instance is None:
4071       raise errors.OpPrereqError("Instance '%s' not known" %
4072                                  self.op.instance_name)
4073     self.instance = instance
4074
4075     if instance.disk_template != constants.DT_REMOTE_RAID1:
4076       raise errors.OpPrereqError("Instance's disk layout is not"
4077                                  " remote_raid1.")
4078     for disk in instance.disks:
4079       if disk.iv_name == self.op.disk_name:
4080         break
4081     else:
4082       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4083                                  " instance." % self.op.disk_name)
4084     for child in disk.children:
4085       if (child.dev_type == constants.LD_DRBD7 and
4086           child.logical_id[2] == self.op.disk_id):
4087         break
4088     else:
4089       raise errors.OpPrereqError("Can't find the device with this port.")
4090
4091     if len(disk.children) < 2:
4092       raise errors.OpPrereqError("Cannot remove the last component from"
4093                                  " a mirror.")
4094     self.disk = disk
4095     self.child = child
4096     if self.child.logical_id[0] == instance.primary_node:
4097       oid = 1
4098     else:
4099       oid = 0
4100     self.old_secondary = self.child.logical_id[oid]
4101
4102   def Exec(self, feedback_fn):
4103     """Remove the mirror component
4104
4105     """
4106     instance = self.instance
4107     disk = self.disk
4108     child = self.child
4109     logger.Info("remove mirror component")
4110     self.cfg.SetDiskID(disk, instance.primary_node)
4111     if not rpc.call_blockdev_removechildren(instance.primary_node,
4112                                             disk, [child]):
4113       raise errors.OpExecError("Can't remove child from mirror.")
4114
4115     for node in child.logical_id[:2]:
4116       self.cfg.SetDiskID(child, node)
4117       if not rpc.call_blockdev_remove(node, child):
4118         logger.Error("Warning: failed to remove device from node %s,"
4119                      " continuing operation." % node)
4120
4121     disk.children.remove(child)
4122     self.cfg.AddInstance(instance)
4123
4124
4125 class LUReplaceDisks(LogicalUnit):
4126   """Replace the disks of an instance.
4127
4128   """
4129   HPATH = "mirrors-replace"
4130   HTYPE = constants.HTYPE_INSTANCE
4131   _OP_REQP = ["instance_name", "mode", "disks"]
4132
4133   def _RunAllocator(self):
4134     """Compute a new secondary node using an IAllocator.
4135
4136     """
4137     ial = IAllocator(self.cfg, self.sstore,
4138                      mode=constants.IALLOCATOR_MODE_RELOC,
4139                      name=self.op.instance_name,
4140                      relocate_from=[self.sec_node])
4141
4142     ial.Run(self.op.iallocator)
4143
4144     if not ial.success:
4145       raise errors.OpPrereqError("Can't compute nodes using"
4146                                  " iallocator '%s': %s" % (self.op.iallocator,
4147                                                            ial.info))
4148     if len(ial.nodes) != ial.required_nodes:
4149       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4150                                  " of nodes (%s), required %s" %
4151                                  (len(ial.nodes), ial.required_nodes))
4152     self.op.remote_node = ial.nodes[0]
4153     logger.ToStdout("Selected new secondary for the instance: %s" %
4154                     self.op.remote_node)
4155
4156   def BuildHooksEnv(self):
4157     """Build hooks env.
4158
4159     This runs on the master, the primary and all the secondaries.
4160
4161     """
4162     env = {
4163       "MODE": self.op.mode,
4164       "NEW_SECONDARY": self.op.remote_node,
4165       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4166       }
4167     env.update(_BuildInstanceHookEnvByObject(self.instance))
4168     nl = [
4169       self.sstore.GetMasterNode(),
4170       self.instance.primary_node,
4171       ]
4172     if self.op.remote_node is not None:
4173       nl.append(self.op.remote_node)
4174     return env, nl, nl
4175
4176   def CheckPrereq(self):
4177     """Check prerequisites.
4178
4179     This checks that the instance is in the cluster.
4180
4181     """
4182     if not hasattr(self.op, "remote_node"):
4183       self.op.remote_node = None
4184
4185     instance = self.cfg.GetInstanceInfo(
4186       self.cfg.ExpandInstanceName(self.op.instance_name))
4187     if instance is None:
4188       raise errors.OpPrereqError("Instance '%s' not known" %
4189                                  self.op.instance_name)
4190     self.instance = instance
4191     self.op.instance_name = instance.name
4192
4193     if instance.disk_template not in constants.DTS_NET_MIRROR:
4194       raise errors.OpPrereqError("Instance's disk layout is not"
4195                                  " network mirrored.")
4196
4197     if len(instance.secondary_nodes) != 1:
4198       raise errors.OpPrereqError("The instance has a strange layout,"
4199                                  " expected one secondary but found %d" %
4200                                  len(instance.secondary_nodes))
4201
4202     self.sec_node = instance.secondary_nodes[0]
4203
4204     ia_name = getattr(self.op, "iallocator", None)
4205     if ia_name is not None:
4206       if self.op.remote_node is not None:
4207         raise errors.OpPrereqError("Give either the iallocator or the new"
4208                                    " secondary, not both")
4209       self.op.remote_node = self._RunAllocator()
4210
4211     remote_node = self.op.remote_node
4212     if remote_node is not None:
4213       remote_node = self.cfg.ExpandNodeName(remote_node)
4214       if remote_node is None:
4215         raise errors.OpPrereqError("Node '%s' not known" %
4216                                    self.op.remote_node)
4217       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4218     else:
4219       self.remote_node_info = None
4220     if remote_node == instance.primary_node:
4221       raise errors.OpPrereqError("The specified node is the primary node of"
4222                                  " the instance.")
4223     elif remote_node == self.sec_node:
4224       if self.op.mode == constants.REPLACE_DISK_SEC:
4225         # this is for DRBD8, where we can't execute the same mode of
4226         # replacement as for drbd7 (no different port allocated)
4227         raise errors.OpPrereqError("Same secondary given, cannot execute"
4228                                    " replacement")
4229       # the user gave the current secondary, switch to
4230       # 'no-replace-secondary' mode for drbd7
4231       remote_node = None
4232     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4233         self.op.mode != constants.REPLACE_DISK_ALL):
4234       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4235                                  " disks replacement, not individual ones")
4236     if instance.disk_template == constants.DT_DRBD8:
4237       if (self.op.mode == constants.REPLACE_DISK_ALL and
4238           remote_node is not None):
4239         # switch to replace secondary mode
4240         self.op.mode = constants.REPLACE_DISK_SEC
4241
4242       if self.op.mode == constants.REPLACE_DISK_ALL:
4243         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4244                                    " secondary disk replacement, not"
4245                                    " both at once")
4246       elif self.op.mode == constants.REPLACE_DISK_PRI:
4247         if remote_node is not None:
4248           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4249                                      " the secondary while doing a primary"
4250                                      " node disk replacement")
4251         self.tgt_node = instance.primary_node
4252         self.oth_node = instance.secondary_nodes[0]
4253       elif self.op.mode == constants.REPLACE_DISK_SEC:
4254         self.new_node = remote_node # this can be None, in which case
4255                                     # we don't change the secondary
4256         self.tgt_node = instance.secondary_nodes[0]
4257         self.oth_node = instance.primary_node
4258       else:
4259         raise errors.ProgrammerError("Unhandled disk replace mode")
4260
4261     for name in self.op.disks:
4262       if instance.FindDisk(name) is None:
4263         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4264                                    (name, instance.name))
4265     self.op.remote_node = remote_node
4266
4267   def _ExecRR1(self, feedback_fn):
4268     """Replace the disks of an instance.
4269
4270     """
4271     instance = self.instance
4272     iv_names = {}
4273     # start of work
4274     if self.op.remote_node is None:
4275       remote_node = self.sec_node
4276     else:
4277       remote_node = self.op.remote_node
4278     cfg = self.cfg
4279     for dev in instance.disks:
4280       size = dev.size
4281       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4282       names = _GenerateUniqueNames(cfg, lv_names)
4283       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4284                                        remote_node, size, names)
4285       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4286       logger.Info("adding new mirror component on secondary for %s" %
4287                   dev.iv_name)
4288       #HARDCODE
4289       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4290                                         new_drbd, False,
4291                                         _GetInstanceInfoText(instance)):
4292         raise errors.OpExecError("Failed to create new component on secondary"
4293                                  " node %s. Full abort, cleanup manually!" %
4294                                  remote_node)
4295
4296       logger.Info("adding new mirror component on primary")
4297       #HARDCODE
4298       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4299                                       instance, new_drbd,
4300                                       _GetInstanceInfoText(instance)):
4301         # remove secondary dev
4302         cfg.SetDiskID(new_drbd, remote_node)
4303         rpc.call_blockdev_remove(remote_node, new_drbd)
4304         raise errors.OpExecError("Failed to create volume on primary!"
4305                                  " Full abort, cleanup manually!!")
4306
4307       # the device exists now
4308       # call the primary node to add the mirror to md
4309       logger.Info("adding new mirror component to md")
4310       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4311                                            [new_drbd]):
4312         logger.Error("Can't add mirror compoment to md!")
4313         cfg.SetDiskID(new_drbd, remote_node)
4314         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4315           logger.Error("Can't rollback on secondary")
4316         cfg.SetDiskID(new_drbd, instance.primary_node)
4317         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4318           logger.Error("Can't rollback on primary")
4319         raise errors.OpExecError("Full abort, cleanup manually!!")
4320
4321       dev.children.append(new_drbd)
4322       cfg.AddInstance(instance)
4323
4324     # this can fail as the old devices are degraded and _WaitForSync
4325     # does a combined result over all disks, so we don't check its
4326     # return value
4327     _WaitForSync(cfg, instance, self.proc, unlock=True)
4328
4329     # so check manually all the devices
4330     for name in iv_names:
4331       dev, child, new_drbd = iv_names[name]
4332       cfg.SetDiskID(dev, instance.primary_node)
4333       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4334       if is_degr:
4335         raise errors.OpExecError("MD device %s is degraded!" % name)
4336       cfg.SetDiskID(new_drbd, instance.primary_node)
4337       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4338       if is_degr:
4339         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4340
4341     for name in iv_names:
4342       dev, child, new_drbd = iv_names[name]
4343       logger.Info("remove mirror %s component" % name)
4344       cfg.SetDiskID(dev, instance.primary_node)
4345       if not rpc.call_blockdev_removechildren(instance.primary_node,
4346                                               dev, [child]):
4347         logger.Error("Can't remove child from mirror, aborting"
4348                      " *this device cleanup*.\nYou need to cleanup manually!!")
4349         continue
4350
4351       for node in child.logical_id[:2]:
4352         logger.Info("remove child device on %s" % node)
4353         cfg.SetDiskID(child, node)
4354         if not rpc.call_blockdev_remove(node, child):
4355           logger.Error("Warning: failed to remove device from node %s,"
4356                        " continuing operation." % node)
4357
4358       dev.children.remove(child)
4359
4360       cfg.AddInstance(instance)
4361
4362   def _ExecD8DiskOnly(self, feedback_fn):
4363     """Replace a disk on the primary or secondary for dbrd8.
4364
4365     The algorithm for replace is quite complicated:
4366       - for each disk to be replaced:
4367         - create new LVs on the target node with unique names
4368         - detach old LVs from the drbd device
4369         - rename old LVs to name_replaced.<time_t>
4370         - rename new LVs to old LVs
4371         - attach the new LVs (with the old names now) to the drbd device
4372       - wait for sync across all devices
4373       - for each modified disk:
4374         - remove old LVs (which have the name name_replaces.<time_t>)
4375
4376     Failures are not very well handled.
4377
4378     """
4379     steps_total = 6
4380     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4381     instance = self.instance
4382     iv_names = {}
4383     vgname = self.cfg.GetVGName()
4384     # start of work
4385     cfg = self.cfg
4386     tgt_node = self.tgt_node
4387     oth_node = self.oth_node
4388
4389     # Step: check device activation
4390     self.proc.LogStep(1, steps_total, "check device existence")
4391     info("checking volume groups")
4392     my_vg = cfg.GetVGName()
4393     results = rpc.call_vg_list([oth_node, tgt_node])
4394     if not results:
4395       raise errors.OpExecError("Can't list volume groups on the nodes")
4396     for node in oth_node, tgt_node:
4397       res = results.get(node, False)
4398       if not res or my_vg not in res:
4399         raise errors.OpExecError("Volume group '%s' not found on %s" %
4400                                  (my_vg, node))
4401     for dev in instance.disks:
4402       if not dev.iv_name in self.op.disks:
4403         continue
4404       for node in tgt_node, oth_node:
4405         info("checking %s on %s" % (dev.iv_name, node))
4406         cfg.SetDiskID(dev, node)
4407         if not rpc.call_blockdev_find(node, dev):
4408           raise errors.OpExecError("Can't find device %s on node %s" %
4409                                    (dev.iv_name, node))
4410
4411     # Step: check other node consistency
4412     self.proc.LogStep(2, steps_total, "check peer consistency")
4413     for dev in instance.disks:
4414       if not dev.iv_name in self.op.disks:
4415         continue
4416       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4417       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4418                                    oth_node==instance.primary_node):
4419         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4420                                  " to replace disks on this node (%s)" %
4421                                  (oth_node, tgt_node))
4422
4423     # Step: create new storage
4424     self.proc.LogStep(3, steps_total, "allocate new storage")
4425     for dev in instance.disks:
4426       if not dev.iv_name in self.op.disks:
4427         continue
4428       size = dev.size
4429       cfg.SetDiskID(dev, tgt_node)
4430       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4431       names = _GenerateUniqueNames(cfg, lv_names)
4432       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4433                              logical_id=(vgname, names[0]))
4434       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4435                              logical_id=(vgname, names[1]))
4436       new_lvs = [lv_data, lv_meta]
4437       old_lvs = dev.children
4438       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4439       info("creating new local storage on %s for %s" %
4440            (tgt_node, dev.iv_name))
4441       # since we *always* want to create this LV, we use the
4442       # _Create...OnPrimary (which forces the creation), even if we
4443       # are talking about the secondary node
4444       for new_lv in new_lvs:
4445         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4446                                         _GetInstanceInfoText(instance)):
4447           raise errors.OpExecError("Failed to create new LV named '%s' on"
4448                                    " node '%s'" %
4449                                    (new_lv.logical_id[1], tgt_node))
4450
4451     # Step: for each lv, detach+rename*2+attach
4452     self.proc.LogStep(4, steps_total, "change drbd configuration")
4453     for dev, old_lvs, new_lvs in iv_names.itervalues():
4454       info("detaching %s drbd from local storage" % dev.iv_name)
4455       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4456         raise errors.OpExecError("Can't detach drbd from local storage on node"
4457                                  " %s for device %s" % (tgt_node, dev.iv_name))
4458       #dev.children = []
4459       #cfg.Update(instance)
4460
4461       # ok, we created the new LVs, so now we know we have the needed
4462       # storage; as such, we proceed on the target node to rename
4463       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4464       # using the assumption that logical_id == physical_id (which in
4465       # turn is the unique_id on that node)
4466
4467       # FIXME(iustin): use a better name for the replaced LVs
4468       temp_suffix = int(time.time())
4469       ren_fn = lambda d, suff: (d.physical_id[0],
4470                                 d.physical_id[1] + "_replaced-%s" % suff)
4471       # build the rename list based on what LVs exist on the node
4472       rlist = []
4473       for to_ren in old_lvs:
4474         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4475         if find_res is not None: # device exists
4476           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4477
4478       info("renaming the old LVs on the target node")
4479       if not rpc.call_blockdev_rename(tgt_node, rlist):
4480         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4481       # now we rename the new LVs to the old LVs
4482       info("renaming the new LVs on the target node")
4483       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4484       if not rpc.call_blockdev_rename(tgt_node, rlist):
4485         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4486
4487       for old, new in zip(old_lvs, new_lvs):
4488         new.logical_id = old.logical_id
4489         cfg.SetDiskID(new, tgt_node)
4490
4491       for disk in old_lvs:
4492         disk.logical_id = ren_fn(disk, temp_suffix)
4493         cfg.SetDiskID(disk, tgt_node)
4494
4495       # now that the new lvs have the old name, we can add them to the device
4496       info("adding new mirror component on %s" % tgt_node)
4497       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4498         for new_lv in new_lvs:
4499           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4500             warning("Can't rollback device %s", hint="manually cleanup unused"
4501                     " logical volumes")
4502         raise errors.OpExecError("Can't add local storage to drbd")
4503
4504       dev.children = new_lvs
4505       cfg.Update(instance)
4506
4507     # Step: wait for sync
4508
4509     # this can fail as the old devices are degraded and _WaitForSync
4510     # does a combined result over all disks, so we don't check its
4511     # return value
4512     self.proc.LogStep(5, steps_total, "sync devices")
4513     _WaitForSync(cfg, instance, self.proc, unlock=True)
4514
4515     # so check manually all the devices
4516     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4517       cfg.SetDiskID(dev, instance.primary_node)
4518       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4519       if is_degr:
4520         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4521
4522     # Step: remove old storage
4523     self.proc.LogStep(6, steps_total, "removing old storage")
4524     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4525       info("remove logical volumes for %s" % name)
4526       for lv in old_lvs:
4527         cfg.SetDiskID(lv, tgt_node)
4528         if not rpc.call_blockdev_remove(tgt_node, lv):
4529           warning("Can't remove old LV", hint="manually remove unused LVs")
4530           continue
4531
4532   def _ExecD8Secondary(self, feedback_fn):
4533     """Replace the secondary node for drbd8.
4534
4535     The algorithm for replace is quite complicated:
4536       - for all disks of the instance:
4537         - create new LVs on the new node with same names
4538         - shutdown the drbd device on the old secondary
4539         - disconnect the drbd network on the primary
4540         - create the drbd device on the new secondary
4541         - network attach the drbd on the primary, using an artifice:
4542           the drbd code for Attach() will connect to the network if it
4543           finds a device which is connected to the good local disks but
4544           not network enabled
4545       - wait for sync across all devices
4546       - remove all disks from the old secondary
4547
4548     Failures are not very well handled.
4549
4550     """
4551     steps_total = 6
4552     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4553     instance = self.instance
4554     iv_names = {}
4555     vgname = self.cfg.GetVGName()
4556     # start of work
4557     cfg = self.cfg
4558     old_node = self.tgt_node
4559     new_node = self.new_node
4560     pri_node = instance.primary_node
4561
4562     # Step: check device activation
4563     self.proc.LogStep(1, steps_total, "check device existence")
4564     info("checking volume groups")
4565     my_vg = cfg.GetVGName()
4566     results = rpc.call_vg_list([pri_node, new_node])
4567     if not results:
4568       raise errors.OpExecError("Can't list volume groups on the nodes")
4569     for node in pri_node, new_node:
4570       res = results.get(node, False)
4571       if not res or my_vg not in res:
4572         raise errors.OpExecError("Volume group '%s' not found on %s" %
4573                                  (my_vg, node))
4574     for dev in instance.disks:
4575       if not dev.iv_name in self.op.disks:
4576         continue
4577       info("checking %s on %s" % (dev.iv_name, pri_node))
4578       cfg.SetDiskID(dev, pri_node)
4579       if not rpc.call_blockdev_find(pri_node, dev):
4580         raise errors.OpExecError("Can't find device %s on node %s" %
4581                                  (dev.iv_name, pri_node))
4582
4583     # Step: check other node consistency
4584     self.proc.LogStep(2, steps_total, "check peer consistency")
4585     for dev in instance.disks:
4586       if not dev.iv_name in self.op.disks:
4587         continue
4588       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4589       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4590         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4591                                  " unsafe to replace the secondary" %
4592                                  pri_node)
4593
4594     # Step: create new storage
4595     self.proc.LogStep(3, steps_total, "allocate new storage")
4596     for dev in instance.disks:
4597       size = dev.size
4598       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4599       # since we *always* want to create this LV, we use the
4600       # _Create...OnPrimary (which forces the creation), even if we
4601       # are talking about the secondary node
4602       for new_lv in dev.children:
4603         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4604                                         _GetInstanceInfoText(instance)):
4605           raise errors.OpExecError("Failed to create new LV named '%s' on"
4606                                    " node '%s'" %
4607                                    (new_lv.logical_id[1], new_node))
4608
4609       iv_names[dev.iv_name] = (dev, dev.children)
4610
4611     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4612     for dev in instance.disks:
4613       size = dev.size
4614       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4615       # create new devices on new_node
4616       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4617                               logical_id=(pri_node, new_node,
4618                                           dev.logical_id[2]),
4619                               children=dev.children)
4620       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4621                                         new_drbd, False,
4622                                       _GetInstanceInfoText(instance)):
4623         raise errors.OpExecError("Failed to create new DRBD on"
4624                                  " node '%s'" % new_node)
4625
4626     for dev in instance.disks:
4627       # we have new devices, shutdown the drbd on the old secondary
4628       info("shutting down drbd for %s on old node" % dev.iv_name)
4629       cfg.SetDiskID(dev, old_node)
4630       if not rpc.call_blockdev_shutdown(old_node, dev):
4631         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4632                 hint="Please cleanup this device manually as soon as possible")
4633
4634     info("detaching primary drbds from the network (=> standalone)")
4635     done = 0
4636     for dev in instance.disks:
4637       cfg.SetDiskID(dev, pri_node)
4638       # set the physical (unique in bdev terms) id to None, meaning
4639       # detach from network
4640       dev.physical_id = (None,) * len(dev.physical_id)
4641       # and 'find' the device, which will 'fix' it to match the
4642       # standalone state
4643       if rpc.call_blockdev_find(pri_node, dev):
4644         done += 1
4645       else:
4646         warning("Failed to detach drbd %s from network, unusual case" %
4647                 dev.iv_name)
4648
4649     if not done:
4650       # no detaches succeeded (very unlikely)
4651       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4652
4653     # if we managed to detach at least one, we update all the disks of
4654     # the instance to point to the new secondary
4655     info("updating instance configuration")
4656     for dev in instance.disks:
4657       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4658       cfg.SetDiskID(dev, pri_node)
4659     cfg.Update(instance)
4660
4661     # and now perform the drbd attach
4662     info("attaching primary drbds to new secondary (standalone => connected)")
4663     failures = []
4664     for dev in instance.disks:
4665       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4666       # since the attach is smart, it's enough to 'find' the device,
4667       # it will automatically activate the network, if the physical_id
4668       # is correct
4669       cfg.SetDiskID(dev, pri_node)
4670       if not rpc.call_blockdev_find(pri_node, dev):
4671         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4672                 "please do a gnt-instance info to see the status of disks")
4673
4674     # this can fail as the old devices are degraded and _WaitForSync
4675     # does a combined result over all disks, so we don't check its
4676     # return value
4677     self.proc.LogStep(5, steps_total, "sync devices")
4678     _WaitForSync(cfg, instance, self.proc, unlock=True)
4679
4680     # so check manually all the devices
4681     for name, (dev, old_lvs) in iv_names.iteritems():
4682       cfg.SetDiskID(dev, pri_node)
4683       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4684       if is_degr:
4685         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4686
4687     self.proc.LogStep(6, steps_total, "removing old storage")
4688     for name, (dev, old_lvs) in iv_names.iteritems():
4689       info("remove logical volumes for %s" % name)
4690       for lv in old_lvs:
4691         cfg.SetDiskID(lv, old_node)
4692         if not rpc.call_blockdev_remove(old_node, lv):
4693           warning("Can't remove LV on old secondary",
4694                   hint="Cleanup stale volumes by hand")
4695
4696   def Exec(self, feedback_fn):
4697     """Execute disk replacement.
4698
4699     This dispatches the disk replacement to the appropriate handler.
4700
4701     """
4702     instance = self.instance
4703
4704     # Activate the instance disks if we're replacing them on a down instance
4705     if instance.status == "down":
4706       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4707       self.proc.ChainOpCode(op)
4708
4709     if instance.disk_template == constants.DT_REMOTE_RAID1:
4710       fn = self._ExecRR1
4711     elif instance.disk_template == constants.DT_DRBD8:
4712       if self.op.remote_node is None:
4713         fn = self._ExecD8DiskOnly
4714       else:
4715         fn = self._ExecD8Secondary
4716     else:
4717       raise errors.ProgrammerError("Unhandled disk replacement case")
4718
4719     ret = fn(feedback_fn)
4720
4721     # Deactivate the instance disks if we're replacing them on a down instance
4722     if instance.status == "down":
4723       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4724       self.proc.ChainOpCode(op)
4725
4726     return ret
4727
4728
4729 class LUGrowDisk(LogicalUnit):
4730   """Grow a disk of an instance.
4731
4732   """
4733   HPATH = "disk-grow"
4734   HTYPE = constants.HTYPE_INSTANCE
4735   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4736
4737   def BuildHooksEnv(self):
4738     """Build hooks env.
4739
4740     This runs on the master, the primary and all the secondaries.
4741
4742     """
4743     env = {
4744       "DISK": self.op.disk,
4745       "AMOUNT": self.op.amount,
4746       }
4747     env.update(_BuildInstanceHookEnvByObject(self.instance))
4748     nl = [
4749       self.sstore.GetMasterNode(),
4750       self.instance.primary_node,
4751       ]
4752     return env, nl, nl
4753
4754   def CheckPrereq(self):
4755     """Check prerequisites.
4756
4757     This checks that the instance is in the cluster.
4758
4759     """
4760     instance = self.cfg.GetInstanceInfo(
4761       self.cfg.ExpandInstanceName(self.op.instance_name))
4762     if instance is None:
4763       raise errors.OpPrereqError("Instance '%s' not known" %
4764                                  self.op.instance_name)
4765
4766     if self.op.amount <= 0:
4767       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4768
4769     self.instance = instance
4770     self.op.instance_name = instance.name
4771
4772     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4773       raise errors.OpPrereqError("Instance's disk layout does not support"
4774                                  " growing.")
4775
4776     self.disk = instance.FindDisk(self.op.disk)
4777     if self.disk is None:
4778       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4779                                  (self.op.disk, instance.name))
4780
4781     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4782     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4783     for node in nodenames:
4784       info = nodeinfo.get(node, None)
4785       if not info:
4786         raise errors.OpPrereqError("Cannot get current information"
4787                                    " from node '%s'" % node)
4788       vg_free = info.get('vg_free', None)
4789       if not isinstance(vg_free, int):
4790         raise errors.OpPrereqError("Can't compute free disk space on"
4791                                    " node %s" % node)
4792       if self.op.amount > info['vg_free']:
4793         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4794                                    " %d MiB available, %d MiB required" %
4795                                    (node, info['vg_free'], self.op.amount))
4796       is_primary = (node == instance.primary_node)
4797       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4798         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4799                                  " synchronized on node %s,"
4800                                  " aborting grow." % (self.op.disk, node))
4801
4802   def Exec(self, feedback_fn):
4803     """Execute disk grow.
4804
4805     """
4806     instance = self.instance
4807     disk = self.disk
4808     for node in (instance.secondary_nodes + (instance.primary_node,)):
4809       self.cfg.SetDiskID(disk, node)
4810       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4811       if not result or not isinstance(result, tuple) or len(result) != 2:
4812         raise errors.OpExecError("grow request failed to node %s" % node)
4813       elif not result[0]:
4814         raise errors.OpExecError("grow request failed to node %s: %s" %
4815                                  (node, result[1]))
4816     disk.RecordGrow(self.op.amount)
4817     self.cfg.Update(instance)
4818     if self.op.wait_for_sync:
4819       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4820       if disk_abort:
4821         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4822                      " Please check the instance.")
4823
4824
4825 class LUQueryInstanceData(NoHooksLU):
4826   """Query runtime instance data.
4827
4828   """
4829   _OP_REQP = ["instances"]
4830
4831   def CheckPrereq(self):
4832     """Check prerequisites.
4833
4834     This only checks the optional instance list against the existing names.
4835
4836     """
4837     if not isinstance(self.op.instances, list):
4838       raise errors.OpPrereqError("Invalid argument type 'instances'")
4839     if self.op.instances:
4840       self.wanted_instances = []
4841       names = self.op.instances
4842       for name in names:
4843         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4844         if instance is None:
4845           raise errors.OpPrereqError("No such instance name '%s'" % name)
4846         self.wanted_instances.append(instance)
4847     else:
4848       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4849                                in self.cfg.GetInstanceList()]
4850     return
4851
4852
4853   def _ComputeDiskStatus(self, instance, snode, dev):
4854     """Compute block device status.
4855
4856     """
4857     self.cfg.SetDiskID(dev, instance.primary_node)
4858     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4859     if dev.dev_type in constants.LDS_DRBD:
4860       # we change the snode then (otherwise we use the one passed in)
4861       if dev.logical_id[0] == instance.primary_node:
4862         snode = dev.logical_id[1]
4863       else:
4864         snode = dev.logical_id[0]
4865
4866     if snode:
4867       self.cfg.SetDiskID(dev, snode)
4868       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4869     else:
4870       dev_sstatus = None
4871
4872     if dev.children:
4873       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4874                       for child in dev.children]
4875     else:
4876       dev_children = []
4877
4878     data = {
4879       "iv_name": dev.iv_name,
4880       "dev_type": dev.dev_type,
4881       "logical_id": dev.logical_id,
4882       "physical_id": dev.physical_id,
4883       "pstatus": dev_pstatus,
4884       "sstatus": dev_sstatus,
4885       "children": dev_children,
4886       }
4887
4888     return data
4889
4890   def Exec(self, feedback_fn):
4891     """Gather and return data"""
4892     result = {}
4893     for instance in self.wanted_instances:
4894       remote_info = rpc.call_instance_info(instance.primary_node,
4895                                                 instance.name)
4896       if remote_info and "state" in remote_info:
4897         remote_state = "up"
4898       else:
4899         remote_state = "down"
4900       if instance.status == "down":
4901         config_state = "down"
4902       else:
4903         config_state = "up"
4904
4905       disks = [self._ComputeDiskStatus(instance, None, device)
4906                for device in instance.disks]
4907
4908       idict = {
4909         "name": instance.name,
4910         "config_state": config_state,
4911         "run_state": remote_state,
4912         "pnode": instance.primary_node,
4913         "snodes": instance.secondary_nodes,
4914         "os": instance.os,
4915         "memory": instance.memory,
4916         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4917         "disks": disks,
4918         "vcpus": instance.vcpus,
4919         }
4920
4921       htkind = self.sstore.GetHypervisorType()
4922       if htkind == constants.HT_XEN_PVM30:
4923         idict["kernel_path"] = instance.kernel_path
4924         idict["initrd_path"] = instance.initrd_path
4925
4926       if htkind == constants.HT_XEN_HVM31:
4927         idict["hvm_boot_order"] = instance.hvm_boot_order
4928         idict["hvm_acpi"] = instance.hvm_acpi
4929         idict["hvm_pae"] = instance.hvm_pae
4930         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4931
4932       if htkind in constants.HTS_REQ_PORT:
4933         idict["vnc_bind_address"] = instance.vnc_bind_address
4934         idict["network_port"] = instance.network_port
4935
4936       result[instance.name] = idict
4937
4938     return result
4939
4940
4941 class LUSetInstanceParms(LogicalUnit):
4942   """Modifies an instances's parameters.
4943
4944   """
4945   HPATH = "instance-modify"
4946   HTYPE = constants.HTYPE_INSTANCE
4947   _OP_REQP = ["instance_name"]
4948
4949   def BuildHooksEnv(self):
4950     """Build hooks env.
4951
4952     This runs on the master, primary and secondaries.
4953
4954     """
4955     args = dict()
4956     if self.mem:
4957       args['memory'] = self.mem
4958     if self.vcpus:
4959       args['vcpus'] = self.vcpus
4960     if self.do_ip or self.do_bridge or self.mac:
4961       if self.do_ip:
4962         ip = self.ip
4963       else:
4964         ip = self.instance.nics[0].ip
4965       if self.bridge:
4966         bridge = self.bridge
4967       else:
4968         bridge = self.instance.nics[0].bridge
4969       if self.mac:
4970         mac = self.mac
4971       else:
4972         mac = self.instance.nics[0].mac
4973       args['nics'] = [(ip, bridge, mac)]
4974     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4975     nl = [self.sstore.GetMasterNode(),
4976           self.instance.primary_node] + list(self.instance.secondary_nodes)
4977     return env, nl, nl
4978
4979   def CheckPrereq(self):
4980     """Check prerequisites.
4981
4982     This only checks the instance list against the existing names.
4983
4984     """
4985     self.mem = getattr(self.op, "mem", None)
4986     self.vcpus = getattr(self.op, "vcpus", None)
4987     self.ip = getattr(self.op, "ip", None)
4988     self.mac = getattr(self.op, "mac", None)
4989     self.bridge = getattr(self.op, "bridge", None)
4990     self.kernel_path = getattr(self.op, "kernel_path", None)
4991     self.initrd_path = getattr(self.op, "initrd_path", None)
4992     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4993     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4994     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4995     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4996     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4997     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4998                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4999                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5000                  self.vnc_bind_address]
5001     if all_parms.count(None) == len(all_parms):
5002       raise errors.OpPrereqError("No changes submitted")
5003     if self.mem is not None:
5004       try:
5005         self.mem = int(self.mem)
5006       except ValueError, err:
5007         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5008     if self.vcpus is not None:
5009       try:
5010         self.vcpus = int(self.vcpus)
5011       except ValueError, err:
5012         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5013     if self.ip is not None:
5014       self.do_ip = True
5015       if self.ip.lower() == "none":
5016         self.ip = None
5017       else:
5018         if not utils.IsValidIP(self.ip):
5019           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5020     else:
5021       self.do_ip = False
5022     self.do_bridge = (self.bridge is not None)
5023     if self.mac is not None:
5024       if self.cfg.IsMacInUse(self.mac):
5025         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5026                                    self.mac)
5027       if not utils.IsValidMac(self.mac):
5028         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5029
5030     if self.kernel_path is not None:
5031       self.do_kernel_path = True
5032       if self.kernel_path == constants.VALUE_NONE:
5033         raise errors.OpPrereqError("Can't set instance to no kernel")
5034
5035       if self.kernel_path != constants.VALUE_DEFAULT:
5036         if not os.path.isabs(self.kernel_path):
5037           raise errors.OpPrereqError("The kernel path must be an absolute"
5038                                     " filename")
5039     else:
5040       self.do_kernel_path = False
5041
5042     if self.initrd_path is not None:
5043       self.do_initrd_path = True
5044       if self.initrd_path not in (constants.VALUE_NONE,
5045                                   constants.VALUE_DEFAULT):
5046         if not os.path.isabs(self.initrd_path):
5047           raise errors.OpPrereqError("The initrd path must be an absolute"
5048                                     " filename")
5049     else:
5050       self.do_initrd_path = False
5051
5052     # boot order verification
5053     if self.hvm_boot_order is not None:
5054       if self.hvm_boot_order != constants.VALUE_DEFAULT:
5055         if len(self.hvm_boot_order.strip("acdn")) != 0:
5056           raise errors.OpPrereqError("invalid boot order specified,"
5057                                      " must be one or more of [acdn]"
5058                                      " or 'default'")
5059
5060     # hvm_cdrom_image_path verification
5061     if self.op.hvm_cdrom_image_path is not None:
5062       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5063               self.op.hvm_cdrom_image_path.lower() == "none"):
5064         raise errors.OpPrereqError("The path to the HVM CDROM image must"
5065                                    " be an absolute path or None, not %s" %
5066                                    self.op.hvm_cdrom_image_path)
5067       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5068               self.op.hvm_cdrom_image_path.lower() == "none"):
5069         raise errors.OpPrereqError("The HVM CDROM image must either be a"
5070                                    " regular file or a symlink pointing to"
5071                                    " an existing regular file, not %s" %
5072                                    self.op.hvm_cdrom_image_path)
5073
5074     # vnc_bind_address verification
5075     if self.op.vnc_bind_address is not None:
5076       if not utils.IsValidIP(self.op.vnc_bind_address):
5077         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5078                                    " like a valid IP address" %
5079                                    self.op.vnc_bind_address)
5080
5081     instance = self.cfg.GetInstanceInfo(
5082       self.cfg.ExpandInstanceName(self.op.instance_name))
5083     if instance is None:
5084       raise errors.OpPrereqError("No such instance name '%s'" %
5085                                  self.op.instance_name)
5086     self.op.instance_name = instance.name
5087     self.instance = instance
5088     return
5089
5090   def Exec(self, feedback_fn):
5091     """Modifies an instance.
5092
5093     All parameters take effect only at the next restart of the instance.
5094     """
5095     result = []
5096     instance = self.instance
5097     if self.mem:
5098       instance.memory = self.mem
5099       result.append(("mem", self.mem))
5100     if self.vcpus:
5101       instance.vcpus = self.vcpus
5102       result.append(("vcpus",  self.vcpus))
5103     if self.do_ip:
5104       instance.nics[0].ip = self.ip
5105       result.append(("ip", self.ip))
5106     if self.bridge:
5107       instance.nics[0].bridge = self.bridge
5108       result.append(("bridge", self.bridge))
5109     if self.mac:
5110       instance.nics[0].mac = self.mac
5111       result.append(("mac", self.mac))
5112     if self.do_kernel_path:
5113       instance.kernel_path = self.kernel_path
5114       result.append(("kernel_path", self.kernel_path))
5115     if self.do_initrd_path:
5116       instance.initrd_path = self.initrd_path
5117       result.append(("initrd_path", self.initrd_path))
5118     if self.hvm_boot_order:
5119       if self.hvm_boot_order == constants.VALUE_DEFAULT:
5120         instance.hvm_boot_order = None
5121       else:
5122         instance.hvm_boot_order = self.hvm_boot_order
5123       result.append(("hvm_boot_order", self.hvm_boot_order))
5124     if self.hvm_acpi is not None:
5125       instance.hvm_acpi = self.hvm_acpi
5126       result.append(("hvm_acpi", self.hvm_acpi))
5127     if self.hvm_pae is not None:
5128       instance.hvm_pae = self.hvm_pae
5129       result.append(("hvm_pae", self.hvm_pae))
5130     if self.hvm_cdrom_image_path:
5131       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5132         instance.hvm_cdrom_image_path = None
5133       else:
5134         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5135       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5136     if self.vnc_bind_address:
5137       instance.vnc_bind_address = self.vnc_bind_address
5138       result.append(("vnc_bind_address", self.vnc_bind_address))
5139
5140     self.cfg.AddInstance(instance)
5141
5142     return result
5143
5144
5145 class LUQueryExports(NoHooksLU):
5146   """Query the exports list
5147
5148   """
5149   _OP_REQP = []
5150
5151   def CheckPrereq(self):
5152     """Check that the nodelist contains only existing nodes.
5153
5154     """
5155     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5156
5157   def Exec(self, feedback_fn):
5158     """Compute the list of all the exported system images.
5159
5160     Returns:
5161       a dictionary with the structure node->(export-list)
5162       where export-list is a list of the instances exported on
5163       that node.
5164
5165     """
5166     return rpc.call_export_list(self.nodes)
5167
5168
5169 class LUExportInstance(LogicalUnit):
5170   """Export an instance to an image in the cluster.
5171
5172   """
5173   HPATH = "instance-export"
5174   HTYPE = constants.HTYPE_INSTANCE
5175   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5176
5177   def BuildHooksEnv(self):
5178     """Build hooks env.
5179
5180     This will run on the master, primary node and target node.
5181
5182     """
5183     env = {
5184       "EXPORT_NODE": self.op.target_node,
5185       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5186       }
5187     env.update(_BuildInstanceHookEnvByObject(self.instance))
5188     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5189           self.op.target_node]
5190     return env, nl, nl
5191
5192   def CheckPrereq(self):
5193     """Check prerequisites.
5194
5195     This checks that the instance and node names are valid.
5196
5197     """
5198     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5199     self.instance = self.cfg.GetInstanceInfo(instance_name)
5200     if self.instance is None:
5201       raise errors.OpPrereqError("Instance '%s' not found" %
5202                                  self.op.instance_name)
5203
5204     # node verification
5205     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5206     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5207
5208     if self.dst_node is None:
5209       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5210                                  self.op.target_node)
5211     self.op.target_node = self.dst_node.name
5212
5213   def Exec(self, feedback_fn):
5214     """Export an instance to an image in the cluster.
5215
5216     """
5217     instance = self.instance
5218     dst_node = self.dst_node
5219     src_node = instance.primary_node
5220     if self.op.shutdown:
5221       # shutdown the instance, but not the disks
5222       if not rpc.call_instance_shutdown(src_node, instance):
5223         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5224                                  (instance.name, src_node))
5225
5226     vgname = self.cfg.GetVGName()
5227
5228     snap_disks = []
5229
5230     try:
5231       for disk in instance.disks:
5232         if disk.iv_name == "sda":
5233           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5234           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5235
5236           if not new_dev_name:
5237             logger.Error("could not snapshot block device %s on node %s" %
5238                          (disk.logical_id[1], src_node))
5239           else:
5240             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5241                                       logical_id=(vgname, new_dev_name),
5242                                       physical_id=(vgname, new_dev_name),
5243                                       iv_name=disk.iv_name)
5244             snap_disks.append(new_dev)
5245
5246     finally:
5247       if self.op.shutdown and instance.status == "up":
5248         if not rpc.call_instance_start(src_node, instance, None):
5249           _ShutdownInstanceDisks(instance, self.cfg)
5250           raise errors.OpExecError("Could not start instance")
5251
5252     # TODO: check for size
5253
5254     for dev in snap_disks:
5255       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5256                                            instance):
5257         logger.Error("could not export block device %s from node"
5258                      " %s to node %s" %
5259                      (dev.logical_id[1], src_node, dst_node.name))
5260       if not rpc.call_blockdev_remove(src_node, dev):
5261         logger.Error("could not remove snapshot block device %s from"
5262                      " node %s" % (dev.logical_id[1], src_node))
5263
5264     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5265       logger.Error("could not finalize export for instance %s on node %s" %
5266                    (instance.name, dst_node.name))
5267
5268     nodelist = self.cfg.GetNodeList()
5269     nodelist.remove(dst_node.name)
5270
5271     # on one-node clusters nodelist will be empty after the removal
5272     # if we proceed the backup would be removed because OpQueryExports
5273     # substitutes an empty list with the full cluster node list.
5274     if nodelist:
5275       op = opcodes.OpQueryExports(nodes=nodelist)
5276       exportlist = self.proc.ChainOpCode(op)
5277       for node in exportlist:
5278         if instance.name in exportlist[node]:
5279           if not rpc.call_export_remove(node, instance.name):
5280             logger.Error("could not remove older export for instance %s"
5281                          " on node %s" % (instance.name, node))
5282
5283
5284 class LURemoveExport(NoHooksLU):
5285   """Remove exports related to the named instance.
5286
5287   """
5288   _OP_REQP = ["instance_name"]
5289
5290   def CheckPrereq(self):
5291     """Check prerequisites.
5292     """
5293     pass
5294
5295   def Exec(self, feedback_fn):
5296     """Remove any export.
5297
5298     """
5299     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5300     # If the instance was not found we'll try with the name that was passed in.
5301     # This will only work if it was an FQDN, though.
5302     fqdn_warn = False
5303     if not instance_name:
5304       fqdn_warn = True
5305       instance_name = self.op.instance_name
5306
5307     op = opcodes.OpQueryExports(nodes=[])
5308     exportlist = self.proc.ChainOpCode(op)
5309     found = False
5310     for node in exportlist:
5311       if instance_name in exportlist[node]:
5312         found = True
5313         if not rpc.call_export_remove(node, instance_name):
5314           logger.Error("could not remove export for instance %s"
5315                        " on node %s" % (instance_name, node))
5316
5317     if fqdn_warn and not found:
5318       feedback_fn("Export not found. If trying to remove an export belonging"
5319                   " to a deleted instance please use its Fully Qualified"
5320                   " Domain Name.")
5321
5322
5323 class TagsLU(NoHooksLU):
5324   """Generic tags LU.
5325
5326   This is an abstract class which is the parent of all the other tags LUs.
5327
5328   """
5329   def CheckPrereq(self):
5330     """Check prerequisites.
5331
5332     """
5333     if self.op.kind == constants.TAG_CLUSTER:
5334       self.target = self.cfg.GetClusterInfo()
5335     elif self.op.kind == constants.TAG_NODE:
5336       name = self.cfg.ExpandNodeName(self.op.name)
5337       if name is None:
5338         raise errors.OpPrereqError("Invalid node name (%s)" %
5339                                    (self.op.name,))
5340       self.op.name = name
5341       self.target = self.cfg.GetNodeInfo(name)
5342     elif self.op.kind == constants.TAG_INSTANCE:
5343       name = self.cfg.ExpandInstanceName(self.op.name)
5344       if name is None:
5345         raise errors.OpPrereqError("Invalid instance name (%s)" %
5346                                    (self.op.name,))
5347       self.op.name = name
5348       self.target = self.cfg.GetInstanceInfo(name)
5349     else:
5350       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5351                                  str(self.op.kind))
5352
5353
5354 class LUGetTags(TagsLU):
5355   """Returns the tags of a given object.
5356
5357   """
5358   _OP_REQP = ["kind", "name"]
5359
5360   def Exec(self, feedback_fn):
5361     """Returns the tag list.
5362
5363     """
5364     return self.target.GetTags()
5365
5366
5367 class LUSearchTags(NoHooksLU):
5368   """Searches the tags for a given pattern.
5369
5370   """
5371   _OP_REQP = ["pattern"]
5372
5373   def CheckPrereq(self):
5374     """Check prerequisites.
5375
5376     This checks the pattern passed for validity by compiling it.
5377
5378     """
5379     try:
5380       self.re = re.compile(self.op.pattern)
5381     except re.error, err:
5382       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5383                                  (self.op.pattern, err))
5384
5385   def Exec(self, feedback_fn):
5386     """Returns the tag list.
5387
5388     """
5389     cfg = self.cfg
5390     tgts = [("/cluster", cfg.GetClusterInfo())]
5391     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5392     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5393     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5394     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5395     results = []
5396     for path, target in tgts:
5397       for tag in target.GetTags():
5398         if self.re.search(tag):
5399           results.append((path, tag))
5400     return results
5401
5402
5403 class LUAddTags(TagsLU):
5404   """Sets a tag on a given object.
5405
5406   """
5407   _OP_REQP = ["kind", "name", "tags"]
5408
5409   def CheckPrereq(self):
5410     """Check prerequisites.
5411
5412     This checks the type and length of the tag name and value.
5413
5414     """
5415     TagsLU.CheckPrereq(self)
5416     for tag in self.op.tags:
5417       objects.TaggableObject.ValidateTag(tag)
5418
5419   def Exec(self, feedback_fn):
5420     """Sets the tag.
5421
5422     """
5423     try:
5424       for tag in self.op.tags:
5425         self.target.AddTag(tag)
5426     except errors.TagError, err:
5427       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5428     try:
5429       self.cfg.Update(self.target)
5430     except errors.ConfigurationError:
5431       raise errors.OpRetryError("There has been a modification to the"
5432                                 " config file and the operation has been"
5433                                 " aborted. Please retry.")
5434
5435
5436 class LUDelTags(TagsLU):
5437   """Delete a list of tags from a given object.
5438
5439   """
5440   _OP_REQP = ["kind", "name", "tags"]
5441
5442   def CheckPrereq(self):
5443     """Check prerequisites.
5444
5445     This checks that we have the given tag.
5446
5447     """
5448     TagsLU.CheckPrereq(self)
5449     for tag in self.op.tags:
5450       objects.TaggableObject.ValidateTag(tag, removal=True)
5451     del_tags = frozenset(self.op.tags)
5452     cur_tags = self.target.GetTags()
5453     if not del_tags <= cur_tags:
5454       diff_tags = del_tags - cur_tags
5455       diff_names = ["'%s'" % tag for tag in diff_tags]
5456       diff_names.sort()
5457       raise errors.OpPrereqError("Tag(s) %s not found" %
5458                                  (",".join(diff_names)))
5459
5460   def Exec(self, feedback_fn):
5461     """Remove the tag from the object.
5462
5463     """
5464     for tag in self.op.tags:
5465       self.target.RemoveTag(tag)
5466     try:
5467       self.cfg.Update(self.target)
5468     except errors.ConfigurationError:
5469       raise errors.OpRetryError("There has been a modification to the"
5470                                 " config file and the operation has been"
5471                                 " aborted. Please retry.")
5472
5473 class LUTestDelay(NoHooksLU):
5474   """Sleep for a specified amount of time.
5475
5476   This LU sleeps on the master and/or nodes for a specified amoutn of
5477   time.
5478
5479   """
5480   _OP_REQP = ["duration", "on_master", "on_nodes"]
5481
5482   def CheckPrereq(self):
5483     """Check prerequisites.
5484
5485     This checks that we have a good list of nodes and/or the duration
5486     is valid.
5487
5488     """
5489
5490     if self.op.on_nodes:
5491       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5492
5493   def Exec(self, feedback_fn):
5494     """Do the actual sleep.
5495
5496     """
5497     if self.op.on_master:
5498       if not utils.TestDelay(self.op.duration):
5499         raise errors.OpExecError("Error during master delay test")
5500     if self.op.on_nodes:
5501       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5502       if not result:
5503         raise errors.OpExecError("Complete failure from rpc call")
5504       for node, node_result in result.items():
5505         if not node_result:
5506           raise errors.OpExecError("Failure during rpc call to node %s,"
5507                                    " result: %s" % (node, node_result))
5508
5509
5510 class IAllocator(object):
5511   """IAllocator framework.
5512
5513   An IAllocator instance has three sets of attributes:
5514     - cfg/sstore that are needed to query the cluster
5515     - input data (all members of the _KEYS class attribute are required)
5516     - four buffer attributes (in|out_data|text), that represent the
5517       input (to the external script) in text and data structure format,
5518       and the output from it, again in two formats
5519     - the result variables from the script (success, info, nodes) for
5520       easy usage
5521
5522   """
5523   _ALLO_KEYS = [
5524     "mem_size", "disks", "disk_template",
5525     "os", "tags", "nics", "vcpus",
5526     ]
5527   _RELO_KEYS = [
5528     "relocate_from",
5529     ]
5530
5531   def __init__(self, cfg, sstore, mode, name, **kwargs):
5532     self.cfg = cfg
5533     self.sstore = sstore
5534     # init buffer variables
5535     self.in_text = self.out_text = self.in_data = self.out_data = None
5536     # init all input fields so that pylint is happy
5537     self.mode = mode
5538     self.name = name
5539     self.mem_size = self.disks = self.disk_template = None
5540     self.os = self.tags = self.nics = self.vcpus = None
5541     self.relocate_from = None
5542     # computed fields
5543     self.required_nodes = None
5544     # init result fields
5545     self.success = self.info = self.nodes = None
5546     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5547       keyset = self._ALLO_KEYS
5548     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5549       keyset = self._RELO_KEYS
5550     else:
5551       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5552                                    " IAllocator" % self.mode)
5553     for key in kwargs:
5554       if key not in keyset:
5555         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5556                                      " IAllocator" % key)
5557       setattr(self, key, kwargs[key])
5558     for key in keyset:
5559       if key not in kwargs:
5560         raise errors.ProgrammerError("Missing input parameter '%s' to"
5561                                      " IAllocator" % key)
5562     self._BuildInputData()
5563
5564   def _ComputeClusterData(self):
5565     """Compute the generic allocator input data.
5566
5567     This is the data that is independent of the actual operation.
5568
5569     """
5570     cfg = self.cfg
5571     # cluster data
5572     data = {
5573       "version": 1,
5574       "cluster_name": self.sstore.GetClusterName(),
5575       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5576       "hypervisor_type": self.sstore.GetHypervisorType(),
5577       # we don't have job IDs
5578       }
5579
5580     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5581
5582     # node data
5583     node_results = {}
5584     node_list = cfg.GetNodeList()
5585     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5586     for nname in node_list:
5587       ninfo = cfg.GetNodeInfo(nname)
5588       if nname not in node_data or not isinstance(node_data[nname], dict):
5589         raise errors.OpExecError("Can't get data for node %s" % nname)
5590       remote_info = node_data[nname]
5591       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5592                    'vg_size', 'vg_free', 'cpu_total']:
5593         if attr not in remote_info:
5594           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5595                                    (nname, attr))
5596         try:
5597           remote_info[attr] = int(remote_info[attr])
5598         except ValueError, err:
5599           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5600                                    " %s" % (nname, attr, str(err)))
5601       # compute memory used by primary instances
5602       i_p_mem = i_p_up_mem = 0
5603       for iinfo in i_list:
5604         if iinfo.primary_node == nname:
5605           i_p_mem += iinfo.memory
5606           if iinfo.status == "up":
5607             i_p_up_mem += iinfo.memory
5608
5609       # compute memory used by instances
5610       pnr = {
5611         "tags": list(ninfo.GetTags()),
5612         "total_memory": remote_info['memory_total'],
5613         "reserved_memory": remote_info['memory_dom0'],
5614         "free_memory": remote_info['memory_free'],
5615         "i_pri_memory": i_p_mem,
5616         "i_pri_up_memory": i_p_up_mem,
5617         "total_disk": remote_info['vg_size'],
5618         "free_disk": remote_info['vg_free'],
5619         "primary_ip": ninfo.primary_ip,
5620         "secondary_ip": ninfo.secondary_ip,
5621         "total_cpus": remote_info['cpu_total'],
5622         }
5623       node_results[nname] = pnr
5624     data["nodes"] = node_results
5625
5626     # instance data
5627     instance_data = {}
5628     for iinfo in i_list:
5629       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5630                   for n in iinfo.nics]
5631       pir = {
5632         "tags": list(iinfo.GetTags()),
5633         "should_run": iinfo.status == "up",
5634         "vcpus": iinfo.vcpus,
5635         "memory": iinfo.memory,
5636         "os": iinfo.os,
5637         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5638         "nics": nic_data,
5639         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5640         "disk_template": iinfo.disk_template,
5641         }
5642       instance_data[iinfo.name] = pir
5643
5644     data["instances"] = instance_data
5645
5646     self.in_data = data
5647
5648   def _AddNewInstance(self):
5649     """Add new instance data to allocator structure.
5650
5651     This in combination with _AllocatorGetClusterData will create the
5652     correct structure needed as input for the allocator.
5653
5654     The checks for the completeness of the opcode must have already been
5655     done.
5656
5657     """
5658     data = self.in_data
5659     if len(self.disks) != 2:
5660       raise errors.OpExecError("Only two-disk configurations supported")
5661
5662     disk_space = _ComputeDiskSize(self.disk_template,
5663                                   self.disks[0]["size"], self.disks[1]["size"])
5664
5665     if self.disk_template in constants.DTS_NET_MIRROR:
5666       self.required_nodes = 2
5667     else:
5668       self.required_nodes = 1
5669     request = {
5670       "type": "allocate",
5671       "name": self.name,
5672       "disk_template": self.disk_template,
5673       "tags": self.tags,
5674       "os": self.os,
5675       "vcpus": self.vcpus,
5676       "memory": self.mem_size,
5677       "disks": self.disks,
5678       "disk_space_total": disk_space,
5679       "nics": self.nics,
5680       "required_nodes": self.required_nodes,
5681       }
5682     data["request"] = request
5683
5684   def _AddRelocateInstance(self):
5685     """Add relocate instance data to allocator structure.
5686
5687     This in combination with _IAllocatorGetClusterData will create the
5688     correct structure needed as input for the allocator.
5689
5690     The checks for the completeness of the opcode must have already been
5691     done.
5692
5693     """
5694     instance = self.cfg.GetInstanceInfo(self.name)
5695     if instance is None:
5696       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5697                                    " IAllocator" % self.name)
5698
5699     if instance.disk_template not in constants.DTS_NET_MIRROR:
5700       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5701
5702     if len(instance.secondary_nodes) != 1:
5703       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5704
5705     self.required_nodes = 1
5706
5707     disk_space = _ComputeDiskSize(instance.disk_template,
5708                                   instance.disks[0].size,
5709                                   instance.disks[1].size)
5710
5711     request = {
5712       "type": "relocate",
5713       "name": self.name,
5714       "disk_space_total": disk_space,
5715       "required_nodes": self.required_nodes,
5716       "relocate_from": self.relocate_from,
5717       }
5718     self.in_data["request"] = request
5719
5720   def _BuildInputData(self):
5721     """Build input data structures.
5722
5723     """
5724     self._ComputeClusterData()
5725
5726     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5727       self._AddNewInstance()
5728     else:
5729       self._AddRelocateInstance()
5730
5731     self.in_text = serializer.Dump(self.in_data)
5732
5733   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5734     """Run an instance allocator and return the results.
5735
5736     """
5737     data = self.in_text
5738
5739     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5740
5741     if not isinstance(result, tuple) or len(result) != 4:
5742       raise errors.OpExecError("Invalid result from master iallocator runner")
5743
5744     rcode, stdout, stderr, fail = result
5745
5746     if rcode == constants.IARUN_NOTFOUND:
5747       raise errors.OpExecError("Can't find allocator '%s'" % name)
5748     elif rcode == constants.IARUN_FAILURE:
5749         raise errors.OpExecError("Instance allocator call failed: %s,"
5750                                  " output: %s" %
5751                                  (fail, stdout+stderr))
5752     self.out_text = stdout
5753     if validate:
5754       self._ValidateResult()
5755
5756   def _ValidateResult(self):
5757     """Process the allocator results.
5758
5759     This will process and if successful save the result in
5760     self.out_data and the other parameters.
5761
5762     """
5763     try:
5764       rdict = serializer.Load(self.out_text)
5765     except Exception, err:
5766       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5767
5768     if not isinstance(rdict, dict):
5769       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5770
5771     for key in "success", "info", "nodes":
5772       if key not in rdict:
5773         raise errors.OpExecError("Can't parse iallocator results:"
5774                                  " missing key '%s'" % key)
5775       setattr(self, key, rdict[key])
5776
5777     if not isinstance(rdict["nodes"], list):
5778       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5779                                " is not a list")
5780     self.out_data = rdict
5781
5782
5783 class LUTestAllocator(NoHooksLU):
5784   """Run allocator tests.
5785
5786   This LU runs the allocator tests
5787
5788   """
5789   _OP_REQP = ["direction", "mode", "name"]
5790
5791   def CheckPrereq(self):
5792     """Check prerequisites.
5793
5794     This checks the opcode parameters depending on the director and mode test.
5795
5796     """
5797     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5798       for attr in ["name", "mem_size", "disks", "disk_template",
5799                    "os", "tags", "nics", "vcpus"]:
5800         if not hasattr(self.op, attr):
5801           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5802                                      attr)
5803       iname = self.cfg.ExpandInstanceName(self.op.name)
5804       if iname is not None:
5805         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5806                                    iname)
5807       if not isinstance(self.op.nics, list):
5808         raise errors.OpPrereqError("Invalid parameter 'nics'")
5809       for row in self.op.nics:
5810         if (not isinstance(row, dict) or
5811             "mac" not in row or
5812             "ip" not in row or
5813             "bridge" not in row):
5814           raise errors.OpPrereqError("Invalid contents of the"
5815                                      " 'nics' parameter")
5816       if not isinstance(self.op.disks, list):
5817         raise errors.OpPrereqError("Invalid parameter 'disks'")
5818       if len(self.op.disks) != 2:
5819         raise errors.OpPrereqError("Only two-disk configurations supported")
5820       for row in self.op.disks:
5821         if (not isinstance(row, dict) or
5822             "size" not in row or
5823             not isinstance(row["size"], int) or
5824             "mode" not in row or
5825             row["mode"] not in ['r', 'w']):
5826           raise errors.OpPrereqError("Invalid contents of the"
5827                                      " 'disks' parameter")
5828     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5829       if not hasattr(self.op, "name"):
5830         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5831       fname = self.cfg.ExpandInstanceName(self.op.name)
5832       if fname is None:
5833         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5834                                    self.op.name)
5835       self.op.name = fname
5836       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5837     else:
5838       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5839                                  self.op.mode)
5840
5841     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5842       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5843         raise errors.OpPrereqError("Missing allocator name")
5844     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5845       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5846                                  self.op.direction)
5847
5848   def Exec(self, feedback_fn):
5849     """Run the allocator test.
5850
5851     """
5852     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5853       ial = IAllocator(self.cfg, self.sstore,
5854                        mode=self.op.mode,
5855                        name=self.op.name,
5856                        mem_size=self.op.mem_size,
5857                        disks=self.op.disks,
5858                        disk_template=self.op.disk_template,
5859                        os=self.op.os,
5860                        tags=self.op.tags,
5861                        nics=self.op.nics,
5862                        vcpus=self.op.vcpus,
5863                        )
5864     else:
5865       ial = IAllocator(self.cfg, self.sstore,
5866                        mode=self.op.mode,
5867                        name=self.op.name,
5868                        relocate_from=list(self.relocate_from),
5869                        )
5870
5871     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5872       result = ial.in_text
5873     else:
5874       ial.Run(self.op.allocator, validate=False)
5875       result = ial.out_text
5876     return result